package arbitrum import ( "context" "fmt" "math/big" "sync" "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rpc" "github.com/fraktal/mev-beta/internal/logger" arbcommon "github.com/fraktal/mev-beta/pkg/arbitrum/common" ) // EventMonitor monitors DEX events across all protocols for comprehensive MEV detection type EventMonitor struct { client *RateLimitedClient rpcClient *rpc.Client logger *logger.Logger protocolRegistry *ArbitrumProtocolRegistry poolCache *PoolCache swapPipeline *SwapEventPipeline // Event signatures for all major DEX protocols eventSignatures map[arbcommon.Protocol]map[string]common.Hash // Active monitoring subscriptions subscriptions map[string]*subscription subMu sync.RWMutex // Metrics eventsProcessed uint64 swapsDetected uint64 liquidityEvents uint64 // Channels for event processing swapEvents chan *SwapEvent liquidationEvents chan *LiquidationEvent liquidityEventsChan chan *LiquidityEvent // Shutdown management ctx context.Context cancel context.CancelFunc } // subscription represents an active event subscription type subscription struct { protocol arbcommon.Protocol query ethereum.FilterQuery cancel context.CancelFunc } // NewEventMonitor creates a new event monitor func NewEventMonitor( client *RateLimitedClient, rpcClient *rpc.Client, logger *logger.Logger, protocolRegistry *ArbitrumProtocolRegistry, poolCache *PoolCache, ) *EventMonitor { ctx, cancel := context.WithCancel(context.Background()) em := &EventMonitor{ client: client, rpcClient: rpcClient, logger: logger, protocolRegistry: protocolRegistry, poolCache: poolCache, eventSignatures: make(map[arbcommon.Protocol]map[string]common.Hash), subscriptions: make(map[string]*subscription), swapEvents: make(chan *SwapEvent, 1000), liquidationEvents: make(chan *LiquidationEvent, 100), liquidityEventsChan: make(chan *LiquidityEvent, 500), ctx: ctx, cancel: cancel, } // Initialize event signatures for all protocols em.initializeEventSignatures() return em } // initializeEventSignatures sets up event signatures for all supported protocols func (em *EventMonitor) initializeEventSignatures() { // Uniswap V2/V3 Swap events em.eventSignatures[arbcommon.ProtocolUniswapV2] = map[string]common.Hash{ "Swap": crypto.Keccak256Hash([]byte("Swap(address,uint256,uint256,uint256,uint256,address)")), "Mint": crypto.Keccak256Hash([]byte("Mint(address,uint256,uint256)")), "Burn": crypto.Keccak256Hash([]byte("Burn(address,uint256,uint256,address)")), } em.eventSignatures[arbcommon.ProtocolUniswapV3] = map[string]common.Hash{ "Swap": crypto.Keccak256Hash([]byte("Swap(address,address,int256,int256,uint160,uint128,int24)")), "Mint": crypto.Keccak256Hash([]byte("Mint(address,int24,int24,uint128,uint256,uint256)")), "Burn": crypto.Keccak256Hash([]byte("Burn(address,int24,int24,uint128,uint256,uint256)")), } // SushiSwap (same as Uniswap V2) em.eventSignatures[arbcommon.ProtocolSushiSwapV2] = em.eventSignatures[arbcommon.ProtocolUniswapV2] // Camelot V3 (Algebra) em.eventSignatures[arbcommon.ProtocolCamelotV3] = map[string]common.Hash{ "Swap": crypto.Keccak256Hash([]byte("Swap(address,address,int256,int256,uint160,uint128,int24)")), "Mint": crypto.Keccak256Hash([]byte("Mint(address,int24,int24,uint128,uint256,uint256)")), "Burn": crypto.Keccak256Hash([]byte("Burn(address,int24,int24,uint128,uint256,uint256)")), } // Balancer V2 em.eventSignatures[arbcommon.ProtocolBalancerV2] = map[string]common.Hash{ "Swap": crypto.Keccak256Hash([]byte("Swap(bytes32,address,address,uint256,uint256)")), "PoolBalanceChanged": crypto.Keccak256Hash([]byte("PoolBalanceChanged(bytes32,address,address[],int256[],uint256[])")), } // Curve Finance em.eventSignatures[arbcommon.ProtocolCurve] = map[string]common.Hash{ "TokenExchange": crypto.Keccak256Hash([]byte("TokenExchange(address,int128,uint256,int128,uint256)")), "TokenExchangeUnderlying": crypto.Keccak256Hash([]byte("TokenExchangeUnderlying(address,int128,uint256,int128,uint256)")), "AddLiquidity": crypto.Keccak256Hash([]byte("AddLiquidity(address,uint256[],uint256[],uint256,uint256)")), "RemoveLiquidity": crypto.Keccak256Hash([]byte("RemoveLiquidity(address,uint256[],uint256)")), } // 1inch Aggregator em.eventSignatures[arbcommon.Protocol1Inch] = map[string]common.Hash{ "Swapped": crypto.Keccak256Hash([]byte("Swapped(address,address,address,uint256,uint256)")), } // GMX em.eventSignatures[arbcommon.ProtocolGMX] = map[string]common.Hash{ "Swap": crypto.Keccak256Hash([]byte("Swap(address,address,address,uint256,uint256,uint256,uint256)")), "LiquidatePosition": crypto.Keccak256Hash([]byte("LiquidatePosition(bytes32,address,address,address,bool,uint256,uint256,uint256,int256,uint256)")), } // Radiant Capital em.eventSignatures[arbcommon.ProtocolRadiant] = map[string]common.Hash{ "LiquidationCall": crypto.Keccak256Hash([]byte("LiquidationCall(address,address,address,uint256,uint256,address,bool)")), "Deposit": crypto.Keccak256Hash([]byte("Deposit(address,address,address,uint256,uint16)")), "Withdraw": crypto.Keccak256Hash([]byte("Withdraw(address,address,address,uint256)")), } } // Start begins monitoring all DEX events func (em *EventMonitor) Start() error { em.logger.Info("🚀 Starting comprehensive DEX event monitoring") // Start event processing workers go em.processSwapEvents() go em.processLiquidationEvents() go em.processLiquidityEvents() // Start monitoring all active protocols if err := em.startProtocolMonitoring(); err != nil { return fmt.Errorf("failed to start protocol monitoring: %w", err) } em.logger.Info("✅ DEX event monitoring started successfully") return nil } // startProtocolMonitoring sets up monitoring for all active protocols func (em *EventMonitor) startProtocolMonitoring() error { protocols := em.protocolRegistry.GetActiveProtocols() for _, protocol := range protocols { // Get all pool addresses for this protocol poolAddresses := em.poolCache.GetPoolAddressesByProtocol(arbcommon.Protocol(protocol.Name)) // If we don't have pools yet, monitor factory events to discover them if len(poolAddresses) == 0 { // Monitor factory events for pool creation if err := em.monitorFactoryEvents(arbcommon.Protocol(protocol.Name)); err != nil { em.logger.Error(fmt.Sprintf("Failed to monitor factory events for %s: %v", protocol.Name, err)) continue } } // Monitor swap events for existing pools if err := em.monitorSwapEvents(arbcommon.Protocol(protocol.Name), poolAddresses); err != nil { em.logger.Error(fmt.Sprintf("Failed to monitor swap events for %s: %v", protocol.Name, err)) continue } // Monitor liquidation events for lending protocols if arbcommon.Protocol(protocol.Name) == arbcommon.ProtocolGMX || arbcommon.Protocol(protocol.Name) == arbcommon.ProtocolRadiant { if err := em.monitorLiquidationEvents(arbcommon.Protocol(protocol.Name)); err != nil { em.logger.Error(fmt.Sprintf("Failed to monitor liquidation events for %s: %v", protocol.Name, err)) continue } } } return nil } // monitorSwapEvents sets up monitoring for swap events on specific pools func (em *EventMonitor) monitorSwapEvents(protocol arbcommon.Protocol, poolAddresses []common.Address) error { if len(poolAddresses) == 0 { return nil // No pools to monitor } // Get event signatures for this protocol signatures, exists := em.eventSignatures[protocol] if !exists { return fmt.Errorf("no event signatures found for protocol %s", protocol) } // Get Swap event signature swapTopic, exists := signatures["Swap"] if !exists { return fmt.Errorf("no Swap event signature found for protocol %s", protocol) } // Create filter query for Swap events query := ethereum.FilterQuery{ Addresses: poolAddresses, Topics: [][]common.Hash{{swapTopic}}, } // Create subscription subCtx, cancel := context.WithCancel(em.ctx) sub := &subscription{ protocol: protocol, query: query, cancel: cancel, } // Store subscription em.subMu.Lock() em.subscriptions[string(protocol)+"_swap"] = sub em.subMu.Unlock() // Start monitoring in a goroutine go em.monitorEvents(subCtx, sub, "Swap") em.logger.Info(fmt.Sprintf("🔍 Monitoring Swap events for %d %s pools", len(poolAddresses), protocol)) return nil } // monitorFactoryEvents sets up monitoring for factory events to discover new pools func (em *EventMonitor) monitorFactoryEvents(protocol arbcommon.Protocol) error { // Get factory addresses for this protocol factories := em.protocolRegistry.GetFactoryAddresses(protocol) if len(factories) == 0 { return nil // No factories to monitor } // Get event signatures for this protocol signatures, exists := em.eventSignatures[protocol] if !exists { return fmt.Errorf("no event signatures found for protocol %s", protocol) } // Different protocols have different pool creation events var creationTopic common.Hash switch protocol { case arbcommon.ProtocolUniswapV2, arbcommon.ProtocolSushiSwapV2: creationTopic, exists = signatures["PairCreated"] if !exists { // Default to common creation event creationTopic = crypto.Keccak256Hash([]byte("PairCreated(address,address,address,uint256)")) } case arbcommon.ProtocolUniswapV3, arbcommon.ProtocolCamelotV3: creationTopic, exists = signatures["PoolCreated"] if !exists { // Default to common creation event creationTopic = crypto.Keccak256Hash([]byte("PoolCreated(address,address,uint24,address)")) } case arbcommon.ProtocolBalancerV2: creationTopic = crypto.Keccak256Hash([]byte("PoolCreated(address,address,address)")) case arbcommon.ProtocolCurve: creationTopic = crypto.Keccak256Hash([]byte("PoolAdded(address)")) default: // For other protocols, use a generic approach creationTopic = crypto.Keccak256Hash([]byte("PoolCreated(address)")) } // Create filter query for pool creation events query := ethereum.FilterQuery{ Addresses: factories, Topics: [][]common.Hash{{creationTopic}}, } // Create subscription subCtx, cancel := context.WithCancel(em.ctx) sub := &subscription{ protocol: protocol, query: query, cancel: cancel, } // Store subscription em.subMu.Lock() em.subscriptions[string(protocol)+"_factory"] = sub em.subMu.Unlock() // Start monitoring in a goroutine go em.monitorEvents(subCtx, sub, "Factory") em.logger.Info(fmt.Sprintf("🏭 Monitoring factory events for %s pool creation", protocol)) return nil } // monitorLiquidationEvents sets up monitoring for liquidation events func (em *EventMonitor) monitorLiquidationEvents(protocol arbcommon.Protocol) error { // Get contract addresses for this protocol contracts := em.protocolRegistry.GetContractAddresses(protocol) if len(contracts) == 0 { return nil // No contracts to monitor } // Get event signatures for this protocol signatures, exists := em.eventSignatures[protocol] if !exists { return fmt.Errorf("no event signatures found for protocol %s", protocol) } // Get Liquidation event signature var liquidationTopic common.Hash switch protocol { case arbcommon.ProtocolGMX: liquidationTopic, exists = signatures["LiquidatePosition"] case arbcommon.ProtocolRadiant: liquidationTopic, exists = signatures["LiquidationCall"] default: return nil // No liquidation events for this protocol } if !exists { return fmt.Errorf("no liquidation event signature found for protocol %s", protocol) } // Create filter query for liquidation events query := ethereum.FilterQuery{ Addresses: contracts, Topics: [][]common.Hash{{liquidationTopic}}, } // Create subscription subCtx, cancel := context.WithCancel(em.ctx) sub := &subscription{ protocol: protocol, query: query, cancel: cancel, } // Store subscription em.subMu.Lock() em.subscriptions[string(protocol)+"_liquidation"] = sub em.subMu.Unlock() // Start monitoring in a goroutine go em.monitorEvents(subCtx, sub, "Liquidation") em.logger.Info(fmt.Sprintf("💧 Monitoring liquidation events for %s", protocol)) return nil } // monitorEvents continuously monitors events for a subscription func (em *EventMonitor) monitorEvents(ctx context.Context, sub *subscription, eventType string) { ticker := time.NewTicker(1 * time.Second) // Poll every second defer ticker.Stop() var lastBlock uint64 = 0 for { select { case <-ctx.Done(): return case <-ticker.C: // Get latest block latestBlock, err := em.client.BlockNumber(ctx) if err != nil { em.logger.Error(fmt.Sprintf("Failed to get latest block: %v", err)) continue } // Set fromBlock to last processed block + 1, or latest - 1000 if starting fromBlock := lastBlock + 1 if fromBlock == 1 || latestBlock-fromBlock > 10000 { fromBlock = latestBlock - 10000 if fromBlock < 1 { fromBlock = 1 } } // Set toBlock to latest block toBlock := latestBlock // Skip if no new blocks if fromBlock > toBlock { continue } // Update query block range query := sub.query query.FromBlock = new(big.Int).SetUint64(fromBlock) query.ToBlock = new(big.Int).SetUint64(toBlock) // Query logs with circuit breaker and rate limiting var logs []types.Log err = em.client.CallWithRateLimit(ctx, func() error { var innerErr error logs, innerErr = em.client.FilterLogs(ctx, query) return innerErr }) if err != nil { em.logger.Error(fmt.Sprintf("Failed to filter logs for %s events: %v", eventType, err)) continue } // Process logs for _, log := range logs { if err := em.processLog(log, sub.protocol, eventType); err != nil { em.logger.Error(fmt.Sprintf("Failed to process %s log: %v", eventType, err)) } em.eventsProcessed++ } // Update last processed block lastBlock = toBlock } } } // processLog processes a single log entry func (em *EventMonitor) processLog(log types.Log, protocol arbcommon.Protocol, eventType string) error { switch eventType { case "Swap": return em.processSwapLog(log, protocol) case "Factory": return em.processFactoryLog(log, protocol) case "Liquidation": return em.processLiquidationLog(log, protocol) default: // Generic event processing em.logger.Debug(fmt.Sprintf("Processing generic %s event from %s", eventType, protocol)) return nil } } // processSwapLog processes a swap event log func (em *EventMonitor) processSwapLog(log types.Log, protocol arbcommon.Protocol) error { // Parse swap event based on protocol swapEvent, err := em.parseSwapEvent(log, protocol) if err != nil { return fmt.Errorf("failed to parse swap event: %w", err) } // Send to swap events channel select { case em.swapEvents <- swapEvent: em.swapsDetected++ default: em.logger.Warn("Swap events channel full, dropping event") } return nil } // parseSwapEvent parses a swap event log into a SwapEvent func (em *EventMonitor) parseSwapEvent(log types.Log, protocol arbcommon.Protocol) (*SwapEvent, error) { // This is a simplified implementation - in practice, you'd parse the actual // event data based on the protocol's ABI swapEvent := &SwapEvent{ Timestamp: time.Now(), BlockNumber: log.BlockNumber, TxHash: log.TxHash.Hex(), Protocol: string(protocol), Pool: log.Address.Hex(), // In a real implementation, you'd parse the actual event parameters here TokenIn: "0x0000000000000000000000000000000000000000", TokenOut: "0x0000000000000000000000000000000000000000", AmountIn: "0", AmountOut: "0", Sender: "0x0000000000000000000000000000000000000000", Recipient: "0x0000000000000000000000000000000000000000", GasPrice: "0", GasUsed: 0, PriceImpact: 0.0, MEVScore: 0.0, Profitable: false, } // Add to pool cache if not already present em.poolCache.AddPoolIfNotExists(log.Address, protocol) return swapEvent, nil } // processFactoryLog processes a factory event log (pool creation) func (em *EventMonitor) processFactoryLog(log types.Log, protocol arbcommon.Protocol) error { // Parse pool creation event to extract token addresses var token0, token1 common.Address // For Uniswap V2/SushiSwap: PairCreated(address indexed token0, address indexed token1, address pair, uint) // For Uniswap V3: PoolCreated(address indexed token0, address indexed token1, uint24 indexed fee, int24 tickSpacing, address pool) if len(log.Topics) >= 3 { token0 = common.BytesToAddress(log.Topics[1].Bytes()) token1 = common.BytesToAddress(log.Topics[2].Bytes()) } // Handle empty addresses to prevent slice bounds panic token0Display := "unknown" token1Display := "unknown" addressDisplay := "unknown" if len(token0.Hex()) > 0 { if len(token0.Hex()) > 8 { token0Display = token0.Hex()[:8] } else { token0Display = token0.Hex() } } if len(token1.Hex()) > 0 { if len(token1.Hex()) > 8 { token1Display = token1.Hex()[:8] } else { token1Display = token1.Hex() } } if len(log.Address.Hex()) > 0 { if len(log.Address.Hex()) > 10 { addressDisplay = log.Address.Hex()[:10] } else { addressDisplay = log.Address.Hex() } } em.logger.Info(fmt.Sprintf("🆕 New %s pool created: %s/%s at %s", protocol, token0Display, token1Display, addressDisplay)) // Add to pool cache and start monitoring this pool em.poolCache.AddPoolIfNotExists(log.Address, protocol) // CRITICAL FIX: Validate pool address is not zero before creating event if log.Address == (common.Address{}) { return fmt.Errorf("invalid zero pool address from log") } // CRITICAL: Create liquidity event to trigger cross-factory syncing liquidityEvent := &LiquidityEvent{ Timestamp: time.Now(), BlockNumber: log.BlockNumber, TxHash: log.TxHash.Hex(), Protocol: string(protocol), Pool: log.Address.Hex(), Token0: token0.Hex(), Token1: token1.Hex(), EventType: "pool_created", Amount0: "0", Amount1: "0", Liquidity: "0", PriceAfter: "0", ImpactSize: 0.0, ArbitrageOpp: false, } // Send to liquidity events channel for processing select { case em.liquidityEventsChan <- liquidityEvent: em.liquidityEvents++ default: em.logger.Warn("Liquidity events channel full, dropping pool creation event") } // If we have the swap pipeline, trigger cross-factory sync if em.swapPipeline != nil { // CRITICAL FIX: Validate pool address is not zero before creating event if log.Address == (common.Address{}) { return nil // Skip processing if zero address } // Create a mock swap event to trigger syncing mockSwap := &SwapEvent{ Protocol: string(protocol), Pool: log.Address.Hex(), TokenIn: token0.Hex(), TokenOut: token1.Hex(), } go em.swapPipeline.SubmitPoolDiscoverySwap(mockSwap) } return nil } // processLiquidationLog processes a liquidation event log func (em *EventMonitor) processLiquidationLog(log types.Log, protocol arbcommon.Protocol) error { // Parse liquidation event based on protocol liquidationEvent := &LiquidationEvent{ Timestamp: time.Now(), BlockNumber: log.BlockNumber, TxHash: log.TxHash.Hex(), Protocol: string(protocol), // In a real implementation, you'd parse the actual event parameters here } // Send to liquidation events channel select { case em.liquidationEvents <- liquidationEvent: default: em.logger.Warn("Liquidation events channel full, dropping event") } return nil } // processSwapEvents processes swap events from the channel func (em *EventMonitor) processSwapEvents() { for { select { case <-em.ctx.Done(): return case swapEvent := <-em.swapEvents: // Log the swap event if err := em.protocolRegistry.LogSwapEvent(swapEvent); err != nil { em.logger.Error(fmt.Sprintf("Failed to log swap event: %v", err)) } // Update metrics em.swapsDetected++ } } } // processLiquidationEvents processes liquidation events from the channel func (em *EventMonitor) processLiquidationEvents() { for { select { case <-em.ctx.Done(): return case liquidationEvent := <-em.liquidationEvents: // Log the liquidation event if err := em.protocolRegistry.LogLiquidationEvent(liquidationEvent); err != nil { em.logger.Error(fmt.Sprintf("Failed to log liquidation event: %v", err)) } } } } // processLiquidityEvents processes liquidity events from the channel func (em *EventMonitor) processLiquidityEvents() { for { select { case <-em.ctx.Done(): return case liquidityEvent := <-em.liquidityEventsChan: // Log the liquidity event if err := em.protocolRegistry.LogLiquidityEvent(liquidityEvent); err != nil { em.logger.Error(fmt.Sprintf("Failed to log liquidity event: %v", err)) } em.liquidityEvents++ } } } // GetMetrics returns current monitoring metrics func (em *EventMonitor) GetMetrics() map[string]interface{} { return map[string]interface{}{ "events_processed": em.eventsProcessed, "swaps_detected": em.swapsDetected, "liquidity_events": em.liquidityEvents, "active_subscriptions": len(em.subscriptions), } } // Close stops all monitoring and cleans up resources func (em *EventMonitor) Close() error { em.logger.Info("🛑 Stopping DEX event monitoring") // Cancel all subscriptions em.subMu.Lock() for _, sub := range em.subscriptions { sub.cancel() } em.subscriptions = make(map[string]*subscription) em.subMu.Unlock() // Cancel main context em.cancel() em.logger.Info("✅ DEX event monitoring stopped") return nil }