package monitor import ( "context" "fmt" "math/big" "sync" "time" "github.com/fraktal/mev-beta/internal/config" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/internal/ratelimit" "github.com/fraktal/mev-beta/pkg/arbitrum" "github.com/fraktal/mev-beta/pkg/market" "github.com/fraktal/mev-beta/pkg/scanner" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "golang.org/x/time/rate" ) // ArbitrumMonitor monitors the Arbitrum sequencer for transactions with concurrency support type ArbitrumMonitor struct { config *config.ArbitrumConfig botConfig *config.BotConfig client *ethclient.Client l2Parser *arbitrum.ArbitrumL2Parser logger *logger.Logger rateLimiter *ratelimit.LimiterManager marketMgr *market.MarketManager scanner *scanner.MarketScanner pipeline *market.Pipeline fanManager *market.FanManager limiter *rate.Limiter pollInterval time.Duration running bool mu sync.RWMutex } // NewArbitrumMonitor creates a new Arbitrum monitor with rate limiting func NewArbitrumMonitor( arbCfg *config.ArbitrumConfig, botCfg *config.BotConfig, logger *logger.Logger, rateLimiter *ratelimit.LimiterManager, marketMgr *market.MarketManager, scanner *scanner.MarketScanner, ) (*ArbitrumMonitor, error) { // Create Ethereum client client, err := ethclient.Dial(arbCfg.RPCEndpoint) if err != nil { return nil, fmt.Errorf("failed to connect to Arbitrum node: %v", err) } // Create L2 parser for Arbitrum transaction parsing l2Parser, err := arbitrum.NewArbitrumL2Parser(arbCfg.RPCEndpoint, logger) if err != nil { return nil, fmt.Errorf("failed to create L2 parser: %v", err) } // Create rate limiter based on config limiter := rate.NewLimiter( rate.Limit(arbCfg.RateLimit.RequestsPerSecond), arbCfg.RateLimit.Burst, ) // Create pipeline pipeline := market.NewPipeline(botCfg, logger, marketMgr, scanner, client) // Add default stages pipeline.AddDefaultStages() // Create fan manager fanManager := market.NewFanManager( &config.Config{ Arbitrum: *arbCfg, Bot: *botCfg, }, logger, rateLimiter, ) return &ArbitrumMonitor{ config: arbCfg, botConfig: botCfg, client: client, l2Parser: l2Parser, logger: logger, rateLimiter: rateLimiter, marketMgr: marketMgr, scanner: scanner, pipeline: pipeline, fanManager: fanManager, limiter: limiter, pollInterval: time.Duration(botCfg.PollingInterval) * time.Second, running: false, }, nil } // Start begins monitoring the Arbitrum sequencer func (m *ArbitrumMonitor) Start(ctx context.Context) error { m.mu.Lock() m.running = true m.mu.Unlock() m.logger.Info("Starting Arbitrum sequencer monitoring...") // Get the latest block to start from if err := m.rateLimiter.WaitForLimit(ctx, m.config.RPCEndpoint); err != nil { return fmt.Errorf("rate limit error: %v", err) } header, err := m.client.HeaderByNumber(ctx, nil) if err != nil { return fmt.Errorf("failed to get latest block header: %v", err) } lastBlock := header.Number.Uint64() m.logger.Info(fmt.Sprintf("Starting from block: %d", lastBlock)) // Subscribe to DEX events for real-time monitoring if err := m.subscribeToDEXEvents(ctx); err != nil { m.logger.Warn(fmt.Sprintf("Failed to subscribe to DEX events: %v", err)) } else { m.logger.Info("Subscribed to DEX events") } for { m.mu.RLock() running := m.running m.mu.RUnlock() if !running { break } select { case <-ctx.Done(): m.Stop() return nil case <-time.After(m.pollInterval): // Get the latest block if err := m.rateLimiter.WaitForLimit(ctx, m.config.RPCEndpoint); err != nil { m.logger.Error(fmt.Sprintf("Rate limit error: %v", err)) continue } header, err := m.client.HeaderByNumber(ctx, nil) if err != nil { m.logger.Error(fmt.Sprintf("Failed to get latest block header: %v", err)) continue } currentBlock := header.Number.Uint64() // Process blocks from lastBlock+1 to currentBlock for blockNum := lastBlock + 1; blockNum <= currentBlock; blockNum++ { if err := m.processBlock(ctx, blockNum); err != nil { m.logger.Error(fmt.Sprintf("Failed to process block %d: %v", blockNum, err)) } } lastBlock = currentBlock } } return nil } // Stop stops the monitor func (m *ArbitrumMonitor) Stop() { m.mu.Lock() defer m.mu.Unlock() m.running = false m.logger.Info("Stopping Arbitrum monitor...") } // processBlock processes a single block for potential swap transactions with enhanced L2 parsing func (m *ArbitrumMonitor) processBlock(ctx context.Context, blockNumber uint64) error { m.logger.Debug(fmt.Sprintf("Processing block %d", blockNumber)) // Wait for rate limiter if err := m.rateLimiter.WaitForLimit(ctx, m.config.RPCEndpoint); err != nil { return fmt.Errorf("rate limit error: %v", err) } // Get block using L2 parser to bypass transaction type issues l2Block, err := m.l2Parser.GetBlockByNumber(ctx, blockNumber) if err != nil { m.logger.Error(fmt.Sprintf("Failed to get L2 block %d: %v", blockNumber, err)) return fmt.Errorf("failed to get L2 block %d: %v", blockNumber, err) } // Parse DEX transactions from the block dexTransactions := m.l2Parser.ParseDEXTransactions(ctx, l2Block) m.logger.Info(fmt.Sprintf("Block %d: Processing %d transactions, found %d DEX transactions", blockNumber, len(l2Block.Transactions), len(dexTransactions))) // Process DEX transactions if len(dexTransactions) > 0 { m.logger.Info(fmt.Sprintf("Block %d contains %d DEX transactions:", blockNumber, len(dexTransactions))) for i, dexTx := range dexTransactions { m.logger.Info(fmt.Sprintf(" [%d] %s: %s -> %s (%s) calling %s (%s)", i+1, dexTx.Hash, dexTx.From, dexTx.To, dexTx.ContractName, dexTx.FunctionName, dexTx.Protocol)) } // TODO: Convert DEX transactions to standard format and process through pipeline // For now, we're successfully detecting and logging DEX transactions } // If no DEX transactions found, report empty block if len(dexTransactions) == 0 { if len(l2Block.Transactions) == 0 { m.logger.Info(fmt.Sprintf("Block %d: Empty block", blockNumber)) } else { m.logger.Info(fmt.Sprintf("Block %d: No DEX transactions found in %d total transactions", blockNumber, len(l2Block.Transactions))) } } return nil } // subscribeToDEXEvents subscribes to DEX contract events for real-time monitoring func (m *ArbitrumMonitor) subscribeToDEXEvents(ctx context.Context) error { // Define official DEX contract addresses for Arbitrum mainnet dexContracts := []struct { Address common.Address Name string }{ // Official Arbitrum DEX Factories {common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"), "UniswapV2Factory"}, // Official Uniswap V2 Factory {common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"), "UniswapV3Factory"}, // Official Uniswap V3 Factory {common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"), "SushiSwapFactory"}, // Official SushiSwap V2 Factory // Official Arbitrum DEX Routers {common.HexToAddress("0x4752ba5dbc23f44d87826276bf6fd6b1c372ad24"), "UniswapV2Router02"}, // Official Uniswap V2 Router02 {common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"), "UniswapV3Router"}, // Official Uniswap V3 SwapRouter {common.HexToAddress("0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45"), "UniswapV3Router02"}, // Official Uniswap V3 SwapRouter02 {common.HexToAddress("0x1b02dA8Cb0d097eB8D57A175b88c7D8b47997506"), "SushiSwapRouter"}, // Official SushiSwap Router {common.HexToAddress("0xC36442b4a4522E871399CD717aBDD847Ab11FE88"), "UniswapV3PositionManager"}, // Official Position Manager // Additional official routers {common.HexToAddress("0xa51afafe0263b40edaef0df8781ea9aa03e381a3"), "UniversalRouter"}, // Universal Router {common.HexToAddress("0x4C60051384bd2d3C01bfc845Cf5F4b44bcbE9de5"), "GMX Router"}, // GMX DEX Router // Popular Arbitrum pools (verified high volume pools) {common.HexToAddress("0xC6962004f452bE9203591991D15f6b388e09E8D0"), "USDC/WETH UniswapV3 0.05%"}, // High volume pool {common.HexToAddress("0x17c14D2c404D167802b16C450d3c99F88F2c4F4d"), "USDC/WETH UniswapV3 0.3%"}, // High volume pool {common.HexToAddress("0x2f5e87C9312fa29aed5c179E456625D79015299c"), "WBTC/WETH UniswapV3 0.05%"}, // High volume pool {common.HexToAddress("0x149e36E72726e0BceA5c59d40df2c43F60f5A22D"), "WBTC/WETH UniswapV3 0.3%"}, // High volume pool {common.HexToAddress("0x641C00A822e8b671738d32a431a4Fb6074E5c79d"), "USDT/WETH UniswapV3 0.05%"}, // High volume pool {common.HexToAddress("0xFe7D6a84287235C7b4b57C4fEb9a44d4C6Ed3BB8"), "ARB/WETH UniswapV3 0.05%"}, // ARB native token pool } // Define common DEX event signatures eventSignatures := []common.Hash{ common.HexToHash("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"), // Swap (Uniswap V2) common.HexToHash("0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"), // Swap (Uniswap V3) common.HexToHash("0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f"), // Mint (Uniswap V2) common.HexToHash("0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"), // Burn (Uniswap V2) common.HexToHash("0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118"), // Mint (Uniswap V3) common.HexToHash("0x0c396cd989a39f49a56c8a608a0409f2075c6b60e9c44533b5cf87abdbe393f1"), // Burn (Uniswap V3) } // Create filter query for DEX events addresses := make([]common.Address, len(dexContracts)) for i, dex := range dexContracts { addresses[i] = dex.Address } topics := [][]common.Hash{{}} topics[0] = eventSignatures query := ethereum.FilterQuery{ Addresses: addresses, Topics: topics, } // Subscribe to logs logs := make(chan types.Log) sub, err := m.client.SubscribeFilterLogs(context.Background(), query, logs) if err != nil { return fmt.Errorf("failed to subscribe to DEX events: %v", err) } m.logger.Info("Subscribed to DEX events") // Process logs in a goroutine go func() { defer func() { if r := recover(); r != nil { m.logger.Error(fmt.Sprintf("Panic in DEX event processor: %v", r)) } }() defer sub.Unsubscribe() for { select { case log := <-logs: m.processDEXEvent(ctx, log) case err := <-sub.Err(): if err != nil { m.logger.Error(fmt.Sprintf("DEX event subscription error: %v", err)) } return case <-ctx.Done(): return } } }() return nil } // processDEXEvent processes a DEX event log func (m *ArbitrumMonitor) processDEXEvent(ctx context.Context, log types.Log) { m.logger.Debug(fmt.Sprintf("Processing DEX event from contract %s, topic count: %d", log.Address.Hex(), len(log.Topics))) // Check if this is a swap event if len(log.Topics) > 0 { eventSig := log.Topics[0] // Check for common swap event signatures switch eventSig.Hex() { case "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822": // Uniswap V2 Swap m.logger.Info(fmt.Sprintf("Uniswap V2 Swap event detected: Contract=%s, TxHash=%s", log.Address.Hex(), log.TxHash.Hex())) case "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67": // Uniswap V3 Swap m.logger.Info(fmt.Sprintf("Uniswap V3 Swap event detected: Contract=%s, TxHash=%s", log.Address.Hex(), log.TxHash.Hex())) case "0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f": // Uniswap V2 Mint m.logger.Info(fmt.Sprintf("Uniswap V2 Mint event detected: Contract=%s, TxHash=%s", log.Address.Hex(), log.TxHash.Hex())) case "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde": // Uniswap V2 Burn m.logger.Info(fmt.Sprintf("Uniswap V2 Burn event detected: Contract=%s, TxHash=%s", log.Address.Hex(), log.TxHash.Hex())) case "0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118": // Uniswap V3 Mint m.logger.Info(fmt.Sprintf("Uniswap V3 Mint event detected: Contract=%s, TxHash=%s", log.Address.Hex(), log.TxHash.Hex())) case "0x0c396cd989a39f49a56c8a608a0409f2075c6b60e9c44533b5cf87abdbe393f1": // Uniswap V3 Burn m.logger.Info(fmt.Sprintf("Uniswap V3 Burn event detected: Contract=%s, TxHash=%s", log.Address.Hex(), log.TxHash.Hex())) default: m.logger.Debug(fmt.Sprintf("Other DEX event detected: Contract=%s, EventSig=%s, TxHash=%s", log.Address.Hex(), eventSig.Hex(), log.TxHash.Hex())) } // Fetch transaction receipt for detailed analysis receipt, err := m.client.TransactionReceipt(ctx, log.TxHash) if err != nil { m.logger.Error(fmt.Sprintf("Failed to fetch receipt for transaction %s: %v", log.TxHash.Hex(), err)) return } // Process the transaction through the pipeline // This will parse the DEX events and look for arbitrage opportunities m.processTransactionReceipt(ctx, receipt, log.BlockNumber, log.BlockHash) } } // processTransactionReceipt processes a transaction receipt for DEX events func (m *ArbitrumMonitor) processTransactionReceipt(ctx context.Context, receipt *types.Receipt, blockNumber uint64, blockHash common.Hash) { if receipt == nil { return } m.logger.Debug(fmt.Sprintf("Processing transaction receipt %s from block %d", receipt.TxHash.Hex(), blockNumber)) // Process transaction logs for DEX events dexEvents := 0 for _, log := range receipt.Logs { if len(log.Topics) > 0 { eventSig := log.Topics[0] // Check for common DEX event signatures switch eventSig.Hex() { case "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822": // Uniswap V2 Swap m.logger.Info(fmt.Sprintf("DEX Swap event detected in transaction %s: Uniswap V2", receipt.TxHash.Hex())) dexEvents++ case "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67": // Uniswap V3 Swap m.logger.Info(fmt.Sprintf("DEX Swap event detected in transaction %s: Uniswap V3", receipt.TxHash.Hex())) dexEvents++ case "0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f": // Uniswap V2 Mint m.logger.Info(fmt.Sprintf("DEX Mint event detected in transaction %s: Uniswap V2", receipt.TxHash.Hex())) dexEvents++ case "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde": // Uniswap V2 Burn m.logger.Info(fmt.Sprintf("DEX Burn event detected in transaction %s: Uniswap V2", receipt.TxHash.Hex())) dexEvents++ case "0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118": // Uniswap V3 Mint m.logger.Info(fmt.Sprintf("DEX Mint event detected in transaction %s: Uniswap V3", receipt.TxHash.Hex())) dexEvents++ case "0x0c396cd989a39f49a56c8a608a0409f2075c6b60e9c44533b5cf87abdbe393f1": // Uniswap V3 Burn m.logger.Info(fmt.Sprintf("DEX Burn event detected in transaction %s: Uniswap V3", receipt.TxHash.Hex())) dexEvents++ } } } if dexEvents > 0 { m.logger.Info(fmt.Sprintf("Transaction %s contains %d DEX events", receipt.TxHash.Hex(), dexEvents)) } // Create a minimal transaction for the pipeline // This is just a stub since we don't have the full transaction data tx := types.NewTransaction(0, common.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) // Create a slice with just this transaction transactions := []*types.Transaction{tx} // Process through the pipeline if err := m.pipeline.ProcessTransactions(ctx, transactions, blockNumber, uint64(time.Now().Unix())); err != nil { m.logger.Error(fmt.Sprintf("Pipeline processing error for receipt %s: %v", receipt.TxHash.Hex(), err)) } } // processTransaction analyzes a transaction for potential swap opportunities func (m *ArbitrumMonitor) processTransaction(ctx context.Context, tx *types.Transaction) error { // Check if this is a potential swap transaction // This is a simplified check - in practice, you would check for // specific function signatures of Uniswap-like contracts // For now, we'll just log all transactions from, err := m.client.TransactionSender(ctx, tx, common.Hash{}, 0) if err != nil { // This can happen for pending transactions from = common.HexToAddress("0x0") } m.logger.Debug(fmt.Sprintf("Transaction: %s, From: %s, To: %s, Value: %s ETH", tx.Hash().Hex(), from.Hex(), func() string { if tx.To() != nil { return tx.To().Hex() } return "contract creation" }(), new(big.Float).Quo(new(big.Float).SetInt(tx.Value()), big.NewFloat(1e18)).String(), )) // TODO: Add logic to detect swap transactions and analyze them // This would involve: // 1. Checking if the transaction is calling a Uniswap-like contract // 2. Decoding the swap function call // 3. Extracting the token addresses and amounts // 4. Calculating potential price impact return nil } // GetPendingTransactions retrieves pending transactions from the mempool func (m *ArbitrumMonitor) GetPendingTransactions(ctx context.Context) ([]*types.Transaction, error) { // This is a simplified implementation // In practice, you might need to use a different approach to access pending transactions // Query for pending transactions txs := make([]*types.Transaction, 0) return txs, nil } // getTransactionReceiptWithRetry attempts to get a transaction receipt with exponential backoff retry func (m *ArbitrumMonitor) getTransactionReceiptWithRetry(ctx context.Context, txHash common.Hash, maxRetries int) (*types.Receipt, error) { for attempt := 0; attempt < maxRetries; attempt++ { m.logger.Debug(fmt.Sprintf("Attempting to fetch receipt for transaction %s (attempt %d/%d)", txHash.Hex(), attempt+1, maxRetries)) // Try to fetch the transaction receipt receipt, err := m.client.TransactionReceipt(ctx, txHash) if err == nil { m.logger.Debug(fmt.Sprintf("Successfully fetched receipt for transaction %s on attempt %d", txHash.Hex(), attempt+1)) return receipt, nil } // Check for specific error types that shouldn't be retried if ctx.Err() != nil { return nil, ctx.Err() } // Log retry attempt for other errors if attempt < maxRetries-1 { backoffDuration := time.Duration(1<