- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing - Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives - Added LRU caching system for address validation with 10-minute TTL - Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures - Fixed duplicate function declarations and import conflicts across multiple files - Added error recovery mechanisms with multiple fallback strategies - Updated tests to handle new validation behavior for suspicious addresses - Fixed parser test expectations for improved validation system - Applied gofmt formatting fixes to ensure code style compliance - Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot - Resolved critical security vulnerabilities in heuristic address extraction - Progress: Updated TODO audit from 10% to 35% complete 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
845 lines
26 KiB
Go
845 lines
26 KiB
Go
package arbitrum
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
arbdiscovery "github.com/fraktal/mev-beta/pkg/arbitrum/discovery"
|
|
arbmarket "github.com/fraktal/mev-beta/pkg/arbitrum/market"
|
|
arbparser "github.com/fraktal/mev-beta/pkg/arbitrum/parser"
|
|
)
|
|
|
|
// SwapEventPipeline processes swap events from multiple sources
|
|
type SwapEventPipeline struct {
|
|
logger *logger.Logger
|
|
protocolRegistry *ArbitrumProtocolRegistry
|
|
marketDiscovery *arbdiscovery.MarketDiscovery
|
|
strategyEngine *MEVStrategyEngine
|
|
|
|
// Channels for different event sources
|
|
transactionSwaps chan *SwapEvent
|
|
eventLogSwaps chan *SwapEvent
|
|
poolDiscoverySwaps chan *SwapEvent
|
|
|
|
// Unified swap processing
|
|
unifiedSwaps chan *SwapEvent
|
|
|
|
// Metrics
|
|
eventsProcessed uint64
|
|
swapsProcessed uint64
|
|
arbitrageOps uint64
|
|
|
|
// Deduplication
|
|
seenSwaps map[string]time.Time
|
|
seenMu sync.RWMutex
|
|
maxAge time.Duration
|
|
|
|
// Shutdown management
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewSwapEventPipeline creates a new swap event processing pipeline
|
|
func NewSwapEventPipeline(
|
|
logger *logger.Logger,
|
|
protocolRegistry *ArbitrumProtocolRegistry,
|
|
marketDiscovery *arbdiscovery.MarketDiscovery,
|
|
strategyEngine *MEVStrategyEngine,
|
|
) *SwapEventPipeline {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
pipeline := &SwapEventPipeline{
|
|
logger: logger,
|
|
protocolRegistry: protocolRegistry,
|
|
marketDiscovery: marketDiscovery,
|
|
strategyEngine: strategyEngine,
|
|
transactionSwaps: make(chan *SwapEvent, 1000),
|
|
eventLogSwaps: make(chan *SwapEvent, 1000),
|
|
poolDiscoverySwaps: make(chan *SwapEvent, 100),
|
|
unifiedSwaps: make(chan *SwapEvent, 2000),
|
|
seenSwaps: make(map[string]time.Time),
|
|
maxAge: 5 * time.Minute, // Keep seen swaps for 5 minutes
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
return pipeline
|
|
}
|
|
|
|
// Start begins processing swap events from all sources
|
|
func (p *SwapEventPipeline) Start() error {
|
|
p.logger.Info("🚀 Starting unified swap event processing pipeline")
|
|
|
|
// Start processing workers for each event source
|
|
go p.processTransactionSwaps()
|
|
go p.processEventLogSwaps()
|
|
go p.processPoolDiscoverySwaps()
|
|
|
|
// Start unified swap processing
|
|
go p.processUnifiedSwaps()
|
|
|
|
// Start cleanup of old seen swaps
|
|
go p.cleanupSeenSwaps()
|
|
|
|
p.logger.Info("✅ Swap event processing pipeline started successfully")
|
|
return nil
|
|
}
|
|
|
|
// SubmitTransactionSwap submits a swap event from transaction parsing
|
|
func (p *SwapEventPipeline) SubmitTransactionSwap(swap *SwapEvent) {
|
|
select {
|
|
case p.transactionSwaps <- swap:
|
|
p.eventsProcessed++
|
|
default:
|
|
p.logger.Warn("Transaction swaps channel full, dropping event")
|
|
}
|
|
}
|
|
|
|
// SubmitEventLogSwap submits a swap event from event log monitoring
|
|
func (p *SwapEventPipeline) SubmitEventLogSwap(swap *SwapEvent) {
|
|
select {
|
|
case p.eventLogSwaps <- swap:
|
|
p.eventsProcessed++
|
|
default:
|
|
p.logger.Warn("Event log swaps channel full, dropping event")
|
|
}
|
|
}
|
|
|
|
// SubmitPoolDiscoverySwap submits a swap event from pool discovery
|
|
func (p *SwapEventPipeline) SubmitPoolDiscoverySwap(swap *SwapEvent) {
|
|
select {
|
|
case p.poolDiscoverySwaps <- swap:
|
|
p.eventsProcessed++
|
|
default:
|
|
p.logger.Warn("Pool discovery swaps channel full, dropping event")
|
|
}
|
|
}
|
|
|
|
// processTransactionSwaps processes swap events from transaction parsing
|
|
func (p *SwapEventPipeline) processTransactionSwaps() {
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case swap := <-p.transactionSwaps:
|
|
if p.isDuplicateSwap(swap) {
|
|
continue
|
|
}
|
|
|
|
// Add to unified processing
|
|
select {
|
|
case p.unifiedSwaps <- swap:
|
|
default:
|
|
p.logger.Warn("Unified swaps channel full, dropping transaction swap")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processEventLogSwaps processes swap events from event log monitoring
|
|
func (p *SwapEventPipeline) processEventLogSwaps() {
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case swap := <-p.eventLogSwaps:
|
|
if p.isDuplicateSwap(swap) {
|
|
continue
|
|
}
|
|
|
|
// Add to unified processing
|
|
select {
|
|
case p.unifiedSwaps <- swap:
|
|
default:
|
|
p.logger.Warn("Unified swaps channel full, dropping event log swap")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processPoolDiscoverySwaps processes swap events from pool discovery
|
|
func (p *SwapEventPipeline) processPoolDiscoverySwaps() {
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case swap := <-p.poolDiscoverySwaps:
|
|
if p.isDuplicateSwap(swap) {
|
|
continue
|
|
}
|
|
|
|
// Add to unified processing
|
|
select {
|
|
case p.unifiedSwaps <- swap:
|
|
default:
|
|
p.logger.Warn("Unified swaps channel full, dropping pool discovery swap")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processUnifiedSwaps processes all swap events through the unified pipeline
|
|
func (p *SwapEventPipeline) processUnifiedSwaps() {
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case swap := <-p.unifiedSwaps:
|
|
p.processSwap(swap)
|
|
}
|
|
}
|
|
}
|
|
|
|
// processSwap processes a single swap event through the complete pipeline
|
|
func (p *SwapEventPipeline) processSwap(swap *SwapEvent) {
|
|
p.swapsProcessed++
|
|
|
|
// Log the swap event
|
|
if err := p.protocolRegistry.LogSwapEvent(swap); err != nil {
|
|
p.logger.Error(fmt.Sprintf("Failed to log swap event: %v", err))
|
|
}
|
|
|
|
// Update market discovery with new pool information if needed
|
|
p.updateMarketDiscovery(swap)
|
|
|
|
// Analyze for arbitrage opportunities
|
|
if err := p.analyzeForArbitrage(swap); err != nil {
|
|
p.logger.Error(fmt.Sprintf("Failed to analyze swap for arbitrage: %v", err))
|
|
}
|
|
|
|
// Mark as seen to prevent duplicates
|
|
p.markSwapAsSeen(swap)
|
|
}
|
|
|
|
// isDuplicateSwap checks if a swap event has already been processed recently
|
|
func (p *SwapEventPipeline) isDuplicateSwap(swap *SwapEvent) bool {
|
|
key := fmt.Sprintf("%s:%s:%s", swap.TxHash, swap.Pool, swap.TokenIn)
|
|
|
|
p.seenMu.RLock()
|
|
defer p.seenMu.RUnlock()
|
|
|
|
if lastSeen, exists := p.seenSwaps[key]; exists {
|
|
// If we've seen this swap within the max age, it's a duplicate
|
|
if time.Since(lastSeen) < p.maxAge {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// markSwapAsSeen marks a swap event as processed to prevent duplicates
|
|
func (p *SwapEventPipeline) markSwapAsSeen(swap *SwapEvent) {
|
|
key := fmt.Sprintf("%s:%s:%s", swap.TxHash, swap.Pool, swap.TokenIn)
|
|
|
|
p.seenMu.Lock()
|
|
defer p.seenMu.Unlock()
|
|
|
|
p.seenSwaps[key] = time.Now()
|
|
}
|
|
|
|
// cleanupSeenSwaps periodically removes old entries from the seen swaps map
|
|
func (p *SwapEventPipeline) cleanupSeenSwaps() {
|
|
ticker := time.NewTicker(1 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
p.seenMu.Lock()
|
|
now := time.Now()
|
|
for key, lastSeen := range p.seenSwaps {
|
|
if now.Sub(lastSeen) > p.maxAge {
|
|
delete(p.seenSwaps, key)
|
|
}
|
|
}
|
|
p.seenMu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateMarketDiscovery updates the market discovery with new pool information
|
|
func (p *SwapEventPipeline) updateMarketDiscovery(swap *SwapEvent) {
|
|
// TODO: Add proper methods to MarketDiscovery to access pools
|
|
// For now, skip the pool existence check - this functionality will be restored after reorganization
|
|
/*
|
|
poolAddr := common.HexToAddress(swap.Pool)
|
|
p.marketDiscovery.mu.RLock()
|
|
_, exists := p.marketDiscovery.pools[poolAddr]
|
|
p.marketDiscovery.mu.RUnlock()
|
|
|
|
if !exists {
|
|
// Add new pool to tracking
|
|
poolInfo := &arbmarket.PoolInfoDetailed{
|
|
Address: poolAddr,
|
|
Factory: common.HexToAddress(swap.Router), // Simplified
|
|
FactoryType: swap.Protocol,
|
|
LastUpdated: time.Now(),
|
|
Priority: 50, // Default priority
|
|
Active: true,
|
|
}
|
|
|
|
p.marketDiscovery.mu.Lock()
|
|
p.marketDiscovery.pools[poolAddr] = poolInfo
|
|
p.marketDiscovery.mu.Unlock()
|
|
|
|
p.logger.Info(fmt.Sprintf("🆕 New pool added to tracking: %s (%s)", swap.Pool, swap.Protocol))
|
|
|
|
// CRITICAL: Sync this pool across all other factories
|
|
go p.syncPoolAcrossFactories(swap)
|
|
}
|
|
*/
|
|
}
|
|
|
|
// syncPoolAcrossFactories ensures when we discover a pool on one DEX, we check/add it on all others
|
|
// TODO: This function is temporarily disabled due to access to unexported fields
|
|
func (p *SwapEventPipeline) syncPoolAcrossFactories(originalSwap *SwapEvent) {
|
|
// Temporarily disabled until proper MarketDiscovery interface is implemented
|
|
return
|
|
/*
|
|
tokenIn := common.HexToAddress(originalSwap.TokenIn)
|
|
tokenOut := common.HexToAddress(originalSwap.TokenOut)
|
|
|
|
// Handle empty token addresses to prevent slice bounds panic
|
|
tokenInDisplay := "unknown"
|
|
tokenOutDisplay := "unknown"
|
|
if len(originalSwap.TokenIn) > 0 {
|
|
if len(originalSwap.TokenIn) > 8 {
|
|
tokenInDisplay = originalSwap.TokenIn[:8]
|
|
} else {
|
|
tokenInDisplay = originalSwap.TokenIn
|
|
}
|
|
} else {
|
|
// Handle completely empty token address
|
|
tokenInDisplay = "unknown"
|
|
}
|
|
if len(originalSwap.TokenOut) > 0 {
|
|
if len(originalSwap.TokenOut) > 8 {
|
|
tokenOutDisplay = originalSwap.TokenOut[:8]
|
|
} else {
|
|
tokenOutDisplay = originalSwap.TokenOut
|
|
}
|
|
} else {
|
|
// Handle completely empty token address
|
|
tokenOutDisplay = "unknown"
|
|
}
|
|
p.logger.Info(fmt.Sprintf("🔄 Syncing pool %s/%s across all factories",
|
|
tokenInDisplay, tokenOutDisplay))
|
|
|
|
// Get all factory configurations
|
|
factories := []struct {
|
|
protocol string
|
|
factory common.Address
|
|
name string
|
|
}{
|
|
{"uniswap_v2", common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"), "UniswapV2"},
|
|
{"sushiswap", common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"), "SushiSwap"},
|
|
{"camelot_v2", common.HexToAddress("0x6EcCab422D763aC031210895C81787E87B43A652"), "CamelotV2"},
|
|
{"uniswap_v3", common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"), "UniswapV3"},
|
|
}
|
|
|
|
for _, factory := range factories {
|
|
// Skip the original factory
|
|
if factory.protocol == originalSwap.Protocol {
|
|
continue
|
|
}
|
|
|
|
// Check if pool exists on this factory
|
|
poolAddr := p.calculatePoolAddress(factory.factory, tokenIn, tokenOut, factory.protocol)
|
|
|
|
p.marketDiscovery.mu.RLock()
|
|
_, exists := p.marketDiscovery.pools[poolAddr]
|
|
p.marketDiscovery.mu.RUnlock()
|
|
|
|
if !exists {
|
|
// Add this potential pool for monitoring
|
|
poolInfo := &arbmarket.PoolInfoDetailed{
|
|
Address: poolAddr,
|
|
Factory: factory.factory,
|
|
FactoryType: factory.protocol,
|
|
Token0: tokenIn,
|
|
Token1: tokenOut,
|
|
LastUpdated: time.Now(),
|
|
Priority: 45, // Slightly lower priority for discovered pools
|
|
Active: true,
|
|
}
|
|
|
|
p.marketDiscovery.mu.Lock()
|
|
p.marketDiscovery.pools[poolAddr] = poolInfo
|
|
p.marketDiscovery.mu.Unlock()
|
|
|
|
// Handle empty pool address to prevent slice bounds panic
|
|
poolAddrDisplay := "unknown"
|
|
if len(poolAddr.Hex()) > 0 {
|
|
if len(poolAddr.Hex()) > 10 {
|
|
poolAddrDisplay = poolAddr.Hex()[:10]
|
|
} else {
|
|
poolAddrDisplay = poolAddr.Hex()
|
|
}
|
|
} else {
|
|
// Handle completely empty address
|
|
poolAddrDisplay = "unknown"
|
|
}
|
|
p.logger.Info(fmt.Sprintf("✅ Added cross-factory pool: %s on %s",
|
|
poolAddrDisplay, factory.name))
|
|
}
|
|
}
|
|
*/
|
|
}
|
|
|
|
// calculatePoolAddress calculates the deterministic pool address for a token pair
|
|
func (p *SwapEventPipeline) calculatePoolAddress(factory, tokenA, tokenB common.Address, protocol string) common.Address {
|
|
// Sort tokens
|
|
token0, token1 := tokenA, tokenB
|
|
if tokenA.Hex() > tokenB.Hex() {
|
|
token0, token1 = tokenB, tokenA
|
|
}
|
|
|
|
// This is a simplified calculation - in production would use CREATE2
|
|
// For now, generate a deterministic address based on factory and tokens
|
|
data := append(factory.Bytes(), token0.Bytes()...)
|
|
data = append(data, token1.Bytes()...)
|
|
hash := crypto.Keccak256Hash(data)
|
|
|
|
return common.BytesToAddress(hash.Bytes()[12:])
|
|
}
|
|
|
|
// analyzeForArbitrage analyzes a swap event for arbitrage opportunities
|
|
// TODO: Temporarily disabled until types are properly reorganized
|
|
func (p *SwapEventPipeline) analyzeForArbitrage(swap *SwapEvent) error {
|
|
// Temporarily disabled - to be restored after reorganization
|
|
return nil
|
|
/*
|
|
// CRITICAL: Check price impact first - if significant, immediately scan all pools
|
|
if swap.PriceImpact > 0.005 { // 0.5% price impact threshold
|
|
// Handle empty pool address to prevent slice bounds panic
|
|
poolDisplay := "unknown"
|
|
if len(swap.Pool) > 0 {
|
|
if len(swap.Pool) > 10 {
|
|
poolDisplay = swap.Pool[:10]
|
|
} else {
|
|
poolDisplay = swap.Pool
|
|
}
|
|
} else {
|
|
// Handle completely empty pool address
|
|
poolDisplay = "unknown"
|
|
}
|
|
p.logger.Info(fmt.Sprintf("⚡ High price impact detected: %.2f%% on %s - scanning for arbitrage",
|
|
swap.PriceImpact*100, poolDisplay))
|
|
|
|
// Immediately scan all pools with this token pair for arbitrage
|
|
go p.scanAllPoolsForArbitrage(swap)
|
|
}
|
|
|
|
// Check if strategy engine is available before using it
|
|
if p.strategyEngine == nil {
|
|
p.logger.Warn("Strategy engine not initialized, skipping detailed arbitrage analysis")
|
|
return nil
|
|
}
|
|
|
|
// Use the strategy engine to analyze for arbitrage opportunities
|
|
profitableStrategy, err := p.strategyEngine.AnalyzeArbitrageOpportunity(context.Background(), swap)
|
|
if err != nil {
|
|
p.logger.Warn(fmt.Sprintf("Strategy engine analysis failed: %v", err))
|
|
return nil // Don't fail the pipeline, just log the issue
|
|
}
|
|
|
|
// If we found a profitable arbitrage strategy, log it and potentially execute it
|
|
if profitableStrategy != nil && profitableStrategy.Type == "arbitrage" {
|
|
p.arbitrageOps++
|
|
|
|
// Calculate net profit after gas
|
|
gasInEth := new(big.Int).Mul(profitableStrategy.GasCost, big.NewInt(100000000)) // 0.1 gwei for Arbitrum
|
|
netProfit := new(big.Int).Sub(profitableStrategy.ExpectedProfit, gasInEth)
|
|
|
|
p.logger.Info(fmt.Sprintf("💰 Profitable arbitrage detected: Gross: %s ETH, Gas: %s ETH, Net: %s ETH",
|
|
formatEther(profitableStrategy.ExpectedProfit),
|
|
formatEther(gasInEth),
|
|
formatEther(netProfit)))
|
|
|
|
// Log the opportunity to the arbitrage opportunities file
|
|
// Handle empty transaction hash to prevent slice bounds panic
|
|
txHashDisplay := "unknown"
|
|
if len(swap.TxHash) > 0 {
|
|
if len(swap.TxHash) > 8 {
|
|
txHashDisplay = swap.TxHash[:8]
|
|
} else {
|
|
txHashDisplay = swap.TxHash
|
|
}
|
|
} else {
|
|
// Handle completely empty transaction hash
|
|
txHashDisplay = "unknown"
|
|
}
|
|
|
|
opportunity := &ArbitrageOpportunityDetailed{
|
|
ID: fmt.Sprintf("arb_%d_%s", time.Now().Unix(), txHashDisplay),
|
|
Type: "arbitrage",
|
|
TokenIn: common.HexToAddress(swap.TokenIn),
|
|
TokenOut: common.HexToAddress(swap.TokenOut),
|
|
AmountIn: big.NewInt(0), // Would extract from swap data
|
|
ExpectedAmountOut: big.NewInt(0), // Would calculate from arbitrage path
|
|
ActualAmountOut: big.NewInt(0),
|
|
Profit: profitableStrategy.ExpectedProfit,
|
|
ProfitUSD: 0.0, // Would calculate from token prices
|
|
ProfitMargin: profitableStrategy.ProfitMarginPct / 100.0,
|
|
GasCost: profitableStrategy.GasCost,
|
|
NetProfit: profitableStrategy.NetProfit,
|
|
ExchangeA: swap.Protocol,
|
|
ExchangeB: "", // Would determine from arbitrage path
|
|
PoolA: common.HexToAddress(swap.Pool),
|
|
PoolB: common.Address{}, // Would get from arbitrage path
|
|
PriceImpactA: 0.0, // Would calculate from swap data
|
|
PriceImpactB: 0.0, // Would calculate from arbitrage path
|
|
Confidence: profitableStrategy.Confidence,
|
|
RiskScore: profitableStrategy.RiskScore,
|
|
ExecutionTime: profitableStrategy.ExecutionTime,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := p.marketDiscovery.logArbitrageOpportunity(opportunity); err != nil {
|
|
p.logger.Error(fmt.Sprintf("Failed to log arbitrage opportunity: %v", err))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
*/
|
|
}
|
|
|
|
// scanAllPoolsForArbitrage scans all pools for arbitrage opportunities when price impact is detected
|
|
// TODO: Temporarily disabled until types are properly reorganized
|
|
func (p *SwapEventPipeline) scanAllPoolsForArbitrage(triggerSwap *SwapEvent) {
|
|
// Temporarily disabled - to be restored after reorganization
|
|
return
|
|
/*
|
|
tokenIn := common.HexToAddress(triggerSwap.TokenIn)
|
|
tokenOut := common.HexToAddress(triggerSwap.TokenOut)
|
|
|
|
// Handle empty token addresses to prevent slice bounds panic
|
|
tokenInDisplay := "unknown"
|
|
tokenOutDisplay := "unknown"
|
|
if len(triggerSwap.TokenIn) > 0 {
|
|
if len(triggerSwap.TokenIn) > 8 {
|
|
tokenInDisplay = triggerSwap.TokenIn[:8]
|
|
} else {
|
|
tokenInDisplay = triggerSwap.TokenIn
|
|
}
|
|
} else {
|
|
// Handle completely empty token address
|
|
tokenInDisplay = "unknown"
|
|
}
|
|
if len(triggerSwap.TokenOut) > 0 {
|
|
if len(triggerSwap.TokenOut) > 8 {
|
|
tokenOutDisplay = triggerSwap.TokenOut[:8]
|
|
} else {
|
|
tokenOutDisplay = triggerSwap.TokenOut
|
|
}
|
|
} else {
|
|
// Handle completely empty token address
|
|
tokenOutDisplay = "unknown"
|
|
}
|
|
p.logger.Info(fmt.Sprintf("🔍 Scanning all pools for arbitrage: %s/%s",
|
|
tokenInDisplay, tokenOutDisplay))
|
|
|
|
// Get all pools with this token pair
|
|
p.marketDiscovery.mu.RLock()
|
|
var relevantPools []*PoolInfoDetailed
|
|
for _, pool := range p.marketDiscovery.pools {
|
|
if (pool.Token0 == tokenIn && pool.Token1 == tokenOut) ||
|
|
(pool.Token0 == tokenOut && pool.Token1 == tokenIn) {
|
|
relevantPools = append(relevantPools, pool)
|
|
}
|
|
}
|
|
p.marketDiscovery.mu.RUnlock()
|
|
|
|
p.logger.Info(fmt.Sprintf("Found %d pools to scan for arbitrage", len(relevantPools)))
|
|
|
|
// Check for arbitrage between all pool pairs
|
|
for i := 0; i < len(relevantPools); i++ {
|
|
for j := i + 1; j < len(relevantPools); j++ {
|
|
poolA := relevantPools[i]
|
|
poolB := relevantPools[j]
|
|
|
|
// Skip if both pools are from the same protocol
|
|
if poolA.FactoryType == poolB.FactoryType {
|
|
continue
|
|
}
|
|
|
|
// ENHANCED: Fetch real prices from both pools for accurate arbitrage detection
|
|
priceA, err := p.fetchPoolPrice(poolA, tokenIn, tokenOut)
|
|
if err != nil {
|
|
p.logger.Debug(fmt.Sprintf("Failed to fetch price from %s: %v", poolA.FactoryType, err))
|
|
continue
|
|
}
|
|
|
|
priceB, err := p.fetchPoolPrice(poolB, tokenIn, tokenOut)
|
|
if err != nil {
|
|
p.logger.Debug(fmt.Sprintf("Failed to fetch price from %s: %v", poolB.FactoryType, err))
|
|
continue
|
|
}
|
|
|
|
// Calculate actual price difference
|
|
var priceDiff float64
|
|
if priceA > priceB {
|
|
priceDiff = (priceA - priceB) / priceB
|
|
} else {
|
|
priceDiff = (priceB - priceA) / priceA
|
|
}
|
|
|
|
// Account for transaction fees (0.3% typical) and gas costs
|
|
minProfitThreshold := 0.008 // 0.8% minimum for profitability after all costs
|
|
|
|
if priceDiff > minProfitThreshold {
|
|
// Calculate potential profit with $13 capital
|
|
availableCapital := 13.0 // $13 in ETH equivalent
|
|
grossProfit := availableCapital * priceDiff
|
|
gasCostUSD := 0.50 // ~$0.50 gas cost on Arbitrum
|
|
netProfitUSD := grossProfit - gasCostUSD
|
|
|
|
if netProfitUSD > 5.0 { // $5 minimum profit
|
|
p.logger.Info(fmt.Sprintf("🎯 PROFITABLE ARBITRAGE: $%.2f profit (%.2f%%) between %s and %s",
|
|
netProfitUSD,
|
|
priceDiff*100,
|
|
poolA.FactoryType,
|
|
poolB.FactoryType))
|
|
|
|
// Log to JSONL with detailed profit calculations
|
|
// Handle empty pool addresses to prevent slice bounds panic
|
|
poolAAddrDisplay := "unknown"
|
|
poolBAddrDisplay := "unknown"
|
|
if len(poolA.Address.Hex()) > 0 {
|
|
if len(poolA.Address.Hex()) > 8 {
|
|
poolAAddrDisplay = poolA.Address.Hex()[:8]
|
|
} else {
|
|
poolAAddrDisplay = poolA.Address.Hex()
|
|
}
|
|
} else {
|
|
// Handle completely empty address
|
|
poolAAddrDisplay = "unknown"
|
|
}
|
|
if len(poolB.Address.Hex()) > 0 {
|
|
if len(poolB.Address.Hex()) > 8 {
|
|
poolBAddrDisplay = poolB.Address.Hex()[:8]
|
|
} else {
|
|
poolBAddrDisplay = poolB.Address.Hex()
|
|
}
|
|
} else {
|
|
// Handle completely empty address
|
|
poolBAddrDisplay = "unknown"
|
|
}
|
|
|
|
opportunity := &ArbitrageOpportunityDetailed{
|
|
ID: fmt.Sprintf("arb_%d_%s_%s", time.Now().Unix(), poolAAddrDisplay, poolBAddrDisplay),
|
|
Type: "cross-dex",
|
|
TokenIn: tokenIn,
|
|
TokenOut: tokenOut,
|
|
ExchangeA: poolA.FactoryType,
|
|
ExchangeB: poolB.FactoryType,
|
|
PoolA: poolA.Address,
|
|
PoolB: poolB.Address,
|
|
ProfitMargin: priceDiff,
|
|
ProfitUSD: netProfitUSD,
|
|
PriceA: priceA,
|
|
PriceB: priceB,
|
|
CapitalRequired: availableCapital,
|
|
GasCostUSD: gasCostUSD,
|
|
Confidence: 0.85, // High confidence with real price data
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := p.marketDiscovery.logArbitrageOpportunity(opportunity); err != nil {
|
|
p.logger.Error(fmt.Sprintf("Failed to log arbitrage: %v", err))
|
|
}
|
|
} else {
|
|
p.logger.Debug(fmt.Sprintf("⚠️ Arbitrage found but profit $%.2f below $5 threshold", netProfitUSD))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
*/
|
|
}
|
|
|
|
// fetchPoolPrice fetches the current price from a pool for the given token pair
|
|
// TODO: Temporarily disabled until types are properly reorganized
|
|
func (p *SwapEventPipeline) fetchPoolPrice(pool *arbmarket.PoolInfoDetailed, tokenIn, tokenOut common.Address) (float64, error) {
|
|
// Temporarily disabled - to be restored after reorganization
|
|
return 0.0, nil
|
|
/*
|
|
// Since binding packages are not available, we'll return an appropriate default
|
|
// based on the available pool data. In a real implementation, this would call
|
|
// the actual pool contract to get reserves and calculate the price.
|
|
|
|
// Check if the pool has valid reserve data to calculate price
|
|
if pool.Reserve0 == nil || pool.Reserve1 == nil {
|
|
return 0, fmt.Errorf("pool reserves not available for %s", pool.Address.Hex())
|
|
}
|
|
|
|
// Determine which reserve corresponds to tokenIn and tokenOut
|
|
var reserveIn, reserveOut *big.Int
|
|
if pool.Token0 == tokenIn && pool.Token1 == tokenOut {
|
|
reserveIn = pool.Reserve0
|
|
reserveOut = pool.Reserve1
|
|
} else if pool.Token0 == tokenOut && pool.Token1 == tokenIn {
|
|
reserveIn = pool.Reserve1
|
|
reserveOut = pool.Reserve0
|
|
} else {
|
|
return 0, fmt.Errorf("token pair does not match pool %s", pool.Address.Hex())
|
|
}
|
|
|
|
// Check if reserves are zero
|
|
if reserveIn.Sign() <= 0 {
|
|
return 0, fmt.Errorf("invalid reserveIn for pool %s", pool.Address.Hex())
|
|
}
|
|
|
|
// Calculate price as reserveOut / reserveIn, accounting for decimals
|
|
tokenInInfo, tokenInExists := p.marketDiscovery.tokens[tokenIn]
|
|
tokenOutInfo, tokenOutExists := p.marketDiscovery.tokens[tokenOut]
|
|
|
|
if !tokenInExists || !tokenOutExists {
|
|
// If token decimals not available, assume 18 decimals for both
|
|
reserveInFloat := new(big.Float).SetInt(reserveIn)
|
|
reserveOutFloat := new(big.Float).SetInt(reserveOut)
|
|
|
|
// Calculate price: reserveOut / reserveIn
|
|
// Check for zero division
|
|
if reserveInFloat.Cmp(big.NewFloat(0)) == 0 {
|
|
return 0, fmt.Errorf("division by zero: reserveIn is zero")
|
|
}
|
|
price := new(big.Float).Quo(reserveOutFloat, reserveInFloat)
|
|
priceFloat64, _ := price.Float64()
|
|
|
|
return priceFloat64, nil
|
|
}
|
|
|
|
// Calculate price considering decimals
|
|
reserveInFloat := new(big.Float).SetInt(reserveIn)
|
|
reserveOutFloat := new(big.Float).SetInt(reserveOut)
|
|
|
|
decimalsIn := float64(tokenInInfo.Decimals)
|
|
decimalsOut := float64(tokenOutInfo.Decimals)
|
|
|
|
// Normalize to the same decimal scale
|
|
if decimalsIn > decimalsOut {
|
|
// Adjust reserveOut up
|
|
multiplier := new(big.Float).SetFloat64(math.Pow10(int(decimalsIn - decimalsOut)))
|
|
reserveOutFloat.Mul(reserveOutFloat, multiplier)
|
|
} else if decimalsOut > decimalsIn {
|
|
// Adjust reserveIn up
|
|
multiplier := new(big.Float).SetFloat64(math.Pow10(int(decimalsOut - decimalsIn)))
|
|
reserveInFloat.Mul(reserveInFloat, multiplier)
|
|
}
|
|
|
|
// Calculate price: (reserveOut normalized) / (reserveIn normalized)
|
|
// Check for zero division
|
|
if reserveInFloat.Cmp(big.NewFloat(0)) == 0 {
|
|
return 0, fmt.Errorf("division by zero: normalized reserveIn is zero")
|
|
}
|
|
price := new(big.Float).Quo(reserveOutFloat, reserveInFloat)
|
|
priceFloat64, _ := price.Float64()
|
|
|
|
return priceFloat64, nil
|
|
*/
|
|
}
|
|
|
|
// formatEther formats a big.Int wei amount as ETH string
|
|
func formatEther(wei *big.Int) string {
|
|
if wei == nil {
|
|
return "0"
|
|
}
|
|
|
|
// Dividing by 1e18 is safe as it's a constant
|
|
ether := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(1e18))
|
|
result, _ := ether.Float64()
|
|
return fmt.Sprintf("%.6f", result)
|
|
}
|
|
|
|
// GetMetrics returns current pipeline metrics
|
|
func (p *SwapEventPipeline) GetMetrics() map[string]interface{} {
|
|
p.seenMu.RLock()
|
|
seenSwaps := len(p.seenSwaps)
|
|
p.seenMu.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"events_processed": p.eventsProcessed,
|
|
"swaps_processed": p.swapsProcessed,
|
|
"arbitrage_ops": p.arbitrageOps,
|
|
"seen_swaps": seenSwaps,
|
|
"transaction_queue": len(p.transactionSwaps),
|
|
"event_log_queue": len(p.eventLogSwaps),
|
|
"pool_discovery_queue": len(p.poolDiscoverySwaps),
|
|
"unified_queue": len(p.unifiedSwaps),
|
|
}
|
|
}
|
|
|
|
// SubmitPoolStateUpdate processes pool state updates for dynamic state management
|
|
func (p *SwapEventPipeline) SubmitPoolStateUpdate(update *arbparser.PoolStateUpdate) {
|
|
// For now, just log the pool state update
|
|
// In production, this would update pool state in the market discovery
|
|
// Handle empty addresses to prevent slice bounds panic
|
|
poolAddrDisplay := "unknown"
|
|
tokenInDisplay := "unknown"
|
|
tokenOutDisplay := "unknown"
|
|
if len(update.Pool.Hex()) > 0 {
|
|
if len(update.Pool.Hex()) > 8 {
|
|
poolAddrDisplay = update.Pool.Hex()[:8]
|
|
} else {
|
|
poolAddrDisplay = update.Pool.Hex()
|
|
}
|
|
} else {
|
|
// Handle completely empty address
|
|
poolAddrDisplay = "unknown"
|
|
}
|
|
if len(update.TokenIn.Hex()) > 0 {
|
|
if len(update.TokenIn.Hex()) > 6 {
|
|
tokenInDisplay = update.TokenIn.Hex()[:6]
|
|
} else {
|
|
tokenInDisplay = update.TokenIn.Hex()
|
|
}
|
|
} else {
|
|
// Handle completely empty address
|
|
tokenInDisplay = "unknown"
|
|
}
|
|
if len(update.TokenOut.Hex()) > 0 {
|
|
if len(update.TokenOut.Hex()) > 6 {
|
|
tokenOutDisplay = update.TokenOut.Hex()[:6]
|
|
} else {
|
|
tokenOutDisplay = update.TokenOut.Hex()
|
|
}
|
|
} else {
|
|
// Handle completely empty address
|
|
tokenOutDisplay = "unknown"
|
|
}
|
|
p.logger.Debug(fmt.Sprintf("🔄 Pool state update: %s %s->%s (%s %s)",
|
|
poolAddrDisplay,
|
|
tokenInDisplay,
|
|
tokenOutDisplay,
|
|
formatEther(update.AmountIn),
|
|
update.UpdateType))
|
|
|
|
// Update market discovery with new pool state
|
|
// TODO: Temporarily disabled until proper interface is implemented
|
|
/*
|
|
if p.marketDiscovery != nil {
|
|
go p.marketDiscovery.UpdatePoolState(update)
|
|
}
|
|
*/
|
|
}
|
|
|
|
// Close stops the pipeline and cleans up resources
|
|
func (p *SwapEventPipeline) Close() error {
|
|
p.logger.Info("🛑 Stopping swap event processing pipeline")
|
|
|
|
p.cancel()
|
|
|
|
p.logger.Info("✅ Swap event processing pipeline stopped")
|
|
return nil
|
|
}
|