package main import ( "context" "fmt" "log/slog" "math/big" "os" "os/signal" "syscall" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/your-org/mev-bot/pkg/arbitrage" "github.com/your-org/mev-bot/pkg/cache" "github.com/your-org/mev-bot/pkg/execution" "github.com/your-org/mev-bot/pkg/observability" "github.com/your-org/mev-bot/pkg/parsers" "github.com/your-org/mev-bot/pkg/pools" "github.com/your-org/mev-bot/pkg/sequencer" mevtypes "github.com/your-org/mev-bot/pkg/types" "github.com/your-org/mev-bot/pkg/validation" ) func main() { // Initialize logger logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, })) logger.Info("🤖 Starting MEV Bot V2") logger.Info("================================") // Load configuration config, err := LoadConfig() if err != nil { logger.Error("failed to load configuration", "error", err) os.Exit(1) } logger.Info("configuration loaded", "chain_id", config.ChainID, "rpc_url", config.RPCURL, "wallet", config.WalletAddress.Hex(), ) // Log safety configuration for testing/monitoring logger.Info("================================") logger.Info("SAFETY CONFIGURATION") logger.Info("================================") logger.Info("execution settings", "dry_run_mode", config.DryRun, "enable_execution", config.EnableExecution, "enable_simulation", config.EnableSimulation, "enable_front_running", config.EnableFrontRunning, ) // Helper function to convert big.Int wei to ETH string (avoids Int64 overflow) weiToEth := func(wei *big.Int) string { if wei == nil { return "0.0000" } ethFloat := new(big.Float).SetInt(wei) ethFloat = ethFloat.Quo(ethFloat, big.NewFloat(1e18)) result, _ := ethFloat.Float64() return fmt.Sprintf("%.4f", result) } logger.Info("risk limits", "max_position_size_eth", weiToEth(config.MaxPositionSize), "max_daily_volume_eth", weiToEth(config.MaxDailyVolume), "max_slippage_bps", config.MaxSlippageBPS, "max_gas_price_gwei", config.MaxGasPrice, ) logger.Info("profit thresholds", "min_profit_eth", weiToEth(config.MinProfit), "min_roi_percent", fmt.Sprintf("%.2f%%", config.MinROI*100), "min_swap_amount_eth", weiToEth(config.MinSwapAmount), ) logger.Info("circuit breaker", "enabled", true, "max_consecutive_losses", config.MaxConsecutiveLosses, "max_hourly_loss_eth", weiToEth(config.MaxHourlyLoss), "max_daily_loss_eth", weiToEth(config.MaxDailyLoss), ) logger.Info("emergency stop", "file_path", config.EmergencyStopFile, "check_interval_seconds", 10, ) logger.Info("================================") // Initialize observability metrics := observability.NewMetrics("mev_bot") logger.Info("metrics initialized", "port", config.MetricsPort) // Initialize pool cache poolCache := cache.NewPoolCache() logger.Info("pool cache initialized") // Initialize parser factory parserFactory := parsers.NewFactory() // Register parsers parserFactory.RegisterParser(mevtypes.ProtocolUniswapV2, parsers.NewUniswapV2Parser(poolCache, logger)) parserFactory.RegisterParser(mevtypes.ProtocolUniswapV3, parsers.NewUniswapV3Parser(poolCache, logger)) parserFactory.RegisterParser(mevtypes.ProtocolCurve, parsers.NewCurveParser(poolCache, logger)) logger.Info("parsers registered", "count", 3) // Initialize validator validatorRules := validation.DefaultValidationRules() validatorRules.MinAmount = config.MinSwapAmount validatorRules.AllowedProtocols[mevtypes.ProtocolUniswapV2] = true validatorRules.AllowedProtocols[mevtypes.ProtocolUniswapV3] = true validatorRules.AllowedProtocols[mevtypes.ProtocolCurve] = true validatorRules.AllowedProtocols[mevtypes.ProtocolSushiSwap] = true validatorRules.AllowedProtocols[mevtypes.ProtocolCamelot] = true validator := validation.NewValidator(validatorRules) logger.Info("validator initialized") // Initialize arbitrage detector pathFinderConfig := &arbitrage.PathFinderConfig{ MaxHops: config.MaxHops, MaxPathsPerPair: config.MaxPaths, MinLiquidity: config.MinPoolLiquidity, } pathFinder := arbitrage.NewPathFinder(poolCache, pathFinderConfig, logger) gasEstimator := arbitrage.NewGasEstimator(nil, logger) calculatorConfig := &arbitrage.CalculatorConfig{ MinProfitWei: config.MinProfit, MinROI: config.MinROI, SlippageTolerance: float64(config.MaxSlippageBPS) / 10000.0, // Convert BPS to decimal } calculator := arbitrage.NewCalculator(calculatorConfig, gasEstimator, logger) detectorConfig := &arbitrage.DetectorConfig{ MaxConcurrentEvaluations: config.MaxConcurrentDetection, EvaluationTimeout: 5 * time.Second, MaxPathsToEvaluate: config.MaxPaths, } detector := arbitrage.NewDetector(detectorConfig, pathFinder, calculator, poolCache, logger) logger.Info("arbitrage detector initialized") // Initialize execution engine builderConfig := execution.DefaultTransactionBuilderConfig() builderConfig.DefaultSlippageBPS = config.MaxSlippageBPS builderConfig.MaxGasLimit = config.MaxGasLimit builder := execution.NewTransactionBuilder(builderConfig, big.NewInt(config.ChainID), logger) riskConfig := execution.DefaultRiskManagerConfig() riskConfig.MaxPositionSize = config.MaxPositionSize riskConfig.MaxDailyVolume = config.MaxDailyVolume riskConfig.MinProfitAfterGas = config.MinProfit riskConfig.MinROI = config.MinROI riskManager := execution.NewRiskManager(riskConfig, nil, logger) flashloanConfig := execution.DefaultFlashloanConfig() flashloanConfig.ExecutorContract = config.ExecutorContract flashloanMgr := execution.NewFlashloanManager(flashloanConfig, logger) executorConfig := &execution.ExecutorConfig{ PrivateKey: config.PrivateKey, WalletAddress: config.WalletAddress, RPCEndpoint: config.RPCURL, PrivateRPCEndpoint: config.PrivateRPCURL, UsePrivateRPC: config.UsePrivateRPC, ConfirmationBlocks: config.ConfirmationBlocks, TimeoutPerTx: config.TxTimeout, MaxRetries: config.MaxRetries, GasPriceStrategy: config.GasPriceStrategy, MonitorInterval: 1 * time.Second, CleanupInterval: 1 * time.Minute, } executor, err := execution.NewExecutor(executorConfig, builder, riskManager, flashloanMgr, logger) if err != nil { logger.Error("failed to initialize executor", "error", err) os.Exit(1) } logger.Info("execution engine initialized") // Initialize pool discovery discoveryConfig := &pools.DiscoveryConfig{ RPCURL: config.RPCURL, MaxPools: config.MaxPoolsToDiscover, MinLiquidity: config.MinPoolLiquidity, } discovery, err := pools.NewDiscovery(discoveryConfig, poolCache, logger) if err != nil { logger.Error("failed to initialize pool discovery", "error", err) os.Exit(1) } // Initialize sequencer reader seqConfig := &sequencer.ReaderConfig{ WSURL: config.SequencerWSURL, RPCURL: config.RPCURL, WorkerCount: config.WorkerCount, BufferSize: config.BufferSize, MinProfit: config.MinProfit, EnableFrontRunning: config.EnableFrontRunning, } seqReader, err := sequencer.NewReader(seqConfig, parserFactory, validator, poolCache, detector, executor, logger) if err != nil { logger.Error("failed to initialize sequencer reader", "error", err) os.Exit(1) } logger.Info("sequencer reader initialized") // Create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Discover pools logger.Info("🔍 Discovering pools...") if err := discovery.DiscoverAll(ctx); err != nil { logger.Error("pool discovery failed", "error", err) os.Exit(1) } stats := discovery.GetStats() logger.Info("✅ Pool discovery complete", "pools_discovered", stats["pools_discovered"], "pools_cached", stats["pools_cached"], ) count, _ := poolCache.Count(ctx) if count == 0 { logger.Error("no pools discovered - cannot continue") os.Exit(1) } // Start sequencer reader logger.Info("🚀 Starting sequencer reader...") go func() { if err := seqReader.Start(ctx); err != nil { logger.Error("sequencer reader failed", "error", err) cancel() } }() // Metrics server (placeholder - implement if needed) _ = metrics // TODO: Implement metrics server logger.Info("📊 Metrics initialized", "port", config.MetricsPort) // Start stats reporter go reportStats(ctx, logger, seqReader, poolCache, riskManager) // Start emergency stop monitor go monitorEmergencyStop(ctx, cancel, logger, config.EmergencyStopFile) logger.Info("✨ MEV Bot V2 is running") logger.Info("================================") logger.Info("Press Ctrl+C to stop") // Wait for interrupt or context cancellation sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) select { case <-sigCh: logger.Info("🛑 Interrupt signal received") case <-ctx.Done(): logger.Info("🛑 Emergency stop triggered") } logger.Info("🛑 Shutting down...") cancel() // Stop services seqReader.Stop() executor.Stop() logger.Info("👋 Shutdown complete") } // reportStats periodically reports statistics func reportStats(ctx context.Context, logger *slog.Logger, seqReader *sequencer.Reader, poolCache cache.PoolCache, riskManager *execution.RiskManager) { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: seqStats := seqReader.GetStats() riskStats := riskManager.GetStats() count, _ := poolCache.Count(ctx) logger.Info("📈 Stats Report", "pools_cached", count, "tx_processed", seqStats["tx_processed"], "opportunities_found", seqStats["opportunities_found"], "executions_attempted", seqStats["executions_attempted"], "circuit_breaker_open", riskStats["circuit_breaker_open"], "daily_volume", riskStats["daily_volume"], ) } } } // monitorEmergencyStop periodically checks for the emergency stop file func monitorEmergencyStop(ctx context.Context, cancel context.CancelFunc, logger *slog.Logger, emergencyStopFile string) { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() logger.Info("🚨 Emergency stop monitor started", "file_path", emergencyStopFile, "check_interval_seconds", 10, ) for { select { case <-ctx.Done(): return case <-ticker.C: // Check if emergency stop file exists if _, err := os.Stat(emergencyStopFile); err == nil { logger.Error("🚨 EMERGENCY STOP FILE DETECTED - Initiating shutdown", "file_path", emergencyStopFile, ) cancel() return } } } } // Config holds application configuration type Config struct { // Chain ChainID int64 RPCURL string WSURL string // Sequencer SequencerWSURL string // Wallet WalletAddress common.Address PrivateKey []byte // Execution ExecutorContract common.Address PrivateRPCURL string UsePrivateRPC bool ConfirmationBlocks uint64 TxTimeout time.Duration MaxRetries int GasPriceStrategy string MaxGasLimit uint64 MaxGasPrice uint64 EnableSimulation bool EnableFrontRunning bool EnableExecution bool DryRun bool // Safety MaxConsecutiveLosses int MaxHourlyLoss *big.Int MaxDailyLoss *big.Int EmergencyStopFile string // Arbitrage MaxHops int MaxPaths int MinProfit *big.Int MinROI float64 MaxSlippageBPS uint16 MinSwapAmount *big.Int MinPoolLiquidity *big.Int MaxConcurrentDetection int // Risk MaxPositionSize *big.Int MaxDailyVolume *big.Int // Discovery MaxPoolsToDiscover int // Performance WorkerCount int BufferSize int // Monitoring MetricsPort int } // LoadConfig loads configuration from environment variables func LoadConfig() (*Config, error) { // Get private key privateKeyHex := os.Getenv("PRIVATE_KEY") if privateKeyHex == "" { return nil, fmt.Errorf("PRIVATE_KEY not set") } privateKey, err := crypto.HexToECDSA(privateKeyHex) if err != nil { return nil, fmt.Errorf("invalid private key: %w", err) } walletAddress := crypto.PubkeyToAddress(privateKey.PublicKey) // Get executor contract (optional) - supports both naming conventions executorContract := common.HexToAddress(getEnvOrDefault("EXECUTOR_CONTRACT", getEnvOrDefault("CONTRACT_ARBITRAGE_EXECUTOR", "0x0000000000000000000000000000000000000000"))) return &Config{ // Chain ChainID: 42161, // Arbitrum RPCURL: getEnvOrDefault("RPC_URL", getEnvOrDefault("ARBITRUM_RPC_ENDPOINT", "https://arb1.arbitrum.io/rpc")), WSURL: getEnvOrDefault("WS_URL", getEnvOrDefault("ARBITRUM_WS_ENDPOINT", "wss://arb1.arbitrum.io/ws")), // Sequencer SequencerWSURL: getEnvOrDefault("SEQUENCER_WS_URL", getEnvOrDefault("ARBITRUM_WS_ENDPOINT", "wss://arb1.arbitrum.io/ws")), // Wallet WalletAddress: walletAddress, PrivateKey: crypto.FromECDSA(privateKey), // Execution ExecutorContract: executorContract, PrivateRPCURL: os.Getenv("PRIVATE_RPC_URL"), UsePrivateRPC: os.Getenv("USE_PRIVATE_RPC") == "true", ConfirmationBlocks: 1, TxTimeout: 5 * time.Minute, MaxRetries: 3, GasPriceStrategy: getEnvOrDefault("GAS_PRICE_STRATEGY", "fast"), MaxGasLimit: 3000000, MaxGasPrice: 50, // Default 50 gwei EnableSimulation: getEnvOrDefault("ENABLE_SIMULATION", "true") == "true", EnableFrontRunning: getEnvOrDefault("ENABLE_FRONT_RUNNING", "false") == "true", EnableExecution: getEnvOrDefault("ENABLE_EXECUTION", "false") == "true", DryRun: getEnvOrDefault("DRY_RUN_MODE", "true") == "true", // Safety MaxConsecutiveLosses: 3, // Default 3 consecutive losses MaxHourlyLoss: new(big.Int).SetUint64(100000000000000000), // Default 0.1 ETH (10^17 wei) MaxDailyLoss: new(big.Int).SetUint64(500000000000000000), // Default 0.5 ETH (5*10^17 wei) EmergencyStopFile: getEnvOrDefault("EMERGENCY_STOP_FILE", "/tmp/mev-bot-emergency-stop"), // Arbitrage MaxHops: 3, MaxPaths: 100, MinProfit: new(big.Int).SetUint64(10000000000000000), // 0.01 ETH (10^16 wei) MinROI: 0.01, // 1% MaxSlippageBPS: 200, // 2% MinSwapAmount: new(big.Int).SetUint64(1000000000000000), // 0.001 ETH (10^15 wei) MinPoolLiquidity: new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil), // 1 ETH MaxConcurrentDetection: 10, // Risk MaxPositionSize: new(big.Int).Mul(big.NewInt(10), new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil)), // 10 ETH MaxDailyVolume: new(big.Int).Mul(big.NewInt(100), new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil)), // 100 ETH // Discovery MaxPoolsToDiscover: 1000, // Performance WorkerCount: 10, BufferSize: 1000, // Monitoring MetricsPort: 9090, }, nil } // getEnvOrDefault gets an environment variable or returns a default value func getEnvOrDefault(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value } return defaultValue }