package orchestrator import ( "context" "fmt" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/fraktal/mev-beta/internal/config" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/arbitrage" "github.com/fraktal/mev-beta/pkg/events" "github.com/fraktal/mev-beta/pkg/market" "github.com/fraktal/mev-beta/pkg/pools" "github.com/fraktal/mev-beta/pkg/scanner" ) // MEVCoordinator orchestrates the entire MEV detection pipeline type MEVCoordinator struct { // Configuration config *config.Config logger *logger.Logger // Core components eventParser *events.EventParser poolDiscovery *pools.PoolDiscovery marketManager *market.MarketManager marketScanner *scanner.MarketScanner arbitrageScanner *arbitrage.MultiHopScanner // Data flow channels rawTransactions chan *RawTransaction parsedEvents chan *events.Event poolUpdates chan *PoolUpdate arbitrageOppCh chan *ArbitrageOpportunity // Coordination wg sync.WaitGroup ctx context.Context cancel context.CancelFunc // Metrics metrics *CoordinatorMetrics } // RawTransaction represents a transaction to be processed type RawTransaction struct { Transaction *types.Transaction Receipt *types.Receipt BlockNumber uint64 Timestamp uint64 } // PoolUpdate represents a pool state update type PoolUpdate struct { PoolAddress common.Address Token0 common.Address Token1 common.Address Reserve0 *big.Int Reserve1 *big.Int Timestamp time.Time } // ArbitrageOpportunity represents a detected arbitrage opportunity type ArbitrageOpportunity struct { Paths []*arbitrage.ArbitragePath TriggerToken common.Address Amount *big.Int NetProfit *big.Int ROI float64 DetectedAt time.Time } // CoordinatorMetrics tracks performance metrics type CoordinatorMetrics struct { TransactionsProcessed uint64 EventsParsed uint64 PoolsDiscovered uint64 OpportunitiesFound uint64 AverageProcessingTime time.Duration mutex sync.RWMutex } // NewMEVCoordinator creates a new MEV coordinator func NewMEVCoordinator( config *config.Config, logger *logger.Logger, eventParser *events.EventParser, poolDiscovery *pools.PoolDiscovery, marketManager *market.MarketManager, marketScanner *scanner.MarketScanner, ) *MEVCoordinator { ctx, cancel := context.WithCancel(context.Background()) arbitrageScanner := arbitrage.NewMultiHopScanner(logger, marketManager) return &MEVCoordinator{ config: config, logger: logger, eventParser: eventParser, poolDiscovery: poolDiscovery, marketManager: marketManager, marketScanner: marketScanner, arbitrageScanner: arbitrageScanner, // Buffered channels for smooth data flow rawTransactions: make(chan *RawTransaction, config.Bot.ChannelBufferSize), parsedEvents: make(chan *events.Event, config.Bot.ChannelBufferSize), poolUpdates: make(chan *PoolUpdate, config.Bot.ChannelBufferSize), arbitrageOppCh: make(chan *ArbitrageOpportunity, 100), ctx: ctx, cancel: cancel, metrics: &CoordinatorMetrics{}, } } // Start begins the MEV coordination pipeline func (mc *MEVCoordinator) Start() error { mc.logger.Info("Starting MEV Coordinator pipeline...") // Start pipeline stages mc.startTransactionProcessor() mc.startEventProcessor() mc.startPoolUpdateProcessor() mc.startArbitrageProcessor() mc.startOpportunityHandler() mc.startMetricsReporter() mc.logger.Info("MEV Coordinator pipeline started successfully") return nil } // Stop gracefully shuts down the coordinator func (mc *MEVCoordinator) Stop() { mc.logger.Info("Stopping MEV Coordinator pipeline...") // Cancel context to stop all goroutines mc.cancel() // Wait for all goroutines to finish mc.wg.Wait() // Close channels close(mc.rawTransactions) close(mc.parsedEvents) close(mc.poolUpdates) close(mc.arbitrageOppCh) mc.logger.Info("MEV Coordinator pipeline stopped") } // ProcessTransaction submits a transaction for processing through the pipeline func (mc *MEVCoordinator) ProcessTransaction(tx *types.Transaction, receipt *types.Receipt, blockNumber uint64, timestamp uint64) { select { case mc.rawTransactions <- &RawTransaction{ Transaction: tx, Receipt: receipt, BlockNumber: blockNumber, Timestamp: timestamp, }: case <-mc.ctx.Done(): return default: // Channel full, log and skip mc.logger.Warn("Transaction processing channel full, skipping transaction") } } // startTransactionProcessor processes raw transactions and extracts events func (mc *MEVCoordinator) startTransactionProcessor() { mc.wg.Add(1) go func() { defer mc.wg.Done() for { select { case rawTx := <-mc.rawTransactions: start := time.Now() mc.processRawTransaction(rawTx) // Update metrics mc.updateMetrics(time.Since(start)) case <-mc.ctx.Done(): return } } }() } // processRawTransaction processes a single raw transaction func (mc *MEVCoordinator) processRawTransaction(rawTx *RawTransaction) { // Parse events from transaction events, err := mc.eventParser.ParseTransactionReceipt( rawTx.Receipt, rawTx.BlockNumber, rawTx.Timestamp, ) if err != nil { mc.logger.Debug(fmt.Sprintf("Failed to parse transaction %s: %v", rawTx.Transaction.Hash().Hex(), err)) return } // Also try to parse events directly from transaction data txEvents, err := mc.eventParser.ParseTransaction(rawTx.Transaction, rawTx.BlockNumber, rawTx.Timestamp) if err == nil { events = append(events, txEvents...) } // Submit events for processing for _, event := range events { select { case mc.parsedEvents <- event: case <-mc.ctx.Done(): return default: mc.logger.Warn("Event processing channel full, skipping event") } } mc.metrics.mutex.Lock() mc.metrics.TransactionsProcessed++ mc.metrics.EventsParsed += uint64(len(events)) mc.metrics.mutex.Unlock() } // startEventProcessor processes parsed events func (mc *MEVCoordinator) startEventProcessor() { // Start multiple event processors for concurrency for i := 0; i < mc.config.Bot.MaxWorkers; i++ { mc.wg.Add(1) go func(workerID int) { defer mc.wg.Done() for { select { case event := <-mc.parsedEvents: mc.processEvent(event, workerID) case <-mc.ctx.Done(): return } } }(i) } } // processEvent processes a single parsed event func (mc *MEVCoordinator) processEvent(event *events.Event, workerID int) { mc.logger.Debug(fmt.Sprintf("Worker %d processing %s event from pool %s", workerID, event.Type.String(), event.PoolAddress.Hex())) // Submit to pool discovery for potential new pool detection mc.submitToPoolDiscovery(event) // Submit to market scanner for analysis mc.marketScanner.SubmitEvent(*event) // Update pool state if this is a swap event if event.Type == events.Swap { mc.updatePoolState(event) // Trigger arbitrage scan for significant swaps if mc.isSignificantSwap(event) { mc.triggerArbitrageScan(event) } } } // submitToPoolDiscovery submits event to pool discovery system func (mc *MEVCoordinator) submitToPoolDiscovery(event *events.Event) { // Convert event to pool discovery format logData := map[string]interface{}{ "data": "", "topics": []interface{}{}, } // Submit for discovery analysis mc.poolDiscovery.DiscoverFromTransaction( event.TransactionHash.Hex(), event.PoolAddress.Hex(), "", []interface{}{logData}, ) } // updatePoolState updates pool state from swap events func (mc *MEVCoordinator) updatePoolState(event *events.Event) { if event.SqrtPriceX96 != nil && event.Liquidity != nil { mc.marketManager.UpdatePool( event.PoolAddress, event.Liquidity, event.SqrtPriceX96, event.Tick, ) // Send pool update notification select { case mc.poolUpdates <- &PoolUpdate{ PoolAddress: event.PoolAddress, Token0: event.Token0, Token1: event.Token1, Timestamp: time.Now(), }: case <-mc.ctx.Done(): return default: // Channel full, continue } } } // isSignificantSwap determines if a swap is significant enough to trigger arbitrage scanning func (mc *MEVCoordinator) isSignificantSwap(event *events.Event) bool { // Check if swap amount is above threshold minAmount := big.NewInt(1000000000000000000) // 1 ETH equivalent if event.Amount0 != nil && event.Amount0.Cmp(minAmount) > 0 { return true } if event.Amount1 != nil && event.Amount1.Cmp(minAmount) > 0 { return true } return false } // triggerArbitrageScan triggers an arbitrage scan for the given event func (mc *MEVCoordinator) triggerArbitrageScan(event *events.Event) { go func() { // Determine the token and amount to scan for var triggerToken common.Address var amount *big.Int if event.Amount0 != nil && event.Amount0.Sign() > 0 { triggerToken = event.Token0 amount = event.Amount0 } else if event.Amount1 != nil && event.Amount1.Sign() > 0 { triggerToken = event.Token1 amount = event.Amount1 } else { return } // Scan for arbitrage opportunities ctx, cancel := context.WithTimeout(mc.ctx, 5*time.Second) defer cancel() paths, err := mc.arbitrageScanner.ScanForArbitrage(ctx, triggerToken, amount) if err != nil { mc.logger.Debug(fmt.Sprintf("Arbitrage scan failed: %v", err)) return } if len(paths) > 0 { // Calculate total profit from all paths totalProfit := big.NewInt(0) for _, path := range paths { totalProfit.Add(totalProfit, path.NetProfit) } opportunity := &ArbitrageOpportunity{ Paths: paths, TriggerToken: triggerToken, Amount: amount, NetProfit: totalProfit, ROI: paths[0].ROI, // Use ROI from best path DetectedAt: time.Now(), } select { case mc.arbitrageOppCh <- opportunity: case <-mc.ctx.Done(): return default: mc.logger.Warn("Arbitrage opportunity channel full") } } }() } // startPoolUpdateProcessor processes pool updates func (mc *MEVCoordinator) startPoolUpdateProcessor() { mc.wg.Add(1) go func() { defer mc.wg.Done() for { select { case poolUpdate := <-mc.poolUpdates: mc.logger.Debug(fmt.Sprintf("Pool update for %s: %s/%s", poolUpdate.PoolAddress.Hex(), poolUpdate.Token0.Hex(), poolUpdate.Token1.Hex())) case <-mc.ctx.Done(): return } } }() } // startArbitrageProcessor processes arbitrage opportunities func (mc *MEVCoordinator) startArbitrageProcessor() { mc.wg.Add(1) go func() { defer mc.wg.Done() for { select { case <-mc.ctx.Done(): return } } }() } // startOpportunityHandler handles detected arbitrage opportunities func (mc *MEVCoordinator) startOpportunityHandler() { mc.wg.Add(1) go func() { defer mc.wg.Done() for { select { case opp := <-mc.arbitrageOppCh: mc.handleArbitrageOpportunity(opp) case <-mc.ctx.Done(): return } } }() } // handleArbitrageOpportunity handles a detected arbitrage opportunity func (mc *MEVCoordinator) handleArbitrageOpportunity(opp *ArbitrageOpportunity) { mc.logger.Info(fmt.Sprintf("🎯 ARBITRAGE OPPORTUNITY DETECTED!")) mc.logger.Info(fmt.Sprintf(" Token: %s", opp.TriggerToken.Hex())) mc.logger.Info(fmt.Sprintf(" Amount: %s wei", opp.Amount.String())) mc.logger.Info(fmt.Sprintf(" Net Profit: %s wei", opp.NetProfit.String())) mc.logger.Info(fmt.Sprintf(" ROI: %.2f%%", opp.ROI)) mc.logger.Info(fmt.Sprintf(" Paths: %d", len(opp.Paths))) for i, path := range opp.Paths { mc.logger.Info(fmt.Sprintf(" Path %d: %d hops, %s profit, %.2f%% ROI", i+1, len(path.Tokens)-1, path.NetProfit.String(), path.ROI)) pathStr := path.Tokens[0].Hex()[:8] for j := 1; j < len(path.Tokens); j++ { pathStr += " -> " + path.Tokens[j].Hex()[:8] } mc.logger.Info(fmt.Sprintf(" Route: %s", pathStr)) } mc.metrics.mutex.Lock() mc.metrics.OpportunitiesFound++ mc.metrics.mutex.Unlock() } // startMetricsReporter reports metrics periodically func (mc *MEVCoordinator) startMetricsReporter() { mc.wg.Add(1) go func() { defer mc.wg.Done() ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: mc.reportMetrics() case <-mc.ctx.Done(): return } } }() } // reportMetrics logs current metrics func (mc *MEVCoordinator) reportMetrics() { mc.metrics.mutex.RLock() defer mc.metrics.mutex.RUnlock() mc.logger.Info(fmt.Sprintf("📊 MEV COORDINATOR METRICS:")) mc.logger.Info(fmt.Sprintf(" Transactions Processed: %d", mc.metrics.TransactionsProcessed)) mc.logger.Info(fmt.Sprintf(" Events Parsed: %d", mc.metrics.EventsParsed)) mc.logger.Info(fmt.Sprintf(" Pools Discovered: %d", mc.metrics.PoolsDiscovered)) mc.logger.Info(fmt.Sprintf(" Opportunities Found: %d", mc.metrics.OpportunitiesFound)) mc.logger.Info(fmt.Sprintf(" Avg Processing Time: %v", mc.metrics.AverageProcessingTime)) } // updateMetrics updates processing time metrics func (mc *MEVCoordinator) updateMetrics(processingTime time.Duration) { mc.metrics.mutex.Lock() defer mc.metrics.mutex.Unlock() // Calculate rolling average if mc.metrics.AverageProcessingTime == 0 { mc.metrics.AverageProcessingTime = processingTime } else { mc.metrics.AverageProcessingTime = (mc.metrics.AverageProcessingTime + processingTime) / 2 } }