Files
mev-beta/orig/pkg/scanner/swap/analyzer.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

1054 lines
40 KiB
Go

package swap
import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/events"
"github.com/fraktal/mev-beta/pkg/marketdata"
"github.com/fraktal/mev-beta/pkg/profitcalc"
scannercommon "github.com/fraktal/mev-beta/pkg/scanner/common"
"github.com/fraktal/mev-beta/pkg/scanner/market"
stypes "github.com/fraktal/mev-beta/pkg/types"
"github.com/fraktal/mev-beta/pkg/uniswap"
)
// SwapAnalyzer handles analysis of swap events for price movement opportunities
type SwapAnalyzer struct {
logger *logger.Logger
marketDataLogger *marketdata.MarketDataLogger
profitCalculator *profitcalc.ProfitCalculator
opportunityRanker *profitcalc.OpportunityRanker
}
var (
factoryProtocolMap = map[common.Address]string{
common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"): "UniswapV3",
common.HexToAddress("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f"): "UniswapV2",
common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"): "UniswapV2",
common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"): "SushiSwap",
common.HexToAddress("0xBA12222222228d8Ba445958a75a0704d566BF2C8"): "Balancer",
common.HexToAddress("0xF18056Bbd320E96A48e3Fbf8bC061322531aac99"): "Curve",
common.HexToAddress("0x5ffe7FB82894076ECB99A30D6A32e969e6e35E98"): "Curve",
common.HexToAddress("0x5F1dddbf348aC2fbe22a163e30F99F9ECE3DD50a"): "KyberElastic",
common.HexToAddress("0x1a3c9B1d2F0529D97f2afC5136Cc23e58f1FD35B"): "Camelot",
}
protocolDefaultFactory = map[string]common.Address{
"UniswapV3": common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"),
"UniswapV2": common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"),
"SushiSwap": common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"),
"Balancer": common.HexToAddress("0xBA12222222228d8Ba445958a75a0704d566BF2C8"),
"Curve": common.HexToAddress("0xF18056Bbd320E96A48e3Fbf8bC061322531aac99"),
"KyberElastic": common.HexToAddress("0x5F1dddbf348aC2fbe22a163e30F99F9ECE3DD50a"),
"Camelot": common.HexToAddress("0x1a3c9B1d2F0529D97f2afC5136Cc23e58f1FD35B"),
}
protocolSpecialByAddress = map[common.Address]string{
common.HexToAddress("0xBA12222222228d8Ba445958a75a0704d566BF2C8"): "Balancer",
common.HexToAddress("0xF18056Bbd320E96A48e3Fbf8bC061322531aac99"): "Curve",
common.HexToAddress("0x5F1dddbf348aC2fbe22a163e30F99F9ECE3DD50a"): "KyberElastic",
}
)
// NewSwapAnalyzer creates a new swap analyzer
func NewSwapAnalyzer(
logger *logger.Logger,
marketDataLogger *marketdata.MarketDataLogger,
profitCalculator *profitcalc.ProfitCalculator,
opportunityRanker *profitcalc.OpportunityRanker,
) *SwapAnalyzer {
return &SwapAnalyzer{
logger: logger,
marketDataLogger: marketDataLogger,
profitCalculator: profitCalculator,
opportunityRanker: opportunityRanker,
}
}
// AnalyzeSwapEvent analyzes a swap event for arbitrage opportunities
func (s *SwapAnalyzer) AnalyzeSwapEvent(event events.Event, marketScanner *market.MarketScanner) {
s.logger.Debug(fmt.Sprintf("Analyzing swap event in pool %s", event.PoolAddress))
// Validate pool address before attempting expensive lookups
if event.PoolAddress == (common.Address{}) {
s.logger.Warn(fmt.Sprintf("Skipping swap analysis: empty pool address (tx: %s)", event.TransactionHash.Hex()))
return
}
if event.PoolAddress == event.Token0 || event.PoolAddress == event.Token1 {
s.logger.Warn(fmt.Sprintf("Skipping swap analysis: pool address %s matches token address (tx: %s)", event.PoolAddress.Hex(), event.TransactionHash.Hex()))
return
}
if strings.HasPrefix(strings.ToLower(event.PoolAddress.Hex()), "0x000000") {
s.logger.Warn(fmt.Sprintf("Skipping swap analysis: suspicious pool address %s (tx: %s)", event.PoolAddress.Hex(), event.TransactionHash.Hex()))
return
}
// Get comprehensive pool data to determine factory and fee
poolInfo, poolExists := s.marketDataLogger.GetPoolInfo(event.PoolAddress)
factory := common.Address{}
fee := uint32(3000) // Default 0.3%
if poolExists {
factory = poolInfo.Factory
fee = poolInfo.Fee
} else {
// Determine factory from known DEX protocols
factory = marketScanner.GetFactoryForProtocol(event.Protocol)
}
// Create comprehensive swap event data for market data logger
swapData := &marketdata.SwapEventData{
TxHash: event.TransactionHash,
BlockNumber: event.BlockNumber,
LogIndex: uint(0), // Default log index (would need to be extracted from receipt)
Timestamp: time.Now(),
PoolAddress: event.PoolAddress,
Factory: factory,
Protocol: event.Protocol,
Token0: event.Token0,
Token1: event.Token1,
Sender: common.Address{}, // Default sender (would need to be extracted from transaction)
Recipient: common.Address{}, // Default recipient (would need to be extracted from transaction)
SqrtPriceX96: event.SqrtPriceX96,
Liquidity: event.Liquidity,
Tick: int32(event.Tick),
}
// Extract swap amounts from event (handle signed amounts correctly)
if event.Amount0 != nil && event.Amount1 != nil {
amount0Float := new(big.Float).SetInt(event.Amount0)
amount1Float := new(big.Float).SetInt(event.Amount1)
// Determine input/output based on sign (negative means token was removed from pool = output)
if amount0Float.Sign() < 0 {
// Token0 out, Token1 in
swapData.Amount0Out = new(big.Int).Abs(event.Amount0)
swapData.Amount1In = event.Amount1
swapData.Amount0In = big.NewInt(0)
swapData.Amount1Out = big.NewInt(0)
} else if amount1Float.Sign() < 0 {
// Token0 in, Token1 out
swapData.Amount0In = event.Amount0
swapData.Amount1Out = new(big.Int).Abs(event.Amount1)
swapData.Amount0Out = big.NewInt(0)
swapData.Amount1In = big.NewInt(0)
} else {
// Both positive (shouldn't happen in normal swaps, but handle gracefully)
swapData.Amount0In = event.Amount0
swapData.Amount1In = event.Amount1
swapData.Amount0Out = big.NewInt(0)
swapData.Amount1Out = big.NewInt(0)
}
}
// Calculate USD values using profit calculator's price oracle
swapData.AmountInUSD, swapData.AmountOutUSD, swapData.FeeUSD = s.calculateSwapUSDValues(swapData, fee)
// Calculate price impact based on pool liquidity and swap amounts
swapData.PriceImpact = s.calculateSwapPriceImpact(event, swapData)
// Get pool data with caching
poolData, err := marketScanner.GetPoolData(event.PoolAddress.Hex())
if err != nil {
if errors.Is(err, market.ErrInvalidPoolCandidate) {
s.logger.Debug("Skipping pool data fetch for invalid candidate",
"pool", event.PoolAddress,
"tx", event.TransactionHash,
"error", err)
} else {
// Enhanced error logging with context - check if this is an ERC-20 token misclassified as a pool
errorMsg := fmt.Sprintf("Error getting pool data for %s: %v", event.PoolAddress, err)
contextMsg := fmt.Sprintf("event_type:%s protocol:%s block:%d tx:%s",
event.Type.String(), event.Protocol, event.BlockNumber, event.TransactionHash.Hex())
s.logger.Debug(fmt.Sprintf("Pool skipped %s [context: %s]", errorMsg, contextMsg))
}
return
}
// CRITICAL FIX: Use actual token addresses from pool contract, not zero addresses from event
// The swap parser leaves Token0/Token1 as zeros expecting the caller to fill them,
// but poolData already contains the correct addresses from token0()/token1() calls
if poolData.Token0 != (common.Address{}) && poolData.Token1 != (common.Address{}) {
swapData.Token0 = poolData.Token0
swapData.Token1 = poolData.Token1
event.Token0 = poolData.Token0
event.Token1 = poolData.Token1
s.logger.Debug(fmt.Sprintf("Updated swap token addresses from pool data: token0=%s, token1=%s",
poolData.Token0.Hex(), poolData.Token1.Hex()))
} else {
// If pool data doesn't have token addresses, this is invalid - reject the event
s.logger.Warn(fmt.Sprintf("Pool data missing token addresses for pool %s, skipping event",
event.PoolAddress.Hex()))
return
}
finalProtocol := s.detectSwapProtocol(event, poolInfo, poolData, factory)
if finalProtocol == "" || strings.EqualFold(finalProtocol, "unknown") {
if fallback := canonicalProtocolName(event.Protocol); fallback != "" {
finalProtocol = fallback
} else {
finalProtocol = "Unknown"
}
}
event.Protocol = finalProtocol
swapData.Protocol = finalProtocol
if poolData != nil {
poolData.Protocol = finalProtocol
}
factory = s.resolveFactory(factory, finalProtocol, marketScanner)
swapData.Factory = factory
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.marketDataLogger.LogSwapEvent(ctx, event, swapData); err != nil {
s.logger.Debug(fmt.Sprintf("Failed to log swap event to market data logger: %v", err))
}
// Log the swap event to database (legacy)
marketScanner.LogSwapEvent(event)
s.marketDataLogger.UpdatePoolMetadata(event.PoolAddress, finalProtocol, factory)
// DEBUG: Log zero address events to trace their source
if event.PoolAddress == (common.Address{}) {
s.logger.Error(fmt.Sprintf("ZERO ADDRESS DEBUG [ANALYZER]: Received Event with zero PoolAddress - TxHash: %s, Protocol: %s, Token0: %s, Token1: %s, Type: %v",
event.TransactionHash.Hex(), event.Protocol, event.Token0.Hex(), event.Token1.Hex(), event.Type))
}
// Calculate price impact
priceMovement, err := s.calculatePriceMovement(event, poolData)
if err != nil {
s.logger.Error(fmt.Sprintf("Error calculating price movement for pool %s: %v", event.PoolAddress, err))
return
}
// Log opportunity with actual swap amounts from event (legacy)
s.logSwapOpportunity(event, poolData, priceMovement, marketScanner)
// Check if the movement is significant
if marketScanner.IsSignificantMovement(priceMovement, marketScanner.Config().MinProfitThreshold) {
s.logger.Info(fmt.Sprintf("Significant price movement detected in pool %s: %+v", event.PoolAddress, priceMovement))
// Look for arbitrage opportunities
opportunities := s.findArbitrageOpportunities(event, priceMovement, marketScanner)
if len(opportunities) > 0 {
s.logger.Info(fmt.Sprintf("Found %d arbitrage opportunities for pool %s", len(opportunities), event.PoolAddress))
for _, opp := range opportunities {
s.logger.Info(fmt.Sprintf("Arbitrage opportunity: %+v", opp))
// Execute the arbitrage opportunity
marketScanner.ExecuteArbitrageOpportunity(opp)
}
} else {
s.logger.Debug(fmt.Sprintf("Price movement in pool %s is not significant: %f", event.PoolAddress, priceMovement.PriceImpact))
}
}
}
// logSwapOpportunity logs swap opportunities using actual amounts from events
func (s *SwapAnalyzer) logSwapOpportunity(event events.Event, poolData *market.CachedData, priceMovement *market.PriceMovement, marketScanner *market.MarketScanner) {
// Convert amounts from big.Int to big.Float for profit calculation
amountInFloat := big.NewFloat(0)
amountOutFloat := big.NewFloat(0)
amountInDisplay := float64(0)
amountOutDisplay := float64(0)
// For swap events, Amount0 and Amount1 represent the actual swap amounts
// The sign indicates direction (positive = token added to pool, negative = token removed from pool)
if event.Amount0 != nil {
amount0Float := new(big.Float).SetInt(event.Amount0)
if event.Amount1 != nil {
amount1Float := new(big.Float).SetInt(event.Amount1)
// Determine input/output based on sign (negative means token was removed from pool = output)
if amount0Float.Sign() < 0 {
// Token0 out, Token1 in
amountOutFloat = new(big.Float).Abs(amount0Float)
amountInFloat = amount1Float
amountOutDisplay, _ = new(big.Float).Quo(amountOutFloat, big.NewFloat(1e18)).Float64()
amountInDisplay, _ = new(big.Float).Quo(amountInFloat, big.NewFloat(1e18)).Float64()
} else {
// Token0 in, Token1 out
amountInFloat = amount0Float
amountOutFloat = new(big.Float).Abs(amount1Float)
amountInDisplay, _ = new(big.Float).Quo(amountInFloat, big.NewFloat(1e18)).Float64()
amountOutDisplay, _ = new(big.Float).Quo(amountOutFloat, big.NewFloat(1e18)).Float64()
}
}
}
// CRITICAL FIX: Skip zero-amount events early to reduce log noise
// Zero amounts indicate parsing failures or events from failed transactions
// This eliminates ~55% of false positives from being logged
if amountInFloat.Sign() == 0 || amountOutFloat.Sign() == 0 {
s.logger.Debug(fmt.Sprintf("⏭️ Skipping swap with zero amount in tx %s: amountIn=%v, amountOut=%v (failed transaction or parsing error)",
event.TransactionHash.Hex()[:10], amountInDisplay, amountOutDisplay))
return
}
// CRITICAL FIX: Skip dust amounts early (< 0.0001 ETH) to prevent extreme profit margin calculations
// Dust threshold: 0.0001 ETH = $0.10 at $1k ETH (economically insignificant for MEV)
// This prevents division-by-zero-like conditions in profit margin calculations
minAmountETH := big.NewFloat(0.0001)
amountInETH := new(big.Float).Quo(amountInFloat, big.NewFloat(1e18))
amountOutETH := new(big.Float).Quo(amountOutFloat, big.NewFloat(1e18))
if amountInETH.Cmp(minAmountETH) < 0 || amountOutETH.Cmp(minAmountETH) < 0 {
amountInETHFloat, _ := amountInETH.Float64()
amountOutETHFloat, _ := amountOutETH.Float64()
s.logger.Debug(fmt.Sprintf("⏭️ Skipping dust swap in tx %s: amountIn=%.6f ETH, amountOut=%.6f ETH (below 0.0001 ETH threshold)",
event.TransactionHash.Hex()[:10], amountInETHFloat, amountOutETHFloat))
return
}
// Analyze arbitrage opportunity using the profit calculator
var estimatedProfitUSD float64 = 0.0
var profitData map[string]interface{}
if amountInFloat.Sign() > 0 && amountOutFloat.Sign() > 0 {
opportunity := s.profitCalculator.AnalyzeSwapOpportunity(
context.Background(),
event.Token0,
event.Token1,
new(big.Float).Quo(amountInFloat, big.NewFloat(1e18)), // Convert to ETH units
new(big.Float).Quo(amountOutFloat, big.NewFloat(1e18)), // Convert to ETH units
event.Protocol,
)
// CRITICAL FIX #3: Remove confidence threshold filter to enable emerging token arbitrage
// Previously skipped opportunities where token confidence < 10%
// Best arbitrage is in emerging/unknown tokens - this filter was rejecting 20-30% of opportunities
// Now analyze all tokens independently of price confidence - profit matters more than known price
// If needed, confidence can be used for ranking/prioritization, but not for filtering
//
// REMOVED FILTER:
// if opportunity.Confidence < 0.10 {
// return
// }
if opportunity != nil {
// Add opportunity to ranking system
rankedOpp := s.opportunityRanker.AddOpportunity(opportunity)
// Use the calculated profit for logging
if opportunity.NetProfit != nil {
estimatedProfitFloat, _ := opportunity.NetProfit.Float64()
estimatedProfitUSD = estimatedProfitFloat * 2000 // Assume 1 ETH = $2000 for USD conversion
}
// Add detailed profit analysis to additional data
profitData = map[string]interface{}{
"arbitrageId": opportunity.ID,
"isExecutable": opportunity.IsExecutable,
"rejectReason": opportunity.RejectReason,
"confidence": opportunity.Confidence,
"profitMargin": opportunity.ProfitMargin,
"netProfitETH": s.profitCalculator.FormatEther(opportunity.NetProfit),
"gasCostETH": s.profitCalculator.FormatEther(opportunity.GasCost),
"estimatedProfitETH": s.profitCalculator.FormatEther(opportunity.EstimatedProfit),
}
// Add ranking data if available
if rankedOpp != nil {
profitData["opportunityScore"] = rankedOpp.Score
profitData["opportunityRank"] = rankedOpp.Rank
profitData["competitionRisk"] = rankedOpp.CompetitionRisk
profitData["updateCount"] = rankedOpp.UpdateCount
}
}
} else if priceMovement != nil {
// Fallback to simple price impact calculation
estimatedProfitUSD = priceMovement.PriceImpact * 100
}
// Resolve token symbols
tokenIn := s.resolveTokenSymbol(event.Token0.Hex())
tokenOut := s.resolveTokenSymbol(event.Token1.Hex())
// Create additional data with profit analysis
additionalData := map[string]interface{}{
"poolAddress": event.PoolAddress.Hex(),
"protocol": event.Protocol,
"token0": event.Token0.Hex(),
"token1": event.Token1.Hex(),
"tokenIn": tokenIn,
"tokenOut": tokenOut,
"blockNumber": event.BlockNumber,
}
// Add price impact if available
if priceMovement != nil {
additionalData["priceImpact"] = priceMovement.PriceImpact
}
// Merge profit analysis data
if profitData != nil {
for k, v := range profitData {
additionalData[k] = v
}
}
// Log the opportunity using actual swap amounts and profit analysis
s.logger.Opportunity(event.TransactionHash.Hex(), "", event.PoolAddress.Hex(), "Swap", event.Protocol,
amountInDisplay, amountOutDisplay, 0.0, estimatedProfitUSD, additionalData)
}
// resolveTokenSymbol converts token address to human-readable symbol
func (s *SwapAnalyzer) resolveTokenSymbol(tokenAddress string) string {
// Convert to lowercase for consistent lookup
addr := strings.ToLower(tokenAddress)
// Known Arbitrum token mappings (same as in L2 parser)
tokenMap := map[string]string{
"0x82af49447d8a07e3bd95bd0d56f35241523fbab1": "WETH",
"0xaf88d065e77c8cc2239327c5edb3a432268e5831": "USDC",
"0xff970a61a04b1ca14834a43f5de4533ebddb5cc8": "USDC.e",
"0xfd086bc7cd5c481dcc9c85ebe478a1c0b69fcbb9": "USDT",
"0x2f2a2543b76a4166549f7aab2e75bef0aefc5b0f": "WBTC",
"0x912ce59144191c1204e64559fe8253a0e49e6548": "ARB",
"0xfc5a1a6eb076a2c7ad06ed22c90d7e710e35ad0a": "GMX",
"0xf97f4df75117a78c1a5a0dbb814af92458539fb4": "LINK",
"0xfa7f8980b0f1e64a2062791cc3b0871572f1f7f0": "UNI",
"0xba5ddd1f9d7f570dc94a51479a000e3bce967196": "AAVE",
"0x0de59c86c306b9fead9fb67e65551e2b6897c3f6": "KUMA",
"0x6efa9b8883dfb78fd75cd89d8474c44c3cbda469": "DIA",
"0x440017a1b021006d556d7fc06a54c32e42eb745b": "G@ARB",
"0x11cdb42b0eb46d95f990bedd4695a6e3fa034978": "CRV",
"0x040d1edc9569d4bab2d15287dc5a4f10f56a56b8": "BAL",
"0x354a6da3fcde098f8389cad84b0182725c6c91de": "COMP",
"0x2e9a6df78e42c50b0cefcf9000d0c3a4d34e1dd5": "MKR",
}
if symbol, exists := tokenMap[addr]; exists {
return symbol
}
// Return truncated address if not in mapping
if len(tokenAddress) > 10 {
return tokenAddress[:6] + "..." + tokenAddress[len(tokenAddress)-4:]
}
return tokenAddress
}
// calculatePriceMovement calculates the price movement from a swap event using cached mathematical functions
func (s *SwapAnalyzer) calculatePriceMovement(event events.Event, poolData *market.CachedData) (*market.PriceMovement, error) {
s.logger.Debug(fmt.Sprintf("Calculating price movement for pool %s", event.PoolAddress))
// Get current price from pool data using cached function
currentPrice := uniswap.SqrtPriceX96ToPriceCached(poolData.SqrtPriceX96.ToBig())
if currentPrice == nil {
return nil, fmt.Errorf("failed to calculate current price from sqrtPriceX96")
}
// Calculate price impact based on pool's actual liquidity (not swap amount ratio)
// FIXED: Previously used amount1/amount0 which is WRONG - that's the trade ratio, not pool price
// Correct approach: Calculate impact as (amountIn / liquidity) for the affected side
var priceImpact float64
// Use absolute values for amounts (UniswapV3 uses signed int256)
amount0Abs := new(big.Int).Abs(event.Amount0)
amount1Abs := new(big.Int).Abs(event.Amount1)
// Determine which direction the swap went (which amount is "in" vs "out")
var amountIn *big.Int
if event.Amount0.Sign() > 0 && event.Amount1.Sign() < 0 {
// Token0 in, Token1 out
amountIn = amount0Abs
} else if event.Amount0.Sign() < 0 && event.Amount1.Sign() > 0 {
// Token1 in, Token0 out
amountIn = amount1Abs
} else {
// Both same sign or zero, cannot determine - use larger amount
if amount0Abs.Cmp(amount1Abs) > 0 {
amountIn = amount0Abs
} else {
amountIn = amount1Abs
}
}
// Calculate price impact as percentage of liquidity affected
// priceImpact ≈ amountIn / (liquidity / sqrt(price))
if poolData.Liquidity != nil && poolData.Liquidity.Sign() > 0 {
liquidityFloat := new(big.Float).SetInt(poolData.Liquidity.ToBig())
amountInFloat := new(big.Float).SetInt(amountIn)
// Approximate price impact (simplified model)
// For V3: impact ≈ (amountIn * sqrt(price)) / liquidity
// Normalize both to ETH units (divide amountIn by 1e18)
amountInETH := new(big.Float).Quo(amountInFloat, big.NewFloat(1e18))
halfLiquidity := new(big.Float).Quo(liquidityFloat, big.NewFloat(2.0))
if halfLiquidity.Sign() > 0 {
priceImpactFloat := new(big.Float).Quo(amountInETH, halfLiquidity)
priceImpact, _ = priceImpactFloat.Float64()
// Validate: reject impossible price impacts (>100% = 1.0)
if priceImpact > 1.0 {
s.logger.Debug(fmt.Sprintf("High price impact detected (%.4f), capping at 1.0 - swap too large for liquidity", priceImpact))
priceImpact = 1.0
}
}
}
// Set amountOut (opposite direction from amountIn)
var amountOut *big.Int
if event.Amount0.Sign() > 0 && event.Amount1.Sign() < 0 {
// Token0 in, Token1 out
amountOut = amount1Abs
} else if event.Amount0.Sign() < 0 && event.Amount1.Sign() > 0 {
// Token1 in, Token0 out
amountOut = amount0Abs
} else {
// Fallback: use amounts as-is
if amount0Abs.Cmp(amount1Abs) > 0 {
amountOut = amount1Abs
} else {
amountOut = amount0Abs
}
}
// Calculate PriceAfter based on the swap's impact
// For Uniswap V3, use the constant product formula with liquidity
priceAfter, tickAfter := s.calculatePriceAfterSwap(
poolData,
event.Amount0,
event.Amount1,
currentPrice,
)
movement := &market.PriceMovement{
Token0: event.Token0.Hex(),
Token1: event.Token1.Hex(),
Pool: event.PoolAddress.Hex(),
Protocol: event.Protocol,
AmountIn: amountIn,
AmountOut: amountOut,
PriceBefore: currentPrice,
PriceAfter: priceAfter,
PriceImpact: priceImpact,
TickBefore: poolData.Tick,
TickAfter: tickAfter,
Timestamp: time.Now(),
}
amount0Str := "0"
if event.Amount0 != nil {
amount0Str = event.Amount0.String()
}
s.logger.Debug(fmt.Sprintf("Price movement calculated: impact=%.6f%%, amount_in=%s", priceImpact*100, amount0Str))
return movement, nil
}
// calculatePriceAfterSwap calculates the price and tick after a swap
// Uses Uniswap V3 constant product formula with proper bounds checking
// FIXED: Added proper scaling and bounds checking to prevent negative sqrtPrice
func (s *SwapAnalyzer) calculatePriceAfterSwap(
poolData *market.CachedData,
amount0 *big.Int,
amount1 *big.Int,
priceBefore *big.Float,
) (*big.Float, int) {
// If we don't have liquidity data, we can't calculate the new price accurately
if poolData.Liquidity == nil || poolData.Liquidity.Sign() == 0 {
s.logger.Debug("No liquidity data available, returning price before")
return priceBefore, poolData.Tick
}
// Bounds checking: ensure priceBefore is valid
if priceBefore.Sign() <= 0 {
s.logger.Warn("Invalid priceBefore (<=0), returning fallback")
return priceBefore, poolData.Tick
}
liquidityFloat := new(big.Float).SetInt(poolData.Liquidity.ToBig())
// Check for near-zero liquidity to avoid division issues
minLiquidity := big.NewFloat(1e6) // Minimum reasonable liquidity
if liquidityFloat.Cmp(minLiquidity) < 0 {
s.logger.Debug(fmt.Sprintf("Liquidity too low (%.2f), returning price before", liquidityFloat))
return priceBefore, poolData.Tick
}
// Convert amounts to absolute values for calculation
amount0Abs := new(big.Int).Abs(amount0)
amount1Abs := new(big.Int).Abs(amount1)
amount0Float := new(big.Float).SetInt(amount0Abs)
amount1Float := new(big.Float).SetInt(amount1Abs)
// Calculate price impact as percentage of liquidity
// FIXED: Use percentage-based calculation to prevent large deltas
var priceImpactRatio *big.Float
// Determine swap direction and calculate impact ratio
if amount0.Sign() > 0 && amount1.Sign() < 0 {
// Token0 in, Token1 out -> price moves based on amount0/liquidity ratio
priceImpactRatio = new(big.Float).Quo(amount0Float, liquidityFloat)
} else if amount0.Sign() < 0 && amount1.Sign() > 0 {
// Token1 in, Token0 out -> price moves based on amount1/liquidity ratio
priceImpactRatio = new(big.Float).Quo(amount1Float, liquidityFloat)
} else {
// Can't determine direction or both zero - return original price
s.logger.Debug("Cannot determine swap direction, returning price before")
return priceBefore, poolData.Tick
}
// FIXED: Cap the price impact ratio to prevent extreme movements
maxImpactRatio := big.NewFloat(0.5) // Max 50% impact per swap
if priceImpactRatio.Cmp(maxImpactRatio) > 0 {
s.logger.Debug(fmt.Sprintf("Price impact ratio %.6f exceeds max %.2f, capping",
priceImpactRatio, maxImpactRatio))
priceImpactRatio = maxImpactRatio
}
// Calculate price change: priceAfter = priceBefore * (1 ± impact)
// For token0 in: price decreases (1 - impact)
// For token1 in: price increases (1 + impact)
one := big.NewFloat(1.0)
var priceMultiplier *big.Float
if amount0.Sign() > 0 && amount1.Sign() < 0 {
// Token0 in -> price decreases
priceMultiplier = new(big.Float).Sub(one, priceImpactRatio)
} else {
// Token1 in -> price increases
priceMultiplier = new(big.Float).Add(one, priceImpactRatio)
}
// FIXED: Ensure multiplier is positive and reasonable
minMultiplier := big.NewFloat(0.01) // Price can't drop below 1% of original
maxMultiplier := big.NewFloat(100.0) // Price can't increase more than 100x
if priceMultiplier.Cmp(minMultiplier) < 0 {
s.logger.Warn(fmt.Sprintf("Price multiplier %.6f too low, capping at %.2f",
priceMultiplier, minMultiplier))
priceMultiplier = minMultiplier
}
if priceMultiplier.Cmp(maxMultiplier) > 0 {
s.logger.Warn(fmt.Sprintf("Price multiplier %.6f too high, capping at %.2f",
priceMultiplier, maxMultiplier))
priceMultiplier = maxMultiplier
}
// Calculate final price
priceAfter := new(big.Float).Mul(priceBefore, priceMultiplier)
// Final validation
if priceAfter.Sign() <= 0 {
s.logger.Warn(fmt.Sprintf("Calculated priceAfter is non-positive (%.6f), using price before",
priceAfter))
return priceBefore, poolData.Tick
}
// Calculate tick after (approximate)
priceAfterFloat64, _ := priceAfter.Float64()
if priceAfterFloat64 <= 0 {
return priceBefore, poolData.Tick
}
tickAfter := uniswap.SqrtPriceX96ToTick(uniswap.PriceToSqrtPriceX96(priceAfter))
s.logger.Debug(fmt.Sprintf("Price after swap: before=%.10f, after=%.10f, tick: %d -> %d, impact: %.4f%%",
priceBefore, priceAfter, poolData.Tick, tickAfter, priceImpactRatio))
return priceAfter, tickAfter
}
// findArbitrageOpportunities looks for arbitrage opportunities based on price movements
func (s *SwapAnalyzer) findArbitrageOpportunities(event events.Event, movement *market.PriceMovement, marketScanner *market.MarketScanner) []stypes.ArbitrageOpportunity {
s.logger.Debug(fmt.Sprintf("Searching for arbitrage opportunities for pool %s", event.PoolAddress))
opportunities := make([]stypes.ArbitrageOpportunity, 0)
// Get related pools for the same token pair
relatedPools := marketScanner.FindRelatedPools(event.Token0, event.Token1)
// If we have related pools, compare prices
if len(relatedPools) > 0 {
// Get the current price in this pool
currentPrice := movement.PriceBefore
// PHASE 2 OPTIMIZATION: Batch fetch all related pools in single RPC call (99% RPC reduction!)
// Collect all pool addresses first
poolAddresses := make([]common.Address, 0, len(relatedPools))
poolIndexMap := make(map[common.Address]int) // Map address to relatedPools index
for i, pool := range relatedPools {
// Skip the same pool
if pool.Address == event.PoolAddress {
continue
}
poolAddresses = append(poolAddresses, pool.Address)
poolIndexMap[pool.Address] = i
}
// Batch fetch all pool data in a single RPC call
poolDataMap, err := marketScanner.BatchFetchPoolData(poolAddresses)
if err != nil {
s.logger.Warn(fmt.Sprintf("Batch fetch failed for %d related pools: %v, opportunities may be missed",
len(poolAddresses), err))
// Continue anyway - some pools may have been fetched
}
// Process each fetched pool
for poolAddr, poolData := range poolDataMap {
if poolData == nil {
continue
}
// Check if poolData.SqrtPriceX96 is nil to prevent panic
if poolData.SqrtPriceX96 == nil {
s.logger.Error(fmt.Sprintf("Pool data for %s has nil SqrtPriceX96", poolAddr.Hex()))
continue
}
// Calculate price in the related pool using cached function
relatedPrice := uniswap.SqrtPriceX96ToPriceCached(poolData.SqrtPriceX96.ToBig())
// Check if currentPrice or relatedPrice is nil to prevent panic
if currentPrice == nil || relatedPrice == nil {
s.logger.Error(fmt.Sprintf("Nil price detected for pool comparison"))
continue
}
// Calculate price difference
priceDiff := new(big.Float).Sub(currentPrice, relatedPrice)
// Check if relatedPrice is zero to prevent division by zero
zero := new(big.Float).SetFloat64(0.0)
if relatedPrice.Cmp(zero) == 0 {
s.logger.Debug(fmt.Sprintf("Skipping pool %s: related price is zero", poolAddr.Hex()))
continue
}
priceDiffRatio := new(big.Float).Quo(priceDiff, relatedPrice)
// If there's a significant price difference, we might have an arbitrage opportunity
priceDiffFloat, _ := priceDiffRatio.Float64()
// Validate: reject impossible price differences (>1000% = 10.0)
if priceDiffFloat > 10.0 || priceDiffFloat < -10.0 {
s.logger.Debug(fmt.Sprintf("Skipping pool %s: price difference too large (%.2f)", poolAddr.Hex(), priceDiffFloat))
continue
}
// Take absolute value for comparison
priceDiffAbs := priceDiffFloat
if priceDiffAbs < 0 {
priceDiffAbs = -priceDiffAbs
}
// Lower threshold for Arbitrum where spreads are smaller
arbitrageThreshold := 0.001 // 0.1% threshold instead of 0.5%
if priceDiffAbs > arbitrageThreshold {
// Estimate potential profit
estimatedProfit := marketScanner.EstimateProfit(event, poolData, priceDiffFloat)
if estimatedProfit != nil && estimatedProfit.Sign() > 0 {
// Calculate gas cost with dynamic pricing
// Get real-time gas price from network if available via marketScanner's client
// Fallback to conservative 0.2 gwei if unavailable
gasPrice := big.NewInt(200000000) // 0.2 gwei fallback (increased from 0.1 for safety)
gasUnits := big.NewInt(400000) // 400k gas (increased from 300k for complex arb)
// Note: Future enhancement - get dynamic gas price from marketScanner.client.SuggestGasPrice()
gasCost := new(big.Int).Mul(gasPrice, gasUnits)
// Calculate net profit after gas
netProfit := new(big.Int).Sub(estimatedProfit, gasCost)
// Only create opportunity if still profitable after gas
if netProfit.Sign() > 0 {
now := time.Now()
// Use a reasonable test amount (e.g., 0.1 ETH in wei)
testAmount := big.NewInt(100000000000000000) // 0.1 ETH
opp := stypes.ArbitrageOpportunity{
ID: fmt.Sprintf("arb_%d_%s", now.Unix(), event.PoolAddress.Hex()[:10]),
Path: []string{event.Token0.Hex(), event.Token1.Hex()},
Pools: []string{event.PoolAddress.Hex(), poolAddr.Hex()},
AmountIn: testAmount,
Profit: estimatedProfit,
NetProfit: netProfit,
GasEstimate: gasUnits,
GasCost: gasCost,
EstimatedProfit: netProfit,
RequiredAmount: testAmount,
ROI: priceDiffAbs * 100, // Convert to percentage (use absolute value)
Protocol: fmt.Sprintf("%s->%s", event.Protocol, poolData.Protocol),
ExecutionTime: 200, // Estimated 200ms for direct arb
Confidence: 0.7, // Higher confidence for direct arb
PriceImpact: priceDiffAbs, // Use absolute value for price impact
MaxSlippage: 1.0, // 1% max slippage
TokenIn: event.Token0,
TokenOut: event.Token1,
Timestamp: now.Unix(),
DetectedAt: now,
ExpiresAt: now.Add(3 * time.Second), // 3 second expiry for direct arb
Urgency: 7, // Higher urgency
Risk: 0.2, // Lower risk
Profitable: true,
}
opportunities = append(opportunities, opp)
s.logger.Info(fmt.Sprintf("Found arbitrage opportunity: %+v", opp))
}
}
}
}
}
// Also look for triangular arbitrage opportunities
triangularOpps := marketScanner.FindTriangularArbitrageOpportunities(event)
opportunities = append(opportunities, triangularOpps...)
return opportunities
}
// calculateSwapUSDValues calculates USD values for swap amounts using the profit calculator's price oracle
func (s *SwapAnalyzer) calculateSwapUSDValues(swapData *marketdata.SwapEventData, fee uint32) (amountInUSD, amountOutUSD, feeUSD float64) {
if s.profitCalculator == nil {
return 0, 0, 0
}
// Get token prices in USD
token0Price := s.getTokenPriceUSD(swapData.Token0)
token1Price := s.getTokenPriceUSD(swapData.Token1)
// Calculate decimals for proper conversion
token0Decimals := s.getTokenDecimals(swapData.Token0)
token1Decimals := s.getTokenDecimals(swapData.Token1)
// Calculate amount in USD
if swapData.Amount0In != nil && swapData.Amount0In.Sign() > 0 {
amount0InFloat := s.bigIntToFloat(swapData.Amount0In, token0Decimals)
amountInUSD = amount0InFloat * token0Price
} else if swapData.Amount1In != nil && swapData.Amount1In.Sign() > 0 {
amount1InFloat := s.bigIntToFloat(swapData.Amount1In, token1Decimals)
amountInUSD = amount1InFloat * token1Price
}
// Calculate amount out USD
if swapData.Amount0Out != nil && swapData.Amount0Out.Sign() > 0 {
amount0OutFloat := s.bigIntToFloat(swapData.Amount0Out, token0Decimals)
amountOutUSD = amount0OutFloat * token0Price
} else if swapData.Amount1Out != nil && swapData.Amount1Out.Sign() > 0 {
amount1OutFloat := s.bigIntToFloat(swapData.Amount1Out, token1Decimals)
amountOutUSD = amount1OutFloat * token1Price
}
// Calculate fee USD (fee tier as percentage of input amount)
feePercent := float64(fee) / 1000000.0 // Convert from basis points
feeUSD = amountInUSD * feePercent
return amountInUSD, amountOutUSD, feeUSD
}
// calculateSwapPriceImpact calculates the price impact of a swap based on pool liquidity and amounts
func (s *SwapAnalyzer) calculateSwapPriceImpact(event events.Event, swapData *marketdata.SwapEventData) float64 {
if event.SqrtPriceX96 == nil || event.Liquidity == nil {
return 0.0
}
// Get pre-swap price from sqrtPriceX96
prePrice := s.sqrtPriceX96ToPrice(event.SqrtPriceX96)
if prePrice == 0 {
return 0.0
}
// Calculate effective swap size in token0 terms
var swapSize *big.Int
if swapData.Amount0In != nil && swapData.Amount0In.Sign() > 0 {
swapSize = swapData.Amount0In
} else if swapData.Amount0Out != nil && swapData.Amount0Out.Sign() > 0 {
swapSize = swapData.Amount0Out
} else {
return 0.0
}
// Calculate price impact as percentage of pool liquidity
liquidity := event.Liquidity.ToBig()
if liquidity.Sign() == 0 {
return 0.0
}
// Proper price impact calculation for AMMs: impact = swapSize / (liquidity + swapSize)
// This is more accurate than the quadratic approximation for real AMMs
swapSizeFloat := new(big.Float).SetInt(swapSize)
liquidityFloat := new(big.Float).SetInt(liquidity)
// Calculate the price impact ratio
priceImpactRatio := new(big.Float).Quo(swapSizeFloat, new(big.Float).Add(liquidityFloat, swapSizeFloat))
// Convert to percentage
priceImpactPercent, _ := priceImpactRatio.Float64()
return priceImpactPercent * 100.0
}
// getTokenPriceUSD gets the USD price of a token using various price sources
func (s *SwapAnalyzer) getTokenPriceUSD(tokenAddr common.Address) float64 {
if price, exists := scannercommon.GetTokenPriceUSD(tokenAddr); exists {
return price
}
// For unknown tokens, return 0 (in production, would query price oracle or DEX)
return 0.0
}
// getTokenDecimals returns the decimal places for a token
func (s *SwapAnalyzer) getTokenDecimals(tokenAddr common.Address) uint8 {
if decimals, exists := scannercommon.GetTokenDecimals(tokenAddr); exists {
return decimals
}
// Default to 18 for unknown tokens
return 18
}
// bigIntToFloat converts a big.Int amount to float64 accounting for token decimals
func (s *SwapAnalyzer) bigIntToFloat(amount *big.Int, decimals uint8) float64 {
if amount == nil {
return 0.0
}
divisor := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(decimals)), nil)
amountFloat := new(big.Float).SetInt(amount)
divisorFloat := new(big.Float).SetInt(divisor)
result := new(big.Float).Quo(amountFloat, divisorFloat)
resultFloat, _ := result.Float64()
return resultFloat
}
// sqrtPriceX96ToPrice converts sqrtPriceX96 to a regular price using cached mathematical functions
func (s *SwapAnalyzer) sqrtPriceX96ToPrice(sqrtPriceX96 *uint256.Int) float64 {
if sqrtPriceX96 == nil {
return 0.0
}
// Use cached function for optimized calculation
price := uniswap.SqrtPriceX96ToPriceCached(sqrtPriceX96.ToBig())
priceFloat, _ := price.Float64()
return priceFloat
}
func canonicalProtocolName(raw string) string {
raw = strings.TrimSpace(raw)
if raw == "" {
return ""
}
lower := strings.ToLower(raw)
if idx := strings.Index(lower, "_fee"); idx != -1 {
lower = lower[:idx]
}
replacer := strings.NewReplacer(" ", "", "-", "", ".", "", ":", "")
lower = replacer.Replace(lower)
lower = strings.ReplaceAll(lower, "_", "")
switch {
case strings.Contains(lower, "kyberelastic"):
return "KyberElastic"
case strings.Contains(lower, "kyberclassic"):
return "KyberClassic"
case strings.Contains(lower, "kyberswap"):
return "Kyber"
case strings.Contains(lower, "kyber"):
return "Kyber"
case strings.Contains(lower, "balancer"):
return "Balancer"
case strings.Contains(lower, "curve"):
return "Curve"
case strings.Contains(lower, "camelot"):
return "Camelot"
case strings.Contains(lower, "traderjoe"):
return "TraderJoe"
case strings.Contains(lower, "sushiswap") || strings.Contains(lower, "sushi"):
return "SushiSwap"
case strings.Contains(lower, "pancake"):
return "PancakeSwap"
case strings.Contains(lower, "ramses"):
return "Ramses"
case strings.Contains(lower, "uniswap") && strings.Contains(lower, "v3"):
return "UniswapV3"
case strings.Contains(lower, "uniswap") && strings.Contains(lower, "v2"):
return "UniswapV2"
case strings.Contains(lower, "uniswap"):
return "Uniswap"
}
return strings.Title(strings.ReplaceAll(raw, "_", ""))
}
func protocolFromFactory(factory common.Address) string {
if factory == (common.Address{}) {
return ""
}
if protocol, ok := factoryProtocolMap[factory]; ok {
return protocol
}
return ""
}
func factoryForProtocol(protocol string) common.Address {
canonical := canonicalProtocolName(protocol)
if canonical == "" {
return common.Address{}
}
if addr, ok := protocolDefaultFactory[canonical]; ok {
return addr
}
return common.Address{}
}
func (s *SwapAnalyzer) resolveFactory(existing common.Address, protocol string, marketScanner *market.MarketScanner) common.Address {
if existing != (common.Address{}) {
return existing
}
if addr := factoryForProtocol(protocol); addr != (common.Address{}) {
return addr
}
if marketScanner != nil {
if addr := marketScanner.GetFactoryForProtocol(protocol); addr != (common.Address{}) {
return addr
}
}
return existing
}
func (s *SwapAnalyzer) detectSwapProtocol(event events.Event, poolInfo *marketdata.PoolInfo, poolData *market.CachedData, factory common.Address) string {
candidates := []string{event.Protocol}
if poolInfo != nil {
candidates = append(candidates, poolInfo.Protocol)
if factory == (common.Address{}) && poolInfo.Factory != (common.Address{}) {
factory = poolInfo.Factory
}
}
if poolData != nil {
candidates = append(candidates, poolData.Protocol)
}
if proto := protocolFromFactory(factory); proto != "" {
candidates = append(candidates, proto)
}
for _, candidate := range candidates {
if canonical := canonicalProtocolName(candidate); canonical != "" && !strings.EqualFold(canonical, "unknown") {
return canonical
}
}
if proto, ok := protocolSpecialByAddress[event.PoolAddress]; ok {
return proto
}
return "Unknown"
}