// Package arbitrage provides the core arbitrage detection and analysis engine // for the MEV bot. This package is responsible for identifying profitable // arbitrage opportunities across multiple DEX protocols on Arbitrum. // // The detection engine continuously scans for price discrepancies between // exchanges and calculates potential profits after accounting for gas costs, // slippage, and other factors. It uses advanced mathematical models to // optimize trade sizing and minimize risks. package arbitrage import ( "context" "fmt" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/exchanges" "github.com/fraktal/mev-beta/pkg/math" "github.com/fraktal/mev-beta/pkg/types" ) // ArbitrageDetectionEngine is the core component responsible for discovering // profitable arbitrage opportunities in real-time across multiple DEX protocols. // The engine uses sophisticated algorithms to: // // 1. Continuously scan token pairs across supported exchanges // 2. Identify price discrepancies that exceed minimum profit thresholds // 3. Calculate optimal trade sizes considering slippage and gas costs // 4. Filter opportunities based on risk and confidence metrics // 5. Provide real-time opportunity feeds to execution systems // // The engine operates with configurable parameters for different trading strategies // and risk profiles, making it suitable for both conservative and aggressive MEV extraction. type ArbitrageDetectionEngine struct { // Core dependencies for arbitrage detection registry *exchanges.ExchangeRegistry // Registry of supported exchanges and their configurations calculator *math.ArbitrageCalculator // Mathematical engine for profit calculations gasEstimator math.GasEstimator // Gas cost estimation for transaction profitability logger *logger.Logger // Structured logging for monitoring and debugging decimalConverter *math.DecimalConverter // Handles precision math for token amounts // Callback function for handling discovered opportunities // This is typically connected to an execution engine opportunityHandler func(*types.ArbitrageOpportunity) // Configuration parameters that control detection behavior config DetectionConfig // State management for concurrent operation runningMutex sync.RWMutex // Protects running state from race conditions isRunning bool // Indicates if the engine is currently active stopChan chan struct{} // Channel for graceful shutdown signaling opportunityChan chan *types.ArbitrageOpportunity // Buffered channel for opportunity distribution // Performance tracking and metrics scanCount uint64 // Total number of scans performed since startup opportunityCount uint64 // Total number of opportunities discovered lastScanTime time.Time // Timestamp of the most recent scan completion // Concurrent processing infrastructure scanWorkers *WorkerPool // Pool of workers for parallel opportunity scanning pathWorkers *WorkerPool // Pool of workers for complex path analysis } // DetectionConfig contains all configuration parameters for the arbitrage detection engine. // These parameters control scanning behavior, opportunity filtering, and performance characteristics. // The configuration allows fine-tuning for different trading strategies and risk profiles. type DetectionConfig struct { // Scanning timing and concurrency parameters ScanInterval time.Duration // How frequently to scan for new opportunities MaxConcurrentScans int // Maximum number of simultaneous scanning operations MaxConcurrentPaths int // Maximum number of paths to analyze in parallel // Opportunity filtering criteria - these determine what qualifies as actionable MinProfitThreshold *math.UniversalDecimal // Minimum profit required to consider an opportunity MaxPriceImpact *math.UniversalDecimal // Maximum acceptable price impact (slippage) MaxHops int // Maximum number of hops in a trading path // Token filtering and prioritization HighPriorityTokens []common.Address // Tokens to scan more frequently (e.g., WETH, USDC) TokenWhitelist []common.Address // Only scan these tokens if specified TokenBlacklist []common.Address // Never scan these tokens (e.g., known scam tokens) // Exchange selection and weighting EnabledExchanges []math.ExchangeType // Which exchanges to include in scanning ExchangeWeights map[math.ExchangeType]float64 // Relative weights for exchange prioritization // Performance optimization settings CachePoolData bool // Whether to cache pool data between scans CacheTTL time.Duration // How long to keep cached data valid BatchSize int // Number of opportunities to process in each batch // Risk management constraints MaxPositionSize *math.UniversalDecimal // Maximum size for any single arbitrage RequiredConfidence float64 // Minimum confidence score (0.0-1.0) for execution } // WorkerPool manages concurrent workers for scanning type WorkerPool struct { workers int taskChan chan ScanTask wg sync.WaitGroup ctx context.Context cancel context.CancelFunc } // ScanTask represents a scanning task type ScanTask struct { TokenPair exchanges.TokenPair Exchanges []*exchanges.ExchangeConfig InputAmount *math.UniversalDecimal ResultChan chan ScanResult } // ScanResult contains the result of a scanning task type ScanResult struct { Opportunity *types.ArbitrageOpportunity Error error ScanTime time.Duration } // NewArbitrageDetectionEngine creates and initializes a new arbitrage detection engine // with the specified dependencies and configuration. The engine is created in a stopped // state and must be started explicitly using the Start() method. // // Parameters: // - registry: Exchange registry containing supported DEX configurations // - gasEstimator: Component for estimating transaction gas costs // - logger: Structured logger for monitoring and debugging // - config: Configuration parameters for detection behavior // // Returns: // - *ArbitrageDetectionEngine: Configured but not yet started detection engine func NewArbitrageDetectionEngine( registry *exchanges.ExchangeRegistry, gasEstimator math.GasEstimator, logger *logger.Logger, config DetectionConfig, ) *ArbitrageDetectionEngine { calculator := math.NewArbitrageCalculator(gasEstimator) engine := &ArbitrageDetectionEngine{ registry: registry, calculator: calculator, gasEstimator: gasEstimator, logger: logger, decimalConverter: math.NewDecimalConverter(), config: config, isRunning: false, stopChan: make(chan struct{}), opportunityChan: make(chan *types.ArbitrageOpportunity, 1000), // Buffered channel } // Set default configuration if not provided engine.setDefaultConfig() return engine } // setDefaultConfig sets default configuration values func (engine *ArbitrageDetectionEngine) setDefaultConfig() { if engine.config.ScanInterval == 0 { engine.config.ScanInterval = 1 * time.Second } if engine.config.MaxConcurrentScans == 0 { engine.config.MaxConcurrentScans = 10 } if engine.config.MaxConcurrentPaths == 0 { engine.config.MaxConcurrentPaths = 50 } if engine.config.MinProfitThreshold == nil { // Set minimum profit to 0.12 ETH to ensure profitability after gas costs // Typical gas cost: 300k-400k gas @ 0.2-0.3 gwei = 0.06-0.12 ETH // Adding safety margin for network volatility engine.config.MinProfitThreshold, _ = engine.decimalConverter.FromString("0.12", 18, "ETH") } if engine.config.MaxPriceImpact == nil { engine.config.MaxPriceImpact, _ = engine.decimalConverter.FromString("2", 4, "PERCENT") } if engine.config.MaxHops == 0 { engine.config.MaxHops = 3 } if engine.config.CacheTTL == 0 { engine.config.CacheTTL = 30 * time.Second } if engine.config.BatchSize == 0 { engine.config.BatchSize = 20 } if engine.config.RequiredConfidence == 0 { engine.config.RequiredConfidence = 0.7 } if len(engine.config.EnabledExchanges) == 0 { // Enable all exchanges by default for _, exchangeConfig := range engine.registry.GetAllExchanges() { engine.config.EnabledExchanges = append(engine.config.EnabledExchanges, exchangeConfig.Type) } } } // Start begins the arbitrage detection process and initializes all background workers. // This method starts the main detection loop, worker pools, and opportunity processing. // The engine will continue running until Stop() is called or the context is cancelled. // // The startup process includes: // 1. Validating that the engine is not already running // 2. Initializing worker pools for concurrent processing // 3. Starting the main detection loop // 4. Starting the opportunity processing pipeline // // Parameters: // - ctx: Context for controlling the engine lifecycle // // Returns: // - error: Any startup errors func (engine *ArbitrageDetectionEngine) Start(ctx context.Context) error { // Ensure thread-safe state management during startup engine.runningMutex.Lock() defer engine.runningMutex.Unlock() if engine.isRunning { return fmt.Errorf("detection engine is already running") } engine.logger.Info("Starting arbitrage detection engine...") engine.logger.Info(fmt.Sprintf("Configuration - Scan Interval: %v, Max Concurrent Scans: %d, Min Profit: %s ETH", engine.config.ScanInterval, engine.config.MaxConcurrentScans, engine.decimalConverter.ToHumanReadable(engine.config.MinProfitThreshold))) // Initialize worker pools if err := engine.initializeWorkerPools(ctx); err != nil { return fmt.Errorf("failed to initialize worker pools: %w", err) } engine.isRunning = true // Start main detection loop go engine.detectionLoop(ctx) // Start opportunity processing go engine.opportunityProcessor(ctx) engine.logger.Info("Arbitrage detection engine started successfully") return nil } // Stop halts the arbitrage detection process func (engine *ArbitrageDetectionEngine) Stop() error { engine.runningMutex.Lock() defer engine.runningMutex.Unlock() if !engine.isRunning { return fmt.Errorf("detection engine is not running") } engine.logger.Info("Stopping arbitrage detection engine...") // Signal stop close(engine.stopChan) // Stop worker pools if engine.scanWorkers != nil { engine.scanWorkers.Stop() } if engine.pathWorkers != nil { engine.pathWorkers.Stop() } engine.isRunning = false engine.logger.Info(fmt.Sprintf("Detection engine stopped. Total scans: %d, Opportunities found: %d", engine.scanCount, engine.opportunityCount)) return nil } // initializeWorkerPools sets up worker pools for concurrent processing func (engine *ArbitrageDetectionEngine) initializeWorkerPools(ctx context.Context) error { // Initialize scan worker pool engine.scanWorkers = NewWorkerPool(engine.config.MaxConcurrentScans, ctx) engine.scanWorkers.Start(engine.processScanTask) // Initialize path worker pool engine.pathWorkers = NewWorkerPool(engine.config.MaxConcurrentPaths, ctx) engine.pathWorkers.Start(engine.processPathTask) return nil } // detectionLoop runs the main detection logic func (engine *ArbitrageDetectionEngine) detectionLoop(ctx context.Context) { ticker := time.NewTicker(engine.config.ScanInterval) defer ticker.Stop() for { select { case <-ctx.Done(): engine.logger.Info("Detection loop stopped due to context cancellation") return case <-engine.stopChan: engine.logger.Info("Detection loop stopped") return case <-ticker.C: engine.performScan(ctx) } } } // performScan executes a complete arbitrage scanning cycle across all configured // token pairs and exchanges. This is the core scanning logic that runs periodically // to discover new arbitrage opportunities. // // The scanning process includes: // 1. Identifying token pairs to analyze // 2. Determining input amounts to test // 3. Creating scanning tasks for worker pools // 4. Processing results and filtering opportunities // // Each scan is tracked for performance monitoring and optimization. // // Parameters: // - ctx: Context for cancellation and timeout control func (engine *ArbitrageDetectionEngine) performScan(ctx context.Context) { // Track scan performance for monitoring scanStart := time.Now() engine.scanCount++ // Increment total scan counter engine.logger.Debug(fmt.Sprintf("Starting arbitrage scan #%d", engine.scanCount)) // Get token pairs to scan tokenPairs := engine.getTokenPairsToScan() // Get input amounts to test inputAmounts := engine.getInputAmountsToTest() // Create scan tasks scanTasks := make([]ScanTask, 0) for _, pair := range tokenPairs { // Get exchanges that support this pair supportingExchanges := engine.registry.GetExchangesForPair( common.HexToAddress(pair.Token0.Address), common.HexToAddress(pair.Token1.Address), ) // Filter enabled exchanges enabledExchanges := engine.filterEnabledExchanges(supportingExchanges) if len(enabledExchanges) < 2 { continue // Need at least 2 exchanges for arbitrage } for _, inputAmount := range inputAmounts { task := ScanTask{ TokenPair: pair, Exchanges: enabledExchanges, InputAmount: inputAmount, ResultChan: make(chan ScanResult, 1), } scanTasks = append(scanTasks, task) } } engine.logger.Debug(fmt.Sprintf("Created %d scan tasks for %d token pairs", len(scanTasks), len(tokenPairs))) // Process scan tasks in batches engine.processScanTasksBatch(ctx, scanTasks) scanDuration := time.Since(scanStart) engine.lastScanTime = time.Now() engine.logger.Debug(fmt.Sprintf("Completed arbitrage scan #%d in %v", engine.scanCount, scanDuration)) } // getTokenPairsToScan returns token pairs to scan for arbitrage func (engine *ArbitrageDetectionEngine) getTokenPairsToScan() []exchanges.TokenPair { // Get high priority tokens first highPriorityTokens := engine.registry.GetHighPriorityTokens(10) // Create pairs from high priority tokens pairs := make([]exchanges.TokenPair, 0) for i, token0 := range highPriorityTokens { for j, token1 := range highPriorityTokens { if i >= j { continue // Avoid duplicates and self-pairs } // Check if pair is supported if engine.registry.IsPairSupported( common.HexToAddress(token0.Address), common.HexToAddress(token1.Address), ) { pairs = append(pairs, exchanges.TokenPair{ Token0: token0, Token1: token1, }) } } } return pairs } // getInputAmountsToTest returns different input amounts to test for arbitrage func (engine *ArbitrageDetectionEngine) getInputAmountsToTest() []*math.UniversalDecimal { amounts := make([]*math.UniversalDecimal, 0) // Test different input amounts to find optimal arbitrage size testAmounts := []string{"0.1", "0.5", "1", "2", "5", "10"} for _, amountStr := range testAmounts { if amount, err := engine.decimalConverter.FromString(amountStr, 18, "ETH"); err == nil { amounts = append(amounts, amount) } } return amounts } // filterEnabledExchanges filters exchanges based on configuration func (engine *ArbitrageDetectionEngine) filterEnabledExchanges(exchangeConfigs []*exchanges.ExchangeConfig) []*exchanges.ExchangeConfig { enabled := make([]*exchanges.ExchangeConfig, 0) enabledMap := make(map[math.ExchangeType]bool) for _, exchangeType := range engine.config.EnabledExchanges { enabledMap[exchangeType] = true } for _, exchange := range exchangeConfigs { if enabledMap[exchange.Type] { enabled = append(enabled, exchange) } } return enabled } // processScanTasksBatch processes scan tasks in batches for efficiency func (engine *ArbitrageDetectionEngine) processScanTasksBatch(ctx context.Context, tasks []ScanTask) { batchSize := engine.config.BatchSize for i := 0; i < len(tasks); i += batchSize { end := i + batchSize if end > len(tasks) { end = len(tasks) } batch := tasks[i:end] engine.processScanBatch(ctx, batch) // Small delay between batches to avoid overwhelming the system select { case <-ctx.Done(): return case <-time.After(10 * time.Millisecond): } } } // processScanBatch processes a batch of scan tasks concurrently func (engine *ArbitrageDetectionEngine) processScanBatch(ctx context.Context, batch []ScanTask) { resultChans := make([]chan ScanResult, len(batch)) // Submit tasks to worker pool for i, task := range batch { resultChans[i] = task.ResultChan select { case engine.scanWorkers.taskChan <- task: case <-ctx.Done(): return } } // Collect results for _, resultChan := range resultChans { select { case result := <-resultChan: if result.Error != nil { engine.logger.Debug(fmt.Sprintf("Scan task error: %v", result.Error)) continue } if result.Opportunity != nil && engine.calculator.IsOpportunityProfitable(result.Opportunity) { engine.opportunityCount++ // Send opportunity to processing channel select { case engine.opportunityChan <- result.Opportunity: profitDisplay := ethAmountString(engine.decimalConverter, nil, result.Opportunity.NetProfit) engine.logger.Info(fmt.Sprintf("🎯 Found profitable arbitrage: %s ETH profit, %0.1f%% confidence", profitDisplay, result.Opportunity.Confidence*100)) default: engine.logger.Warn("Opportunity channel full, dropping opportunity") } } case <-ctx.Done(): return case <-time.After(5 * time.Second): engine.logger.Warn("Scan task timed out") } } } // processScanTask processes a single scan task func (engine *ArbitrageDetectionEngine) processScanTask(task ScanTask) { start := time.Now() // Find arbitrage paths between exchanges paths := engine.findArbitragePaths(task.TokenPair, task.Exchanges) var bestOpportunity *types.ArbitrageOpportunity for _, path := range paths { // Calculate arbitrage opportunity opportunity, err := engine.calculator.CalculateArbitrageOpportunity( path, task.InputAmount, math.TokenInfo{ Address: task.TokenPair.Token0.Address, Symbol: task.TokenPair.Token0.Symbol, Decimals: task.TokenPair.Token0.Decimals, }, math.TokenInfo{ Address: task.TokenPair.Token1.Address, Symbol: task.TokenPair.Token1.Symbol, Decimals: task.TokenPair.Token1.Decimals, }, ) if err != nil { continue } // Check if this is the best opportunity so far if bestOpportunity == nil || engine.isOpportunityBetter(opportunity, bestOpportunity) { bestOpportunity = opportunity } } result := ScanResult{ Opportunity: bestOpportunity, ScanTime: time.Since(start), } task.ResultChan <- result } // findArbitragePaths finds possible arbitrage paths between exchanges func (engine *ArbitrageDetectionEngine) findArbitragePaths(pair exchanges.TokenPair, exchangeConfigs []*exchanges.ExchangeConfig) [][]*math.PoolData { paths := make([][]*math.PoolData, 0) // For simplicity, we'll focus on 2-hop arbitrage (buy on exchange A, sell on exchange B) // Production implementation would include multi-hop paths token0Addr := common.HexToAddress(pair.Token0.Address) token1Addr := common.HexToAddress(pair.Token1.Address) for i, exchange1 := range exchangeConfigs { for j, exchange2 := range exchangeConfigs { if i == j { continue // Same exchange } // Find pools on each exchange pool1 := engine.findBestPool(exchange1, token0Addr, token1Addr) pool2 := engine.findBestPool(exchange2, token1Addr, token0Addr) // Reverse direction if pool1 != nil && pool2 != nil { path := []*math.PoolData{pool1, pool2} paths = append(paths, path) } } } return paths } // findBestPool finds the best pool for a token pair on an exchange func (engine *ArbitrageDetectionEngine) findBestPool(exchange *exchanges.ExchangeConfig, token0, token1 common.Address) *math.PoolData { // Get the pool detector and liquidity fetcher from the registry poolDetector := engine.registry.GetPoolDetector(exchange.Type) liquidityFetcher := engine.registry.GetLiquidityFetcher(exchange.Type) if poolDetector == nil || liquidityFetcher == nil { return nil } // Get pools for this pair pools, err := poolDetector.GetAllPools(token0, token1) if err != nil || len(pools) == 0 { return nil } // For now, return data for the first pool // Production implementation would compare liquidity and select the best poolData, err := liquidityFetcher.GetPoolData(pools[0]) if err != nil { return nil } return poolData } // isOpportunityBetter compares two opportunities and returns true if the first is better func (engine *ArbitrageDetectionEngine) isOpportunityBetter(opp1, opp2 *types.ArbitrageOpportunity) bool { if opp1 == nil { return false } if opp2 == nil { return true } if opp1.Quantities != nil && opp2.Quantities != nil { net1, err1 := engine.decimalAmountToUniversal(opp1.Quantities.NetProfit) net2, err2 := engine.decimalAmountToUniversal(opp2.Quantities.NetProfit) if err1 == nil && err2 == nil { cmp, err := engine.decimalConverter.Compare(net1, net2) if err == nil { if cmp > 0 { return true } else if cmp < 0 { return false } } } } // Fallback to canonical big.Int comparison if opp1.NetProfit != nil && opp2.NetProfit != nil { if opp1.NetProfit.Cmp(opp2.NetProfit) > 0 { return true } else if opp1.NetProfit.Cmp(opp2.NetProfit) < 0 { return false } } return opp1.Confidence > opp2.Confidence } // processPathTask processes a path finding task func (engine *ArbitrageDetectionEngine) processPathTask(task ScanTask) { // This would be used for more complex path finding algorithms // For now, defer to the main scan task processing engine.processScanTask(task) } func (engine *ArbitrageDetectionEngine) decimalAmountToUniversal(dec types.DecimalAmount) (*math.UniversalDecimal, error) { if dec.Value == "" { return nil, fmt.Errorf("decimal amount empty") } value, ok := new(big.Int).SetString(dec.Value, 10) if !ok { return nil, fmt.Errorf("invalid decimal amount %s", dec.Value) } return math.NewUniversalDecimal(value, dec.Decimals, dec.Symbol) } // opportunityProcessor processes discovered opportunities func (engine *ArbitrageDetectionEngine) opportunityProcessor(ctx context.Context) { for { select { case <-ctx.Done(): return case <-engine.stopChan: return case opportunity := <-engine.opportunityChan: engine.processOpportunity(opportunity) } } } // processOpportunity handles the detailed processing of a discovered arbitrage opportunity. // This includes logging detailed information about the opportunity, validating its parameters, // and potentially forwarding it to execution systems. // // The processing includes: // 1. Detailed logging of opportunity parameters // 2. Validation of profit calculations // 3. Risk assessment // 4. Forwarding to registered opportunity handlers // // Parameters: // - opportunity: The arbitrage opportunity to process func (engine *ArbitrageDetectionEngine) processOpportunity(opportunity *types.ArbitrageOpportunity) { // Log the opportunity discovery with truncated addresses for readability engine.logger.Info(fmt.Sprintf("Processing arbitrage opportunity: %s -> %s", opportunity.TokenIn.Hex()[:8], opportunity.TokenOut.Hex()[:8])) if opportunity.Quantities != nil { if amt, err := engine.decimalAmountToUniversal(opportunity.Quantities.AmountIn); err == nil { engine.logger.Info(fmt.Sprintf(" Input Amount: %s %s", engine.decimalConverter.ToHumanReadable(amt), amt.Symbol)) } } else if opportunity.AmountIn != nil { amountDisplay := ethAmountString(engine.decimalConverter, nil, opportunity.AmountIn) engine.logger.Info(fmt.Sprintf(" Input Amount: %s", amountDisplay)) } engine.logger.Info(fmt.Sprintf(" Input Token: %s", opportunity.TokenIn.Hex())) if opportunity.Quantities != nil { if net, err := engine.decimalAmountToUniversal(opportunity.Quantities.NetProfit); err == nil { engine.logger.Info(fmt.Sprintf(" Net Profit: %s %s", engine.decimalConverter.ToHumanReadable(net), net.Symbol)) } } else if opportunity.NetProfit != nil { netProfitDisplay := ethAmountString(engine.decimalConverter, nil, opportunity.NetProfit) engine.logger.Info(fmt.Sprintf(" Net Profit: %s ETH", netProfitDisplay)) } engine.logger.Info(fmt.Sprintf(" ROI: %.2f%%", opportunity.ROI)) if opportunity.Quantities != nil { if impact, err := engine.decimalAmountToUniversal(opportunity.Quantities.PriceImpact); err == nil { engine.logger.Info(fmt.Sprintf(" Price Impact: %s %s", engine.decimalConverter.ToHumanReadable(impact), impact.Symbol)) } } else { engine.logger.Info(fmt.Sprintf(" Price Impact: %.2f%%", opportunity.PriceImpact)) } engine.logger.Info(fmt.Sprintf(" Confidence: %.1f%%", opportunity.Confidence*100)) engine.logger.Info(fmt.Sprintf(" Risk Level: %.2f", opportunity.Risk)) engine.logger.Info(fmt.Sprintf(" Protocol: %s", opportunity.Protocol)) engine.logger.Info(fmt.Sprintf(" Path length: %d", len(opportunity.Path))) if engine.opportunityHandler != nil { // Do not block detection loop while execution occurs go engine.opportunityHandler(opportunity) } } // SetOpportunityHandler registers a callback function that will be invoked when a profitable // arbitrage opportunity is discovered. This is the primary integration point between the // detection engine and execution systems. // // The handler function should: // 1. Perform additional validation if needed // 2. Make final go/no-go decisions based on current market conditions // 3. Trigger transaction execution if appropriate // 4. Handle any execution errors or failures // // Note: The handler is called asynchronously to avoid blocking the detection loop. // // Parameters: // - handler: Function to call when opportunities are discovered func (engine *ArbitrageDetectionEngine) SetOpportunityHandler(handler func(*types.ArbitrageOpportunity)) { engine.opportunityHandler = handler } // GetOpportunityChannel returns the channel for receiving opportunities func (engine *ArbitrageDetectionEngine) GetOpportunityChannel() <-chan *types.ArbitrageOpportunity { return engine.opportunityChan } // GetStats returns detection engine statistics func (engine *ArbitrageDetectionEngine) GetStats() DetectionStats { engine.runningMutex.RLock() defer engine.runningMutex.RUnlock() return DetectionStats{ IsRunning: engine.isRunning, TotalScans: engine.scanCount, OpportunitiesFound: engine.opportunityCount, LastScanTime: engine.lastScanTime, ScanInterval: engine.config.ScanInterval, ConfiguredExchanges: len(engine.config.EnabledExchanges), } } // ScanOpportunities scans for arbitrage opportunities using the provided parameters func (engine *ArbitrageDetectionEngine) ScanOpportunities(ctx context.Context, params []*DetectionParams) ([]*types.ArbitrageOpportunity, error) { if !engine.isRunning { return nil, fmt.Errorf("detection engine is not running, call Start() first") } var opportunities []*types.ArbitrageOpportunity // Process each detection parameter for _, param := range params { paramOpportunities := engine.scanForSingleParam(param) opportunities = append(opportunities, paramOpportunities...) } return opportunities, nil } // scanForSingleParam handles scanning for a single detection parameter func (engine *ArbitrageDetectionEngine) scanForSingleParam(param *DetectionParams) []*types.ArbitrageOpportunity { tokenPair := engine.createTokenPair(param) // Get exchange configurations for this token pair exchangeConfigs := engine.registry.GetExchangesForPair(common.HexToAddress(tokenPair.Token0.Address), common.HexToAddress(tokenPair.Token1.Address)) if len(exchangeConfigs) < 2 { return nil // Need at least 2 exchanges for arbitrage } // Find all possible arbitrage paths between the tokens paths := engine.findArbitragePaths(tokenPair, exchangeConfigs) return engine.processPathsForOpportunities(paths, param) } // createTokenPair creates a token pair from detection parameters func (engine *ArbitrageDetectionEngine) createTokenPair(param *DetectionParams) exchanges.TokenPair { // Create token info using simplified approach for now // In production, this would query contract metadata token0Info := exchanges.TokenInfo{ Address: param.TokenA.Hex(), Symbol: param.TokenA.Hex()[:8], // Use first 8 chars of address as symbol Name: "Unknown Token", Decimals: 18, // Standard ERC-20 decimals } token1Info := exchanges.TokenInfo{ Address: param.TokenB.Hex(), Symbol: param.TokenB.Hex()[:8], // Use first 8 chars of address as symbol Name: "Unknown Token", Decimals: 18, // Standard ERC-20 decimals } return exchanges.TokenPair{ Token0: token0Info, Token1: token1Info, } } // processPathsForOpportunities processes paths to find profitable opportunities func (engine *ArbitrageDetectionEngine) processPathsForOpportunities(paths [][]*math.PoolData, param *DetectionParams) []*types.ArbitrageOpportunity { var opportunities []*types.ArbitrageOpportunity for _, path := range paths { if len(path) == 0 { continue } pathOpportunities := engine.evaluatePath(path, param) opportunities = append(opportunities, pathOpportunities...) } return opportunities } // evaluatePath evaluates an arbitrage path for opportunities func (engine *ArbitrageDetectionEngine) evaluatePath(path []*math.PoolData, param *DetectionParams) []*types.ArbitrageOpportunity { var opportunities []*types.ArbitrageOpportunity // Get token info for the first and last pools in the path tokenA := path[0].Token0 tokenZ := path[len(path)-1].Token1 if path[len(path)-1].Token0.Address == tokenA.Address { tokenZ = path[len(path)-1].Token0 } // Test various input amounts to find the most profitable one inputAmounts := engine.getInputAmountsToTest() for _, inputAmount := range inputAmounts { // Calculate arbitrage opportunity using the calculator opportunity, err := engine.calculator.CalculateArbitrageOpportunity(path, inputAmount, tokenA, tokenZ) if err != nil { engine.logger.Debug(fmt.Sprintf("Failed to calculate opportunity for path: %v", err)) continue } // Apply filters based on the parameters if opportunity.NetProfit.Cmp(param.MinProfit) < 0 { continue // Below minimum profit threshold } // Check slippage threshold if opportunity.PriceImpact > param.MaxSlippage { continue // Above maximum slippage tolerance } // Add to opportunities if it passes all checks opportunities = append(opportunities, opportunity) // For now, break after finding one good opportunity per path // to avoid too many similar results (can be made configurable) break } return opportunities } // DetectionStats contains statistics about the detection engine type DetectionStats struct { IsRunning bool TotalScans uint64 OpportunitiesFound uint64 LastScanTime time.Time ScanInterval time.Duration ConfiguredExchanges int } // NewWorkerPool creates a new worker pool func NewWorkerPool(workers int, ctx context.Context) *WorkerPool { ctx, cancel := context.WithCancel(ctx) return &WorkerPool{ workers: workers, taskChan: make(chan ScanTask, workers*2), // Buffered channel ctx: ctx, cancel: cancel, } } // Start starts the worker pool func (wp *WorkerPool) Start(taskProcessor func(ScanTask)) { for i := 0; i < wp.workers; i++ { wp.wg.Add(1) go func() { defer wp.wg.Done() for { select { case <-wp.ctx.Done(): return case task := <-wp.taskChan: taskProcessor(task) } } }() } } // Stop stops the worker pool func (wp *WorkerPool) Stop() { wp.cancel() close(wp.taskChan) wp.wg.Wait() }