From e79e0d960d640061ee6be58ad0a6c40e25427542 Mon Sep 17 00:00:00 2001 From: Administrator Date: Mon, 10 Nov 2025 10:03:28 +0100 Subject: [PATCH] feat: add pool cache adapter and strict event validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Created PoolCacheAdapter to wrap PoolDiscovery for EventParser - Updated ArbitrumMonitor to pass pool cache to parser via NewEventParserFull - Added strict validation to reject events with zero addresses - Added strict validation to reject events with zero amounts - Parser now uses discovered pools from cache for token enrichment This ensures zero addresses and zero amounts NEVER reach the scanner. Events with invalid data are logged and rejected at the monitor level. Changes: - pkg/pools/pool_cache_adapter.go: New adapter implementing PoolCache interface - pkg/monitor/concurrent.go: Pool cache integration and validation logic 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- pkg/monitor/concurrent.go | 84 +++++++++++++++++++++++------- pkg/pools/pool_cache_adapter.go | 92 +++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 18 deletions(-) create mode 100644 pkg/pools/pool_cache_adapter.go diff --git a/pkg/monitor/concurrent.go b/pkg/monitor/concurrent.go index d7cd1e3..ad17617 100644 --- a/pkg/monitor/concurrent.go +++ b/pkg/monitor/concurrent.go @@ -144,21 +144,7 @@ func NewArbitrumMonitor( return nil, fmt.Errorf("L2 parser is null, cannot create enhanced event parser") } - logger.Info("✅ L2 PARSER AVAILABLE - Creating enhanced event parser...") - enhancedEventParser := events.NewEventParserWithTokenExtractor(logger, l2Parser) - - if enhancedEventParser == nil { - logger.Error("❌ ENHANCED EVENT PARSER CREATION FAILED") - return nil, fmt.Errorf("enhanced event parser creation failed") - } - - logger.Info("✅ ENHANCED EVENT PARSER CREATED SUCCESSFULLY") - logger.Info("🔄 INJECTING ENHANCED PARSER INTO PIPELINE...") - - // Inject enhanced parser into pipeline to avoid import cycle - pipeline.SetEnhancedEventParser(enhancedEventParser) - - logger.Info("🎯 ENHANCED PARSER INJECTION COMPLETED") + logger.Info("✅ L2 PARSER AVAILABLE - Creating pool discovery for cache...") // Create raw RPC client for pool discovery poolRPCClient, err := rpc.Dial(arbCfg.RPCEndpoint) @@ -166,7 +152,29 @@ func NewArbitrumMonitor( return nil, fmt.Errorf("failed to create RPC client for pool discovery: %w", err) } - _ = pools.NewPoolDiscovery(poolRPCClient, logger) // Will be used in future enhancements + // Create pool discovery for caching discovered pools + poolDiscovery := pools.NewPoolDiscovery(poolRPCClient, logger) + + // Create pool cache adapter to provide PoolCache interface + poolCacheAdapter := pools.NewPoolCacheAdapter(poolDiscovery) + + logger.Info("✅ POOL CACHE ADAPTER CREATED - Creating enhanced event parser...") + + // Create enhanced event parser with pool cache support + enhancedEventParser := events.NewEventParserFull(logger, l2Parser, poolCacheAdapter) + + if enhancedEventParser == nil { + logger.Error("❌ ENHANCED EVENT PARSER CREATION FAILED") + return nil, fmt.Errorf("enhanced event parser creation failed") + } + + logger.Info("✅ ENHANCED EVENT PARSER CREATED SUCCESSFULLY WITH POOL CACHE") + logger.Info("🔄 INJECTING ENHANCED PARSER INTO PIPELINE...") + + // Inject enhanced parser into pipeline to avoid import cycle + pipeline.SetEnhancedEventParser(enhancedEventParser) + + logger.Info("🎯 ENHANCED PARSER INJECTION COMPLETED") // Create MEV coordinator - removed to avoid import cycle // coordinator := orchestrator.NewMEVCoordinator( @@ -830,10 +838,50 @@ func (m *ArbitrumMonitor) processTransactionReceipt(ctx context.Context, receipt m.logger.Info(fmt.Sprintf("Successfully parsed %d events from receipt %s", len(parsedEvents), receipt.TxHash.Hex())) - // Submit each parsed event directly to the scanner + // Submit each parsed event directly to the scanner with strict validation for _, event := range parsedEvents { if event != nil { - m.logger.Debug(fmt.Sprintf("Submitting event to scanner: Type=%s, Pool=%s, Token0=%s, Token1=%s", + // CRITICAL: Validate event data quality before submission + // Zero addresses and zero amounts are NEVER acceptable + isValid := true + validationErrors := []string{} + + // Check for zero addresses + zeroAddr := common.Address{} + if event.Token0 == zeroAddr { + validationErrors = append(validationErrors, "Token0 is zero address") + isValid = false + } + if event.Token1 == zeroAddr { + validationErrors = append(validationErrors, "Token1 is zero address") + isValid = false + } + if event.PoolAddress == zeroAddr { + validationErrors = append(validationErrors, "PoolAddress is zero address") + isValid = false + } + + // Check for zero amounts (for swap events) + if event.Type == events.EventTypeSwap { + if event.Amount0In != nil && event.Amount0In.Sign() == 0 && + event.Amount0Out != nil && event.Amount0Out.Sign() == 0 { + validationErrors = append(validationErrors, "Amount0In and Amount0Out are both zero") + isValid = false + } + if event.Amount1In != nil && event.Amount1In.Sign() == 0 && + event.Amount1Out != nil && event.Amount1Out.Sign() == 0 { + validationErrors = append(validationErrors, "Amount1In and Amount1Out are both zero") + isValid = false + } + } + + if !isValid { + m.logger.Warn(fmt.Sprintf("❌ REJECTING INVALID EVENT - Type=%s, Pool=%s, TxHash=%s, Errors: %v", + event.Type.String(), event.PoolAddress.Hex(), event.TxHash.Hex(), validationErrors)) + continue + } + + m.logger.Debug(fmt.Sprintf("✅ Valid event - Submitting to scanner: Type=%s, Pool=%s, Token0=%s, Token1=%s", event.Type.String(), event.PoolAddress.Hex(), event.Token0.Hex(), event.Token1.Hex())) // Submit to scanner for arbitrage analysis diff --git a/pkg/pools/pool_cache_adapter.go b/pkg/pools/pool_cache_adapter.go new file mode 100644 index 0000000..1a202b0 --- /dev/null +++ b/pkg/pools/pool_cache_adapter.go @@ -0,0 +1,92 @@ +package pools + +import ( + "github.com/ethereum/go-ethereum/common" + arbcommon "github.com/fraktal/mev-beta/pkg/arbitrum/common" +) + +// PoolCacheAdapter adapts PoolDiscovery to implement interfaces.PoolCache +// This allows the EventParser to use PoolDiscovery as its pool cache +type PoolCacheAdapter struct { + discovery *PoolDiscovery +} + +// NewPoolCacheAdapter creates a new adapter wrapping a PoolDiscovery +func NewPoolCacheAdapter(discovery *PoolDiscovery) *PoolCacheAdapter { + return &PoolCacheAdapter{ + discovery: discovery, + } +} + +// GetPool retrieves pool information from cache +func (a *PoolCacheAdapter) GetPool(address common.Address) *arbcommon.PoolInfo { + if a.discovery == nil { + return nil + } + + // Get pool from discovery + pool, exists := a.discovery.GetPool(address.Hex()) + if !exists || pool == nil { + return nil + } + + // Convert Pool to PoolInfo + return &arbcommon.PoolInfo{ + Address: common.HexToAddress(pool.Address), + Protocol: parseProtocol(pool.Protocol), + PoolType: parsePoolType(pool.Protocol), + FactoryAddress: common.HexToAddress(pool.Factory), + Token0: common.HexToAddress(pool.Token0), + Token1: common.HexToAddress(pool.Token1), + Fee: pool.Fee, + TotalLiquidity: pool.Liquidity, + } +} + +// GetPoolsByTokenPair retrieves pools for a specific token pair +func (a *PoolCacheAdapter) GetPoolsByTokenPair(token0, token1 common.Address) []*arbcommon.PoolInfo { + if a.discovery == nil { + return nil + } + + // PoolDiscovery doesn't have a direct method for this yet + // We'll return nil for now and implement this later if needed + // This is acceptable as the parser only uses GetPool currently + return nil +} + +// parseProtocol converts protocol string to Protocol enum +func parseProtocol(protocol string) arbcommon.Protocol { + switch protocol { + case "uniswap-v2": + return arbcommon.ProtocolUniswapV2 + case "uniswap-v3": + return arbcommon.ProtocolUniswapV3 + case "sushiswap": + return arbcommon.ProtocolSushiSwap + case "camelot": + return arbcommon.ProtocolCamelot + case "curve": + return arbcommon.ProtocolCurve + case "balancer": + return arbcommon.ProtocolBalancer + default: + return arbcommon.ProtocolUnknown + } +} + +// parsePoolType converts protocol string to PoolType enum +func parsePoolType(protocol string) arbcommon.PoolType { + switch protocol { + case "uniswap-v2", "sushiswap", "camelot": + return arbcommon.PoolTypeV2 + case "uniswap-v3": + return arbcommon.PoolTypeV3 + case "curve": + return arbcommon.PoolTypeStableSwap + case "balancer": + return arbcommon.PoolTypeWeighted + default: + return arbcommon.PoolTypeUnknown + } +}