Files
mev-beta/pkg/arbitrum/swap_pipeline.go
Krypto Kajun 850223a953 fix(multicall): resolve critical multicall parsing corruption issues
- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing
- Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives
- Added LRU caching system for address validation with 10-minute TTL
- Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures
- Fixed duplicate function declarations and import conflicts across multiple files
- Added error recovery mechanisms with multiple fallback strategies
- Updated tests to handle new validation behavior for suspicious addresses
- Fixed parser test expectations for improved validation system
- Applied gofmt formatting fixes to ensure code style compliance
- Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot
- Resolved critical security vulnerabilities in heuristic address extraction
- Progress: Updated TODO audit from 10% to 35% complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 00:12:55 -05:00

845 lines
26 KiB
Go

package arbitrum
import (
"context"
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/fraktal/mev-beta/internal/logger"
arbdiscovery "github.com/fraktal/mev-beta/pkg/arbitrum/discovery"
arbmarket "github.com/fraktal/mev-beta/pkg/arbitrum/market"
arbparser "github.com/fraktal/mev-beta/pkg/arbitrum/parser"
)
// SwapEventPipeline processes swap events from multiple sources
type SwapEventPipeline struct {
logger *logger.Logger
protocolRegistry *ArbitrumProtocolRegistry
marketDiscovery *arbdiscovery.MarketDiscovery
strategyEngine *MEVStrategyEngine
// Channels for different event sources
transactionSwaps chan *SwapEvent
eventLogSwaps chan *SwapEvent
poolDiscoverySwaps chan *SwapEvent
// Unified swap processing
unifiedSwaps chan *SwapEvent
// Metrics
eventsProcessed uint64
swapsProcessed uint64
arbitrageOps uint64
// Deduplication
seenSwaps map[string]time.Time
seenMu sync.RWMutex
maxAge time.Duration
// Shutdown management
ctx context.Context
cancel context.CancelFunc
}
// NewSwapEventPipeline creates a new swap event processing pipeline
func NewSwapEventPipeline(
logger *logger.Logger,
protocolRegistry *ArbitrumProtocolRegistry,
marketDiscovery *arbdiscovery.MarketDiscovery,
strategyEngine *MEVStrategyEngine,
) *SwapEventPipeline {
ctx, cancel := context.WithCancel(context.Background())
pipeline := &SwapEventPipeline{
logger: logger,
protocolRegistry: protocolRegistry,
marketDiscovery: marketDiscovery,
strategyEngine: strategyEngine,
transactionSwaps: make(chan *SwapEvent, 1000),
eventLogSwaps: make(chan *SwapEvent, 1000),
poolDiscoverySwaps: make(chan *SwapEvent, 100),
unifiedSwaps: make(chan *SwapEvent, 2000),
seenSwaps: make(map[string]time.Time),
maxAge: 5 * time.Minute, // Keep seen swaps for 5 minutes
ctx: ctx,
cancel: cancel,
}
return pipeline
}
// Start begins processing swap events from all sources
func (p *SwapEventPipeline) Start() error {
p.logger.Info("🚀 Starting unified swap event processing pipeline")
// Start processing workers for each event source
go p.processTransactionSwaps()
go p.processEventLogSwaps()
go p.processPoolDiscoverySwaps()
// Start unified swap processing
go p.processUnifiedSwaps()
// Start cleanup of old seen swaps
go p.cleanupSeenSwaps()
p.logger.Info("✅ Swap event processing pipeline started successfully")
return nil
}
// SubmitTransactionSwap submits a swap event from transaction parsing
func (p *SwapEventPipeline) SubmitTransactionSwap(swap *SwapEvent) {
select {
case p.transactionSwaps <- swap:
p.eventsProcessed++
default:
p.logger.Warn("Transaction swaps channel full, dropping event")
}
}
// SubmitEventLogSwap submits a swap event from event log monitoring
func (p *SwapEventPipeline) SubmitEventLogSwap(swap *SwapEvent) {
select {
case p.eventLogSwaps <- swap:
p.eventsProcessed++
default:
p.logger.Warn("Event log swaps channel full, dropping event")
}
}
// SubmitPoolDiscoverySwap submits a swap event from pool discovery
func (p *SwapEventPipeline) SubmitPoolDiscoverySwap(swap *SwapEvent) {
select {
case p.poolDiscoverySwaps <- swap:
p.eventsProcessed++
default:
p.logger.Warn("Pool discovery swaps channel full, dropping event")
}
}
// processTransactionSwaps processes swap events from transaction parsing
func (p *SwapEventPipeline) processTransactionSwaps() {
for {
select {
case <-p.ctx.Done():
return
case swap := <-p.transactionSwaps:
if p.isDuplicateSwap(swap) {
continue
}
// Add to unified processing
select {
case p.unifiedSwaps <- swap:
default:
p.logger.Warn("Unified swaps channel full, dropping transaction swap")
}
}
}
}
// processEventLogSwaps processes swap events from event log monitoring
func (p *SwapEventPipeline) processEventLogSwaps() {
for {
select {
case <-p.ctx.Done():
return
case swap := <-p.eventLogSwaps:
if p.isDuplicateSwap(swap) {
continue
}
// Add to unified processing
select {
case p.unifiedSwaps <- swap:
default:
p.logger.Warn("Unified swaps channel full, dropping event log swap")
}
}
}
}
// processPoolDiscoverySwaps processes swap events from pool discovery
func (p *SwapEventPipeline) processPoolDiscoverySwaps() {
for {
select {
case <-p.ctx.Done():
return
case swap := <-p.poolDiscoverySwaps:
if p.isDuplicateSwap(swap) {
continue
}
// Add to unified processing
select {
case p.unifiedSwaps <- swap:
default:
p.logger.Warn("Unified swaps channel full, dropping pool discovery swap")
}
}
}
}
// processUnifiedSwaps processes all swap events through the unified pipeline
func (p *SwapEventPipeline) processUnifiedSwaps() {
for {
select {
case <-p.ctx.Done():
return
case swap := <-p.unifiedSwaps:
p.processSwap(swap)
}
}
}
// processSwap processes a single swap event through the complete pipeline
func (p *SwapEventPipeline) processSwap(swap *SwapEvent) {
p.swapsProcessed++
// Log the swap event
if err := p.protocolRegistry.LogSwapEvent(swap); err != nil {
p.logger.Error(fmt.Sprintf("Failed to log swap event: %v", err))
}
// Update market discovery with new pool information if needed
p.updateMarketDiscovery(swap)
// Analyze for arbitrage opportunities
if err := p.analyzeForArbitrage(swap); err != nil {
p.logger.Error(fmt.Sprintf("Failed to analyze swap for arbitrage: %v", err))
}
// Mark as seen to prevent duplicates
p.markSwapAsSeen(swap)
}
// isDuplicateSwap checks if a swap event has already been processed recently
func (p *SwapEventPipeline) isDuplicateSwap(swap *SwapEvent) bool {
key := fmt.Sprintf("%s:%s:%s", swap.TxHash, swap.Pool, swap.TokenIn)
p.seenMu.RLock()
defer p.seenMu.RUnlock()
if lastSeen, exists := p.seenSwaps[key]; exists {
// If we've seen this swap within the max age, it's a duplicate
if time.Since(lastSeen) < p.maxAge {
return true
}
}
return false
}
// markSwapAsSeen marks a swap event as processed to prevent duplicates
func (p *SwapEventPipeline) markSwapAsSeen(swap *SwapEvent) {
key := fmt.Sprintf("%s:%s:%s", swap.TxHash, swap.Pool, swap.TokenIn)
p.seenMu.Lock()
defer p.seenMu.Unlock()
p.seenSwaps[key] = time.Now()
}
// cleanupSeenSwaps periodically removes old entries from the seen swaps map
func (p *SwapEventPipeline) cleanupSeenSwaps() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-p.ctx.Done():
return
case <-ticker.C:
p.seenMu.Lock()
now := time.Now()
for key, lastSeen := range p.seenSwaps {
if now.Sub(lastSeen) > p.maxAge {
delete(p.seenSwaps, key)
}
}
p.seenMu.Unlock()
}
}
}
// updateMarketDiscovery updates the market discovery with new pool information
func (p *SwapEventPipeline) updateMarketDiscovery(swap *SwapEvent) {
// TODO: Add proper methods to MarketDiscovery to access pools
// For now, skip the pool existence check - this functionality will be restored after reorganization
/*
poolAddr := common.HexToAddress(swap.Pool)
p.marketDiscovery.mu.RLock()
_, exists := p.marketDiscovery.pools[poolAddr]
p.marketDiscovery.mu.RUnlock()
if !exists {
// Add new pool to tracking
poolInfo := &arbmarket.PoolInfoDetailed{
Address: poolAddr,
Factory: common.HexToAddress(swap.Router), // Simplified
FactoryType: swap.Protocol,
LastUpdated: time.Now(),
Priority: 50, // Default priority
Active: true,
}
p.marketDiscovery.mu.Lock()
p.marketDiscovery.pools[poolAddr] = poolInfo
p.marketDiscovery.mu.Unlock()
p.logger.Info(fmt.Sprintf("🆕 New pool added to tracking: %s (%s)", swap.Pool, swap.Protocol))
// CRITICAL: Sync this pool across all other factories
go p.syncPoolAcrossFactories(swap)
}
*/
}
// syncPoolAcrossFactories ensures when we discover a pool on one DEX, we check/add it on all others
// TODO: This function is temporarily disabled due to access to unexported fields
func (p *SwapEventPipeline) syncPoolAcrossFactories(originalSwap *SwapEvent) {
// Temporarily disabled until proper MarketDiscovery interface is implemented
return
/*
tokenIn := common.HexToAddress(originalSwap.TokenIn)
tokenOut := common.HexToAddress(originalSwap.TokenOut)
// Handle empty token addresses to prevent slice bounds panic
tokenInDisplay := "unknown"
tokenOutDisplay := "unknown"
if len(originalSwap.TokenIn) > 0 {
if len(originalSwap.TokenIn) > 8 {
tokenInDisplay = originalSwap.TokenIn[:8]
} else {
tokenInDisplay = originalSwap.TokenIn
}
} else {
// Handle completely empty token address
tokenInDisplay = "unknown"
}
if len(originalSwap.TokenOut) > 0 {
if len(originalSwap.TokenOut) > 8 {
tokenOutDisplay = originalSwap.TokenOut[:8]
} else {
tokenOutDisplay = originalSwap.TokenOut
}
} else {
// Handle completely empty token address
tokenOutDisplay = "unknown"
}
p.logger.Info(fmt.Sprintf("🔄 Syncing pool %s/%s across all factories",
tokenInDisplay, tokenOutDisplay))
// Get all factory configurations
factories := []struct {
protocol string
factory common.Address
name string
}{
{"uniswap_v2", common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"), "UniswapV2"},
{"sushiswap", common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"), "SushiSwap"},
{"camelot_v2", common.HexToAddress("0x6EcCab422D763aC031210895C81787E87B43A652"), "CamelotV2"},
{"uniswap_v3", common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"), "UniswapV3"},
}
for _, factory := range factories {
// Skip the original factory
if factory.protocol == originalSwap.Protocol {
continue
}
// Check if pool exists on this factory
poolAddr := p.calculatePoolAddress(factory.factory, tokenIn, tokenOut, factory.protocol)
p.marketDiscovery.mu.RLock()
_, exists := p.marketDiscovery.pools[poolAddr]
p.marketDiscovery.mu.RUnlock()
if !exists {
// Add this potential pool for monitoring
poolInfo := &arbmarket.PoolInfoDetailed{
Address: poolAddr,
Factory: factory.factory,
FactoryType: factory.protocol,
Token0: tokenIn,
Token1: tokenOut,
LastUpdated: time.Now(),
Priority: 45, // Slightly lower priority for discovered pools
Active: true,
}
p.marketDiscovery.mu.Lock()
p.marketDiscovery.pools[poolAddr] = poolInfo
p.marketDiscovery.mu.Unlock()
// Handle empty pool address to prevent slice bounds panic
poolAddrDisplay := "unknown"
if len(poolAddr.Hex()) > 0 {
if len(poolAddr.Hex()) > 10 {
poolAddrDisplay = poolAddr.Hex()[:10]
} else {
poolAddrDisplay = poolAddr.Hex()
}
} else {
// Handle completely empty address
poolAddrDisplay = "unknown"
}
p.logger.Info(fmt.Sprintf("✅ Added cross-factory pool: %s on %s",
poolAddrDisplay, factory.name))
}
}
*/
}
// calculatePoolAddress calculates the deterministic pool address for a token pair
func (p *SwapEventPipeline) calculatePoolAddress(factory, tokenA, tokenB common.Address, protocol string) common.Address {
// Sort tokens
token0, token1 := tokenA, tokenB
if tokenA.Hex() > tokenB.Hex() {
token0, token1 = tokenB, tokenA
}
// This is a simplified calculation - in production would use CREATE2
// For now, generate a deterministic address based on factory and tokens
data := append(factory.Bytes(), token0.Bytes()...)
data = append(data, token1.Bytes()...)
hash := crypto.Keccak256Hash(data)
return common.BytesToAddress(hash.Bytes()[12:])
}
// analyzeForArbitrage analyzes a swap event for arbitrage opportunities
// TODO: Temporarily disabled until types are properly reorganized
func (p *SwapEventPipeline) analyzeForArbitrage(swap *SwapEvent) error {
// Temporarily disabled - to be restored after reorganization
return nil
/*
// CRITICAL: Check price impact first - if significant, immediately scan all pools
if swap.PriceImpact > 0.005 { // 0.5% price impact threshold
// Handle empty pool address to prevent slice bounds panic
poolDisplay := "unknown"
if len(swap.Pool) > 0 {
if len(swap.Pool) > 10 {
poolDisplay = swap.Pool[:10]
} else {
poolDisplay = swap.Pool
}
} else {
// Handle completely empty pool address
poolDisplay = "unknown"
}
p.logger.Info(fmt.Sprintf("⚡ High price impact detected: %.2f%% on %s - scanning for arbitrage",
swap.PriceImpact*100, poolDisplay))
// Immediately scan all pools with this token pair for arbitrage
go p.scanAllPoolsForArbitrage(swap)
}
// Check if strategy engine is available before using it
if p.strategyEngine == nil {
p.logger.Warn("Strategy engine not initialized, skipping detailed arbitrage analysis")
return nil
}
// Use the strategy engine to analyze for arbitrage opportunities
profitableStrategy, err := p.strategyEngine.AnalyzeArbitrageOpportunity(context.Background(), swap)
if err != nil {
p.logger.Warn(fmt.Sprintf("Strategy engine analysis failed: %v", err))
return nil // Don't fail the pipeline, just log the issue
}
// If we found a profitable arbitrage strategy, log it and potentially execute it
if profitableStrategy != nil && profitableStrategy.Type == "arbitrage" {
p.arbitrageOps++
// Calculate net profit after gas
gasInEth := new(big.Int).Mul(profitableStrategy.GasCost, big.NewInt(100000000)) // 0.1 gwei for Arbitrum
netProfit := new(big.Int).Sub(profitableStrategy.ExpectedProfit, gasInEth)
p.logger.Info(fmt.Sprintf("💰 Profitable arbitrage detected: Gross: %s ETH, Gas: %s ETH, Net: %s ETH",
formatEther(profitableStrategy.ExpectedProfit),
formatEther(gasInEth),
formatEther(netProfit)))
// Log the opportunity to the arbitrage opportunities file
// Handle empty transaction hash to prevent slice bounds panic
txHashDisplay := "unknown"
if len(swap.TxHash) > 0 {
if len(swap.TxHash) > 8 {
txHashDisplay = swap.TxHash[:8]
} else {
txHashDisplay = swap.TxHash
}
} else {
// Handle completely empty transaction hash
txHashDisplay = "unknown"
}
opportunity := &ArbitrageOpportunityDetailed{
ID: fmt.Sprintf("arb_%d_%s", time.Now().Unix(), txHashDisplay),
Type: "arbitrage",
TokenIn: common.HexToAddress(swap.TokenIn),
TokenOut: common.HexToAddress(swap.TokenOut),
AmountIn: big.NewInt(0), // Would extract from swap data
ExpectedAmountOut: big.NewInt(0), // Would calculate from arbitrage path
ActualAmountOut: big.NewInt(0),
Profit: profitableStrategy.ExpectedProfit,
ProfitUSD: 0.0, // Would calculate from token prices
ProfitMargin: profitableStrategy.ProfitMarginPct / 100.0,
GasCost: profitableStrategy.GasCost,
NetProfit: profitableStrategy.NetProfit,
ExchangeA: swap.Protocol,
ExchangeB: "", // Would determine from arbitrage path
PoolA: common.HexToAddress(swap.Pool),
PoolB: common.Address{}, // Would get from arbitrage path
PriceImpactA: 0.0, // Would calculate from swap data
PriceImpactB: 0.0, // Would calculate from arbitrage path
Confidence: profitableStrategy.Confidence,
RiskScore: profitableStrategy.RiskScore,
ExecutionTime: profitableStrategy.ExecutionTime,
Timestamp: time.Now(),
}
if err := p.marketDiscovery.logArbitrageOpportunity(opportunity); err != nil {
p.logger.Error(fmt.Sprintf("Failed to log arbitrage opportunity: %v", err))
}
}
return nil
*/
}
// scanAllPoolsForArbitrage scans all pools for arbitrage opportunities when price impact is detected
// TODO: Temporarily disabled until types are properly reorganized
func (p *SwapEventPipeline) scanAllPoolsForArbitrage(triggerSwap *SwapEvent) {
// Temporarily disabled - to be restored after reorganization
return
/*
tokenIn := common.HexToAddress(triggerSwap.TokenIn)
tokenOut := common.HexToAddress(triggerSwap.TokenOut)
// Handle empty token addresses to prevent slice bounds panic
tokenInDisplay := "unknown"
tokenOutDisplay := "unknown"
if len(triggerSwap.TokenIn) > 0 {
if len(triggerSwap.TokenIn) > 8 {
tokenInDisplay = triggerSwap.TokenIn[:8]
} else {
tokenInDisplay = triggerSwap.TokenIn
}
} else {
// Handle completely empty token address
tokenInDisplay = "unknown"
}
if len(triggerSwap.TokenOut) > 0 {
if len(triggerSwap.TokenOut) > 8 {
tokenOutDisplay = triggerSwap.TokenOut[:8]
} else {
tokenOutDisplay = triggerSwap.TokenOut
}
} else {
// Handle completely empty token address
tokenOutDisplay = "unknown"
}
p.logger.Info(fmt.Sprintf("🔍 Scanning all pools for arbitrage: %s/%s",
tokenInDisplay, tokenOutDisplay))
// Get all pools with this token pair
p.marketDiscovery.mu.RLock()
var relevantPools []*PoolInfoDetailed
for _, pool := range p.marketDiscovery.pools {
if (pool.Token0 == tokenIn && pool.Token1 == tokenOut) ||
(pool.Token0 == tokenOut && pool.Token1 == tokenIn) {
relevantPools = append(relevantPools, pool)
}
}
p.marketDiscovery.mu.RUnlock()
p.logger.Info(fmt.Sprintf("Found %d pools to scan for arbitrage", len(relevantPools)))
// Check for arbitrage between all pool pairs
for i := 0; i < len(relevantPools); i++ {
for j := i + 1; j < len(relevantPools); j++ {
poolA := relevantPools[i]
poolB := relevantPools[j]
// Skip if both pools are from the same protocol
if poolA.FactoryType == poolB.FactoryType {
continue
}
// ENHANCED: Fetch real prices from both pools for accurate arbitrage detection
priceA, err := p.fetchPoolPrice(poolA, tokenIn, tokenOut)
if err != nil {
p.logger.Debug(fmt.Sprintf("Failed to fetch price from %s: %v", poolA.FactoryType, err))
continue
}
priceB, err := p.fetchPoolPrice(poolB, tokenIn, tokenOut)
if err != nil {
p.logger.Debug(fmt.Sprintf("Failed to fetch price from %s: %v", poolB.FactoryType, err))
continue
}
// Calculate actual price difference
var priceDiff float64
if priceA > priceB {
priceDiff = (priceA - priceB) / priceB
} else {
priceDiff = (priceB - priceA) / priceA
}
// Account for transaction fees (0.3% typical) and gas costs
minProfitThreshold := 0.008 // 0.8% minimum for profitability after all costs
if priceDiff > minProfitThreshold {
// Calculate potential profit with $13 capital
availableCapital := 13.0 // $13 in ETH equivalent
grossProfit := availableCapital * priceDiff
gasCostUSD := 0.50 // ~$0.50 gas cost on Arbitrum
netProfitUSD := grossProfit - gasCostUSD
if netProfitUSD > 5.0 { // $5 minimum profit
p.logger.Info(fmt.Sprintf("🎯 PROFITABLE ARBITRAGE: $%.2f profit (%.2f%%) between %s and %s",
netProfitUSD,
priceDiff*100,
poolA.FactoryType,
poolB.FactoryType))
// Log to JSONL with detailed profit calculations
// Handle empty pool addresses to prevent slice bounds panic
poolAAddrDisplay := "unknown"
poolBAddrDisplay := "unknown"
if len(poolA.Address.Hex()) > 0 {
if len(poolA.Address.Hex()) > 8 {
poolAAddrDisplay = poolA.Address.Hex()[:8]
} else {
poolAAddrDisplay = poolA.Address.Hex()
}
} else {
// Handle completely empty address
poolAAddrDisplay = "unknown"
}
if len(poolB.Address.Hex()) > 0 {
if len(poolB.Address.Hex()) > 8 {
poolBAddrDisplay = poolB.Address.Hex()[:8]
} else {
poolBAddrDisplay = poolB.Address.Hex()
}
} else {
// Handle completely empty address
poolBAddrDisplay = "unknown"
}
opportunity := &ArbitrageOpportunityDetailed{
ID: fmt.Sprintf("arb_%d_%s_%s", time.Now().Unix(), poolAAddrDisplay, poolBAddrDisplay),
Type: "cross-dex",
TokenIn: tokenIn,
TokenOut: tokenOut,
ExchangeA: poolA.FactoryType,
ExchangeB: poolB.FactoryType,
PoolA: poolA.Address,
PoolB: poolB.Address,
ProfitMargin: priceDiff,
ProfitUSD: netProfitUSD,
PriceA: priceA,
PriceB: priceB,
CapitalRequired: availableCapital,
GasCostUSD: gasCostUSD,
Confidence: 0.85, // High confidence with real price data
Timestamp: time.Now(),
}
if err := p.marketDiscovery.logArbitrageOpportunity(opportunity); err != nil {
p.logger.Error(fmt.Sprintf("Failed to log arbitrage: %v", err))
}
} else {
p.logger.Debug(fmt.Sprintf("⚠️ Arbitrage found but profit $%.2f below $5 threshold", netProfitUSD))
}
}
}
}
*/
}
// fetchPoolPrice fetches the current price from a pool for the given token pair
// TODO: Temporarily disabled until types are properly reorganized
func (p *SwapEventPipeline) fetchPoolPrice(pool *arbmarket.PoolInfoDetailed, tokenIn, tokenOut common.Address) (float64, error) {
// Temporarily disabled - to be restored after reorganization
return 0.0, nil
/*
// Since binding packages are not available, we'll return an appropriate default
// based on the available pool data. In a real implementation, this would call
// the actual pool contract to get reserves and calculate the price.
// Check if the pool has valid reserve data to calculate price
if pool.Reserve0 == nil || pool.Reserve1 == nil {
return 0, fmt.Errorf("pool reserves not available for %s", pool.Address.Hex())
}
// Determine which reserve corresponds to tokenIn and tokenOut
var reserveIn, reserveOut *big.Int
if pool.Token0 == tokenIn && pool.Token1 == tokenOut {
reserveIn = pool.Reserve0
reserveOut = pool.Reserve1
} else if pool.Token0 == tokenOut && pool.Token1 == tokenIn {
reserveIn = pool.Reserve1
reserveOut = pool.Reserve0
} else {
return 0, fmt.Errorf("token pair does not match pool %s", pool.Address.Hex())
}
// Check if reserves are zero
if reserveIn.Sign() <= 0 {
return 0, fmt.Errorf("invalid reserveIn for pool %s", pool.Address.Hex())
}
// Calculate price as reserveOut / reserveIn, accounting for decimals
tokenInInfo, tokenInExists := p.marketDiscovery.tokens[tokenIn]
tokenOutInfo, tokenOutExists := p.marketDiscovery.tokens[tokenOut]
if !tokenInExists || !tokenOutExists {
// If token decimals not available, assume 18 decimals for both
reserveInFloat := new(big.Float).SetInt(reserveIn)
reserveOutFloat := new(big.Float).SetInt(reserveOut)
// Calculate price: reserveOut / reserveIn
// Check for zero division
if reserveInFloat.Cmp(big.NewFloat(0)) == 0 {
return 0, fmt.Errorf("division by zero: reserveIn is zero")
}
price := new(big.Float).Quo(reserveOutFloat, reserveInFloat)
priceFloat64, _ := price.Float64()
return priceFloat64, nil
}
// Calculate price considering decimals
reserveInFloat := new(big.Float).SetInt(reserveIn)
reserveOutFloat := new(big.Float).SetInt(reserveOut)
decimalsIn := float64(tokenInInfo.Decimals)
decimalsOut := float64(tokenOutInfo.Decimals)
// Normalize to the same decimal scale
if decimalsIn > decimalsOut {
// Adjust reserveOut up
multiplier := new(big.Float).SetFloat64(math.Pow10(int(decimalsIn - decimalsOut)))
reserveOutFloat.Mul(reserveOutFloat, multiplier)
} else if decimalsOut > decimalsIn {
// Adjust reserveIn up
multiplier := new(big.Float).SetFloat64(math.Pow10(int(decimalsOut - decimalsIn)))
reserveInFloat.Mul(reserveInFloat, multiplier)
}
// Calculate price: (reserveOut normalized) / (reserveIn normalized)
// Check for zero division
if reserveInFloat.Cmp(big.NewFloat(0)) == 0 {
return 0, fmt.Errorf("division by zero: normalized reserveIn is zero")
}
price := new(big.Float).Quo(reserveOutFloat, reserveInFloat)
priceFloat64, _ := price.Float64()
return priceFloat64, nil
*/
}
// formatEther formats a big.Int wei amount as ETH string
func formatEther(wei *big.Int) string {
if wei == nil {
return "0"
}
// Dividing by 1e18 is safe as it's a constant
ether := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(1e18))
result, _ := ether.Float64()
return fmt.Sprintf("%.6f", result)
}
// GetMetrics returns current pipeline metrics
func (p *SwapEventPipeline) GetMetrics() map[string]interface{} {
p.seenMu.RLock()
seenSwaps := len(p.seenSwaps)
p.seenMu.RUnlock()
return map[string]interface{}{
"events_processed": p.eventsProcessed,
"swaps_processed": p.swapsProcessed,
"arbitrage_ops": p.arbitrageOps,
"seen_swaps": seenSwaps,
"transaction_queue": len(p.transactionSwaps),
"event_log_queue": len(p.eventLogSwaps),
"pool_discovery_queue": len(p.poolDiscoverySwaps),
"unified_queue": len(p.unifiedSwaps),
}
}
// SubmitPoolStateUpdate processes pool state updates for dynamic state management
func (p *SwapEventPipeline) SubmitPoolStateUpdate(update *arbparser.PoolStateUpdate) {
// For now, just log the pool state update
// In production, this would update pool state in the market discovery
// Handle empty addresses to prevent slice bounds panic
poolAddrDisplay := "unknown"
tokenInDisplay := "unknown"
tokenOutDisplay := "unknown"
if len(update.Pool.Hex()) > 0 {
if len(update.Pool.Hex()) > 8 {
poolAddrDisplay = update.Pool.Hex()[:8]
} else {
poolAddrDisplay = update.Pool.Hex()
}
} else {
// Handle completely empty address
poolAddrDisplay = "unknown"
}
if len(update.TokenIn.Hex()) > 0 {
if len(update.TokenIn.Hex()) > 6 {
tokenInDisplay = update.TokenIn.Hex()[:6]
} else {
tokenInDisplay = update.TokenIn.Hex()
}
} else {
// Handle completely empty address
tokenInDisplay = "unknown"
}
if len(update.TokenOut.Hex()) > 0 {
if len(update.TokenOut.Hex()) > 6 {
tokenOutDisplay = update.TokenOut.Hex()[:6]
} else {
tokenOutDisplay = update.TokenOut.Hex()
}
} else {
// Handle completely empty address
tokenOutDisplay = "unknown"
}
p.logger.Debug(fmt.Sprintf("🔄 Pool state update: %s %s->%s (%s %s)",
poolAddrDisplay,
tokenInDisplay,
tokenOutDisplay,
formatEther(update.AmountIn),
update.UpdateType))
// Update market discovery with new pool state
// TODO: Temporarily disabled until proper interface is implemented
/*
if p.marketDiscovery != nil {
go p.marketDiscovery.UpdatePoolState(update)
}
*/
}
// Close stops the pipeline and cleans up resources
func (p *SwapEventPipeline) Close() error {
p.logger.Info("🛑 Stopping swap event processing pipeline")
p.cancel()
p.logger.Info("✅ Swap event processing pipeline stopped")
return nil
}