Compare commits

...

3 Commits

Author SHA1 Message Date
Administrator
9935246022 fix: correct Protocol and PoolType enum mappings
- Use ProtocolSushiSwapV2/V3 instead of ProtocolSushiSwap
- Use ProtocolCamelotV2/V3 instead of ProtocolCamelot
- Use ProtocolBalancerV2/V3 instead of ProtocolBalancer
- Use PoolTypeConstantProduct instead of PoolTypeV2
- Use PoolTypeConcentrated instead of PoolTypeV3
- Add default fallbacks to prevent undefined enum usage

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:09:08 +01:00
Administrator
e79e0d960d feat: add pool cache adapter and strict event validation
- Created PoolCacheAdapter to wrap PoolDiscovery for EventParser
- Updated ArbitrumMonitor to pass pool cache to parser via NewEventParserFull
- Added strict validation to reject events with zero addresses
- Added strict validation to reject events with zero amounts
- Parser now uses discovered pools from cache for token enrichment

This ensures zero addresses and zero amounts NEVER reach the scanner.
Events with invalid data are logged and rejected at the monitor level.

Changes:
- pkg/pools/pool_cache_adapter.go: New adapter implementing PoolCache interface
- pkg/monitor/concurrent.go: Pool cache integration and validation logic

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:03:28 +01:00
Administrator
e02ded0a6a fix: use pool cache to avoid zero addresses in Uniswap V3 parsing
- Added poolCache field to EventParser struct with PoolCache interface
- Modified getPoolTokens() to check cache before returning zero addresses
- Created PoolCache interface in pkg/interfaces for clean separation
- Added debug logging to identify pools missing from cache
- Documented long-term architecture improvements in PARSER_ARCHITECTURE_IMPROVEMENTS.md

This fixes the critical issue where Uniswap V3 swap events would show zero
addresses for tokens when transaction calldata was unavailable. The parser
now falls back to the pool cache which contains previously discovered pool
information.

Benefits:
- Eliminates zero address errors for known pools
- Reduces unnecessary RPC calls
- Provides visibility into which pools are missing from cache
- Lays foundation for per-exchange parser architecture

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 09:59:37 +01:00
5 changed files with 469 additions and 22 deletions

View File

@@ -0,0 +1,259 @@
# Parser Architecture Improvements
## Current Issue
Zero address tokens appearing in parsed events due to missing token data when transaction fetch fails.
## Immediate Fix Applied (2025-11-09)
- Added pool cache to EventParser
- Parser now checks pool cache before returning zero addresses
- Logs when pools are missing from cache to identify parsing errors
## Proposed Long-term Architecture Improvements
### 1. Individual Parsers Per Exchange Type
**Current:** Single monolithic EventParser handles all DEX types
**Proposed:** Factory pattern with exchange-specific parsers
```go
type ExchangeParser interface {
ParseEvent(log *types.Log, tx *types.Transaction) (*Event, error)
ValidateEvent(event *Event) error
}
type UniswapV2Parser struct {}
type UniswapV3Parser struct {}
type SushiSwapParser struct {}
type CurveParser struct {}
```
**Benefits:**
- Cleaner code with focused responsibility
- Easier to add new DEX types
- Better testability
- Exchange-specific optimizations
---
### 2. Background Pool Data Validation Channel
**Proposed:** Separate goroutine for pool state validation and updates
```go
type PoolValidationEvent struct {
PoolAddress common.Address
ParsedData *PoolData
CachedData *PoolData
Changed bool
ChangedFields []string
}
// Background validation
func (p *Parser) validatePoolData(ctx context.Context) {
for event := range p.poolValidationChan {
cached := p.poolCache.GetPool(event.PoolAddress)
if cached != nil {
// Validate parsed data against cache
if event.ParsedData.Token0 != cached.Token0 {
p.logger.Warn("Token0 mismatch",
"pool", event.PoolAddress,
"parsed", event.ParsedData.Token0,
"cached", cached.Token0)
}
// Log ALL discrepancies
}
// Update cache with latest data
p.poolCache.Update(event.PoolAddress, event.ParsedData)
}
}
```
**Benefits:**
- Real-time validation of parsing accuracy
- Identifies when sequencer data changes
- Helps catch parsing bugs immediately
- Non-blocking - doesn't slow down main parsing
- Audit trail of pool state changes
---
### 3. Pool Data Validation Against Cache
**Current:** Parse data, submit event, hope it's correct
**Proposed:** Validate parsed data against known good cache data
```go
func (p *Parser) validateAndEnrichEvent(event *Event) error {
// If pool is in cache, validate parsed data
if cached := p.poolCache.GetPool(event.PoolAddress); cached != nil {
validationErrors := []string{}
// Validate Token0
if event.Token0 != cached.Token0 && event.Token0 != (common.Address{}) {
validationErrors = append(validationErrors,
fmt.Sprintf("Token0 mismatch: parsed=%s, cached=%s",
event.Token0, cached.Token0))
}
// Validate Token1
if event.Token1 != cached.Token1 && event.Token1 != (common.Address{}) {
validationErrors = append(validationErrors,
fmt.Sprintf("Token1 mismatch: parsed=%s, cached=%s",
event.Token1, cached.Token1))
}
// Validate Fee
if event.Fee != cached.Fee && event.Fee != 0 {
validationErrors = append(validationErrors,
fmt.Sprintf("Fee mismatch: parsed=%d, cached=%d",
event.Fee, cached.Fee))
}
if len(validationErrors) > 0 {
p.logger.Error("Event validation failed",
"pool", event.PoolAddress,
"errors", validationErrors)
return fmt.Errorf("validation errors: %v", validationErrors)
}
// Enrich event with cached data if parsed data is missing
if event.Token0 == (common.Address{}) {
event.Token0 = cached.Token0
}
if event.Token1 == (common.Address{}) {
event.Token1 = cached.Token1
}
}
return nil
}
```
**Benefits:**
- Self-healing: fixes missing data from cache
- Detects parsing errors immediately
- Provides confidence in parsed data
- Creates audit trail of validation failures
---
### 4. Fast Mapping for Pool Retrieval
**Current:** Already implemented with `PoolCache` using `map[common.Address]*PoolInfo`
**Optimization:** Add multi-index lookups
```go
type PoolCache struct {
byAddress map[common.Address]*PoolInfo
byTokenPair map[string][]*PoolInfo // "token0-token1" sorted
byProtocol map[Protocol][]*PoolInfo
byLiquidityRank []common.Address // Sorted by liquidity
}
// O(1) lookups for all access patterns
func (c *PoolCache) GetByAddress(addr common.Address) *PoolInfo
func (c *PoolCache) GetByTokenPair(t0, t1 common.Address) []*PoolInfo
func (c *PoolCache) GetByProtocol(protocol Protocol) []*PoolInfo
func (c *PoolCache) GetTopByLiquidity(limit int) []*PoolInfo
```
**Benefits:**
- O(1) lookups for all common access patterns
- Faster arbitrage path finding
- Better pool discovery
---
### 5. Comprehensive Logging for Debugging
```go
type ParsingMetrics struct {
TotalEvents int64
SuccessfulParses int64
FailedParses int64
ZeroAddressCount int64
ValidationFailures int64
CacheHits int64
CacheMisses int64
DataDiscrepancies int64
}
func (p *Parser) logParsingMetrics() {
p.logger.Info("Parsing metrics",
"total", p.metrics.TotalEvents,
"success_rate", float64(p.metrics.SuccessfulParses)/float64(p.metrics.TotalEvents)*100,
"zero_address_rate", float64(p.metrics.ZeroAddressCount)/float64(p.metrics.TotalEvents)*100,
"cache_hit_rate", float64(p.metrics.CacheHits)/float64(p.metrics.CacheHits+p.metrics.CacheMisses)*100,
"validation_failure_rate", float64(p.metrics.ValidationFailures)/float64(p.metrics.TotalEvents)*100)
}
```
---
## Implementation Roadmap
### Phase 1: Immediate (Current)
- ✅ Add pool cache to parser
- ✅ Log missing pools
- ✅ Check cache before returning zero addresses
### Phase 2: Validation (Next)
- [ ] Add validation channel
- [ ] Implement background validator goroutine
- [ ] Add validation metrics
- [ ] Create alerting for validation failures
### Phase 3: Per-Exchange Parsers
- [ ] Create ExchangeParser interface
- [ ] Implement UniswapV2Parser
- [ ] Implement UniswapV3Parser
- [ ] Migrate existing code
- [ ] Add parser factory
### Phase 4: Advanced Features
- [ ] Multi-index pool cache
- [ ] Historical state tracking
- [ ] Anomaly detection
- [ ] Performance profiling
---
## Expected Benefits
### Immediate
- ✅ Fewer zero address errors
- ✅ Better debugging visibility
- ✅ Reduced RPC calls (use cache)
### After Full Implementation
- 99%+ parsing accuracy
- Self-healing parser that fixes missing data
- Real-time detection of parsing issues
- Complete audit trail for troubleshooting
- Faster arbitrage detection
- Easier to add new DEXes
---
## Metrics to Track
1. **Parsing Accuracy**
- Zero address rate (target: < 0.1%)
- Validation failure rate (target: < 0.5%)
- Cache hit rate (target: > 95%)
2. **Performance**
- Parse time per event (target: < 1ms)
- Cache lookup time (target: < 0.1ms)
- Validation overhead (target: < 10%)
3. **Reliability**
- Data discrepancy rate (target: < 0.1%)
- Parser error rate (target: < 0.01%)
- Event drop rate (target: 0%)
---
**Status:** Phase 1 completed 2025-11-09
**Next:** Implement Phase 2 (validation channel)

View File

@@ -123,7 +123,11 @@ type EventParser struct {
// CRITICAL FIX: Token extractor interface for working token extraction
tokenExtractor interfaces.TokenExtractor
logger *logger.Logger
// CRITICAL FIX: Pool cache to avoid zero addresses for known pools
poolCache interfaces.PoolCache
logger *logger.Logger
}
func (ep *EventParser) logDebug(message string, kv ...interface{}) {
@@ -167,6 +171,11 @@ func NewEventParserWithLogger(log *logger.Logger) *EventParser {
// NewEventParserWithTokenExtractor instantiates an EventParser with a TokenExtractor for enhanced parsing.
// This is the primary constructor for using the working L2 parser logic.
func NewEventParserWithTokenExtractor(log *logger.Logger, tokenExtractor interfaces.TokenExtractor) *EventParser {
return NewEventParserFull(log, tokenExtractor, nil)
}
// NewEventParserFull instantiates an EventParser with full customization options
func NewEventParserFull(log *logger.Logger, tokenExtractor interfaces.TokenExtractor, poolCache interfaces.PoolCache) *EventParser {
if log == nil {
log = logger.New("info", "text", "")
}
@@ -174,6 +183,7 @@ func NewEventParserWithTokenExtractor(log *logger.Logger, tokenExtractor interfa
parser := &EventParser{
logger: log,
tokenExtractor: tokenExtractor,
poolCache: poolCache,
// Official Arbitrum DEX Factory Addresses
UniswapV2Factory: common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"), // Official Uniswap V2 Factory on Arbitrum
UniswapV3Factory: common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"), // Official Uniswap V3 Factory on Arbitrum
@@ -1820,9 +1830,25 @@ func (ep *EventParser) getPoolTokens(poolAddress common.Address, txHash common.H
}
}
// Return zero addresses - scanner will enrich with pool cache data if needed
// This is acceptable because the comment at concurrent.go:381 says
// "Scanner will enrich event with token addresses from cache if missing"
// CRITICAL FIX: Use pool cache to get tokens from known pools
// This avoids RPC calls and zero addresses for pools we've already discovered
if ep.poolCache != nil {
poolInfo := ep.poolCache.GetPool(poolAddress)
if poolInfo != nil && poolInfo.Token0 != (common.Address{}) && poolInfo.Token1 != (common.Address{}) {
ep.logDebug("enriched pool tokens from cache",
"pool", poolAddress.Hex()[:10],
"token0", poolInfo.Token0.Hex()[:10],
"token1", poolInfo.Token1.Hex()[:10])
return poolInfo.Token0, poolInfo.Token1
}
}
// If pool not in cache, log a warning - this helps identify parsing errors
ep.logDebug("pool not found in cache, returning zero addresses",
"pool", poolAddress.Hex()[:10],
"txHash", txHash.Hex()[:10])
// Return zero addresses - this will now be logged so we can track which pools are missing
return common.Address{}, common.Address{}
}

View File

@@ -0,0 +1,15 @@
package interfaces
import (
"github.com/ethereum/go-ethereum/common"
arbcommon "github.com/fraktal/mev-beta/pkg/arbitrum/common"
)
// PoolCache provides access to cached pool information
type PoolCache interface {
// GetPool retrieves pool information from cache
GetPool(address common.Address) *arbcommon.PoolInfo
// GetPoolsByTokenPair retrieves pools for a specific token pair
GetPoolsByTokenPair(token0, token1 common.Address) []*arbcommon.PoolInfo
}

View File

@@ -144,21 +144,7 @@ func NewArbitrumMonitor(
return nil, fmt.Errorf("L2 parser is null, cannot create enhanced event parser")
}
logger.Info("✅ L2 PARSER AVAILABLE - Creating enhanced event parser...")
enhancedEventParser := events.NewEventParserWithTokenExtractor(logger, l2Parser)
if enhancedEventParser == nil {
logger.Error("❌ ENHANCED EVENT PARSER CREATION FAILED")
return nil, fmt.Errorf("enhanced event parser creation failed")
}
logger.Info("✅ ENHANCED EVENT PARSER CREATED SUCCESSFULLY")
logger.Info("🔄 INJECTING ENHANCED PARSER INTO PIPELINE...")
// Inject enhanced parser into pipeline to avoid import cycle
pipeline.SetEnhancedEventParser(enhancedEventParser)
logger.Info("🎯 ENHANCED PARSER INJECTION COMPLETED")
logger.Info("✅ L2 PARSER AVAILABLE - Creating pool discovery for cache...")
// Create raw RPC client for pool discovery
poolRPCClient, err := rpc.Dial(arbCfg.RPCEndpoint)
@@ -166,7 +152,29 @@ func NewArbitrumMonitor(
return nil, fmt.Errorf("failed to create RPC client for pool discovery: %w", err)
}
_ = pools.NewPoolDiscovery(poolRPCClient, logger) // Will be used in future enhancements
// Create pool discovery for caching discovered pools
poolDiscovery := pools.NewPoolDiscovery(poolRPCClient, logger)
// Create pool cache adapter to provide PoolCache interface
poolCacheAdapter := pools.NewPoolCacheAdapter(poolDiscovery)
logger.Info("✅ POOL CACHE ADAPTER CREATED - Creating enhanced event parser...")
// Create enhanced event parser with pool cache support
enhancedEventParser := events.NewEventParserFull(logger, l2Parser, poolCacheAdapter)
if enhancedEventParser == nil {
logger.Error("❌ ENHANCED EVENT PARSER CREATION FAILED")
return nil, fmt.Errorf("enhanced event parser creation failed")
}
logger.Info("✅ ENHANCED EVENT PARSER CREATED SUCCESSFULLY WITH POOL CACHE")
logger.Info("🔄 INJECTING ENHANCED PARSER INTO PIPELINE...")
// Inject enhanced parser into pipeline to avoid import cycle
pipeline.SetEnhancedEventParser(enhancedEventParser)
logger.Info("🎯 ENHANCED PARSER INJECTION COMPLETED")
// Create MEV coordinator - removed to avoid import cycle
// coordinator := orchestrator.NewMEVCoordinator(
@@ -830,10 +838,50 @@ func (m *ArbitrumMonitor) processTransactionReceipt(ctx context.Context, receipt
m.logger.Info(fmt.Sprintf("Successfully parsed %d events from receipt %s", len(parsedEvents), receipt.TxHash.Hex()))
// Submit each parsed event directly to the scanner
// Submit each parsed event directly to the scanner with strict validation
for _, event := range parsedEvents {
if event != nil {
m.logger.Debug(fmt.Sprintf("Submitting event to scanner: Type=%s, Pool=%s, Token0=%s, Token1=%s",
// CRITICAL: Validate event data quality before submission
// Zero addresses and zero amounts are NEVER acceptable
isValid := true
validationErrors := []string{}
// Check for zero addresses
zeroAddr := common.Address{}
if event.Token0 == zeroAddr {
validationErrors = append(validationErrors, "Token0 is zero address")
isValid = false
}
if event.Token1 == zeroAddr {
validationErrors = append(validationErrors, "Token1 is zero address")
isValid = false
}
if event.PoolAddress == zeroAddr {
validationErrors = append(validationErrors, "PoolAddress is zero address")
isValid = false
}
// Check for zero amounts (for swap events)
if event.Type == events.EventTypeSwap {
if event.Amount0In != nil && event.Amount0In.Sign() == 0 &&
event.Amount0Out != nil && event.Amount0Out.Sign() == 0 {
validationErrors = append(validationErrors, "Amount0In and Amount0Out are both zero")
isValid = false
}
if event.Amount1In != nil && event.Amount1In.Sign() == 0 &&
event.Amount1Out != nil && event.Amount1Out.Sign() == 0 {
validationErrors = append(validationErrors, "Amount1In and Amount1Out are both zero")
isValid = false
}
}
if !isValid {
m.logger.Warn(fmt.Sprintf("❌ REJECTING INVALID EVENT - Type=%s, Pool=%s, TxHash=%s, Errors: %v",
event.Type.String(), event.PoolAddress.Hex(), event.TxHash.Hex(), validationErrors))
continue
}
m.logger.Debug(fmt.Sprintf("✅ Valid event - Submitting to scanner: Type=%s, Pool=%s, Token0=%s, Token1=%s",
event.Type.String(), event.PoolAddress.Hex(), event.Token0.Hex(), event.Token1.Hex()))
// Submit to scanner for arbitrage analysis

View File

@@ -0,0 +1,99 @@
package pools
import (
"github.com/ethereum/go-ethereum/common"
arbcommon "github.com/fraktal/mev-beta/pkg/arbitrum/common"
)
// PoolCacheAdapter adapts PoolDiscovery to implement interfaces.PoolCache
// This allows the EventParser to use PoolDiscovery as its pool cache
type PoolCacheAdapter struct {
discovery *PoolDiscovery
}
// NewPoolCacheAdapter creates a new adapter wrapping a PoolDiscovery
func NewPoolCacheAdapter(discovery *PoolDiscovery) *PoolCacheAdapter {
return &PoolCacheAdapter{
discovery: discovery,
}
}
// GetPool retrieves pool information from cache
func (a *PoolCacheAdapter) GetPool(address common.Address) *arbcommon.PoolInfo {
if a.discovery == nil {
return nil
}
// Get pool from discovery
pool, exists := a.discovery.GetPool(address.Hex())
if !exists || pool == nil {
return nil
}
// Convert Pool to PoolInfo
return &arbcommon.PoolInfo{
Address: common.HexToAddress(pool.Address),
Protocol: parseProtocol(pool.Protocol),
PoolType: parsePoolType(pool.Protocol),
FactoryAddress: common.HexToAddress(pool.Factory),
Token0: common.HexToAddress(pool.Token0),
Token1: common.HexToAddress(pool.Token1),
Fee: pool.Fee,
TotalLiquidity: pool.Liquidity,
}
}
// GetPoolsByTokenPair retrieves pools for a specific token pair
func (a *PoolCacheAdapter) GetPoolsByTokenPair(token0, token1 common.Address) []*arbcommon.PoolInfo {
if a.discovery == nil {
return nil
}
// PoolDiscovery doesn't have a direct method for this yet
// We'll return nil for now and implement this later if needed
// This is acceptable as the parser only uses GetPool currently
return nil
}
// parseProtocol converts protocol string to Protocol enum
func parseProtocol(protocol string) arbcommon.Protocol {
switch protocol {
case "uniswap-v2":
return arbcommon.ProtocolUniswapV2
case "uniswap-v3":
return arbcommon.ProtocolUniswapV3
case "sushiswap", "sushiswap-v2":
return arbcommon.ProtocolSushiSwapV2
case "sushiswap-v3":
return arbcommon.ProtocolSushiSwapV3
case "camelot", "camelot-v2":
return arbcommon.ProtocolCamelotV2
case "camelot-v3":
return arbcommon.ProtocolCamelotV3
case "curve":
return arbcommon.ProtocolCurve
case "balancer", "balancer-v2":
return arbcommon.ProtocolBalancerV2
case "balancer-v3":
return arbcommon.ProtocolBalancerV3
default:
// Default to UniswapV2 for unknown protocols
return arbcommon.ProtocolUniswapV2
}
}
// parsePoolType converts protocol string to PoolType enum
func parsePoolType(protocol string) arbcommon.PoolType {
switch protocol {
case "uniswap-v2", "sushiswap", "sushiswap-v2", "camelot", "camelot-v2":
return arbcommon.PoolTypeConstantProduct
case "uniswap-v3", "sushiswap-v3", "camelot-v3":
return arbcommon.PoolTypeConcentrated
case "curve":
return arbcommon.PoolTypeStableSwap
case "balancer", "balancer-v2", "balancer-v3":
return arbcommon.PoolTypeWeighted
default:
return arbcommon.PoolTypeConstantProduct
}
}