package arbitrum import ( "context" "fmt" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/arbitrum/discovery" ) // MarketDiscoveryManager is the main market discovery manager that coordinates all submodules type MarketDiscoveryManager struct { marketDiscovery *discovery.MarketDiscovery // Add other submodules as needed logger *logger.Logger config *discovery.MarketConfig client *ethclient.Client // Incremental scanning state lastScannedBlock uint64 maxBlocksPerScan uint64 scanInterval time.Duration stopChan chan struct{} isScanning bool scanMu sync.Mutex } // NewMarketDiscoveryManager creates a new market discovery manager func NewMarketDiscoveryManager(client *ethclient.Client, logger *logger.Logger, configPath string) (*MarketDiscoveryManager, error) { // Wrap client with rate limiting rateLimitedClient := NewRateLimitedClient(client, 10.0, logger) // 10 requests per second default md, err := discovery.NewMarketDiscovery(rateLimitedClient.Client, logger, configPath) if err != nil { return nil, fmt.Errorf("failed to create market discovery: %w", err) } manager := &MarketDiscoveryManager{ marketDiscovery: md, logger: logger, client: rateLimitedClient.Client, maxBlocksPerScan: 1000, // Default to 1000 blocks per scan scanInterval: 30 * time.Second, // Default to 30 seconds between scans stopChan: make(chan struct{}), } // Load the config to make it available config, err := discovery.LoadMarketConfig(configPath) if err != nil { return nil, fmt.Errorf("failed to load config: %w", err) } manager.config = config return manager, nil } // DiscoverPools discovers pools from factories within a block range func (mdm *MarketDiscoveryManager) DiscoverPools(ctx context.Context, fromBlock, toBlock uint64) (*discovery.PoolDiscoveryResult, error) { return mdm.marketDiscovery.DiscoverPools(ctx, fromBlock, toBlock) } // ScanForArbitrage scans all pools for arbitrage opportunities func (mdm *MarketDiscoveryManager) ScanForArbitrage(ctx context.Context, blockNumber uint64) (*discovery.MarketScanResult, error) { return mdm.marketDiscovery.ScanForArbitrage(ctx, blockNumber) } // BuildComprehensiveMarkets builds comprehensive markets for all exchanges and top tokens func (mdm *MarketDiscoveryManager) BuildComprehensiveMarkets() error { return mdm.marketDiscovery.BuildComprehensiveMarkets() } // StartIncrementalScanning starts incremental block scanning for new pools func (mdm *MarketDiscoveryManager) StartIncrementalScanning(ctx context.Context) error { mdm.scanMu.Lock() if mdm.isScanning { mdm.scanMu.Unlock() return fmt.Errorf("incremental scanning already running") } mdm.isScanning = true mdm.scanMu.Unlock() mdm.logger.Info("🔄 Starting incremental market discovery scanning") // Get the current block number to start scanning from currentBlock, err := mdm.client.BlockNumber(ctx) if err != nil { return fmt.Errorf("failed to get current block number: %w", err) } // Set the last scanned block to current block minus a small buffer // This prevents scanning too far back and missing recent blocks mdm.lastScannedBlock = currentBlock - 10 go mdm.runIncrementalScanning(ctx) return nil } // runIncrementalScanning runs the incremental scanning loop func (mdm *MarketDiscoveryManager) runIncrementalScanning(ctx context.Context) { ticker := time.NewTicker(mdm.scanInterval) defer ticker.Stop() for { select { case <-ctx.Done(): mdm.scanMu.Lock() mdm.isScanning = false mdm.scanMu.Unlock() return case <-mdm.stopChan: mdm.scanMu.Lock() mdm.isScanning = false mdm.scanMu.Unlock() return case <-ticker.C: if err := mdm.performIncrementalScan(ctx); err != nil { mdm.logger.Error(fmt.Sprintf("Incremental scan failed: %v", err)) } } } } // performIncrementalScan performs a single incremental scan func (mdm *MarketDiscoveryManager) performIncrementalScan(ctx context.Context) error { // Get the current block number currentBlock, err := mdm.client.BlockNumber(ctx) if err != nil { return fmt.Errorf("failed to get current block number: %w", err) } // Calculate the range to scan fromBlock := mdm.lastScannedBlock + 1 toBlock := currentBlock // Limit the scan range to prevent overwhelming the RPC if toBlock-fromBlock > mdm.maxBlocksPerScan { toBlock = fromBlock + mdm.maxBlocksPerScan - 1 } // If there are no new blocks to scan, return early if fromBlock > toBlock { return nil } mdm.logger.Info(fmt.Sprintf("🔍 Performing incremental scan: blocks %d to %d", fromBlock, toBlock)) // Discover pools in the range result, err := mdm.marketDiscovery.DiscoverPools(ctx, fromBlock, toBlock) if err != nil { return fmt.Errorf("failed to discover pools: %w", err) } // Log the results if result.PoolsFound > 0 { mdm.logger.Info(fmt.Sprintf("🆕 Discovered %d new pools in blocks %d-%d", result.PoolsFound, fromBlock, toBlock)) for _, pool := range result.NewPools { // Handle empty addresses to prevent slice bounds panic poolAddrDisplay := "unknown" if len(pool.Address.Hex()) > 0 { if len(pool.Address.Hex()) > 8 { poolAddrDisplay = pool.Address.Hex()[:8] } else { poolAddrDisplay = pool.Address.Hex() } } else { // Handle completely empty address poolAddrDisplay = "unknown" } // Handle empty factory addresses to prevent slice bounds panic factoryAddrDisplay := "unknown" if len(pool.Factory.Hex()) > 0 { if len(pool.Factory.Hex()) > 8 { factoryAddrDisplay = pool.Factory.Hex()[:8] } else { factoryAddrDisplay = pool.Factory.Hex() } } else { // Handle completely empty address factoryAddrDisplay = "unknown" } mdm.logger.Info(fmt.Sprintf(" 🏦 Pool %s (factory %s, tokens %s-%s)", poolAddrDisplay, factoryAddrDisplay, pool.Token0.Hex()[:6], pool.Token1.Hex()[:6])) } } // Update the last scanned block mdm.lastScannedBlock = toBlock return nil } // StopIncrementalScanning stops the incremental scanning func (mdm *MarketDiscoveryManager) StopIncrementalScanning() error { mdm.scanMu.Lock() defer mdm.scanMu.Unlock() if !mdm.isScanning { return fmt.Errorf("incremental scanning not running") } close(mdm.stopChan) mdm.isScanning = false return nil } // SetMaxBlocksPerScan sets the maximum number of blocks to scan in a single increment func (mdm *MarketDiscoveryManager) SetMaxBlocksPerScan(maxBlocks uint64) { mdm.scanMu.Lock() defer mdm.scanMu.Unlock() mdm.maxBlocksPerScan = maxBlocks } // SetScanInterval sets the interval between incremental scans func (mdm *MarketDiscoveryManager) SetScanInterval(interval time.Duration) { mdm.scanMu.Lock() defer mdm.scanMu.Unlock() mdm.scanInterval = interval } // GetLastScannedBlock returns the last block that was scanned func (mdm *MarketDiscoveryManager) GetLastScannedBlock() uint64 { mdm.scanMu.Lock() defer mdm.scanMu.Unlock() return mdm.lastScannedBlock } // GetStatistics returns market discovery statistics func (mdm *MarketDiscoveryManager) GetStatistics() map[string]interface{} { return mdm.marketDiscovery.GetStatistics() } // GetPoolCache returns the pool cache for external use func (mdm *MarketDiscoveryManager) GetPoolCache() interface{} { // This is a simplified implementation - in practice, you'd want to return // a proper pool cache or create one from the current pools return &PoolCache{ pools: make(map[common.Address]*CachedPoolInfo), cacheLock: sync.RWMutex{}, maxSize: 10000, ttl: time.Hour, } } // StartFactoryEventMonitoring begins real-time monitoring of factory events for new pool discovery func (mdm *MarketDiscoveryManager) StartFactoryEventMonitoring(ctx context.Context, client *ethclient.Client) error { mdm.logger.Info("🏭 Starting real-time factory event monitoring") // Create event subscriptions for each factory // This would require access to the factory information in the market discovery // For now, we'll just log that this functionality exists go mdm.monitorFactoryEvents(ctx, client) return nil } // monitorFactoryEvents continuously monitors factory events for new pool creation func (mdm *MarketDiscoveryManager) monitorFactoryEvents(ctx context.Context, client *ethclient.Client) { // This would be implemented to monitor factory events // For now, it's a placeholder ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: // In a real implementation, this would check for new pools // For now, just log that monitoring is running mdm.logger.Debug("Factory event monitoring tick") } } } // Close closes all log files and resources func (mdm *MarketDiscoveryManager) Close() error { return nil // No resources to close in this simplified version }