Files
mev-beta/pkg/scanner/swap/analyzer.go
Krypto Kajun 0358eed3a6 fix(arbitrage): critical fixes for struct initialization and price impact calculations
- Triangular arbitrage: populate all 25+ ArbitrageOpportunity fields
- Direct arbitrage: complete field initialization with gas cost calculation
- Price impact: add division-by-zero protection and validation
- Absolute value handling for swap amounts to prevent uint256 max display

Remaining issue: Some events still show uint256 max - needs investigation
of alternative parsing code path (possibly pkg/pools/discovery.go)
2025-10-25 20:12:45 -05:00

847 lines
32 KiB
Go

package swap
import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/events"
"github.com/fraktal/mev-beta/pkg/marketdata"
"github.com/fraktal/mev-beta/pkg/profitcalc"
scannercommon "github.com/fraktal/mev-beta/pkg/scanner/common"
"github.com/fraktal/mev-beta/pkg/scanner/market"
stypes "github.com/fraktal/mev-beta/pkg/types"
"github.com/fraktal/mev-beta/pkg/uniswap"
)
// SwapAnalyzer handles analysis of swap events for price movement opportunities
type SwapAnalyzer struct {
logger *logger.Logger
marketDataLogger *marketdata.MarketDataLogger
profitCalculator *profitcalc.ProfitCalculator
opportunityRanker *profitcalc.OpportunityRanker
}
var (
factoryProtocolMap = map[common.Address]string{
common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"): "UniswapV3",
common.HexToAddress("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f"): "UniswapV2",
common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"): "UniswapV2",
common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"): "SushiSwap",
common.HexToAddress("0xBA12222222228d8Ba445958a75a0704d566BF2C8"): "Balancer",
common.HexToAddress("0xF18056Bbd320E96A48e3Fbf8bC061322531aac99"): "Curve",
common.HexToAddress("0x5ffe7FB82894076ECB99A30D6A32e969e6e35E98"): "Curve",
common.HexToAddress("0x5F1dddbf348aC2fbe22a163e30F99F9ECE3DD50a"): "KyberElastic",
common.HexToAddress("0x1a3c9B1d2F0529D97f2afC5136Cc23e58f1FD35B"): "Camelot",
}
protocolDefaultFactory = map[string]common.Address{
"UniswapV3": common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"),
"UniswapV2": common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"),
"SushiSwap": common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"),
"Balancer": common.HexToAddress("0xBA12222222228d8Ba445958a75a0704d566BF2C8"),
"Curve": common.HexToAddress("0xF18056Bbd320E96A48e3Fbf8bC061322531aac99"),
"KyberElastic": common.HexToAddress("0x5F1dddbf348aC2fbe22a163e30F99F9ECE3DD50a"),
"Camelot": common.HexToAddress("0x1a3c9B1d2F0529D97f2afC5136Cc23e58f1FD35B"),
}
protocolSpecialByAddress = map[common.Address]string{
common.HexToAddress("0xBA12222222228d8Ba445958a75a0704d566BF2C8"): "Balancer",
common.HexToAddress("0xF18056Bbd320E96A48e3Fbf8bC061322531aac99"): "Curve",
common.HexToAddress("0x5F1dddbf348aC2fbe22a163e30F99F9ECE3DD50a"): "KyberElastic",
}
)
// NewSwapAnalyzer creates a new swap analyzer
func NewSwapAnalyzer(
logger *logger.Logger,
marketDataLogger *marketdata.MarketDataLogger,
profitCalculator *profitcalc.ProfitCalculator,
opportunityRanker *profitcalc.OpportunityRanker,
) *SwapAnalyzer {
return &SwapAnalyzer{
logger: logger,
marketDataLogger: marketDataLogger,
profitCalculator: profitCalculator,
opportunityRanker: opportunityRanker,
}
}
// AnalyzeSwapEvent analyzes a swap event for arbitrage opportunities
func (s *SwapAnalyzer) AnalyzeSwapEvent(event events.Event, marketScanner *market.MarketScanner) {
s.logger.Debug(fmt.Sprintf("Analyzing swap event in pool %s", event.PoolAddress))
// Validate pool address before attempting expensive lookups
if event.PoolAddress == (common.Address{}) {
s.logger.Warn(fmt.Sprintf("Skipping swap analysis: empty pool address (tx: %s)", event.TransactionHash.Hex()))
return
}
if event.PoolAddress == event.Token0 || event.PoolAddress == event.Token1 {
s.logger.Warn(fmt.Sprintf("Skipping swap analysis: pool address %s matches token address (tx: %s)", event.PoolAddress.Hex(), event.TransactionHash.Hex()))
return
}
if strings.HasPrefix(strings.ToLower(event.PoolAddress.Hex()), "0x000000") {
s.logger.Warn(fmt.Sprintf("Skipping swap analysis: suspicious pool address %s (tx: %s)", event.PoolAddress.Hex(), event.TransactionHash.Hex()))
return
}
// Get comprehensive pool data to determine factory and fee
poolInfo, poolExists := s.marketDataLogger.GetPoolInfo(event.PoolAddress)
factory := common.Address{}
fee := uint32(3000) // Default 0.3%
if poolExists {
factory = poolInfo.Factory
fee = poolInfo.Fee
} else {
// Determine factory from known DEX protocols
factory = marketScanner.GetFactoryForProtocol(event.Protocol)
}
// Create comprehensive swap event data for market data logger
swapData := &marketdata.SwapEventData{
TxHash: event.TransactionHash,
BlockNumber: event.BlockNumber,
LogIndex: uint(0), // Default log index (would need to be extracted from receipt)
Timestamp: time.Now(),
PoolAddress: event.PoolAddress,
Factory: factory,
Protocol: event.Protocol,
Token0: event.Token0,
Token1: event.Token1,
Sender: common.Address{}, // Default sender (would need to be extracted from transaction)
Recipient: common.Address{}, // Default recipient (would need to be extracted from transaction)
SqrtPriceX96: event.SqrtPriceX96,
Liquidity: event.Liquidity,
Tick: int32(event.Tick),
}
// Extract swap amounts from event (handle signed amounts correctly)
if event.Amount0 != nil && event.Amount1 != nil {
amount0Float := new(big.Float).SetInt(event.Amount0)
amount1Float := new(big.Float).SetInt(event.Amount1)
// Determine input/output based on sign (negative means token was removed from pool = output)
if amount0Float.Sign() < 0 {
// Token0 out, Token1 in
swapData.Amount0Out = new(big.Int).Abs(event.Amount0)
swapData.Amount1In = event.Amount1
swapData.Amount0In = big.NewInt(0)
swapData.Amount1Out = big.NewInt(0)
} else if amount1Float.Sign() < 0 {
// Token0 in, Token1 out
swapData.Amount0In = event.Amount0
swapData.Amount1Out = new(big.Int).Abs(event.Amount1)
swapData.Amount0Out = big.NewInt(0)
swapData.Amount1In = big.NewInt(0)
} else {
// Both positive (shouldn't happen in normal swaps, but handle gracefully)
swapData.Amount0In = event.Amount0
swapData.Amount1In = event.Amount1
swapData.Amount0Out = big.NewInt(0)
swapData.Amount1Out = big.NewInt(0)
}
}
// Calculate USD values using profit calculator's price oracle
swapData.AmountInUSD, swapData.AmountOutUSD, swapData.FeeUSD = s.calculateSwapUSDValues(swapData, fee)
// Calculate price impact based on pool liquidity and swap amounts
swapData.PriceImpact = s.calculateSwapPriceImpact(event, swapData)
// Get pool data with caching
poolData, err := marketScanner.GetPoolData(event.PoolAddress.Hex())
if err != nil {
if errors.Is(err, market.ErrInvalidPoolCandidate) {
s.logger.Debug("Skipping pool data fetch for invalid candidate",
"pool", event.PoolAddress,
"tx", event.TransactionHash,
"error", err)
} else {
// Enhanced error logging with context - check if this is an ERC-20 token misclassified as a pool
errorMsg := fmt.Sprintf("Error getting pool data for %s: %v", event.PoolAddress, err)
contextMsg := fmt.Sprintf("event_type:%s protocol:%s block:%d tx:%s",
event.Type.String(), event.Protocol, event.BlockNumber, event.TransactionHash.Hex())
s.logger.Error(fmt.Sprintf("%s [context: %s]", errorMsg, contextMsg))
}
return
}
// CRITICAL FIX: Use actual token addresses from pool contract, not zero addresses from event
// The swap parser leaves Token0/Token1 as zeros expecting the caller to fill them,
// but poolData already contains the correct addresses from token0()/token1() calls
if poolData.Token0 != (common.Address{}) && poolData.Token1 != (common.Address{}) {
swapData.Token0 = poolData.Token0
swapData.Token1 = poolData.Token1
event.Token0 = poolData.Token0
event.Token1 = poolData.Token1
s.logger.Debug(fmt.Sprintf("Updated swap token addresses from pool data: token0=%s, token1=%s",
poolData.Token0.Hex(), poolData.Token1.Hex()))
} else {
// If pool data doesn't have token addresses, this is invalid - reject the event
s.logger.Warn(fmt.Sprintf("Pool data missing token addresses for pool %s, skipping event",
event.PoolAddress.Hex()))
return
}
finalProtocol := s.detectSwapProtocol(event, poolInfo, poolData, factory)
if finalProtocol == "" || strings.EqualFold(finalProtocol, "unknown") {
if fallback := canonicalProtocolName(event.Protocol); fallback != "" {
finalProtocol = fallback
} else {
finalProtocol = "Unknown"
}
}
event.Protocol = finalProtocol
swapData.Protocol = finalProtocol
if poolData != nil {
poolData.Protocol = finalProtocol
}
factory = s.resolveFactory(factory, finalProtocol, marketScanner)
swapData.Factory = factory
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.marketDataLogger.LogSwapEvent(ctx, event, swapData); err != nil {
s.logger.Debug(fmt.Sprintf("Failed to log swap event to market data logger: %v", err))
}
// Log the swap event to database (legacy)
marketScanner.LogSwapEvent(event)
s.marketDataLogger.UpdatePoolMetadata(event.PoolAddress, finalProtocol, factory)
// DEBUG: Log zero address events to trace their source
if event.PoolAddress == (common.Address{}) {
s.logger.Error(fmt.Sprintf("ZERO ADDRESS DEBUG [ANALYZER]: Received Event with zero PoolAddress - TxHash: %s, Protocol: %s, Token0: %s, Token1: %s, Type: %v",
event.TransactionHash.Hex(), event.Protocol, event.Token0.Hex(), event.Token1.Hex(), event.Type))
}
// Calculate price impact
priceMovement, err := s.calculatePriceMovement(event, poolData)
if err != nil {
s.logger.Error(fmt.Sprintf("Error calculating price movement for pool %s: %v", event.PoolAddress, err))
return
}
// Log opportunity with actual swap amounts from event (legacy)
s.logSwapOpportunity(event, poolData, priceMovement, marketScanner)
// Check if the movement is significant
if marketScanner.IsSignificantMovement(priceMovement, marketScanner.Config().MinProfitThreshold) {
s.logger.Info(fmt.Sprintf("Significant price movement detected in pool %s: %+v", event.PoolAddress, priceMovement))
// Look for arbitrage opportunities
opportunities := s.findArbitrageOpportunities(event, priceMovement, marketScanner)
if len(opportunities) > 0 {
s.logger.Info(fmt.Sprintf("Found %d arbitrage opportunities for pool %s", len(opportunities), event.PoolAddress))
for _, opp := range opportunities {
s.logger.Info(fmt.Sprintf("Arbitrage opportunity: %+v", opp))
// Execute the arbitrage opportunity
marketScanner.ExecuteArbitrageOpportunity(opp)
}
} else {
s.logger.Debug(fmt.Sprintf("Price movement in pool %s is not significant: %f", event.PoolAddress, priceMovement.PriceImpact))
}
}
}
// logSwapOpportunity logs swap opportunities using actual amounts from events
func (s *SwapAnalyzer) logSwapOpportunity(event events.Event, poolData *market.CachedData, priceMovement *market.PriceMovement, marketScanner *market.MarketScanner) {
// Convert amounts from big.Int to big.Float for profit calculation
amountInFloat := big.NewFloat(0)
amountOutFloat := big.NewFloat(0)
amountInDisplay := float64(0)
amountOutDisplay := float64(0)
// For swap events, Amount0 and Amount1 represent the actual swap amounts
// The sign indicates direction (positive = token added to pool, negative = token removed from pool)
if event.Amount0 != nil {
amount0Float := new(big.Float).SetInt(event.Amount0)
if event.Amount1 != nil {
amount1Float := new(big.Float).SetInt(event.Amount1)
// Determine input/output based on sign (negative means token was removed from pool = output)
if amount0Float.Sign() < 0 {
// Token0 out, Token1 in
amountOutFloat = new(big.Float).Abs(amount0Float)
amountInFloat = amount1Float
amountOutDisplay, _ = new(big.Float).Quo(amountOutFloat, big.NewFloat(1e18)).Float64()
amountInDisplay, _ = new(big.Float).Quo(amountInFloat, big.NewFloat(1e18)).Float64()
} else {
// Token0 in, Token1 out
amountInFloat = amount0Float
amountOutFloat = new(big.Float).Abs(amount1Float)
amountInDisplay, _ = new(big.Float).Quo(amountInFloat, big.NewFloat(1e18)).Float64()
amountOutDisplay, _ = new(big.Float).Quo(amountOutFloat, big.NewFloat(1e18)).Float64()
}
}
}
// Analyze arbitrage opportunity using the profit calculator
var estimatedProfitUSD float64 = 0.0
var profitData map[string]interface{}
if amountInFloat.Sign() > 0 && amountOutFloat.Sign() > 0 {
opportunity := s.profitCalculator.AnalyzeSwapOpportunity(
context.Background(),
event.Token0,
event.Token1,
new(big.Float).Quo(amountInFloat, big.NewFloat(1e18)), // Convert to ETH units
new(big.Float).Quo(amountOutFloat, big.NewFloat(1e18)), // Convert to ETH units
event.Protocol,
)
if opportunity != nil {
// Add opportunity to ranking system
rankedOpp := s.opportunityRanker.AddOpportunity(opportunity)
// Use the calculated profit for logging
if opportunity.NetProfit != nil {
estimatedProfitFloat, _ := opportunity.NetProfit.Float64()
estimatedProfitUSD = estimatedProfitFloat * 2000 // Assume 1 ETH = $2000 for USD conversion
}
// Add detailed profit analysis to additional data
profitData = map[string]interface{}{
"arbitrageId": opportunity.ID,
"isExecutable": opportunity.IsExecutable,
"rejectReason": opportunity.RejectReason,
"confidence": opportunity.Confidence,
"profitMargin": opportunity.ProfitMargin,
"netProfitETH": s.profitCalculator.FormatEther(opportunity.NetProfit),
"gasCostETH": s.profitCalculator.FormatEther(opportunity.GasCost),
"estimatedProfitETH": s.profitCalculator.FormatEther(opportunity.EstimatedProfit),
}
// Add ranking data if available
if rankedOpp != nil {
profitData["opportunityScore"] = rankedOpp.Score
profitData["opportunityRank"] = rankedOpp.Rank
profitData["competitionRisk"] = rankedOpp.CompetitionRisk
profitData["updateCount"] = rankedOpp.UpdateCount
}
}
} else if priceMovement != nil {
// Fallback to simple price impact calculation
estimatedProfitUSD = priceMovement.PriceImpact * 100
}
// Resolve token symbols
tokenIn := s.resolveTokenSymbol(event.Token0.Hex())
tokenOut := s.resolveTokenSymbol(event.Token1.Hex())
// Create additional data with profit analysis
additionalData := map[string]interface{}{
"poolAddress": event.PoolAddress.Hex(),
"protocol": event.Protocol,
"token0": event.Token0.Hex(),
"token1": event.Token1.Hex(),
"tokenIn": tokenIn,
"tokenOut": tokenOut,
"blockNumber": event.BlockNumber,
}
// Add price impact if available
if priceMovement != nil {
additionalData["priceImpact"] = priceMovement.PriceImpact
}
// Merge profit analysis data
if profitData != nil {
for k, v := range profitData {
additionalData[k] = v
}
}
// Log the opportunity using actual swap amounts and profit analysis
s.logger.Opportunity(event.TransactionHash.Hex(), "", event.PoolAddress.Hex(), "Swap", event.Protocol,
amountInDisplay, amountOutDisplay, 0.0, estimatedProfitUSD, additionalData)
}
// resolveTokenSymbol converts token address to human-readable symbol
func (s *SwapAnalyzer) resolveTokenSymbol(tokenAddress string) string {
// Convert to lowercase for consistent lookup
addr := strings.ToLower(tokenAddress)
// Known Arbitrum token mappings (same as in L2 parser)
tokenMap := map[string]string{
"0x82af49447d8a07e3bd95bd0d56f35241523fbab1": "WETH",
"0xaf88d065e77c8cc2239327c5edb3a432268e5831": "USDC",
"0xff970a61a04b1ca14834a43f5de4533ebddb5cc8": "USDC.e",
"0xfd086bc7cd5c481dcc9c85ebe478a1c0b69fcbb9": "USDT",
"0x2f2a2543b76a4166549f7aab2e75bef0aefc5b0f": "WBTC",
"0x912ce59144191c1204e64559fe8253a0e49e6548": "ARB",
"0xfc5a1a6eb076a2c7ad06ed22c90d7e710e35ad0a": "GMX",
"0xf97f4df75117a78c1a5a0dbb814af92458539fb4": "LINK",
"0xfa7f8980b0f1e64a2062791cc3b0871572f1f7f0": "UNI",
"0xba5ddd1f9d7f570dc94a51479a000e3bce967196": "AAVE",
"0x0de59c86c306b9fead9fb67e65551e2b6897c3f6": "KUMA",
"0x6efa9b8883dfb78fd75cd89d8474c44c3cbda469": "DIA",
"0x440017a1b021006d556d7fc06a54c32e42eb745b": "G@ARB",
"0x11cdb42b0eb46d95f990bedd4695a6e3fa034978": "CRV",
"0x040d1edc9569d4bab2d15287dc5a4f10f56a56b8": "BAL",
"0x354a6da3fcde098f8389cad84b0182725c6c91de": "COMP",
"0x2e9a6df78e42c50b0cefcf9000d0c3a4d34e1dd5": "MKR",
}
if symbol, exists := tokenMap[addr]; exists {
return symbol
}
// Return truncated address if not in mapping
if len(tokenAddress) > 10 {
return tokenAddress[:6] + "..." + tokenAddress[len(tokenAddress)-4:]
}
return tokenAddress
}
// calculatePriceMovement calculates the price movement from a swap event using cached mathematical functions
func (s *SwapAnalyzer) calculatePriceMovement(event events.Event, poolData *market.CachedData) (*market.PriceMovement, error) {
s.logger.Debug(fmt.Sprintf("Calculating price movement for pool %s", event.PoolAddress))
// Get current price from pool data using cached function
currentPrice := uniswap.SqrtPriceX96ToPriceCached(poolData.SqrtPriceX96.ToBig())
if currentPrice == nil {
return nil, fmt.Errorf("failed to calculate current price from sqrtPriceX96")
}
// Calculate price impact based on swap amounts
var priceImpact float64
if event.Amount0.Sign() > 0 && event.Amount1.Sign() > 0 {
// Both amounts are positive, calculate the impact
amount0Float := new(big.Float).SetInt(event.Amount0)
amount1Float := new(big.Float).SetInt(event.Amount1)
// Price impact = |amount1 / amount0 - current_price| / current_price
swapPrice := new(big.Float).Quo(amount1Float, amount0Float)
priceDiff := new(big.Float).Sub(swapPrice, currentPrice)
priceDiff.Abs(priceDiff)
// Check if currentPrice is zero to prevent division by zero
zero := new(big.Float).SetFloat64(0.0)
if currentPrice.Cmp(zero) != 0 {
priceImpactFloat := new(big.Float).Quo(priceDiff, currentPrice)
priceImpact, _ = priceImpactFloat.Float64()
// Validate: reject impossible price impacts (>1000% = 10.0)
if priceImpact > 10.0 {
s.logger.Warn(fmt.Sprintf("Price impact too large (%.2f), capping at 0", priceImpact))
priceImpact = 0.0
}
}
}
// Use absolute values for amounts (UniswapV3 uses signed int256, but amounts should be positive)
amountIn := new(big.Int).Abs(event.Amount0)
amountOut := new(big.Int).Abs(event.Amount1)
movement := &market.PriceMovement{
Token0: event.Token0.Hex(),
Token1: event.Token1.Hex(),
Pool: event.PoolAddress.Hex(),
Protocol: event.Protocol,
AmountIn: amountIn,
AmountOut: amountOut,
PriceBefore: currentPrice,
PriceAfter: currentPrice, // For now, assume same price (could be calculated based on swap)
PriceImpact: priceImpact,
TickBefore: poolData.Tick,
TickAfter: poolData.Tick, // For now, assume same tick
Timestamp: time.Now(),
}
amount0Str := "0"
if event.Amount0 != nil {
amount0Str = event.Amount0.String()
}
s.logger.Debug(fmt.Sprintf("Price movement calculated: impact=%.6f%%, amount_in=%s", priceImpact*100, amount0Str))
return movement, nil
}
// findArbitrageOpportunities looks for arbitrage opportunities based on price movements
func (s *SwapAnalyzer) findArbitrageOpportunities(event events.Event, movement *market.PriceMovement, marketScanner *market.MarketScanner) []stypes.ArbitrageOpportunity {
s.logger.Debug(fmt.Sprintf("Searching for arbitrage opportunities for pool %s", event.PoolAddress))
opportunities := make([]stypes.ArbitrageOpportunity, 0)
// Get related pools for the same token pair
relatedPools := marketScanner.FindRelatedPools(event.Token0, event.Token1)
// If we have related pools, compare prices
if len(relatedPools) > 0 {
// Get the current price in this pool
currentPrice := movement.PriceBefore
// Compare with prices in related pools
for _, pool := range relatedPools {
// Skip the same pool
if pool.Address == event.PoolAddress {
continue
}
// Get pool data
poolData, err := marketScanner.GetPoolData(pool.Address.Hex())
if err != nil {
// Enhanced error logging with context for related pool analysis
errorMsg := fmt.Sprintf("Error getting pool data for related pool %s: %v", pool.Address.Hex(), err)
contextMsg := fmt.Sprintf("original_pool:%s related_pool:%s token_pair:%s-%s",
event.PoolAddress.Hex(), pool.Address.Hex(), pool.Token0.Hex(), pool.Token1.Hex())
s.logger.Error(fmt.Sprintf("%s [context: %s]", errorMsg, contextMsg))
continue
}
// Check if poolData.SqrtPriceX96 is nil to prevent panic
if poolData.SqrtPriceX96 == nil {
s.logger.Error(fmt.Sprintf("Pool data for %s has nil SqrtPriceX96", pool.Address.Hex()))
continue
}
// Calculate price in the related pool using cached function
relatedPrice := uniswap.SqrtPriceX96ToPriceCached(poolData.SqrtPriceX96.ToBig())
// Check if currentPrice or relatedPrice is nil to prevent panic
if currentPrice == nil || relatedPrice == nil {
s.logger.Error(fmt.Sprintf("Nil price detected for pool comparison"))
continue
}
// Calculate price difference
priceDiff := new(big.Float).Sub(currentPrice, relatedPrice)
// Check if relatedPrice is zero to prevent division by zero
zero := new(big.Float).SetFloat64(0.0)
if relatedPrice.Cmp(zero) == 0 {
s.logger.Debug(fmt.Sprintf("Skipping pool %s: related price is zero", pool.Address.Hex()))
continue
}
priceDiffRatio := new(big.Float).Quo(priceDiff, relatedPrice)
// If there's a significant price difference, we might have an arbitrage opportunity
priceDiffFloat, _ := priceDiffRatio.Float64()
// Validate: reject impossible price differences (>1000% = 10.0)
if priceDiffFloat > 10.0 || priceDiffFloat < -10.0 {
s.logger.Debug(fmt.Sprintf("Skipping pool %s: price difference too large (%.2f)", pool.Address.Hex(), priceDiffFloat))
continue
}
// Take absolute value for comparison
priceDiffAbs := priceDiffFloat
if priceDiffAbs < 0 {
priceDiffAbs = -priceDiffAbs
}
// Lower threshold for Arbitrum where spreads are smaller
arbitrageThreshold := 0.001 // 0.1% threshold instead of 0.5%
if priceDiffAbs > arbitrageThreshold {
// Estimate potential profit
estimatedProfit := marketScanner.EstimateProfit(event, pool, priceDiffFloat)
if estimatedProfit != nil && estimatedProfit.Sign() > 0 {
// Calculate gas cost in wei (300k gas * current gas price estimate)
gasPrice := big.NewInt(100000000) // 0.1 gwei default
gasUnits := big.NewInt(300000)
gasCost := new(big.Int).Mul(gasPrice, gasUnits)
// Calculate net profit after gas
netProfit := new(big.Int).Sub(estimatedProfit, gasCost)
// Only create opportunity if still profitable after gas
if netProfit.Sign() > 0 {
now := time.Now()
// Use a reasonable test amount (e.g., 0.1 ETH in wei)
testAmount := big.NewInt(100000000000000000) // 0.1 ETH
opp := stypes.ArbitrageOpportunity{
ID: fmt.Sprintf("arb_%d_%s", now.Unix(), event.PoolAddress.Hex()[:10]),
Path: []string{event.Token0.Hex(), event.Token1.Hex()},
Pools: []string{event.PoolAddress.Hex(), pool.Address.Hex()},
AmountIn: testAmount,
Profit: estimatedProfit,
NetProfit: netProfit,
GasEstimate: gasUnits,
GasCost: gasCost,
EstimatedProfit: netProfit,
RequiredAmount: testAmount,
ROI: priceDiffAbs * 100, // Convert to percentage (use absolute value)
Protocol: fmt.Sprintf("%s->%s", event.Protocol, pool.Protocol),
ExecutionTime: 200, // Estimated 200ms for direct arb
Confidence: 0.7, // Higher confidence for direct arb
PriceImpact: priceDiffAbs, // Use absolute value for price impact
MaxSlippage: 1.0, // 1% max slippage
TokenIn: event.Token0,
TokenOut: event.Token1,
Timestamp: now.Unix(),
DetectedAt: now,
ExpiresAt: now.Add(3 * time.Second), // 3 second expiry for direct arb
Urgency: 7, // Higher urgency
Risk: 0.2, // Lower risk
Profitable: true,
}
opportunities = append(opportunities, opp)
s.logger.Info(fmt.Sprintf("Found arbitrage opportunity: %+v", opp))
}
}
}
}
}
// Also look for triangular arbitrage opportunities
triangularOpps := marketScanner.FindTriangularArbitrageOpportunities(event)
opportunities = append(opportunities, triangularOpps...)
return opportunities
}
// calculateSwapUSDValues calculates USD values for swap amounts using the profit calculator's price oracle
func (s *SwapAnalyzer) calculateSwapUSDValues(swapData *marketdata.SwapEventData, fee uint32) (amountInUSD, amountOutUSD, feeUSD float64) {
if s.profitCalculator == nil {
return 0, 0, 0
}
// Get token prices in USD
token0Price := s.getTokenPriceUSD(swapData.Token0)
token1Price := s.getTokenPriceUSD(swapData.Token1)
// Calculate decimals for proper conversion
token0Decimals := s.getTokenDecimals(swapData.Token0)
token1Decimals := s.getTokenDecimals(swapData.Token1)
// Calculate amount in USD
if swapData.Amount0In != nil && swapData.Amount0In.Sign() > 0 {
amount0InFloat := s.bigIntToFloat(swapData.Amount0In, token0Decimals)
amountInUSD = amount0InFloat * token0Price
} else if swapData.Amount1In != nil && swapData.Amount1In.Sign() > 0 {
amount1InFloat := s.bigIntToFloat(swapData.Amount1In, token1Decimals)
amountInUSD = amount1InFloat * token1Price
}
// Calculate amount out USD
if swapData.Amount0Out != nil && swapData.Amount0Out.Sign() > 0 {
amount0OutFloat := s.bigIntToFloat(swapData.Amount0Out, token0Decimals)
amountOutUSD = amount0OutFloat * token0Price
} else if swapData.Amount1Out != nil && swapData.Amount1Out.Sign() > 0 {
amount1OutFloat := s.bigIntToFloat(swapData.Amount1Out, token1Decimals)
amountOutUSD = amount1OutFloat * token1Price
}
// Calculate fee USD (fee tier as percentage of input amount)
feePercent := float64(fee) / 1000000.0 // Convert from basis points
feeUSD = amountInUSD * feePercent
return amountInUSD, amountOutUSD, feeUSD
}
// calculateSwapPriceImpact calculates the price impact of a swap based on pool liquidity and amounts
func (s *SwapAnalyzer) calculateSwapPriceImpact(event events.Event, swapData *marketdata.SwapEventData) float64 {
if event.SqrtPriceX96 == nil || event.Liquidity == nil {
return 0.0
}
// Get pre-swap price from sqrtPriceX96
prePrice := s.sqrtPriceX96ToPrice(event.SqrtPriceX96)
if prePrice == 0 {
return 0.0
}
// Calculate effective swap size in token0 terms
var swapSize *big.Int
if swapData.Amount0In != nil && swapData.Amount0In.Sign() > 0 {
swapSize = swapData.Amount0In
} else if swapData.Amount0Out != nil && swapData.Amount0Out.Sign() > 0 {
swapSize = swapData.Amount0Out
} else {
return 0.0
}
// Calculate price impact as percentage of pool liquidity
liquidity := event.Liquidity.ToBig()
if liquidity.Sign() == 0 {
return 0.0
}
// Proper price impact calculation for AMMs: impact = swapSize / (liquidity + swapSize)
// This is more accurate than the quadratic approximation for real AMMs
swapSizeFloat := new(big.Float).SetInt(swapSize)
liquidityFloat := new(big.Float).SetInt(liquidity)
// Calculate the price impact ratio
priceImpactRatio := new(big.Float).Quo(swapSizeFloat, new(big.Float).Add(liquidityFloat, swapSizeFloat))
// Convert to percentage
priceImpactPercent, _ := priceImpactRatio.Float64()
return priceImpactPercent * 100.0
}
// getTokenPriceUSD gets the USD price of a token using various price sources
func (s *SwapAnalyzer) getTokenPriceUSD(tokenAddr common.Address) float64 {
if price, exists := scannercommon.GetTokenPriceUSD(tokenAddr); exists {
return price
}
// For unknown tokens, return 0 (in production, would query price oracle or DEX)
return 0.0
}
// getTokenDecimals returns the decimal places for a token
func (s *SwapAnalyzer) getTokenDecimals(tokenAddr common.Address) uint8 {
if decimals, exists := scannercommon.GetTokenDecimals(tokenAddr); exists {
return decimals
}
// Default to 18 for unknown tokens
return 18
}
// bigIntToFloat converts a big.Int amount to float64 accounting for token decimals
func (s *SwapAnalyzer) bigIntToFloat(amount *big.Int, decimals uint8) float64 {
if amount == nil {
return 0.0
}
divisor := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(decimals)), nil)
amountFloat := new(big.Float).SetInt(amount)
divisorFloat := new(big.Float).SetInt(divisor)
result := new(big.Float).Quo(amountFloat, divisorFloat)
resultFloat, _ := result.Float64()
return resultFloat
}
// sqrtPriceX96ToPrice converts sqrtPriceX96 to a regular price using cached mathematical functions
func (s *SwapAnalyzer) sqrtPriceX96ToPrice(sqrtPriceX96 *uint256.Int) float64 {
if sqrtPriceX96 == nil {
return 0.0
}
// Use cached function for optimized calculation
price := uniswap.SqrtPriceX96ToPriceCached(sqrtPriceX96.ToBig())
priceFloat, _ := price.Float64()
return priceFloat
}
func canonicalProtocolName(raw string) string {
raw = strings.TrimSpace(raw)
if raw == "" {
return ""
}
lower := strings.ToLower(raw)
if idx := strings.Index(lower, "_fee"); idx != -1 {
lower = lower[:idx]
}
replacer := strings.NewReplacer(" ", "", "-", "", ".", "", ":", "")
lower = replacer.Replace(lower)
lower = strings.ReplaceAll(lower, "_", "")
switch {
case strings.Contains(lower, "kyberelastic"):
return "KyberElastic"
case strings.Contains(lower, "kyberclassic"):
return "KyberClassic"
case strings.Contains(lower, "kyberswap"):
return "Kyber"
case strings.Contains(lower, "kyber"):
return "Kyber"
case strings.Contains(lower, "balancer"):
return "Balancer"
case strings.Contains(lower, "curve"):
return "Curve"
case strings.Contains(lower, "camelot"):
return "Camelot"
case strings.Contains(lower, "traderjoe"):
return "TraderJoe"
case strings.Contains(lower, "sushiswap") || strings.Contains(lower, "sushi"):
return "SushiSwap"
case strings.Contains(lower, "pancake"):
return "PancakeSwap"
case strings.Contains(lower, "ramses"):
return "Ramses"
case strings.Contains(lower, "uniswap") && strings.Contains(lower, "v3"):
return "UniswapV3"
case strings.Contains(lower, "uniswap") && strings.Contains(lower, "v2"):
return "UniswapV2"
case strings.Contains(lower, "uniswap"):
return "Uniswap"
}
return strings.Title(strings.ReplaceAll(raw, "_", ""))
}
func protocolFromFactory(factory common.Address) string {
if factory == (common.Address{}) {
return ""
}
if protocol, ok := factoryProtocolMap[factory]; ok {
return protocol
}
return ""
}
func factoryForProtocol(protocol string) common.Address {
canonical := canonicalProtocolName(protocol)
if canonical == "" {
return common.Address{}
}
if addr, ok := protocolDefaultFactory[canonical]; ok {
return addr
}
return common.Address{}
}
func (s *SwapAnalyzer) resolveFactory(existing common.Address, protocol string, marketScanner *market.MarketScanner) common.Address {
if existing != (common.Address{}) {
return existing
}
if addr := factoryForProtocol(protocol); addr != (common.Address{}) {
return addr
}
if marketScanner != nil {
if addr := marketScanner.GetFactoryForProtocol(protocol); addr != (common.Address{}) {
return addr
}
}
return existing
}
func (s *SwapAnalyzer) detectSwapProtocol(event events.Event, poolInfo *marketdata.PoolInfo, poolData *market.CachedData, factory common.Address) string {
candidates := []string{event.Protocol}
if poolInfo != nil {
candidates = append(candidates, poolInfo.Protocol)
if factory == (common.Address{}) && poolInfo.Factory != (common.Address{}) {
factory = poolInfo.Factory
}
}
if poolData != nil {
candidates = append(candidates, poolData.Protocol)
}
if proto := protocolFromFactory(factory); proto != "" {
candidates = append(candidates, proto)
}
for _, candidate := range candidates {
if canonical := canonicalProtocolName(candidate); canonical != "" && !strings.EqualFold(canonical, "unknown") {
return canonical
}
}
if proto, ok := protocolSpecialByAddress[event.PoolAddress]; ok {
return proto
}
return "Unknown"
}