diff --git a/pkg/monitor/concurrent.go b/pkg/monitor/concurrent.go index d7cd1e3..ad17617 100644 --- a/pkg/monitor/concurrent.go +++ b/pkg/monitor/concurrent.go @@ -144,21 +144,7 @@ func NewArbitrumMonitor( return nil, fmt.Errorf("L2 parser is null, cannot create enhanced event parser") } - logger.Info("✅ L2 PARSER AVAILABLE - Creating enhanced event parser...") - enhancedEventParser := events.NewEventParserWithTokenExtractor(logger, l2Parser) - - if enhancedEventParser == nil { - logger.Error("❌ ENHANCED EVENT PARSER CREATION FAILED") - return nil, fmt.Errorf("enhanced event parser creation failed") - } - - logger.Info("✅ ENHANCED EVENT PARSER CREATED SUCCESSFULLY") - logger.Info("🔄 INJECTING ENHANCED PARSER INTO PIPELINE...") - - // Inject enhanced parser into pipeline to avoid import cycle - pipeline.SetEnhancedEventParser(enhancedEventParser) - - logger.Info("🎯 ENHANCED PARSER INJECTION COMPLETED") + logger.Info("✅ L2 PARSER AVAILABLE - Creating pool discovery for cache...") // Create raw RPC client for pool discovery poolRPCClient, err := rpc.Dial(arbCfg.RPCEndpoint) @@ -166,7 +152,29 @@ func NewArbitrumMonitor( return nil, fmt.Errorf("failed to create RPC client for pool discovery: %w", err) } - _ = pools.NewPoolDiscovery(poolRPCClient, logger) // Will be used in future enhancements + // Create pool discovery for caching discovered pools + poolDiscovery := pools.NewPoolDiscovery(poolRPCClient, logger) + + // Create pool cache adapter to provide PoolCache interface + poolCacheAdapter := pools.NewPoolCacheAdapter(poolDiscovery) + + logger.Info("✅ POOL CACHE ADAPTER CREATED - Creating enhanced event parser...") + + // Create enhanced event parser with pool cache support + enhancedEventParser := events.NewEventParserFull(logger, l2Parser, poolCacheAdapter) + + if enhancedEventParser == nil { + logger.Error("❌ ENHANCED EVENT PARSER CREATION FAILED") + return nil, fmt.Errorf("enhanced event parser creation failed") + } + + logger.Info("✅ ENHANCED EVENT PARSER CREATED SUCCESSFULLY WITH POOL CACHE") + logger.Info("🔄 INJECTING ENHANCED PARSER INTO PIPELINE...") + + // Inject enhanced parser into pipeline to avoid import cycle + pipeline.SetEnhancedEventParser(enhancedEventParser) + + logger.Info("🎯 ENHANCED PARSER INJECTION COMPLETED") // Create MEV coordinator - removed to avoid import cycle // coordinator := orchestrator.NewMEVCoordinator( @@ -830,10 +838,50 @@ func (m *ArbitrumMonitor) processTransactionReceipt(ctx context.Context, receipt m.logger.Info(fmt.Sprintf("Successfully parsed %d events from receipt %s", len(parsedEvents), receipt.TxHash.Hex())) - // Submit each parsed event directly to the scanner + // Submit each parsed event directly to the scanner with strict validation for _, event := range parsedEvents { if event != nil { - m.logger.Debug(fmt.Sprintf("Submitting event to scanner: Type=%s, Pool=%s, Token0=%s, Token1=%s", + // CRITICAL: Validate event data quality before submission + // Zero addresses and zero amounts are NEVER acceptable + isValid := true + validationErrors := []string{} + + // Check for zero addresses + zeroAddr := common.Address{} + if event.Token0 == zeroAddr { + validationErrors = append(validationErrors, "Token0 is zero address") + isValid = false + } + if event.Token1 == zeroAddr { + validationErrors = append(validationErrors, "Token1 is zero address") + isValid = false + } + if event.PoolAddress == zeroAddr { + validationErrors = append(validationErrors, "PoolAddress is zero address") + isValid = false + } + + // Check for zero amounts (for swap events) + if event.Type == events.EventTypeSwap { + if event.Amount0In != nil && event.Amount0In.Sign() == 0 && + event.Amount0Out != nil && event.Amount0Out.Sign() == 0 { + validationErrors = append(validationErrors, "Amount0In and Amount0Out are both zero") + isValid = false + } + if event.Amount1In != nil && event.Amount1In.Sign() == 0 && + event.Amount1Out != nil && event.Amount1Out.Sign() == 0 { + validationErrors = append(validationErrors, "Amount1In and Amount1Out are both zero") + isValid = false + } + } + + if !isValid { + m.logger.Warn(fmt.Sprintf("❌ REJECTING INVALID EVENT - Type=%s, Pool=%s, TxHash=%s, Errors: %v", + event.Type.String(), event.PoolAddress.Hex(), event.TxHash.Hex(), validationErrors)) + continue + } + + m.logger.Debug(fmt.Sprintf("✅ Valid event - Submitting to scanner: Type=%s, Pool=%s, Token0=%s, Token1=%s", event.Type.String(), event.PoolAddress.Hex(), event.Token0.Hex(), event.Token1.Hex())) // Submit to scanner for arbitrage analysis diff --git a/pkg/pools/pool_cache_adapter.go b/pkg/pools/pool_cache_adapter.go new file mode 100644 index 0000000..1a202b0 --- /dev/null +++ b/pkg/pools/pool_cache_adapter.go @@ -0,0 +1,92 @@ +package pools + +import ( + "github.com/ethereum/go-ethereum/common" + arbcommon "github.com/fraktal/mev-beta/pkg/arbitrum/common" +) + +// PoolCacheAdapter adapts PoolDiscovery to implement interfaces.PoolCache +// This allows the EventParser to use PoolDiscovery as its pool cache +type PoolCacheAdapter struct { + discovery *PoolDiscovery +} + +// NewPoolCacheAdapter creates a new adapter wrapping a PoolDiscovery +func NewPoolCacheAdapter(discovery *PoolDiscovery) *PoolCacheAdapter { + return &PoolCacheAdapter{ + discovery: discovery, + } +} + +// GetPool retrieves pool information from cache +func (a *PoolCacheAdapter) GetPool(address common.Address) *arbcommon.PoolInfo { + if a.discovery == nil { + return nil + } + + // Get pool from discovery + pool, exists := a.discovery.GetPool(address.Hex()) + if !exists || pool == nil { + return nil + } + + // Convert Pool to PoolInfo + return &arbcommon.PoolInfo{ + Address: common.HexToAddress(pool.Address), + Protocol: parseProtocol(pool.Protocol), + PoolType: parsePoolType(pool.Protocol), + FactoryAddress: common.HexToAddress(pool.Factory), + Token0: common.HexToAddress(pool.Token0), + Token1: common.HexToAddress(pool.Token1), + Fee: pool.Fee, + TotalLiquidity: pool.Liquidity, + } +} + +// GetPoolsByTokenPair retrieves pools for a specific token pair +func (a *PoolCacheAdapter) GetPoolsByTokenPair(token0, token1 common.Address) []*arbcommon.PoolInfo { + if a.discovery == nil { + return nil + } + + // PoolDiscovery doesn't have a direct method for this yet + // We'll return nil for now and implement this later if needed + // This is acceptable as the parser only uses GetPool currently + return nil +} + +// parseProtocol converts protocol string to Protocol enum +func parseProtocol(protocol string) arbcommon.Protocol { + switch protocol { + case "uniswap-v2": + return arbcommon.ProtocolUniswapV2 + case "uniswap-v3": + return arbcommon.ProtocolUniswapV3 + case "sushiswap": + return arbcommon.ProtocolSushiSwap + case "camelot": + return arbcommon.ProtocolCamelot + case "curve": + return arbcommon.ProtocolCurve + case "balancer": + return arbcommon.ProtocolBalancer + default: + return arbcommon.ProtocolUnknown + } +} + +// parsePoolType converts protocol string to PoolType enum +func parsePoolType(protocol string) arbcommon.PoolType { + switch protocol { + case "uniswap-v2", "sushiswap", "camelot": + return arbcommon.PoolTypeV2 + case "uniswap-v3": + return arbcommon.PoolTypeV3 + case "curve": + return arbcommon.PoolTypeStableSwap + case "balancer": + return arbcommon.PoolTypeWeighted + default: + return arbcommon.PoolTypeUnknown + } +}