CRITICAL FIX: Eliminated blocking RPC call in reader.go that was fetching transaction data we already had from the sequencer feed. Changes for consistency and reusability: 1. Added RawBytes field to DecodedTransaction to store RLP-encoded transaction 2. Created reusable ToEthereumTransaction() method for type conversion 3. Changed channel from 'chan string' (txHashes) to 'chan *SwapEvent' (swapEvents) 4. Updated processSwapEvent to use transaction from swap event instead of RPC Impact: - REMOVES blocking RPC call from hot path (pkg/sequencer/reader.go:357) - Eliminates network latency from transaction processing pipeline - Uses data already available from Arbitrum sequencer feed - Improves throughput and reduces RPC dependency This fixes the #1 CRITICAL blocker for production deployment identified in PRODUCTION_READINESS.md. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
491 lines
12 KiB
Go
491 lines
12 KiB
Go
package sequencer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"math/big"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/ethclient"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/your-org/mev-bot/pkg/arbitrage"
|
|
"github.com/your-org/mev-bot/pkg/cache"
|
|
"github.com/your-org/mev-bot/pkg/execution"
|
|
"github.com/your-org/mev-bot/pkg/parsers"
|
|
"github.com/your-org/mev-bot/pkg/validation"
|
|
)
|
|
|
|
// ReaderConfig contains configuration for the sequencer reader
|
|
type ReaderConfig struct {
|
|
// WebSocket connection
|
|
WSURL string
|
|
ReconnectDelay time.Duration
|
|
MaxReconnectDelay time.Duration
|
|
PingInterval time.Duration
|
|
|
|
// RPC for fetching full transactions
|
|
RPCURL string
|
|
|
|
// Processing
|
|
WorkerCount int
|
|
BufferSize int
|
|
|
|
// Filtering
|
|
MinProfit *big.Int
|
|
EnableFrontRunning bool
|
|
|
|
// Performance
|
|
MaxProcessingTime time.Duration
|
|
}
|
|
|
|
// DefaultReaderConfig returns default configuration
|
|
func DefaultReaderConfig() *ReaderConfig {
|
|
return &ReaderConfig{
|
|
WSURL: "wss://arb1.arbitrum.io/ws",
|
|
ReconnectDelay: 1 * time.Second,
|
|
MaxReconnectDelay: 60 * time.Second,
|
|
PingInterval: 30 * time.Second,
|
|
RPCURL: "https://arb1.arbitrum.io/rpc",
|
|
WorkerCount: 10,
|
|
BufferSize: 1000,
|
|
MinProfit: big.NewInt(0.01e18), // 0.01 ETH
|
|
EnableFrontRunning: true,
|
|
MaxProcessingTime: 50 * time.Millisecond,
|
|
}
|
|
}
|
|
|
|
// Reader reads pending transactions from the Arbitrum sequencer
|
|
type Reader struct {
|
|
config *ReaderConfig
|
|
logger *slog.Logger
|
|
|
|
// Components
|
|
parsers parsers.Factory
|
|
validator validation.Validator
|
|
poolCache cache.PoolCache
|
|
detector *arbitrage.Detector
|
|
executor *execution.Executor
|
|
swapFilter *SwapFilter // NEW: Swap filter for processing sequencer feed
|
|
|
|
// Connections
|
|
wsConn *websocket.Conn
|
|
rpcClient *ethclient.Client
|
|
|
|
// Channels
|
|
swapEvents chan *SwapEvent // Changed from txHashes to pass full swap events
|
|
stopCh chan struct{}
|
|
wg sync.WaitGroup
|
|
|
|
// State (protected by RWMutex)
|
|
mu sync.RWMutex
|
|
connected bool
|
|
lastProcessed time.Time
|
|
processedCount uint64
|
|
opportunityCount uint64
|
|
executionCount uint64
|
|
|
|
// Metrics (atomic operations - thread-safe without mutex)
|
|
txReceived atomic.Uint64
|
|
txProcessed atomic.Uint64
|
|
parseErrors atomic.Uint64
|
|
validationErrors atomic.Uint64
|
|
opportunitiesFound atomic.Uint64
|
|
executionsAttempted atomic.Uint64
|
|
avgParseLatency atomic.Int64 // stored as nanoseconds
|
|
avgDetectLatency atomic.Int64 // stored as nanoseconds
|
|
avgExecuteLatency atomic.Int64 // stored as nanoseconds
|
|
}
|
|
|
|
// NewReader creates a new sequencer reader
|
|
func NewReader(
|
|
config *ReaderConfig,
|
|
parsers parsers.Factory,
|
|
validator validation.Validator,
|
|
poolCache cache.PoolCache,
|
|
detector *arbitrage.Detector,
|
|
executor *execution.Executor,
|
|
logger *slog.Logger,
|
|
) (*Reader, error) {
|
|
if config == nil {
|
|
config = DefaultReaderConfig()
|
|
}
|
|
|
|
// Connect to RPC for fetching full transactions
|
|
rpcClient, err := ethclient.Dial(config.RPCURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to RPC: %w", err)
|
|
}
|
|
|
|
// Create swap filter with pool cache
|
|
swapFilter := NewSwapFilter(&SwapFilterConfig{
|
|
SwapChannelSize: config.BufferSize,
|
|
Logger: loggerAdapter(logger),
|
|
PoolCacheFile: "data/discovered_pools.json",
|
|
})
|
|
|
|
return &Reader{
|
|
config: config,
|
|
logger: logger.With("component", "sequencer_reader"),
|
|
parsers: parsers,
|
|
validator: validator,
|
|
poolCache: poolCache,
|
|
detector: detector,
|
|
executor: executor,
|
|
swapFilter: swapFilter,
|
|
rpcClient: rpcClient,
|
|
swapEvents: make(chan *SwapEvent, config.BufferSize),
|
|
stopCh: make(chan struct{}),
|
|
}, nil
|
|
}
|
|
|
|
// loggerAdapter converts slog.Logger to log.Logger interface
|
|
func loggerAdapter(l *slog.Logger) log.Logger {
|
|
// For now, create a simple wrapper
|
|
// TODO: Implement proper adapter if needed
|
|
return log.Root()
|
|
}
|
|
|
|
// Start starts the sequencer reader
|
|
func (r *Reader) Start(ctx context.Context) error {
|
|
r.logger.Info("starting sequencer reader",
|
|
"workers", r.config.WorkerCount,
|
|
"buffer_size", r.config.BufferSize,
|
|
)
|
|
|
|
// Start swap filter workers (channel-based processing)
|
|
if r.swapFilter != nil {
|
|
for i := 0; i < r.config.WorkerCount; i++ {
|
|
r.swapFilter.StartWorker(ctx, func(swap *SwapEvent) error {
|
|
// Process swap event
|
|
r.logger.Info("🔄 SWAP DETECTED",
|
|
"protocol", swap.Protocol.Name,
|
|
"version", swap.Protocol.Version,
|
|
"type", swap.Protocol.Type,
|
|
"hash", swap.TxHash,
|
|
"pool", swap.Pool.Address.Hex(),
|
|
"seq", swap.SeqNumber,
|
|
"block", swap.BlockNumber,
|
|
)
|
|
|
|
// Send full swap event to arbitrage detection pipeline
|
|
select {
|
|
case r.swapEvents <- swap:
|
|
// Successfully queued for arbitrage detection
|
|
default:
|
|
r.logger.Warn("arbitrage queue full", "tx", swap.TxHash)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
|
|
// Start existing workers for arbitrage detection
|
|
for i := 0; i < r.config.WorkerCount; i++ {
|
|
r.wg.Add(1)
|
|
go r.worker(ctx, i)
|
|
}
|
|
|
|
// Start connection manager
|
|
r.wg.Add(1)
|
|
go r.maintainConnection(ctx)
|
|
|
|
// Wait for context cancellation
|
|
<-ctx.Done()
|
|
|
|
r.logger.Info("stopping sequencer reader")
|
|
close(r.stopCh)
|
|
|
|
// Stop swap filter
|
|
if r.swapFilter != nil {
|
|
r.swapFilter.Stop()
|
|
}
|
|
|
|
r.wg.Wait()
|
|
|
|
return ctx.Err()
|
|
}
|
|
|
|
// maintainConnection maintains the WebSocket connection with automatic reconnection
|
|
func (r *Reader) maintainConnection(ctx context.Context) {
|
|
defer r.wg.Done()
|
|
|
|
reconnectDelay := r.config.ReconnectDelay
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Connect to sequencer
|
|
conn, err := r.connect(ctx)
|
|
if err != nil {
|
|
r.logger.Error("connection failed", "error", err, "retry_in", reconnectDelay)
|
|
time.Sleep(reconnectDelay)
|
|
|
|
// Exponential backoff
|
|
reconnectDelay *= 2
|
|
if reconnectDelay > r.config.MaxReconnectDelay {
|
|
reconnectDelay = r.config.MaxReconnectDelay
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Reset backoff on successful connection
|
|
reconnectDelay = r.config.ReconnectDelay
|
|
r.wsConn = conn
|
|
|
|
r.mu.Lock()
|
|
r.connected = true
|
|
r.mu.Unlock()
|
|
|
|
r.logger.Info("connected to sequencer")
|
|
|
|
// Arbitrum sequencer feed broadcasts immediately - no subscription needed
|
|
// Just start reading messages
|
|
|
|
// Read messages until connection fails
|
|
if err := r.readMessages(ctx, conn); err != nil {
|
|
r.logger.Error("connection lost", "error", err)
|
|
}
|
|
|
|
r.mu.Lock()
|
|
r.connected = false
|
|
r.mu.Unlock()
|
|
|
|
conn.Close()
|
|
}
|
|
}
|
|
|
|
// connect establishes a WebSocket connection
|
|
func (r *Reader) connect(ctx context.Context) (*websocket.Conn, error) {
|
|
dialer := websocket.DefaultDialer
|
|
dialer.HandshakeTimeout = 10 * time.Second
|
|
|
|
conn, _, err := dialer.DialContext(ctx, r.config.WSURL, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dial failed: %w", err)
|
|
}
|
|
|
|
// Set read/write deadlines
|
|
conn.SetReadDeadline(time.Now().Add(r.config.PingInterval * 2))
|
|
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
// subscribe is not needed for Arbitrum sequencer feed
|
|
// The feed broadcasts messages immediately after connection
|
|
// Kept for compatibility but does nothing
|
|
func (r *Reader) subscribe(ctx context.Context, conn *websocket.Conn) error {
|
|
return nil
|
|
}
|
|
|
|
// readMessages reads messages from the WebSocket connection
|
|
func (r *Reader) readMessages(ctx context.Context, conn *websocket.Conn) error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-r.stopCh:
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
// Set read deadline
|
|
conn.SetReadDeadline(time.Now().Add(r.config.PingInterval * 2))
|
|
|
|
var msg map[string]interface{}
|
|
if err := conn.ReadJSON(&msg); err != nil {
|
|
return fmt.Errorf("read failed: %w", err)
|
|
}
|
|
|
|
// Arbitrum sequencer feed format: {"messages": [...]}
|
|
if messages, ok := msg["messages"].([]interface{}); ok {
|
|
for _, m := range messages {
|
|
if msgMap, ok := m.(map[string]interface{}); ok {
|
|
r.txReceived.Add(1)
|
|
|
|
// Pass message to swap filter for processing
|
|
if r.swapFilter != nil {
|
|
r.swapFilter.ProcessMessage(msgMap)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// worker processes transaction hashes
|
|
func (r *Reader) worker(ctx context.Context, id int) {
|
|
defer r.wg.Done()
|
|
|
|
logger := r.logger.With("worker", id)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-r.stopCh:
|
|
return
|
|
case swapEvent := <-r.swapEvents:
|
|
if err := r.processSwapEvent(ctx, swapEvent); err != nil {
|
|
logger.Debug("processing error", "tx", swapEvent.TxHash, "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processSwapEvent processes a swap event with transaction data already decoded
|
|
func (r *Reader) processSwapEvent(ctx context.Context, swapEvent *SwapEvent) error {
|
|
startTime := time.Now()
|
|
|
|
// Enforce max processing time
|
|
procCtx, cancel := context.WithTimeout(ctx, r.config.MaxProcessingTime)
|
|
defer cancel()
|
|
|
|
// Convert decoded transaction to *types.Transaction
|
|
// This uses the transaction data we already received from the sequencer feed
|
|
// NO BLOCKING RPC CALL - transaction is already decoded!
|
|
tx, err := swapEvent.Transaction.ToEthereumTransaction()
|
|
if err != nil {
|
|
return fmt.Errorf("convert tx failed: %w", err)
|
|
}
|
|
|
|
parseStart := time.Now()
|
|
|
|
// Parse transaction events (no receipt for pending transactions)
|
|
events, err := r.parsers.ParseTransaction(procCtx, tx, nil)
|
|
if err != nil {
|
|
r.parseErrors.Add(1)
|
|
return fmt.Errorf("parse failed: %w", err)
|
|
}
|
|
|
|
if len(events) == 0 {
|
|
return nil // No swap events
|
|
}
|
|
|
|
r.avgParseLatency.Store(time.Since(parseStart).Nanoseconds())
|
|
|
|
// Validate events
|
|
validEvents := r.validator.FilterValid(procCtx, events)
|
|
if len(validEvents) == 0 {
|
|
r.validationErrors.Add(1)
|
|
return nil
|
|
}
|
|
|
|
detectStart := time.Now()
|
|
|
|
// Detect arbitrage opportunities for each swap
|
|
for _, event := range validEvents {
|
|
// Get input token from the swap
|
|
inputToken, _ := event.GetInputToken()
|
|
|
|
// Detect opportunities starting with this token
|
|
opportunities, err := r.detector.DetectOpportunities(procCtx, inputToken)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
r.avgDetectLatency.Store(time.Since(detectStart).Nanoseconds())
|
|
|
|
// Execute profitable opportunities
|
|
for _, opp := range opportunities {
|
|
if opp.NetProfit.Cmp(r.config.MinProfit) > 0 {
|
|
r.opportunitiesFound.Add(1)
|
|
r.opportunityCount++
|
|
|
|
if r.config.EnableFrontRunning {
|
|
execStart := time.Now()
|
|
go r.executeFrontRun(ctx, opp, tx)
|
|
r.avgExecuteLatency.Store(time.Since(execStart).Nanoseconds())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
r.txProcessed.Add(1)
|
|
r.processedCount++
|
|
r.lastProcessed = time.Now()
|
|
|
|
totalLatency := time.Since(startTime)
|
|
if totalLatency > r.config.MaxProcessingTime {
|
|
r.logger.Warn("processing too slow", "latency", totalLatency, "target", r.config.MaxProcessingTime)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// executeFrontRun executes a front-running transaction
|
|
func (r *Reader) executeFrontRun(ctx context.Context, opp *arbitrage.Opportunity, targetTx *types.Transaction) {
|
|
r.executionsAttempted.Add(1)
|
|
r.executionCount++
|
|
|
|
r.logger.Info("front-running opportunity",
|
|
"opportunity_id", opp.ID,
|
|
"type", opp.Type,
|
|
"profit", opp.NetProfit.String(),
|
|
"roi", fmt.Sprintf("%.2f%%", opp.ROI*100),
|
|
"target_tx", targetTx.Hash().Hex(),
|
|
)
|
|
|
|
// Execute the arbitrage
|
|
result, err := r.executor.Execute(ctx, opp)
|
|
if err != nil {
|
|
r.logger.Error("execution failed",
|
|
"opportunity_id", opp.ID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
|
|
if result.Success {
|
|
r.logger.Info("execution succeeded",
|
|
"opportunity_id", opp.ID,
|
|
"tx_hash", result.TxHash.Hex(),
|
|
"actual_profit", result.ActualProfit.String(),
|
|
"gas_cost", result.GasCost.String(),
|
|
"duration", result.Duration,
|
|
)
|
|
} else {
|
|
r.logger.Warn("execution failed",
|
|
"opportunity_id", opp.ID,
|
|
"tx_hash", result.TxHash.Hex(),
|
|
"error", result.Error,
|
|
)
|
|
}
|
|
}
|
|
|
|
// GetStats returns current statistics
|
|
func (r *Reader) GetStats() map[string]interface{} {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"connected": r.connected,
|
|
"tx_received": r.txReceived.Load(),
|
|
"tx_processed": r.txProcessed.Load(),
|
|
"parse_errors": r.parseErrors.Load(),
|
|
"validation_errors": r.validationErrors.Load(),
|
|
"opportunities_found": r.opportunitiesFound.Load(),
|
|
"executions_attempted": r.executionsAttempted.Load(),
|
|
"avg_parse_latency": time.Duration(r.avgParseLatency.Load()).String(),
|
|
"avg_detect_latency": time.Duration(r.avgDetectLatency.Load()).String(),
|
|
"avg_execute_latency": time.Duration(r.avgExecuteLatency.Load()).String(),
|
|
"last_processed": r.lastProcessed.Format(time.RFC3339),
|
|
}
|
|
}
|
|
|
|
// Stop stops the sequencer reader
|
|
func (r *Reader) Stop() {
|
|
close(r.stopCh)
|
|
}
|