package arbitrage import ( "context" "fmt" "log/slog" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/your-org/mev-bot/pkg/cache" mevtypes "github.com/your-org/mev-bot/pkg/types" ) // DetectorConfig contains configuration for the opportunity detector type DetectorConfig struct { // Path finding MaxPathsToEvaluate int EvaluationTimeout time.Duration // Input amount optimization MinInputAmount *big.Int MaxInputAmount *big.Int OptimizeInput bool // Gas price DefaultGasPrice *big.Int MaxGasPrice *big.Int // Token whitelist (empty = all tokens allowed) WhitelistedTokens []common.Address // Concurrent evaluation MaxConcurrentEvaluations int } // DefaultDetectorConfig returns default configuration func DefaultDetectorConfig() *DetectorConfig { return &DetectorConfig{ MaxPathsToEvaluate: 50, EvaluationTimeout: 5 * time.Second, MinInputAmount: new(big.Int).Mul(big.NewInt(1), big.NewInt(1e17)), // 0.1 ETH MaxInputAmount: new(big.Int).Mul(big.NewInt(10), big.NewInt(1e18)), // 10 ETH OptimizeInput: true, DefaultGasPrice: big.NewInt(1e9), // 1 gwei MaxGasPrice: big.NewInt(100e9), // 100 gwei WhitelistedTokens: []common.Address{}, MaxConcurrentEvaluations: 10, } } // Detector detects arbitrage opportunities type Detector struct { config *DetectorConfig pathFinder *PathFinder calculator *Calculator poolCache cache.PoolCache logger *slog.Logger // Statistics stats *OpportunityStats statsMutex sync.RWMutex // Channels for opportunity stream opportunityCh chan *Opportunity } // NewDetector creates a new opportunity detector func NewDetector( config *DetectorConfig, pathFinder *PathFinder, calculator *Calculator, poolCache cache.PoolCache, logger *slog.Logger, ) *Detector { if config == nil { config = DefaultDetectorConfig() } return &Detector{ config: config, pathFinder: pathFinder, calculator: calculator, poolCache: poolCache, logger: logger.With("component", "detector"), stats: &OpportunityStats{}, opportunityCh: make(chan *Opportunity, 100), } } // DetectOpportunities finds all arbitrage opportunities for a token func (d *Detector) DetectOpportunities(ctx context.Context, token common.Address) ([]*Opportunity, error) { d.logger.Debug("detecting opportunities", "token", token.Hex()) startTime := time.Now() // Check if token is whitelisted (if whitelist is configured) if !d.isTokenWhitelisted(token) { return nil, fmt.Errorf("token %s not whitelisted", token.Hex()) } // Find all possible paths paths, err := d.pathFinder.FindAllArbitragePaths(ctx, token) if err != nil { return nil, fmt.Errorf("failed to find paths: %w", err) } if len(paths) == 0 { d.logger.Debug("no paths found", "token", token.Hex()) return []*Opportunity{}, nil } d.logger.Info("found paths for evaluation", "token", token.Hex(), "pathCount", len(paths), ) // Limit number of paths to evaluate if len(paths) > d.config.MaxPathsToEvaluate { paths = paths[:d.config.MaxPathsToEvaluate] } // Evaluate paths concurrently opportunities, err := d.evaluatePathsConcurrently(ctx, paths) if err != nil { return nil, fmt.Errorf("failed to evaluate paths: %w", err) } // Filter to only profitable opportunities profitable := d.filterProfitable(opportunities) // Update statistics d.updateStats(profitable) d.logger.Info("detection complete", "token", token.Hex(), "totalPaths", len(paths), "evaluated", len(opportunities), "profitable", len(profitable), "duration", time.Since(startTime), ) return profitable, nil } // DetectOpportunitiesForSwap detects opportunities triggered by a new swap event func (d *Detector) DetectOpportunitiesForSwap(ctx context.Context, swapEvent *mevtypes.SwapEvent) ([]*Opportunity, error) { d.logger.Debug("detecting opportunities from swap", "pool", swapEvent.PoolAddress.Hex(), "protocol", swapEvent.Protocol, ) // Get affected tokens tokenIn, _ := swapEvent.GetInputToken() tokenOut, _ := swapEvent.GetOutputToken() tokens := []common.Address{tokenIn, tokenOut} allOpportunities := make([]*Opportunity, 0) // Check for opportunities involving either token for _, token := range tokens { opps, err := d.DetectOpportunities(ctx, token) if err != nil { d.logger.Warn("failed to detect opportunities for token", "token", token.Hex(), "error", err, ) continue } allOpportunities = append(allOpportunities, opps...) } d.logger.Info("detection from swap complete", "pool", swapEvent.PoolAddress.Hex(), "opportunitiesFound", len(allOpportunities), ) return allOpportunities, nil } // DetectBetweenTokens finds arbitrage opportunities between two specific tokens func (d *Detector) DetectBetweenTokens(ctx context.Context, tokenA, tokenB common.Address) ([]*Opportunity, error) { d.logger.Debug("detecting opportunities between tokens", "tokenA", tokenA.Hex(), "tokenB", tokenB.Hex(), ) // Find two-pool arbitrage paths paths, err := d.pathFinder.FindTwoPoolPaths(ctx, tokenA, tokenB) if err != nil { return nil, fmt.Errorf("failed to find two-pool paths: %w", err) } // Evaluate paths opportunities, err := d.evaluatePathsConcurrently(ctx, paths) if err != nil { return nil, fmt.Errorf("failed to evaluate paths: %w", err) } // Filter profitable profitable := d.filterProfitable(opportunities) d.logger.Info("detection between tokens complete", "tokenA", tokenA.Hex(), "tokenB", tokenB.Hex(), "profitable", len(profitable), ) return profitable, nil } // evaluatePathsConcurrently evaluates multiple paths concurrently func (d *Detector) evaluatePathsConcurrently(ctx context.Context, paths []*Path) ([]*Opportunity, error) { evalCtx, cancel := context.WithTimeout(ctx, d.config.EvaluationTimeout) defer cancel() // Semaphore for limiting concurrent evaluations sem := make(chan struct{}, d.config.MaxConcurrentEvaluations) var wg sync.WaitGroup results := make(chan *Opportunity, len(paths)) errors := make(chan error, len(paths)) for _, path := range paths { wg.Add(1) go func(p *Path) { defer wg.Done() // Acquire semaphore select { case sem <- struct{}{}: defer func() { <-sem }() case <-evalCtx.Done(): errors <- evalCtx.Err() return } opp, err := d.evaluatePath(evalCtx, p) if err != nil { d.logger.Debug("failed to evaluate path", "error", err) errors <- err return } if opp != nil { results <- opp } }(path) } // Wait for all evaluations to complete go func() { wg.Wait() close(results) close(errors) }() // Collect results opportunities := make([]*Opportunity, 0) for opp := range results { opportunities = append(opportunities, opp) } return opportunities, nil } // evaluatePath evaluates a single path for profitability func (d *Detector) evaluatePath(ctx context.Context, path *Path) (*Opportunity, error) { gasPrice := d.config.DefaultGasPrice // Determine input amount inputAmount := d.config.MinInputAmount var opportunity *Opportunity var err error if d.config.OptimizeInput { // Optimize input amount for maximum profit opportunity, err = d.calculator.OptimizeInputAmount(ctx, path, gasPrice, d.config.MaxInputAmount) } else { // Use fixed input amount opportunity, err = d.calculator.CalculateProfitability(ctx, path, inputAmount, gasPrice) } if err != nil { return nil, fmt.Errorf("failed to calculate profitability: %w", err) } return opportunity, nil } // filterProfitable filters opportunities to only include profitable ones func (d *Detector) filterProfitable(opportunities []*Opportunity) []*Opportunity { profitable := make([]*Opportunity, 0) for _, opp := range opportunities { if opp.IsProfitable() && opp.CanExecute() { profitable = append(profitable, opp) } } return profitable } // isTokenWhitelisted checks if a token is whitelisted func (d *Detector) isTokenWhitelisted(token common.Address) bool { if len(d.config.WhitelistedTokens) == 0 { return true // No whitelist = all tokens allowed } for _, whitelisted := range d.config.WhitelistedTokens { if token == whitelisted { return true } } return false } // updateStats updates detection statistics func (d *Detector) updateStats(opportunities []*Opportunity) { d.statsMutex.Lock() defer d.statsMutex.Unlock() d.stats.TotalDetected += len(opportunities) d.stats.LastDetected = time.Now() for _, opp := range opportunities { if opp.IsProfitable() { d.stats.TotalProfitable++ } if opp.CanExecute() { d.stats.TotalExecutable++ } // Update max profit if d.stats.MaxProfit == nil || opp.NetProfit.Cmp(d.stats.MaxProfit) > 0 { d.stats.MaxProfit = new(big.Int).Set(opp.NetProfit) } // Update total profit if d.stats.TotalProfit == nil { d.stats.TotalProfit = big.NewInt(0) } d.stats.TotalProfit.Add(d.stats.TotalProfit, opp.NetProfit) } // Calculate average profit if d.stats.TotalDetected > 0 && d.stats.TotalProfit != nil { d.stats.AverageProfit = new(big.Int).Div( d.stats.TotalProfit, big.NewInt(int64(d.stats.TotalDetected)), ) } } // GetStats returns current detection statistics func (d *Detector) GetStats() OpportunityStats { d.statsMutex.RLock() defer d.statsMutex.RUnlock() // Create a copy to avoid race conditions stats := *d.stats if d.stats.AverageProfit != nil { stats.AverageProfit = new(big.Int).Set(d.stats.AverageProfit) } if d.stats.MaxProfit != nil { stats.MaxProfit = new(big.Int).Set(d.stats.MaxProfit) } if d.stats.TotalProfit != nil { stats.TotalProfit = new(big.Int).Set(d.stats.TotalProfit) } if d.stats.MedianProfit != nil { stats.MedianProfit = new(big.Int).Set(d.stats.MedianProfit) } return stats } // OpportunityStream returns a channel that receives detected opportunities func (d *Detector) OpportunityStream() <-chan *Opportunity { return d.opportunityCh } // PublishOpportunity publishes an opportunity to the stream func (d *Detector) PublishOpportunity(opp *Opportunity) { select { case d.opportunityCh <- opp: default: d.logger.Warn("opportunity channel full, dropping opportunity", "id", opp.ID) } } // MonitorSwaps monitors swap events and detects opportunities func (d *Detector) MonitorSwaps(ctx context.Context, swapCh <-chan *mevtypes.SwapEvent) { d.logger.Info("starting swap monitor") for { select { case <-ctx.Done(): d.logger.Info("swap monitor stopped") return case swap, ok := <-swapCh: if !ok { d.logger.Info("swap channel closed") return } // Detect opportunities for this swap opportunities, err := d.DetectOpportunitiesForSwap(ctx, swap) if err != nil { d.logger.Error("failed to detect opportunities for swap", "pool", swap.PoolAddress.Hex(), "error", err, ) continue } // Publish opportunities to stream for _, opp := range opportunities { d.PublishOpportunity(opp) } } } } // ScanForOpportunities continuously scans for arbitrage opportunities func (d *Detector) ScanForOpportunities(ctx context.Context, interval time.Duration, tokens []common.Address) { d.logger.Info("starting opportunity scanner", "interval", interval, "tokenCount", len(tokens), ) ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): d.logger.Info("opportunity scanner stopped") return case <-ticker.C: d.logger.Debug("scanning for opportunities") for _, token := range tokens { opportunities, err := d.DetectOpportunities(ctx, token) if err != nil { d.logger.Warn("failed to detect opportunities", "token", token.Hex(), "error", err, ) continue } // Publish opportunities for _, opp := range opportunities { d.PublishOpportunity(opp) } } } } } // RankOpportunities ranks opportunities by priority func (d *Detector) RankOpportunities(opportunities []*Opportunity) []*Opportunity { // Sort by priority (highest first) ranked := make([]*Opportunity, len(opportunities)) copy(ranked, opportunities) // Simple bubble sort (good enough for small lists) for i := 0; i < len(ranked)-1; i++ { for j := 0; j < len(ranked)-i-1; j++ { if ranked[j].Priority < ranked[j+1].Priority { ranked[j], ranked[j+1] = ranked[j+1], ranked[j] } } } return ranked }