package arbitrum import ( "context" "fmt" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/oracle" ) // EnhancedDEXParser provides comprehensive parsing for all major DEXs on Arbitrum type EnhancedDEXParser struct { client *rpc.Client logger *logger.Logger oracle *oracle.PriceOracle // Protocol-specific parsers protocolParsers map[Protocol]DEXParserInterface // Contract and signature registries contractRegistry *ContractRegistry signatureRegistry *SignatureRegistry // Pool discovery and caching poolCache *PoolCache // Event enrichment service enrichmentService *EventEnrichmentService tokenMetadata *TokenMetadataService // Configuration config *EnhancedParserConfig // Metrics and monitoring metrics *ParserMetrics metricsLock sync.RWMutex // Concurrency control maxWorkers int workerPool chan struct{} // Shutdown management ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // EnhancedParserConfig contains configuration for the enhanced parser type EnhancedParserConfig struct { // RPC Configuration RPCEndpoint string `json:"rpc_endpoint"` RPCTimeout time.Duration `json:"rpc_timeout"` MaxRetries int `json:"max_retries"` // Parsing Configuration EnabledProtocols []Protocol `json:"enabled_protocols"` MinLiquidityUSD float64 `json:"min_liquidity_usd"` MaxSlippageBps uint64 `json:"max_slippage_bps"` EnablePoolDiscovery bool `json:"enable_pool_discovery"` EnableEventEnrichment bool `json:"enable_event_enrichment"` // Performance Configuration MaxWorkers int `json:"max_workers"` CacheSize int `json:"cache_size"` CacheTTL time.Duration `json:"cache_ttl"` BatchSize int `json:"batch_size"` // Storage Configuration EnablePersistence bool `json:"enable_persistence"` DatabaseURL string `json:"database_url"` RedisURL string `json:"redis_url"` // Monitoring Configuration EnableMetrics bool `json:"enable_metrics"` MetricsInterval time.Duration `json:"metrics_interval"` EnableHealthCheck bool `json:"enable_health_check"` } // DefaultEnhancedParserConfig returns a default configuration func DefaultEnhancedParserConfig() *EnhancedParserConfig { return &EnhancedParserConfig{ RPCTimeout: 30 * time.Second, MaxRetries: 3, EnabledProtocols: []Protocol{ ProtocolUniswapV2, ProtocolUniswapV3, ProtocolSushiSwapV2, ProtocolSushiSwapV3, ProtocolCamelotV2, ProtocolCamelotV3, ProtocolTraderJoeV1, ProtocolTraderJoeV2, ProtocolCurve, ProtocolBalancerV2, ProtocolKyberClassic, ProtocolKyberElastic, ProtocolGMX, ProtocolRamses, ProtocolChronos, }, MinLiquidityUSD: 1000.0, MaxSlippageBps: 1000, // 10% EnablePoolDiscovery: true, EnableEventEnrichment: true, MaxWorkers: 10, CacheSize: 10000, CacheTTL: 1 * time.Hour, BatchSize: 100, EnableMetrics: true, MetricsInterval: 1 * time.Minute, EnableHealthCheck: true, } } // NewEnhancedDEXParser creates a new enhanced DEX parser func NewEnhancedDEXParser(config *EnhancedParserConfig, logger *logger.Logger, oracle *oracle.PriceOracle) (*EnhancedDEXParser, error) { if config == nil { config = DefaultEnhancedParserConfig() } // Create RPC client client, err := rpc.DialContext(context.Background(), config.RPCEndpoint) if err != nil { return nil, fmt.Errorf("failed to connect to RPC endpoint: %w", err) } // Create context for shutdown management ctx, cancel := context.WithCancel(context.Background()) parser := &EnhancedDEXParser{ client: client, logger: logger, oracle: oracle, protocolParsers: make(map[Protocol]DEXParserInterface), config: config, maxWorkers: config.MaxWorkers, workerPool: make(chan struct{}, config.MaxWorkers), ctx: ctx, cancel: cancel, metrics: &ParserMetrics{ ProtocolBreakdown: make(map[Protocol]uint64), EventTypeBreakdown: make(map[EventType]uint64), StartTime: time.Now(), }, } // Initialize worker pool for i := 0; i < config.MaxWorkers; i++ { parser.workerPool <- struct{}{} } // Initialize registries if err := parser.initializeRegistries(); err != nil { return nil, fmt.Errorf("failed to initialize registries: %w", err) } // Initialize protocol parsers if err := parser.initializeProtocolParsers(); err != nil { return nil, fmt.Errorf("failed to initialize protocol parsers: %w", err) } // Initialize pool cache if err := parser.initializePoolCache(); err != nil { return nil, fmt.Errorf("failed to initialize pool cache: %w", err) } // Initialize token metadata service ethClient := ethclient.NewClient(client) parser.tokenMetadata = NewTokenMetadataService(ethClient, logger) // Initialize event enrichment service parser.enrichmentService = NewEventEnrichmentService(oracle, parser.tokenMetadata, logger) // Start background services parser.startBackgroundServices() logger.Info(fmt.Sprintf("Enhanced DEX parser initialized with %d protocols, %d workers", len(parser.protocolParsers), config.MaxWorkers)) return parser, nil } // ParseTransaction comprehensively parses a transaction for DEX interactions func (p *EnhancedDEXParser) ParseTransaction(tx *types.Transaction, receipt *types.Receipt) (*ParseResult, error) { startTime := time.Now() result := &ParseResult{ Events: []*EnhancedDEXEvent{}, NewPools: []*PoolInfo{}, ParsedContracts: []*ContractInfo{}, IsSuccessful: true, } // Parse transaction data (function calls) if txEvents, err := p.parseTransactionData(tx); err != nil { result.Errors = append(result.Errors, fmt.Errorf("transaction data parsing failed: %w", err)) } else { result.Events = append(result.Events, txEvents...) } // Parse transaction logs (events) if receipt != nil { if logEvents, newPools, err := p.parseTransactionLogs(tx, receipt); err != nil { result.Errors = append(result.Errors, fmt.Errorf("transaction logs parsing failed: %w", err)) } else { result.Events = append(result.Events, logEvents...) result.NewPools = append(result.NewPools, newPools...) } } // Enrich event data if p.config.EnableEventEnrichment { for _, event := range result.Events { if err := p.enrichEventData(event); err != nil { p.logger.Debug(fmt.Sprintf("Failed to enrich event data: %v", err)) } } } // Update metrics p.updateMetrics(result, time.Since(startTime)) result.ProcessingTimeMs = uint64(time.Since(startTime).Milliseconds()) result.IsSuccessful = len(result.Errors) == 0 return result, nil } // ParseBlock comprehensively parses all transactions in a block func (p *EnhancedDEXParser) ParseBlock(blockNumber uint64) (*ParseResult, error) { // Get block with full transaction data block, err := p.getBlockByNumber(blockNumber) if err != nil { return nil, fmt.Errorf("failed to get block %d: %w", blockNumber, err) } result := &ParseResult{ Events: []*EnhancedDEXEvent{}, NewPools: []*PoolInfo{}, ParsedContracts: []*ContractInfo{}, IsSuccessful: true, } // Get transactions transactions := block.Transactions() // Parse transactions in parallel results := make(chan *ParseResult, len(transactions)) errors := make(chan error, len(transactions)) for _, tx := range transactions { go func(transaction *types.Transaction) { // Get receipt receipt, err := p.getTransactionReceipt(transaction.Hash()) if err != nil { errors <- fmt.Errorf("failed to get receipt for tx %s: %w", transaction.Hash().Hex(), err) return } // Parse transaction txResult, err := p.ParseTransaction(transaction, receipt) if err != nil { errors <- fmt.Errorf("failed to parse tx %s: %w", transaction.Hash().Hex(), err) return } results <- txResult }(tx) } // Collect results for i := 0; i < len(transactions); i++ { select { case txResult := <-results: result.Events = append(result.Events, txResult.Events...) result.NewPools = append(result.NewPools, txResult.NewPools...) result.ParsedContracts = append(result.ParsedContracts, txResult.ParsedContracts...) result.TotalGasUsed += txResult.TotalGasUsed result.Errors = append(result.Errors, txResult.Errors...) case err := <-errors: result.Errors = append(result.Errors, err) } } result.IsSuccessful = len(result.Errors) == 0 p.logger.Info(fmt.Sprintf("Parsed block %d: %d events, %d new pools, %d errors", blockNumber, len(result.Events), len(result.NewPools), len(result.Errors))) return result, nil } // parseTransactionData parses transaction input data func (p *EnhancedDEXParser) parseTransactionData(tx *types.Transaction) ([]*EnhancedDEXEvent, error) { if tx.To() == nil || len(tx.Data()) < 4 { return nil, nil } // Check if contract is known contractInfo := p.contractRegistry.GetContract(*tx.To()) if contractInfo == nil { return nil, nil } // Get protocol parser parser, exists := p.protocolParsers[contractInfo.Protocol] if !exists { return nil, fmt.Errorf("no parser for protocol %s", contractInfo.Protocol) } // Parse transaction data event, err := parser.ParseTransactionData(tx) if err != nil { return nil, err } if event != nil { return []*EnhancedDEXEvent{event}, nil } return nil, nil } // parseTransactionLogs parses transaction logs for events func (p *EnhancedDEXParser) parseTransactionLogs(tx *types.Transaction, receipt *types.Receipt) ([]*EnhancedDEXEvent, []*PoolInfo, error) { events := []*EnhancedDEXEvent{} newPools := []*PoolInfo{} for _, log := range receipt.Logs { // Parse log with appropriate protocol parser if parsedEvents, discoveredPools, err := p.parseLog(log, tx, receipt); err != nil { p.logger.Debug(fmt.Sprintf("Failed to parse log: %v", err)) } else { events = append(events, parsedEvents...) newPools = append(newPools, discoveredPools...) } } return events, newPools, nil } // parseLog parses a single log entry func (p *EnhancedDEXParser) parseLog(log *types.Log, tx *types.Transaction, receipt *types.Receipt) ([]*EnhancedDEXEvent, []*PoolInfo, error) { events := []*EnhancedDEXEvent{} newPools := []*PoolInfo{} // Check if this is a known event signature eventSig := p.signatureRegistry.GetEventSignature(log.Topics[0]) if eventSig == nil { return nil, nil, nil } // Get protocol parser parser, exists := p.protocolParsers[eventSig.Protocol] if !exists { return nil, nil, fmt.Errorf("no parser for protocol %s", eventSig.Protocol) } // Parse log event, err := parser.ParseLog(log) if err != nil { return nil, nil, err } if event != nil { // Set transaction-level data event.TxHash = tx.Hash() event.From = getTransactionSender(tx) if tx.To() != nil { event.To = *tx.To() } event.GasUsed = receipt.GasUsed event.GasPrice = tx.GasPrice() event.BlockNumber = receipt.BlockNumber.Uint64() event.BlockHash = receipt.BlockHash event.TxIndex = uint64(receipt.TransactionIndex) event.LogIndex = uint64(log.Index) events = append(events, event) // Check for pool creation events if event.EventType == EventTypePoolCreated && p.config.EnablePoolDiscovery { if poolInfo, err := p.extractPoolInfo(event); err == nil { newPools = append(newPools, poolInfo) p.poolCache.AddPool(poolInfo) } } } return events, newPools, nil } // enrichEventData adds additional metadata and calculations to events func (p *EnhancedDEXParser) enrichEventData(event *EnhancedDEXEvent) error { // Use the EventEnrichmentService for comprehensive enrichment if p.enrichmentService != nil { ctx, cancel := context.WithTimeout(p.ctx, 30*time.Second) defer cancel() if err := p.enrichmentService.EnrichEvent(ctx, event); err != nil { p.logger.Debug(fmt.Sprintf("Failed to enrich event with service: %v", err)) // Fall back to legacy enrichment methods return p.legacyEnrichmentFallback(event) } return nil } // Legacy fallback if service is not available return p.legacyEnrichmentFallback(event) } // legacyEnrichmentFallback provides fallback enrichment using the old methods func (p *EnhancedDEXParser) legacyEnrichmentFallback(event *EnhancedDEXEvent) error { // Add token symbols and decimals if err := p.enrichTokenData(event); err != nil { p.logger.Debug(fmt.Sprintf("Failed to enrich token data: %v", err)) } // Calculate USD values using oracle if p.oracle != nil { if err := p.enrichPriceData(event); err != nil { p.logger.Debug(fmt.Sprintf("Failed to enrich price data: %v", err)) } } // Calculate price impact and slippage if err := p.enrichSlippageData(event); err != nil { p.logger.Debug(fmt.Sprintf("Failed to enrich slippage data: %v", err)) } // Detect MEV patterns if err := p.enrichMEVData(event); err != nil { p.logger.Debug(fmt.Sprintf("Failed to enrich MEV data: %v", err)) } return nil } // Helper methods func (p *EnhancedDEXParser) getBlockByNumber(blockNumber uint64) (*types.Block, error) { var block *types.Block ctx, cancel := context.WithTimeout(p.ctx, p.config.RPCTimeout) defer cancel() err := p.client.CallContext(ctx, &block, "eth_getBlockByNumber", fmt.Sprintf("0x%x", blockNumber), true) return block, err } func (p *EnhancedDEXParser) getTransactionReceipt(txHash common.Hash) (*types.Receipt, error) { var receipt *types.Receipt ctx, cancel := context.WithTimeout(p.ctx, p.config.RPCTimeout) defer cancel() err := p.client.CallContext(ctx, &receipt, "eth_getTransactionReceipt", txHash) return receipt, err } func getTransactionSender(tx *types.Transaction) common.Address { // This would typically require signature recovery // For now, return zero address as placeholder return common.Address{} } func (p *EnhancedDEXParser) extractPoolInfo(event *EnhancedDEXEvent) (*PoolInfo, error) { // Extract pool information from pool creation events // Implementation would depend on the specific event structure return &PoolInfo{ Address: event.PoolAddress, Protocol: event.Protocol, PoolType: event.PoolType, Token0: event.TokenIn, Token1: event.TokenOut, Fee: event.PoolFee, CreatedBlock: event.BlockNumber, CreatedTx: event.TxHash, IsActive: true, LastUpdated: time.Now(), }, nil } func (p *EnhancedDEXParser) enrichTokenData(event *EnhancedDEXEvent) error { // Add token symbols and decimals // This would typically query token contracts or use a token registry return nil } func (p *EnhancedDEXParser) enrichPriceData(event *EnhancedDEXEvent) error { // Calculate USD values using price oracle return nil } func (p *EnhancedDEXParser) enrichSlippageData(event *EnhancedDEXEvent) error { // Calculate price impact and slippage return nil } func (p *EnhancedDEXParser) enrichMEVData(event *EnhancedDEXEvent) error { // Detect MEV patterns (arbitrage, sandwich, liquidation) return nil } func (p *EnhancedDEXParser) updateMetrics(result *ParseResult, processingTime time.Duration) { p.metricsLock.Lock() defer p.metricsLock.Unlock() p.metrics.TotalTransactionsParsed++ p.metrics.TotalEventsParsed += uint64(len(result.Events)) p.metrics.TotalPoolsDiscovered += uint64(len(result.NewPools)) if len(result.Errors) > 0 { p.metrics.ParseErrorCount++ } // Update average processing time total := p.metrics.AvgProcessingTimeMs * float64(p.metrics.TotalTransactionsParsed-1) p.metrics.AvgProcessingTimeMs = (total + float64(processingTime.Milliseconds())) / float64(p.metrics.TotalTransactionsParsed) // Update protocol and event type breakdowns for _, event := range result.Events { p.metrics.ProtocolBreakdown[event.Protocol]++ p.metrics.EventTypeBreakdown[event.EventType]++ } p.metrics.LastUpdated = time.Now() } // Lifecycle methods func (p *EnhancedDEXParser) initializeRegistries() error { // Initialize contract and signature registries p.contractRegistry = NewContractRegistry() p.signatureRegistry = NewSignatureRegistry() // Load Arbitrum-specific contracts and signatures return p.loadArbitrumData() } func (p *EnhancedDEXParser) initializeProtocolParsers() error { // Initialize protocol-specific parsers for _, protocol := range p.config.EnabledProtocols { parser, err := p.createProtocolParser(protocol) if err != nil { p.logger.Warn(fmt.Sprintf("Failed to create parser for %s: %v", protocol, err)) continue } p.protocolParsers[protocol] = parser } return nil } func (p *EnhancedDEXParser) initializePoolCache() error { p.poolCache = NewPoolCache(p.config.CacheSize, p.config.CacheTTL) return nil } func (p *EnhancedDEXParser) createProtocolParser(protocol Protocol) (DEXParserInterface, error) { // Factory method to create protocol-specific parsers switch protocol { case ProtocolUniswapV2: return NewUniswapV2Parser(p.client, p.logger), nil case ProtocolUniswapV3: return NewUniswapV3Parser(p.client, p.logger), nil case ProtocolSushiSwapV2: return NewSushiSwapV2Parser(p.client, p.logger), nil case ProtocolSushiSwapV3: return NewSushiSwapV3Parser(p.client, p.logger), nil case ProtocolCamelotV2: return NewCamelotV2Parser(p.client, p.logger), nil case ProtocolCamelotV3: return NewCamelotV3Parser(p.client, p.logger), nil case ProtocolTraderJoeV1: return NewTraderJoeV1Parser(p.client, p.logger), nil case ProtocolTraderJoeV2: return NewTraderJoeV2Parser(p.client, p.logger), nil case ProtocolTraderJoeLB: return NewTraderJoeLBParser(p.client, p.logger), nil case ProtocolCurve: return NewCurveParser(p.client, p.logger), nil case ProtocolBalancerV2: return NewBalancerV2Parser(p.client, p.logger), nil case ProtocolKyberClassic: return NewKyberClassicParser(p.client, p.logger), nil case ProtocolKyberElastic: return NewKyberElasticParser(p.client, p.logger), nil case ProtocolGMX: return NewGMXParser(p.client, p.logger), nil case ProtocolRamses: return NewRamsesParser(p.client, p.logger), nil case ProtocolChronos: return NewChronosParser(p.client, p.logger), nil default: return nil, fmt.Errorf("unsupported protocol: %s", protocol) } } func (p *EnhancedDEXParser) loadArbitrumData() error { // Load comprehensive Arbitrum DEX data // This would be loaded from configuration files or database return nil } func (p *EnhancedDEXParser) startBackgroundServices() { // Start metrics collection if p.config.EnableMetrics { p.wg.Add(1) go p.metricsCollector() } // Start health checker if p.config.EnableHealthCheck { p.wg.Add(1) go p.healthChecker() } } func (p *EnhancedDEXParser) metricsCollector() { defer p.wg.Done() ticker := time.NewTicker(p.config.MetricsInterval) defer ticker.Stop() for { select { case <-ticker.C: p.logMetrics() case <-p.ctx.Done(): return } } } func (p *EnhancedDEXParser) healthChecker() { defer p.wg.Done() ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: if err := p.checkHealth(); err != nil { p.logger.Error(fmt.Sprintf("Health check failed: %v", err)) } case <-p.ctx.Done(): return } } } func (p *EnhancedDEXParser) logMetrics() { p.metricsLock.RLock() defer p.metricsLock.RUnlock() p.logger.Info(fmt.Sprintf("Parser metrics: %d txs, %d events, %d pools, %.2fms avg", p.metrics.TotalTransactionsParsed, p.metrics.TotalEventsParsed, p.metrics.TotalPoolsDiscovered, p.metrics.AvgProcessingTimeMs)) } func (p *EnhancedDEXParser) checkHealth() error { // Check RPC connection ctx, cancel := context.WithTimeout(p.ctx, 5*time.Second) defer cancel() var blockNumber string return p.client.CallContext(ctx, &blockNumber, "eth_blockNumber") } // GetMetrics returns current parser metrics func (p *EnhancedDEXParser) GetMetrics() *ParserMetrics { p.metricsLock.RLock() defer p.metricsLock.RUnlock() // Create a copy to avoid race conditions metricsCopy := *p.metrics return &metricsCopy } // Close shuts down the parser and cleans up resources func (p *EnhancedDEXParser) Close() error { p.logger.Info("Shutting down enhanced DEX parser...") // Cancel context to stop background services p.cancel() // Wait for background services to complete p.wg.Wait() // Close RPC client if p.client != nil { p.client.Close() } // Close pool cache if p.poolCache != nil { p.poolCache.Close() } p.logger.Info("Enhanced DEX parser shutdown complete") return nil }