feat: add pool cache adapter and strict event validation

- Created PoolCacheAdapter to wrap PoolDiscovery for EventParser
- Updated ArbitrumMonitor to pass pool cache to parser via NewEventParserFull
- Added strict validation to reject events with zero addresses
- Added strict validation to reject events with zero amounts
- Parser now uses discovered pools from cache for token enrichment

This ensures zero addresses and zero amounts NEVER reach the scanner.
Events with invalid data are logged and rejected at the monitor level.

Changes:
- pkg/pools/pool_cache_adapter.go: New adapter implementing PoolCache interface
- pkg/monitor/concurrent.go: Pool cache integration and validation logic

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Administrator
2025-11-10 10:03:28 +01:00
parent e02ded0a6a
commit e79e0d960d
2 changed files with 158 additions and 18 deletions

View File

@@ -144,21 +144,7 @@ func NewArbitrumMonitor(
return nil, fmt.Errorf("L2 parser is null, cannot create enhanced event parser")
}
logger.Info("✅ L2 PARSER AVAILABLE - Creating enhanced event parser...")
enhancedEventParser := events.NewEventParserWithTokenExtractor(logger, l2Parser)
if enhancedEventParser == nil {
logger.Error("❌ ENHANCED EVENT PARSER CREATION FAILED")
return nil, fmt.Errorf("enhanced event parser creation failed")
}
logger.Info("✅ ENHANCED EVENT PARSER CREATED SUCCESSFULLY")
logger.Info("🔄 INJECTING ENHANCED PARSER INTO PIPELINE...")
// Inject enhanced parser into pipeline to avoid import cycle
pipeline.SetEnhancedEventParser(enhancedEventParser)
logger.Info("🎯 ENHANCED PARSER INJECTION COMPLETED")
logger.Info("✅ L2 PARSER AVAILABLE - Creating pool discovery for cache...")
// Create raw RPC client for pool discovery
poolRPCClient, err := rpc.Dial(arbCfg.RPCEndpoint)
@@ -166,7 +152,29 @@ func NewArbitrumMonitor(
return nil, fmt.Errorf("failed to create RPC client for pool discovery: %w", err)
}
_ = pools.NewPoolDiscovery(poolRPCClient, logger) // Will be used in future enhancements
// Create pool discovery for caching discovered pools
poolDiscovery := pools.NewPoolDiscovery(poolRPCClient, logger)
// Create pool cache adapter to provide PoolCache interface
poolCacheAdapter := pools.NewPoolCacheAdapter(poolDiscovery)
logger.Info("✅ POOL CACHE ADAPTER CREATED - Creating enhanced event parser...")
// Create enhanced event parser with pool cache support
enhancedEventParser := events.NewEventParserFull(logger, l2Parser, poolCacheAdapter)
if enhancedEventParser == nil {
logger.Error("❌ ENHANCED EVENT PARSER CREATION FAILED")
return nil, fmt.Errorf("enhanced event parser creation failed")
}
logger.Info("✅ ENHANCED EVENT PARSER CREATED SUCCESSFULLY WITH POOL CACHE")
logger.Info("🔄 INJECTING ENHANCED PARSER INTO PIPELINE...")
// Inject enhanced parser into pipeline to avoid import cycle
pipeline.SetEnhancedEventParser(enhancedEventParser)
logger.Info("🎯 ENHANCED PARSER INJECTION COMPLETED")
// Create MEV coordinator - removed to avoid import cycle
// coordinator := orchestrator.NewMEVCoordinator(
@@ -830,10 +838,50 @@ func (m *ArbitrumMonitor) processTransactionReceipt(ctx context.Context, receipt
m.logger.Info(fmt.Sprintf("Successfully parsed %d events from receipt %s", len(parsedEvents), receipt.TxHash.Hex()))
// Submit each parsed event directly to the scanner
// Submit each parsed event directly to the scanner with strict validation
for _, event := range parsedEvents {
if event != nil {
m.logger.Debug(fmt.Sprintf("Submitting event to scanner: Type=%s, Pool=%s, Token0=%s, Token1=%s",
// CRITICAL: Validate event data quality before submission
// Zero addresses and zero amounts are NEVER acceptable
isValid := true
validationErrors := []string{}
// Check for zero addresses
zeroAddr := common.Address{}
if event.Token0 == zeroAddr {
validationErrors = append(validationErrors, "Token0 is zero address")
isValid = false
}
if event.Token1 == zeroAddr {
validationErrors = append(validationErrors, "Token1 is zero address")
isValid = false
}
if event.PoolAddress == zeroAddr {
validationErrors = append(validationErrors, "PoolAddress is zero address")
isValid = false
}
// Check for zero amounts (for swap events)
if event.Type == events.EventTypeSwap {
if event.Amount0In != nil && event.Amount0In.Sign() == 0 &&
event.Amount0Out != nil && event.Amount0Out.Sign() == 0 {
validationErrors = append(validationErrors, "Amount0In and Amount0Out are both zero")
isValid = false
}
if event.Amount1In != nil && event.Amount1In.Sign() == 0 &&
event.Amount1Out != nil && event.Amount1Out.Sign() == 0 {
validationErrors = append(validationErrors, "Amount1In and Amount1Out are both zero")
isValid = false
}
}
if !isValid {
m.logger.Warn(fmt.Sprintf("❌ REJECTING INVALID EVENT - Type=%s, Pool=%s, TxHash=%s, Errors: %v",
event.Type.String(), event.PoolAddress.Hex(), event.TxHash.Hex(), validationErrors))
continue
}
m.logger.Debug(fmt.Sprintf("✅ Valid event - Submitting to scanner: Type=%s, Pool=%s, Token0=%s, Token1=%s",
event.Type.String(), event.PoolAddress.Hex(), event.Token0.Hex(), event.Token1.Hex()))
// Submit to scanner for arbitrage analysis

View File

@@ -0,0 +1,92 @@
package pools
import (
"github.com/ethereum/go-ethereum/common"
arbcommon "github.com/fraktal/mev-beta/pkg/arbitrum/common"
)
// PoolCacheAdapter adapts PoolDiscovery to implement interfaces.PoolCache
// This allows the EventParser to use PoolDiscovery as its pool cache
type PoolCacheAdapter struct {
discovery *PoolDiscovery
}
// NewPoolCacheAdapter creates a new adapter wrapping a PoolDiscovery
func NewPoolCacheAdapter(discovery *PoolDiscovery) *PoolCacheAdapter {
return &PoolCacheAdapter{
discovery: discovery,
}
}
// GetPool retrieves pool information from cache
func (a *PoolCacheAdapter) GetPool(address common.Address) *arbcommon.PoolInfo {
if a.discovery == nil {
return nil
}
// Get pool from discovery
pool, exists := a.discovery.GetPool(address.Hex())
if !exists || pool == nil {
return nil
}
// Convert Pool to PoolInfo
return &arbcommon.PoolInfo{
Address: common.HexToAddress(pool.Address),
Protocol: parseProtocol(pool.Protocol),
PoolType: parsePoolType(pool.Protocol),
FactoryAddress: common.HexToAddress(pool.Factory),
Token0: common.HexToAddress(pool.Token0),
Token1: common.HexToAddress(pool.Token1),
Fee: pool.Fee,
TotalLiquidity: pool.Liquidity,
}
}
// GetPoolsByTokenPair retrieves pools for a specific token pair
func (a *PoolCacheAdapter) GetPoolsByTokenPair(token0, token1 common.Address) []*arbcommon.PoolInfo {
if a.discovery == nil {
return nil
}
// PoolDiscovery doesn't have a direct method for this yet
// We'll return nil for now and implement this later if needed
// This is acceptable as the parser only uses GetPool currently
return nil
}
// parseProtocol converts protocol string to Protocol enum
func parseProtocol(protocol string) arbcommon.Protocol {
switch protocol {
case "uniswap-v2":
return arbcommon.ProtocolUniswapV2
case "uniswap-v3":
return arbcommon.ProtocolUniswapV3
case "sushiswap":
return arbcommon.ProtocolSushiSwap
case "camelot":
return arbcommon.ProtocolCamelot
case "curve":
return arbcommon.ProtocolCurve
case "balancer":
return arbcommon.ProtocolBalancer
default:
return arbcommon.ProtocolUnknown
}
}
// parsePoolType converts protocol string to PoolType enum
func parsePoolType(protocol string) arbcommon.PoolType {
switch protocol {
case "uniswap-v2", "sushiswap", "camelot":
return arbcommon.PoolTypeV2
case "uniswap-v3":
return arbcommon.PoolTypeV3
case "curve":
return arbcommon.PoolTypeStableSwap
case "balancer":
return arbcommon.PoolTypeWeighted
default:
return arbcommon.PoolTypeUnknown
}
}