package sequencer import ( "context" "fmt" "log/slog" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/gorilla/websocket" "github.com/your-org/mev-bot/pkg/arbitrage" "github.com/your-org/mev-bot/pkg/cache" "github.com/your-org/mev-bot/pkg/execution" "github.com/your-org/mev-bot/pkg/parsers" "github.com/your-org/mev-bot/pkg/validation" ) // ReaderConfig contains configuration for the sequencer reader type ReaderConfig struct { // WebSocket connection WSURL string ReconnectDelay time.Duration MaxReconnectDelay time.Duration PingInterval time.Duration // RPC for fetching full transactions RPCURL string // Processing WorkerCount int BufferSize int // Filtering MinProfit *big.Int EnableFrontRunning bool // Performance MaxProcessingTime time.Duration } // DefaultReaderConfig returns default configuration func DefaultReaderConfig() *ReaderConfig { return &ReaderConfig{ WSURL: "wss://arb1.arbitrum.io/ws", ReconnectDelay: 1 * time.Second, MaxReconnectDelay: 60 * time.Second, PingInterval: 30 * time.Second, RPCURL: "https://arb1.arbitrum.io/rpc", WorkerCount: 10, BufferSize: 1000, MinProfit: big.NewInt(0.01e18), // 0.01 ETH EnableFrontRunning: true, MaxProcessingTime: 50 * time.Millisecond, } } // Reader reads pending transactions from the Arbitrum sequencer type Reader struct { config *ReaderConfig logger *slog.Logger // Components parsers parsers.Factory validator validation.Validator poolCache cache.PoolCache detector *arbitrage.Detector executor *execution.Executor // Connections wsConn *websocket.Conn rpcClient *ethclient.Client // Channels txHashes chan string stopCh chan struct{} wg sync.WaitGroup // State mu sync.RWMutex connected bool lastProcessed time.Time processedCount uint64 opportunityCount uint64 executionCount uint64 // Metrics (placeholders for actual metrics) txReceived uint64 txProcessed uint64 parseErrors uint64 validationErrors uint64 opportunitiesFound uint64 executionsAttempted uint64 avgParseLatency time.Duration avgDetectLatency time.Duration avgExecuteLatency time.Duration } // NewReader creates a new sequencer reader func NewReader( config *ReaderConfig, parsers parsers.Factory, validator validation.Validator, poolCache cache.PoolCache, detector *arbitrage.Detector, executor *execution.Executor, logger *slog.Logger, ) (*Reader, error) { if config == nil { config = DefaultReaderConfig() } // Connect to RPC for fetching full transactions rpcClient, err := ethclient.Dial(config.RPCURL) if err != nil { return nil, fmt.Errorf("failed to connect to RPC: %w", err) } return &Reader{ config: config, logger: logger.With("component", "sequencer_reader"), parsers: parsers, validator: validator, poolCache: poolCache, detector: detector, executor: executor, rpcClient: rpcClient, txHashes: make(chan string, config.BufferSize), stopCh: make(chan struct{}), }, nil } // Start starts the sequencer reader func (r *Reader) Start(ctx context.Context) error { r.logger.Info("starting sequencer reader") // Start workers for i := 0; i < r.config.WorkerCount; i++ { r.wg.Add(1) go r.worker(ctx, i) } // Start connection manager r.wg.Add(1) go r.maintainConnection(ctx) // Wait for context cancellation <-ctx.Done() r.logger.Info("stopping sequencer reader") close(r.stopCh) r.wg.Wait() return ctx.Err() } // maintainConnection maintains the WebSocket connection with automatic reconnection func (r *Reader) maintainConnection(ctx context.Context) { defer r.wg.Done() reconnectDelay := r.config.ReconnectDelay for { select { case <-ctx.Done(): return default: } // Connect to sequencer conn, err := r.connect(ctx) if err != nil { r.logger.Error("connection failed", "error", err, "retry_in", reconnectDelay) time.Sleep(reconnectDelay) // Exponential backoff reconnectDelay *= 2 if reconnectDelay > r.config.MaxReconnectDelay { reconnectDelay = r.config.MaxReconnectDelay } continue } // Reset backoff on successful connection reconnectDelay = r.config.ReconnectDelay r.wsConn = conn r.mu.Lock() r.connected = true r.mu.Unlock() r.logger.Info("connected to sequencer") // Subscribe to pending transactions if err := r.subscribe(ctx, conn); err != nil { r.logger.Error("subscription failed", "error", err) conn.Close() continue } // Read messages until connection fails if err := r.readMessages(ctx, conn); err != nil { r.logger.Error("connection lost", "error", err) } r.mu.Lock() r.connected = false r.mu.Unlock() conn.Close() } } // connect establishes a WebSocket connection func (r *Reader) connect(ctx context.Context) (*websocket.Conn, error) { dialer := websocket.DefaultDialer dialer.HandshakeTimeout = 10 * time.Second conn, _, err := dialer.DialContext(ctx, r.config.WSURL, nil) if err != nil { return nil, fmt.Errorf("dial failed: %w", err) } // Set read/write deadlines conn.SetReadDeadline(time.Now().Add(r.config.PingInterval * 2)) conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) return conn, nil } // subscribe subscribes to pending transactions func (r *Reader) subscribe(ctx context.Context, conn *websocket.Conn) error { // Subscribe to newPendingTransactions sub := map[string]interface{}{ "jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": []interface{}{"newPendingTransactions"}, } if err := conn.WriteJSON(sub); err != nil { return fmt.Errorf("subscription write failed: %w", err) } // Read subscription response var resp map[string]interface{} if err := conn.ReadJSON(&resp); err != nil { return fmt.Errorf("subscription response failed: %w", err) } r.logger.Info("subscribed to pending transactions", "response", resp) return nil } // readMessages reads messages from the WebSocket connection func (r *Reader) readMessages(ctx context.Context, conn *websocket.Conn) error { for { select { case <-ctx.Done(): return ctx.Err() case <-r.stopCh: return nil default: } // Set read deadline conn.SetReadDeadline(time.Now().Add(r.config.PingInterval * 2)) var msg map[string]interface{} if err := conn.ReadJSON(&msg); err != nil { return fmt.Errorf("read failed: %w", err) } // Extract transaction hash from notification if params, ok := msg["params"].(map[string]interface{}); ok { if result, ok := params["result"].(string); ok { // Send to worker pool select { case r.txHashes <- result: r.txReceived++ case <-ctx.Done(): return ctx.Err() default: r.logger.Warn("tx buffer full, dropping tx") } } } } } // worker processes transaction hashes func (r *Reader) worker(ctx context.Context, id int) { defer r.wg.Done() logger := r.logger.With("worker", id) for { select { case <-ctx.Done(): return case <-r.stopCh: return case txHash := <-r.txHashes: if err := r.processTxHash(ctx, txHash); err != nil { logger.Debug("processing error", "tx", txHash, "error", err) } } } } // processTxHash processes a transaction hash func (r *Reader) processTxHash(ctx context.Context, txHash string) error { startTime := time.Now() // Enforce max processing time procCtx, cancel := context.WithTimeout(ctx, r.config.MaxProcessingTime) defer cancel() // Fetch full transaction tx, isPending, err := r.rpcClient.TransactionByHash(procCtx, common.HexToHash(txHash)) if err != nil { return fmt.Errorf("fetch tx failed: %w", err) } if !isPending { return nil // Skip already mined transactions } parseStart := time.Now() // Parse transaction events (no receipt for pending transactions) events, err := r.parsers.ParseTransaction(procCtx, tx, nil) if err != nil { r.parseErrors++ return fmt.Errorf("parse failed: %w", err) } if len(events) == 0 { return nil // No swap events } r.avgParseLatency = time.Since(parseStart) // Validate events validEvents := r.validator.FilterValid(procCtx, events) if len(validEvents) == 0 { r.validationErrors++ return nil } detectStart := time.Now() // Detect arbitrage opportunities for each swap for _, event := range validEvents { // Get input token from the swap inputToken, _ := event.GetInputToken() // Detect opportunities starting with this token opportunities, err := r.detector.DetectOpportunities(procCtx, inputToken) if err != nil { continue } r.avgDetectLatency = time.Since(detectStart) // Execute profitable opportunities for _, opp := range opportunities { if opp.NetProfit.Cmp(r.config.MinProfit) > 0 { r.opportunitiesFound++ r.opportunityCount++ if r.config.EnableFrontRunning { execStart := time.Now() go r.executeFrontRun(ctx, opp, tx) r.avgExecuteLatency = time.Since(execStart) } } } } r.txProcessed++ r.processedCount++ r.lastProcessed = time.Now() totalLatency := time.Since(startTime) if totalLatency > r.config.MaxProcessingTime { r.logger.Warn("processing too slow", "latency", totalLatency, "target", r.config.MaxProcessingTime) } return nil } // executeFrontRun executes a front-running transaction func (r *Reader) executeFrontRun(ctx context.Context, opp *arbitrage.Opportunity, targetTx *types.Transaction) { r.executionsAttempted++ r.executionCount++ r.logger.Info("front-running opportunity", "opportunity_id", opp.ID, "type", opp.Type, "profit", opp.NetProfit.String(), "roi", fmt.Sprintf("%.2f%%", opp.ROI*100), "target_tx", targetTx.Hash().Hex(), ) // Execute the arbitrage result, err := r.executor.Execute(ctx, opp) if err != nil { r.logger.Error("execution failed", "opportunity_id", opp.ID, "error", err, ) return } if result.Success { r.logger.Info("execution succeeded", "opportunity_id", opp.ID, "tx_hash", result.TxHash.Hex(), "actual_profit", result.ActualProfit.String(), "gas_cost", result.GasCost.String(), "duration", result.Duration, ) } else { r.logger.Warn("execution failed", "opportunity_id", opp.ID, "tx_hash", result.TxHash.Hex(), "error", result.Error, ) } } // GetStats returns current statistics func (r *Reader) GetStats() map[string]interface{} { r.mu.RLock() defer r.mu.RUnlock() return map[string]interface{}{ "connected": r.connected, "tx_received": r.txReceived, "tx_processed": r.txProcessed, "parse_errors": r.parseErrors, "validation_errors": r.validationErrors, "opportunities_found": r.opportunitiesFound, "executions_attempted": r.executionsAttempted, "avg_parse_latency": r.avgParseLatency.String(), "avg_detect_latency": r.avgDetectLatency.String(), "avg_execute_latency": r.avgExecuteLatency.String(), "last_processed": r.lastProcessed.Format(time.RFC3339), } } // Stop stops the sequencer reader func (r *Reader) Stop() { close(r.stopCh) }