feat(testing): add Anvil fork local testing infrastructure
Complete local testing setup with Anvil fork of Arbitrum mainnet: Infrastructure: - Docker Compose orchestration (Anvil, MEV Bot, Prometheus, Grafana) - Anvil fork configuration with 1-second blocks - Multi-stage Dockerfile for optimized builds - Health checks and auto-restart policies Configuration: - Comprehensive .env.example with all parameters - Prometheus metrics collection setup - Grafana datasource provisioning - .gitignore to prevent committing secrets Testing Scripts: - setup-local-fork.sh: Initialize fork and fund test wallet - create-test-swap.sh: Generate test swaps for bot detection - Both scripts include validation and helpful output Integration Components: - pkg/sequencer/reader.go: WebSocket reader for pending transactions - Worker pool pattern (10 workers) - <50ms processing target - Front-running capability - Auto-reconnection with exponential backoff - pkg/pools/discovery.go: Pool discovery service - UniswapV2-style pools (SushiSwap, Camelot) - UniswapV3 pools (multiple fee tiers) - Factory contract queries - Liquidity filtering Documentation: - TESTING.md: Complete testing guide - Quick start instructions - Testing scenarios - Monitoring and debugging - Performance benchmarks - Troubleshooting guide This enables safe local testing without deploying to public testnet, using real Arbitrum mainnet state forked locally with Anvil. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
461
pkg/sequencer/reader.go
Normal file
461
pkg/sequencer/reader.go
Normal file
@@ -0,0 +1,461 @@
|
||||
package sequencer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math/big"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethclient"
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"github.com/your-org/mev-bot/pkg/arbitrage"
|
||||
"github.com/your-org/mev-bot/pkg/cache"
|
||||
"github.com/your-org/mev-bot/pkg/execution"
|
||||
"github.com/your-org/mev-bot/pkg/parsers"
|
||||
"github.com/your-org/mev-bot/pkg/validation"
|
||||
)
|
||||
|
||||
// ReaderConfig contains configuration for the sequencer reader
|
||||
type ReaderConfig struct {
|
||||
// WebSocket connection
|
||||
WSURL string
|
||||
ReconnectDelay time.Duration
|
||||
MaxReconnectDelay time.Duration
|
||||
PingInterval time.Duration
|
||||
|
||||
// RPC for fetching full transactions
|
||||
RPCURL string
|
||||
|
||||
// Processing
|
||||
WorkerCount int
|
||||
BufferSize int
|
||||
|
||||
// Filtering
|
||||
MinProfit *big.Int
|
||||
EnableFrontRunning bool
|
||||
|
||||
// Performance
|
||||
MaxProcessingTime time.Duration
|
||||
}
|
||||
|
||||
// DefaultReaderConfig returns default configuration
|
||||
func DefaultReaderConfig() *ReaderConfig {
|
||||
return &ReaderConfig{
|
||||
WSURL: "wss://arb1.arbitrum.io/ws",
|
||||
ReconnectDelay: 1 * time.Second,
|
||||
MaxReconnectDelay: 60 * time.Second,
|
||||
PingInterval: 30 * time.Second,
|
||||
RPCURL: "https://arb1.arbitrum.io/rpc",
|
||||
WorkerCount: 10,
|
||||
BufferSize: 1000,
|
||||
MinProfit: big.NewInt(0.01e18), // 0.01 ETH
|
||||
EnableFrontRunning: true,
|
||||
MaxProcessingTime: 50 * time.Millisecond,
|
||||
}
|
||||
}
|
||||
|
||||
// Reader reads pending transactions from the Arbitrum sequencer
|
||||
type Reader struct {
|
||||
config *ReaderConfig
|
||||
logger *slog.Logger
|
||||
|
||||
// Components
|
||||
parsers *parsers.Factory
|
||||
validator *validation.Validator
|
||||
poolCache *cache.PoolCache
|
||||
detector *arbitrage.Detector
|
||||
executor *execution.Executor
|
||||
|
||||
// Connections
|
||||
wsConn *websocket.Conn
|
||||
rpcClient *ethclient.Client
|
||||
|
||||
// Channels
|
||||
txHashes chan string
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
// State
|
||||
mu sync.RWMutex
|
||||
connected bool
|
||||
lastProcessed time.Time
|
||||
processedCount uint64
|
||||
opportunityCount uint64
|
||||
executionCount uint64
|
||||
|
||||
// Metrics (placeholders for actual metrics)
|
||||
txReceived uint64
|
||||
txProcessed uint64
|
||||
parseErrors uint64
|
||||
validationErrors uint64
|
||||
opportunitiesFound uint64
|
||||
executionsAttempted uint64
|
||||
avgParseLatency time.Duration
|
||||
avgDetectLatency time.Duration
|
||||
avgExecuteLatency time.Duration
|
||||
}
|
||||
|
||||
// NewReader creates a new sequencer reader
|
||||
func NewReader(
|
||||
config *ReaderConfig,
|
||||
parsers *parsers.Factory,
|
||||
validator *validation.Validator,
|
||||
poolCache *cache.PoolCache,
|
||||
detector *arbitrage.Detector,
|
||||
executor *execution.Executor,
|
||||
logger *slog.Logger,
|
||||
) (*Reader, error) {
|
||||
if config == nil {
|
||||
config = DefaultReaderConfig()
|
||||
}
|
||||
|
||||
// Connect to RPC for fetching full transactions
|
||||
rpcClient, err := ethclient.Dial(config.RPCURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to RPC: %w", err)
|
||||
}
|
||||
|
||||
return &Reader{
|
||||
config: config,
|
||||
logger: logger.With("component", "sequencer_reader"),
|
||||
parsers: parsers,
|
||||
validator: validator,
|
||||
poolCache: poolCache,
|
||||
detector: detector,
|
||||
executor: executor,
|
||||
rpcClient: rpcClient,
|
||||
txHashes: make(chan string, config.BufferSize),
|
||||
stopCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start starts the sequencer reader
|
||||
func (r *Reader) Start(ctx context.Context) error {
|
||||
r.logger.Info("starting sequencer reader")
|
||||
|
||||
// Start workers
|
||||
for i := 0; i < r.config.WorkerCount; i++ {
|
||||
r.wg.Add(1)
|
||||
go r.worker(ctx, i)
|
||||
}
|
||||
|
||||
// Start connection manager
|
||||
r.wg.Add(1)
|
||||
go r.maintainConnection(ctx)
|
||||
|
||||
// Wait for context cancellation
|
||||
<-ctx.Done()
|
||||
|
||||
r.logger.Info("stopping sequencer reader")
|
||||
close(r.stopCh)
|
||||
r.wg.Wait()
|
||||
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// maintainConnection maintains the WebSocket connection with automatic reconnection
|
||||
func (r *Reader) maintainConnection(ctx context.Context) {
|
||||
defer r.wg.Done()
|
||||
|
||||
reconnectDelay := r.config.ReconnectDelay
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Connect to sequencer
|
||||
conn, err := r.connect(ctx)
|
||||
if err != nil {
|
||||
r.logger.Error("connection failed", "error", err, "retry_in", reconnectDelay)
|
||||
time.Sleep(reconnectDelay)
|
||||
|
||||
// Exponential backoff
|
||||
reconnectDelay *= 2
|
||||
if reconnectDelay > r.config.MaxReconnectDelay {
|
||||
reconnectDelay = r.config.MaxReconnectDelay
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Reset backoff on successful connection
|
||||
reconnectDelay = r.config.ReconnectDelay
|
||||
r.wsConn = conn
|
||||
|
||||
r.mu.Lock()
|
||||
r.connected = true
|
||||
r.mu.Unlock()
|
||||
|
||||
r.logger.Info("connected to sequencer")
|
||||
|
||||
// Subscribe to pending transactions
|
||||
if err := r.subscribe(ctx, conn); err != nil {
|
||||
r.logger.Error("subscription failed", "error", err)
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
// Read messages until connection fails
|
||||
if err := r.readMessages(ctx, conn); err != nil {
|
||||
r.logger.Error("connection lost", "error", err)
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
r.connected = false
|
||||
r.mu.Unlock()
|
||||
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// connect establishes a WebSocket connection
|
||||
func (r *Reader) connect(ctx context.Context) (*websocket.Conn, error) {
|
||||
dialer := websocket.DefaultDialer
|
||||
dialer.HandshakeTimeout = 10 * time.Second
|
||||
|
||||
conn, _, err := dialer.DialContext(ctx, r.config.WSURL, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dial failed: %w", err)
|
||||
}
|
||||
|
||||
// Set read/write deadlines
|
||||
conn.SetReadDeadline(time.Now().Add(r.config.PingInterval * 2))
|
||||
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// subscribe subscribes to pending transactions
|
||||
func (r *Reader) subscribe(ctx context.Context, conn *websocket.Conn) error {
|
||||
// Subscribe to newPendingTransactions
|
||||
sub := map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "eth_subscribe",
|
||||
"params": []interface{}{"newPendingTransactions"},
|
||||
}
|
||||
|
||||
if err := conn.WriteJSON(sub); err != nil {
|
||||
return fmt.Errorf("subscription write failed: %w", err)
|
||||
}
|
||||
|
||||
// Read subscription response
|
||||
var resp map[string]interface{}
|
||||
if err := conn.ReadJSON(&resp); err != nil {
|
||||
return fmt.Errorf("subscription response failed: %w", err)
|
||||
}
|
||||
|
||||
r.logger.Info("subscribed to pending transactions", "response", resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
// readMessages reads messages from the WebSocket connection
|
||||
func (r *Reader) readMessages(ctx context.Context, conn *websocket.Conn) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-r.stopCh:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Set read deadline
|
||||
conn.SetReadDeadline(time.Now().Add(r.config.PingInterval * 2))
|
||||
|
||||
var msg map[string]interface{}
|
||||
if err := conn.ReadJSON(&msg); err != nil {
|
||||
return fmt.Errorf("read failed: %w", err)
|
||||
}
|
||||
|
||||
// Extract transaction hash from notification
|
||||
if params, ok := msg["params"].(map[string]interface{}); ok {
|
||||
if result, ok := params["result"].(string); ok {
|
||||
// Send to worker pool
|
||||
select {
|
||||
case r.txHashes <- result:
|
||||
r.txReceived++
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
r.logger.Warn("tx buffer full, dropping tx")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// worker processes transaction hashes
|
||||
func (r *Reader) worker(ctx context.Context, id int) {
|
||||
defer r.wg.Done()
|
||||
|
||||
logger := r.logger.With("worker", id)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-r.stopCh:
|
||||
return
|
||||
case txHash := <-r.txHashes:
|
||||
if err := r.processTxHash(ctx, txHash); err != nil {
|
||||
logger.Debug("processing error", "tx", txHash, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processTxHash processes a transaction hash
|
||||
func (r *Reader) processTxHash(ctx context.Context, txHash string) error {
|
||||
startTime := time.Now()
|
||||
|
||||
// Enforce max processing time
|
||||
procCtx, cancel := context.WithTimeout(ctx, r.config.MaxProcessingTime)
|
||||
defer cancel()
|
||||
|
||||
// Fetch full transaction
|
||||
tx, isPending, err := r.rpcClient.TransactionByHash(procCtx, common.HexToHash(txHash))
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetch tx failed: %w", err)
|
||||
}
|
||||
|
||||
if !isPending {
|
||||
return nil // Skip already mined transactions
|
||||
}
|
||||
|
||||
parseStart := time.Now()
|
||||
|
||||
// Parse transaction events
|
||||
events, err := r.parsers.ParseTransaction(tx)
|
||||
if err != nil {
|
||||
r.parseErrors++
|
||||
return fmt.Errorf("parse failed: %w", err)
|
||||
}
|
||||
|
||||
if len(events) == 0 {
|
||||
return nil // No swap events
|
||||
}
|
||||
|
||||
r.avgParseLatency = time.Since(parseStart)
|
||||
|
||||
// Validate events
|
||||
validEvents := r.validator.FilterValid(events)
|
||||
if len(validEvents) == 0 {
|
||||
r.validationErrors++
|
||||
return nil
|
||||
}
|
||||
|
||||
detectStart := time.Now()
|
||||
|
||||
// Detect arbitrage opportunities for each swap
|
||||
for _, event := range validEvents {
|
||||
// Get input token from the swap
|
||||
inputToken := event.GetInputToken()
|
||||
|
||||
// Detect opportunities starting with this token
|
||||
opportunities, err := r.detector.DetectOpportunities(procCtx, inputToken)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
r.avgDetectLatency = time.Since(detectStart)
|
||||
|
||||
// Execute profitable opportunities
|
||||
for _, opp := range opportunities {
|
||||
if opp.NetProfit.Cmp(r.config.MinProfit) > 0 {
|
||||
r.opportunitiesFound++
|
||||
r.opportunityCount++
|
||||
|
||||
if r.config.EnableFrontRunning {
|
||||
execStart := time.Now()
|
||||
go r.executeFrontRun(ctx, opp, tx)
|
||||
r.avgExecuteLatency = time.Since(execStart)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r.txProcessed++
|
||||
r.processedCount++
|
||||
r.lastProcessed = time.Now()
|
||||
|
||||
totalLatency := time.Since(startTime)
|
||||
if totalLatency > r.config.MaxProcessingTime {
|
||||
r.logger.Warn("processing too slow", "latency", totalLatency, "target", r.config.MaxProcessingTime)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// executeFrontRun executes a front-running transaction
|
||||
func (r *Reader) executeFrontRun(ctx context.Context, opp *arbitrage.Opportunity, targetTx *types.Transaction) {
|
||||
r.executionsAttempted++
|
||||
r.executionCount++
|
||||
|
||||
r.logger.Info("front-running opportunity",
|
||||
"opportunity_id", opp.ID,
|
||||
"type", opp.Type,
|
||||
"profit", opp.NetProfit.String(),
|
||||
"roi", fmt.Sprintf("%.2f%%", opp.ROI*100),
|
||||
"target_tx", targetTx.Hash().Hex(),
|
||||
)
|
||||
|
||||
// Execute the arbitrage
|
||||
result, err := r.executor.Execute(ctx, opp)
|
||||
if err != nil {
|
||||
r.logger.Error("execution failed",
|
||||
"opportunity_id", opp.ID,
|
||||
"error", err,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if result.Success {
|
||||
r.logger.Info("execution succeeded",
|
||||
"opportunity_id", opp.ID,
|
||||
"tx_hash", result.TxHash.Hex(),
|
||||
"actual_profit", result.ActualProfit.String(),
|
||||
"gas_cost", result.GasCost.String(),
|
||||
"duration", result.Duration,
|
||||
)
|
||||
} else {
|
||||
r.logger.Warn("execution failed",
|
||||
"opportunity_id", opp.ID,
|
||||
"tx_hash", result.TxHash.Hex(),
|
||||
"error", result.Error,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// GetStats returns current statistics
|
||||
func (r *Reader) GetStats() map[string]interface{} {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
return map[string]interface{}{
|
||||
"connected": r.connected,
|
||||
"tx_received": r.txReceived,
|
||||
"tx_processed": r.txProcessed,
|
||||
"parse_errors": r.parseErrors,
|
||||
"validation_errors": r.validationErrors,
|
||||
"opportunities_found": r.opportunitiesFound,
|
||||
"executions_attempted": r.executionsAttempted,
|
||||
"avg_parse_latency": r.avgParseLatency.String(),
|
||||
"avg_detect_latency": r.avgDetectLatency.String(),
|
||||
"avg_execute_latency": r.avgExecuteLatency.String(),
|
||||
"last_processed": r.lastProcessed.Format(time.RFC3339),
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the sequencer reader
|
||||
func (r *Reader) Stop() {
|
||||
close(r.stopCh)
|
||||
}
|
||||
Reference in New Issue
Block a user