package scanner import ( "fmt" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/fraktal/mev-beta/internal/config" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/cache" "github.com/fraktal/mev-beta/pkg/contracts" "github.com/fraktal/mev-beta/pkg/database" "github.com/fraktal/mev-beta/pkg/events" "github.com/fraktal/mev-beta/pkg/marketdata" "github.com/fraktal/mev-beta/pkg/profitcalc" "github.com/fraktal/mev-beta/pkg/scanner/analysis" "github.com/fraktal/mev-beta/pkg/scanner/market" "github.com/fraktal/mev-beta/pkg/scanner/swap" ) // Scanner is the main market scanner that handles event processing type Scanner struct { marketScanner *market.MarketScanner swapAnalyzer *swap.SwapAnalyzer liquidityAnalyzer *analysis.LiquidityAnalyzer config *config.BotConfig logger *logger.Logger workerPool chan chan events.Event workers []*EventWorker wg sync.WaitGroup parsingMonitor *ParsingMonitor // Parsing performance monitor reserveCache *cache.ReserveCache // ADDED: Reserve cache for event-driven invalidation } // EventWorker represents a worker that processes event details type EventWorker struct { ID int WorkerPool chan chan events.Event JobChannel chan events.Event QuitChan chan bool scanner *Scanner } // NewScanner creates a new market scanner with concurrency support func NewScanner(cfg *config.BotConfig, logger *logger.Logger, contractExecutor *contracts.ContractExecutor, db *database.Database, reserveCache *cache.ReserveCache) *Scanner { scanner := &Scanner{ config: cfg, logger: logger, workerPool: make(chan chan events.Event, cfg.MaxWorkers), workers: make([]*EventWorker, 0, cfg.MaxWorkers), reserveCache: reserveCache, // ADDED: Store reserve cache for event-driven invalidation } // Initialize the market scanner marketScanner := market.NewMarketScanner(cfg, logger, contractExecutor, db) scanner.marketScanner = marketScanner // Initialize the swap analyzer swapAnalyzer := swap.NewSwapAnalyzer( logger, marketScanner.GetMarketDataLogger(), marketScanner.GetProfitCalculator(), marketScanner.GetOpportunityRanker(), ) scanner.swapAnalyzer = swapAnalyzer // Initialize the liquidity analyzer liquidityAnalyzer := analysis.NewLiquidityAnalyzer( logger, marketScanner.GetMarketDataLogger(), ) scanner.liquidityAnalyzer = liquidityAnalyzer // Initialize parsing monitor parsingMonitor := NewParsingMonitor(logger, nil) scanner.parsingMonitor = parsingMonitor // Create workers for i := 0; i < cfg.MaxWorkers; i++ { worker := NewEventWorker(i, scanner.workerPool, scanner) scanner.workers = append(scanner.workers, worker) worker.Start() } return scanner } // NewEventWorker creates a new event worker func NewEventWorker(id int, workerPool chan chan events.Event, scanner *Scanner) *EventWorker { return &EventWorker{ ID: id, WorkerPool: workerPool, JobChannel: make(chan events.Event), QuitChan: make(chan bool), scanner: scanner, } } // Start begins the worker func (w *EventWorker) Start() { go func() { for { // Register the worker in the worker pool w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // Process the job w.Process(job) case <-w.QuitChan: // Stop the worker return } } }() } // Stop terminates the worker func (w *EventWorker) Stop() { go func() { w.QuitChan <- true }() } // Process handles an event detail func (w *EventWorker) Process(event events.Event) { // RACE CONDITION FIX: Process synchronously in the worker goroutine // instead of spawning another nested goroutine to avoid WaitGroup race defer w.scanner.wg.Done() // Log the processing w.scanner.logger.Debug(fmt.Sprintf("Worker %d processing %s event in pool %s from protocol %s", w.ID, event.Type.String(), event.PoolAddress, event.Protocol)) // EVENT-DRIVEN CACHE INVALIDATION // Invalidate reserve cache when pool state changes (Swap, AddLiquidity, RemoveLiquidity) // This ensures profit calculations always use fresh reserve data if w.scanner.reserveCache != nil { switch event.Type { case events.Swap, events.AddLiquidity, events.RemoveLiquidity: // Pool state changed - invalidate cached reserves for this pool w.scanner.reserveCache.Invalidate(event.PoolAddress) w.scanner.logger.Debug(fmt.Sprintf("Cache invalidated for pool %s due to %s event", event.PoolAddress.Hex(), event.Type.String())) } } // Analyze based on event type switch event.Type { case events.Swap: w.scanner.swapAnalyzer.AnalyzeSwapEvent(event, w.scanner.marketScanner) case events.AddLiquidity: w.scanner.liquidityAnalyzer.AnalyzeLiquidityEvent(event, w.scanner.marketScanner, true) case events.RemoveLiquidity: w.scanner.liquidityAnalyzer.AnalyzeLiquidityEvent(event, w.scanner.marketScanner, false) case events.NewPool: w.scanner.liquidityAnalyzer.AnalyzeNewPoolEvent(event, w.scanner.marketScanner) default: w.scanner.logger.Debug(fmt.Sprintf("Worker %d received unknown event type: %d", w.ID, event.Type)) } } // SubmitEvent submits an event for processing by the worker pool func (s *Scanner) SubmitEvent(event events.Event) { startTime := time.Now() // CRITICAL FIX: Validate pool address before submission if event.PoolAddress == (common.Address{}) { s.logger.Warn(fmt.Sprintf("REJECTED: Event with zero PoolAddress rejected - TxHash: %s, Protocol: %s, Type: %v, Token0: %s, Token1: %s", event.TransactionHash.Hex(), event.Protocol, event.Type, event.Token0.Hex(), event.Token1.Hex())) // Record parsing failure s.parsingMonitor.RecordParsingEvent(ParsingEvent{ TransactionHash: event.TransactionHash, Protocol: event.Protocol, Success: false, RejectionReason: "zero_address", PoolAddress: event.PoolAddress, Token0: event.Token0, Token1: event.Token1, ParseTimeMs: float64(time.Since(startTime).Nanoseconds()) / 1000000, Timestamp: time.Now(), }) return // Reject events with zero pool addresses } // Additional validation: Pool address should not match token addresses if event.PoolAddress == event.Token0 || event.PoolAddress == event.Token1 { s.logger.Warn(fmt.Sprintf("REJECTED: Event with pool address matching token address - TxHash: %s, Pool: %s, Token0: %s, Token1: %s", event.TransactionHash.Hex(), event.PoolAddress.Hex(), event.Token0.Hex(), event.Token1.Hex())) // Record parsing failure s.parsingMonitor.RecordParsingEvent(ParsingEvent{ TransactionHash: event.TransactionHash, Protocol: event.Protocol, Success: false, RejectionReason: "duplicate_address", PoolAddress: event.PoolAddress, Token0: event.Token0, Token1: event.Token1, ParseTimeMs: float64(time.Since(startTime).Nanoseconds()) / 1000000, Timestamp: time.Now(), }) return // Reject events where pool address matches token addresses } // Additional validation: Check for suspicious zero-padded addresses poolHex := event.PoolAddress.Hex() if len(poolHex) == 42 && poolHex[:20] == "0x000000000000000000" { s.logger.Warn(fmt.Sprintf("REJECTED: Event with suspicious zero-padded pool address - TxHash: %s, Pool: %s", event.TransactionHash.Hex(), poolHex)) // Record parsing failure s.parsingMonitor.RecordParsingEvent(ParsingEvent{ TransactionHash: event.TransactionHash, Protocol: event.Protocol, Success: false, RejectionReason: "suspicious_address", PoolAddress: event.PoolAddress, Token0: event.Token0, Token1: event.Token1, ParseTimeMs: float64(time.Since(startTime).Nanoseconds()) / 1000000, Timestamp: time.Now(), }) return // Reject events with zero-padded addresses } // Record successful parsing s.parsingMonitor.RecordParsingEvent(ParsingEvent{ TransactionHash: event.TransactionHash, Protocol: event.Protocol, Success: true, PoolAddress: event.PoolAddress, Token0: event.Token0, Token1: event.Token1, ParseTimeMs: float64(time.Since(startTime).Nanoseconds()) / 1000000, Timestamp: time.Now(), }) s.wg.Add(1) // Get an available worker job channel jobChannel := <-s.workerPool // Send the job to the worker jobChannel <- event } // GetTopOpportunities returns the top ranked arbitrage opportunities func (s *Scanner) GetTopOpportunities(limit int) []*profitcalc.RankedOpportunity { return s.marketScanner.GetTopOpportunities(limit) } // GetExecutableOpportunities returns executable arbitrage opportunities func (s *Scanner) GetExecutableOpportunities(limit int) []*profitcalc.RankedOpportunity { return s.marketScanner.GetExecutableOpportunities(limit) } // GetOpportunityStats returns statistics about tracked opportunities func (s *Scanner) GetOpportunityStats() map[string]interface{} { return s.marketScanner.GetOpportunityStats() } // GetMarketDataStats returns comprehensive market data statistics func (s *Scanner) GetMarketDataStats() map[string]interface{} { return s.marketScanner.GetMarketDataStats() } // GetCachedTokenInfo returns information about a cached token func (s *Scanner) GetCachedTokenInfo(tokenAddr common.Address) (*marketdata.TokenInfo, bool) { return s.marketScanner.GetCachedTokenInfo(tokenAddr) } // GetCachedPoolInfo returns information about a cached pool func (s *Scanner) GetCachedPoolInfo(poolAddr common.Address) (*marketdata.PoolInfo, bool) { return s.marketScanner.GetCachedPoolInfo(poolAddr) } // GetPoolsForTokenPair returns all cached pools for a token pair func (s *Scanner) GetPoolsForTokenPair(token0, token1 common.Address) []*marketdata.PoolInfo { return s.marketScanner.GetPoolsForTokenPair(token0, token1) } // GetActiveFactories returns all active DEX factories func (s *Scanner) GetActiveFactories() []*marketdata.FactoryInfo { return s.marketScanner.GetActiveFactories() } // WaitGroup returns the scanner's wait group for synchronization func (s *Scanner) WaitGroup() *sync.WaitGroup { return &s.wg } // GetParsingStats returns comprehensive parsing performance statistics func (s *Scanner) GetParsingStats() map[string]interface{} { return s.parsingMonitor.GetCurrentStats() } // GetParsingHealthStatus returns the current parsing health status func (s *Scanner) GetParsingHealthStatus() map[string]interface{} { healthStatus := s.parsingMonitor.GetHealthStatus() return map[string]interface{}{ "health_status": healthStatus, } } // GetParsingPerformanceMetrics returns detailed parsing performance metrics func (s *Scanner) GetParsingPerformanceMetrics() map[string]interface{} { return s.parsingMonitor.GetDashboardData() }