Files
mev-beta/pkg/scanner/concurrent.go
Krypto Kajun 850223a953 fix(multicall): resolve critical multicall parsing corruption issues
- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing
- Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives
- Added LRU caching system for address validation with 10-minute TTL
- Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures
- Fixed duplicate function declarations and import conflicts across multiple files
- Added error recovery mechanisms with multiple fallback strategies
- Updated tests to handle new validation behavior for suspicious addresses
- Fixed parser test expectations for improved validation system
- Applied gofmt formatting fixes to ensure code style compliance
- Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot
- Resolved critical security vulnerabilities in heuristic address extraction
- Progress: Updated TODO audit from 10% to 35% complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 00:12:55 -05:00

205 lines
6.2 KiB
Go

package scanner
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/contracts"
"github.com/fraktal/mev-beta/pkg/database"
"github.com/fraktal/mev-beta/pkg/events"
"github.com/fraktal/mev-beta/pkg/marketdata"
"github.com/fraktal/mev-beta/pkg/profitcalc"
"github.com/fraktal/mev-beta/pkg/scanner/analysis"
"github.com/fraktal/mev-beta/pkg/scanner/market"
"github.com/fraktal/mev-beta/pkg/scanner/swap"
)
// Scanner is the main market scanner that handles event processing
type Scanner struct {
marketScanner *market.MarketScanner
swapAnalyzer *swap.SwapAnalyzer
liquidityAnalyzer *analysis.LiquidityAnalyzer
config *config.BotConfig
logger *logger.Logger
workerPool chan chan events.Event
workers []*EventWorker
wg sync.WaitGroup
}
// EventWorker represents a worker that processes event details
type EventWorker struct {
ID int
WorkerPool chan chan events.Event
JobChannel chan events.Event
QuitChan chan bool
scanner *Scanner
}
// NewScanner creates a new market scanner with concurrency support
func NewScanner(cfg *config.BotConfig, logger *logger.Logger, contractExecutor *contracts.ContractExecutor, db *database.Database) *Scanner {
scanner := &Scanner{
config: cfg,
logger: logger,
workerPool: make(chan chan events.Event, cfg.MaxWorkers),
workers: make([]*EventWorker, 0, cfg.MaxWorkers),
}
// Initialize the market scanner
marketScanner := market.NewMarketScanner(cfg, logger, contractExecutor, db)
scanner.marketScanner = marketScanner
// Initialize the swap analyzer
swapAnalyzer := swap.NewSwapAnalyzer(
logger,
marketScanner.GetMarketDataLogger(),
marketScanner.GetProfitCalculator(),
marketScanner.GetOpportunityRanker(),
)
scanner.swapAnalyzer = swapAnalyzer
// Initialize the liquidity analyzer
liquidityAnalyzer := analysis.NewLiquidityAnalyzer(
logger,
marketScanner.GetMarketDataLogger(),
)
scanner.liquidityAnalyzer = liquidityAnalyzer
// Create workers
for i := 0; i < cfg.MaxWorkers; i++ {
worker := NewEventWorker(i, scanner.workerPool, scanner)
scanner.workers = append(scanner.workers, worker)
worker.Start()
}
return scanner
}
// NewEventWorker creates a new event worker
func NewEventWorker(id int, workerPool chan chan events.Event, scanner *Scanner) *EventWorker {
return &EventWorker{
ID: id,
WorkerPool: workerPool,
JobChannel: make(chan events.Event),
QuitChan: make(chan bool),
scanner: scanner,
}
}
// Start begins the worker
func (w *EventWorker) Start() {
go func() {
for {
// Register the worker in the worker pool
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// Process the job
w.Process(job)
case <-w.QuitChan:
// Stop the worker
return
}
}
}()
}
// Stop terminates the worker
func (w *EventWorker) Stop() {
go func() {
w.QuitChan <- true
}()
}
// Process handles an event detail
func (w *EventWorker) Process(event events.Event) {
// Analyze the event in a separate goroutine to maintain throughput
go func() {
defer w.scanner.wg.Done()
// Log the processing
w.scanner.logger.Debug(fmt.Sprintf("Worker %d processing %s event in pool %s from protocol %s",
w.ID, event.Type.String(), event.PoolAddress, event.Protocol))
// Analyze based on event type
switch event.Type {
case events.Swap:
w.scanner.swapAnalyzer.AnalyzeSwapEvent(event, w.scanner.marketScanner)
case events.AddLiquidity:
w.scanner.liquidityAnalyzer.AnalyzeLiquidityEvent(event, w.scanner.marketScanner, true)
case events.RemoveLiquidity:
w.scanner.liquidityAnalyzer.AnalyzeLiquidityEvent(event, w.scanner.marketScanner, false)
case events.NewPool:
w.scanner.liquidityAnalyzer.AnalyzeNewPoolEvent(event, w.scanner.marketScanner)
default:
w.scanner.logger.Debug(fmt.Sprintf("Worker %d received unknown event type: %d", w.ID, event.Type))
}
}()
}
// SubmitEvent submits an event for processing by the worker pool
func (s *Scanner) SubmitEvent(event events.Event) {
// DEBUG: Track zero address events at submission point
if event.PoolAddress == (common.Address{}) {
s.logger.Error(fmt.Sprintf("ZERO ADDRESS DEBUG [SUBMIT]: Event submitted with zero PoolAddress - TxHash: %s, Protocol: %s, Type: %v",
event.TransactionHash.Hex(), event.Protocol, event.Type))
}
s.wg.Add(1)
// Get an available worker job channel
jobChannel := <-s.workerPool
// Send the job to the worker
jobChannel <- event
}
// GetTopOpportunities returns the top ranked arbitrage opportunities
func (s *Scanner) GetTopOpportunities(limit int) []*profitcalc.RankedOpportunity {
return s.marketScanner.GetTopOpportunities(limit)
}
// GetExecutableOpportunities returns executable arbitrage opportunities
func (s *Scanner) GetExecutableOpportunities(limit int) []*profitcalc.RankedOpportunity {
return s.marketScanner.GetExecutableOpportunities(limit)
}
// GetOpportunityStats returns statistics about tracked opportunities
func (s *Scanner) GetOpportunityStats() map[string]interface{} {
return s.marketScanner.GetOpportunityStats()
}
// GetMarketDataStats returns comprehensive market data statistics
func (s *Scanner) GetMarketDataStats() map[string]interface{} {
return s.marketScanner.GetMarketDataStats()
}
// GetCachedTokenInfo returns information about a cached token
func (s *Scanner) GetCachedTokenInfo(tokenAddr common.Address) (*marketdata.TokenInfo, bool) {
return s.marketScanner.GetCachedTokenInfo(tokenAddr)
}
// GetCachedPoolInfo returns information about a cached pool
func (s *Scanner) GetCachedPoolInfo(poolAddr common.Address) (*marketdata.PoolInfo, bool) {
return s.marketScanner.GetCachedPoolInfo(poolAddr)
}
// GetPoolsForTokenPair returns all cached pools for a token pair
func (s *Scanner) GetPoolsForTokenPair(token0, token1 common.Address) []*marketdata.PoolInfo {
return s.marketScanner.GetPoolsForTokenPair(token0, token1)
}
// GetActiveFactories returns all active DEX factories
func (s *Scanner) GetActiveFactories() []*marketdata.FactoryInfo {
return s.marketScanner.GetActiveFactories()
}
// WaitGroup returns the scanner's wait group for synchronization
func (s *Scanner) WaitGroup() *sync.WaitGroup {
return &s.wg
}