package sequencer import ( "context" "fmt" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/gorilla/websocket" "github.com/your-org/mev-bot/pkg/arbitrage" "github.com/your-org/mev-bot/pkg/cache" "github.com/your-org/mev-bot/pkg/execution" "github.com/your-org/mev-bot/pkg/metrics" "github.com/your-org/mev-bot/pkg/parsers" "github.com/your-org/mev-bot/pkg/validation" ) // ReaderConfig contains configuration for the sequencer reader type ReaderConfig struct { // WebSocket connection WSURL string ReconnectDelay time.Duration MaxReconnectDelay time.Duration PingInterval time.Duration // RPC for fetching full transactions RPCURL string // Processing WorkerCount int BufferSize int // Filtering MinProfit *big.Int EnableFrontRunning bool // Performance MaxProcessingTime time.Duration } // DefaultReaderConfig returns default configuration func DefaultReaderConfig() *ReaderConfig { return &ReaderConfig{ WSURL: "wss://arb1.arbitrum.io/ws", ReconnectDelay: 1 * time.Second, MaxReconnectDelay: 60 * time.Second, PingInterval: 30 * time.Second, RPCURL: "https://arb1.arbitrum.io/rpc", WorkerCount: 10, BufferSize: 1000, MinProfit: big.NewInt(0.01e18), // 0.01 ETH EnableFrontRunning: true, MaxProcessingTime: 50 * time.Millisecond, } } // Reader reads pending transactions from the Arbitrum sequencer type Reader struct { config *ReaderConfig logger log.Logger // Components parsers parsers.Factory validator validation.Validator poolCache cache.PoolCache detector *arbitrage.Detector executor *execution.Executor swapFilter *SwapFilter // NEW: Swap filter for processing sequencer feed // Connections wsConn *websocket.Conn rpcClient *ethclient.Client // Channels swapEvents chan *SwapEvent // Changed from txHashes to pass full swap events stopCh chan struct{} wg sync.WaitGroup // State (protected by RWMutex) mu sync.RWMutex connected bool lastProcessed time.Time processedCount uint64 opportunityCount uint64 executionCount uint64 // NOTE: Metrics are now handled by pkg/metrics (Prometheus) // No local atomic counters needed - metrics package handles thread safety } // NewReader creates a new sequencer reader func NewReader( config *ReaderConfig, parsers parsers.Factory, validator validation.Validator, poolCache cache.PoolCache, detector *arbitrage.Detector, executor *execution.Executor, logger log.Logger, ) (*Reader, error) { if config == nil { config = DefaultReaderConfig() } // Connect to RPC for fetching full transactions rpcClient, err := ethclient.Dial(config.RPCURL) if err != nil { return nil, fmt.Errorf("failed to connect to RPC: %w", err) } // Create swap filter with pool cache swapFilter := NewSwapFilter(&SwapFilterConfig{ SwapChannelSize: config.BufferSize, Logger: logger, PoolCacheFile: "data/discovered_pools.json", }) return &Reader{ config: config, logger: logger.New("component", "sequencer_reader"), parsers: parsers, validator: validator, poolCache: poolCache, detector: detector, executor: executor, swapFilter: swapFilter, rpcClient: rpcClient, swapEvents: make(chan *SwapEvent, config.BufferSize), stopCh: make(chan struct{}), }, nil } // Start starts the sequencer reader func (r *Reader) Start(ctx context.Context) error { r.logger.Info("starting sequencer reader", "workers", r.config.WorkerCount, "buffer_size", r.config.BufferSize, ) // Start swap filter workers (channel-based processing) if r.swapFilter != nil { for i := 0; i < r.config.WorkerCount; i++ { r.swapFilter.StartWorker(ctx, func(swap *SwapEvent) error { // Process swap event r.logger.Info("🔄 SWAP DETECTED", "protocol", swap.Protocol.Name, "version", swap.Protocol.Version, "type", swap.Protocol.Type, "hash", swap.TxHash, "pool", swap.Pool.Address.Hex(), "seq", swap.SeqNumber, "block", swap.BlockNumber, ) // Send full swap event to arbitrage detection pipeline select { case r.swapEvents <- swap: // Successfully queued for arbitrage detection default: r.logger.Warn("arbitrage queue full", "tx", swap.TxHash) } return nil }) } } // Start existing workers for arbitrage detection for i := 0; i < r.config.WorkerCount; i++ { r.wg.Add(1) go r.worker(ctx, i) } // Start connection manager r.wg.Add(1) go r.maintainConnection(ctx) // Wait for context cancellation <-ctx.Done() r.logger.Info("stopping sequencer reader") close(r.stopCh) // Stop swap filter if r.swapFilter != nil { r.swapFilter.Stop() } r.wg.Wait() return ctx.Err() } // maintainConnection maintains the WebSocket connection with automatic reconnection func (r *Reader) maintainConnection(ctx context.Context) { defer r.wg.Done() reconnectDelay := r.config.ReconnectDelay for { select { case <-ctx.Done(): return default: } // Connect to sequencer conn, err := r.connect(ctx) if err != nil { r.logger.Error("connection failed", "error", err, "retry_in", reconnectDelay) time.Sleep(reconnectDelay) // Exponential backoff reconnectDelay *= 2 if reconnectDelay > r.config.MaxReconnectDelay { reconnectDelay = r.config.MaxReconnectDelay } continue } // Reset backoff on successful connection reconnectDelay = r.config.ReconnectDelay r.wsConn = conn r.mu.Lock() r.connected = true r.mu.Unlock() r.logger.Info("connected to sequencer") // Arbitrum sequencer feed broadcasts immediately - no subscription needed // Just start reading messages // Read messages until connection fails if err := r.readMessages(ctx, conn); err != nil { r.logger.Error("connection lost", "error", err) } r.mu.Lock() r.connected = false r.mu.Unlock() conn.Close() } } // connect establishes a WebSocket connection func (r *Reader) connect(ctx context.Context) (*websocket.Conn, error) { dialer := websocket.DefaultDialer dialer.HandshakeTimeout = 10 * time.Second conn, _, err := dialer.DialContext(ctx, r.config.WSURL, nil) if err != nil { return nil, fmt.Errorf("dial failed: %w", err) } // Set read/write deadlines conn.SetReadDeadline(time.Now().Add(r.config.PingInterval * 2)) conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) return conn, nil } // subscribe is not needed for Arbitrum sequencer feed // The feed broadcasts messages immediately after connection // Kept for compatibility but does nothing func (r *Reader) subscribe(ctx context.Context, conn *websocket.Conn) error { return nil } // readMessages reads messages from the WebSocket connection func (r *Reader) readMessages(ctx context.Context, conn *websocket.Conn) error { for { select { case <-ctx.Done(): return ctx.Err() case <-r.stopCh: return nil default: } // Set read deadline conn.SetReadDeadline(time.Now().Add(r.config.PingInterval * 2)) var msg map[string]interface{} if err := conn.ReadJSON(&msg); err != nil { return fmt.Errorf("read failed: %w", err) } // Arbitrum sequencer feed format: {"messages": [...]} if messages, ok := msg["messages"].([]interface{}); ok { for _, m := range messages { if msgMap, ok := m.(map[string]interface{}); ok { metrics.MessagesReceived.Inc() // Pass message to swap filter for processing if r.swapFilter != nil { r.swapFilter.ProcessMessage(msgMap) } } } } } } // worker processes transaction hashes func (r *Reader) worker(ctx context.Context, id int) { defer r.wg.Done() logger := r.logger.New("worker", id) for { select { case <-ctx.Done(): return case <-r.stopCh: return case swapEvent := <-r.swapEvents: if err := r.processSwapEvent(ctx, swapEvent); err != nil { logger.Debug("processing error", "tx", swapEvent.TxHash, "error", err) } } } } // processSwapEvent processes a swap event with transaction data already decoded func (r *Reader) processSwapEvent(ctx context.Context, swapEvent *SwapEvent) error { startTime := time.Now() // Enforce max processing time procCtx, cancel := context.WithTimeout(ctx, r.config.MaxProcessingTime) defer cancel() // Convert decoded transaction to *types.Transaction // This uses the transaction data we already received from the sequencer feed // NO BLOCKING RPC CALL - transaction is already decoded! tx, err := swapEvent.Transaction.ToEthereumTransaction() if err != nil { return fmt.Errorf("convert tx failed: %w", err) } parseStart := time.Now() // Parse transaction events (no receipt for pending transactions) events, err := r.parsers.ParseTransaction(procCtx, tx, nil) if err != nil { metrics.ParseErrors.Inc() return fmt.Errorf("parse failed: %w", err) } if len(events) == 0 { return nil // No swap events } metrics.ParseLatency.Observe(time.Since(parseStart).Seconds()) // Validate events validEvents := r.validator.FilterValid(procCtx, events) if len(validEvents) == 0 { metrics.ValidationErrors.Inc() return nil } detectStart := time.Now() // Detect arbitrage opportunities for each swap for _, event := range validEvents { // Get input token from the swap inputToken, _ := event.GetInputToken() // Detect opportunities starting with this token opportunities, err := r.detector.DetectOpportunities(procCtx, inputToken) if err != nil { continue } metrics.DetectionLatency.Observe(time.Since(detectStart).Seconds()) // Execute profitable opportunities for _, opp := range opportunities { if opp.NetProfit.Cmp(r.config.MinProfit) > 0 { metrics.RecordOpportunity("arbitrage") r.opportunityCount++ if r.config.EnableFrontRunning { execStart := time.Now() go r.executeFrontRun(ctx, opp, tx) metrics.ExecutionLatency.Observe(time.Since(execStart).Seconds()) } } } } metrics.TransactionsProcessed.Inc() r.processedCount++ r.lastProcessed = time.Now() totalLatency := time.Since(startTime) if totalLatency > r.config.MaxProcessingTime { r.logger.Warn("processing too slow", "latency", totalLatency, "target", r.config.MaxProcessingTime) } return nil } // executeFrontRun executes a front-running transaction func (r *Reader) executeFrontRun(ctx context.Context, opp *arbitrage.Opportunity, targetTx *types.Transaction) { metrics.ExecutionsAttempted.Inc() r.executionCount++ r.logger.Info("front-running opportunity", "opportunity_id", opp.ID, "type", opp.Type, "profit", opp.NetProfit.String(), "roi", fmt.Sprintf("%.2f%%", opp.ROI*100), "target_tx", targetTx.Hash().Hex(), ) // Execute the arbitrage result, err := r.executor.Execute(ctx, opp) if err != nil { r.logger.Error("execution failed", "opportunity_id", opp.ID, "error", err, ) return } if result.Success { r.logger.Info("execution succeeded", "opportunity_id", opp.ID, "tx_hash", result.TxHash.Hex(), "actual_profit", result.ActualProfit.String(), "gas_cost", result.GasCost.String(), "duration", result.Duration, ) } else { r.logger.Warn("execution failed", "opportunity_id", opp.ID, "tx_hash", result.TxHash.Hex(), "error", result.Error, ) } } // GetStats returns current statistics // NOTE: Detailed metrics are now available via Prometheus /metrics endpoint // This returns only basic connection state and local counters func (r *Reader) GetStats() map[string]interface{} { r.mu.RLock() defer r.mu.RUnlock() return map[string]interface{}{ "connected": r.connected, "processed_count": r.processedCount, "opportunity_count": r.opportunityCount, "execution_count": r.executionCount, "last_processed": r.lastProcessed.Format(time.RFC3339), "metrics_endpoint": "/metrics (Prometheus format)", } } // Stop stops the sequencer reader func (r *Reader) Stop() { close(r.stopCh) }