Files
mev-beta/pkg/scanner/analysis/pool_analyzer.go
Krypto Kajun 850223a953 fix(multicall): resolve critical multicall parsing corruption issues
- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing
- Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives
- Added LRU caching system for address validation with 10-minute TTL
- Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures
- Fixed duplicate function declarations and import conflicts across multiple files
- Added error recovery mechanisms with multiple fallback strategies
- Updated tests to handle new validation behavior for suspicious addresses
- Fixed parser test expectations for improved validation system
- Applied gofmt formatting fixes to ensure code style compliance
- Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot
- Resolved critical security vulnerabilities in heuristic address extraction
- Progress: Updated TODO audit from 10% to 35% complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 00:12:55 -05:00

178 lines
6.2 KiB
Go

package analysis
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/events"
"github.com/fraktal/mev-beta/pkg/marketdata"
scannercommon "github.com/fraktal/mev-beta/pkg/scanner/common"
"github.com/fraktal/mev-beta/pkg/scanner/market"
)
// LiquidityAnalyzer handles analysis of liquidity events
type LiquidityAnalyzer struct {
logger *logger.Logger
marketDataLogger *marketdata.MarketDataLogger
}
// NewLiquidityAnalyzer creates a new liquidity analyzer
func NewLiquidityAnalyzer(logger *logger.Logger, marketDataLogger *marketdata.MarketDataLogger) *LiquidityAnalyzer {
return &LiquidityAnalyzer{
logger: logger,
marketDataLogger: marketDataLogger,
}
}
// AnalyzeLiquidityEvent analyzes liquidity events (add/remove)
func (l *LiquidityAnalyzer) AnalyzeLiquidityEvent(event events.Event, marketScanner *market.MarketScanner, isAdd bool) {
action := "adding"
eventType := "mint"
if !isAdd {
action = "removing"
eventType = "burn"
}
l.logger.Debug(fmt.Sprintf("Analyzing liquidity event (%s) in pool %s", action, event.PoolAddress))
// Get comprehensive pool data to determine factory
poolInfo, poolExists := l.marketDataLogger.GetPoolInfo(event.PoolAddress)
factory := common.Address{}
if poolExists {
factory = poolInfo.Factory
} else {
// Determine factory from known DEX protocols
factory = marketScanner.GetFactoryForProtocol(event.Protocol)
}
// Create comprehensive liquidity event data for market data logger
liquidityData := &marketdata.LiquidityEventData{
TxHash: event.TransactionHash,
BlockNumber: event.BlockNumber,
LogIndex: uint(0), // Default log index (would need to be extracted from receipt)
Timestamp: time.Now(),
EventType: eventType,
PoolAddress: event.PoolAddress,
Factory: factory,
Protocol: event.Protocol,
Token0: event.Token0,
Token1: event.Token1,
Amount0: event.Amount0,
Amount1: event.Amount1,
Liquidity: event.Liquidity,
Owner: common.Address{}, // Default owner (would need to be extracted from transaction)
Recipient: common.Address{}, // Default recipient (would need to be extracted from transaction)
}
// Calculate USD values for liquidity amounts
liquidityData.Amount0USD, liquidityData.Amount1USD, liquidityData.TotalUSD = l.calculateLiquidityUSDValues(liquidityData)
// Log comprehensive liquidity event to market data logger
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := l.marketDataLogger.LogLiquidityEvent(ctx, event, liquidityData); err != nil {
l.logger.Debug(fmt.Sprintf("Failed to log liquidity event to market data logger: %v", err))
}
// Log the liquidity event to database (legacy)
marketScanner.LogLiquidityEvent(event, eventType)
// Update cached pool data
marketScanner.UpdatePoolData(event)
l.logger.Info(fmt.Sprintf("Liquidity %s event processed for pool %s", action, event.PoolAddress))
}
// AnalyzeNewPoolEvent analyzes new pool creation events
func (l *LiquidityAnalyzer) AnalyzeNewPoolEvent(event events.Event, marketScanner *market.MarketScanner) {
l.logger.Info(fmt.Sprintf("New pool created: %s (protocol: %s)", event.PoolAddress, event.Protocol))
// Add to known pools by fetching and caching the pool data
l.logger.Debug(fmt.Sprintf("Adding new pool %s to monitoring", event.PoolAddress))
// Fetch pool data to validate it's a real pool
poolData, err := marketScanner.GetPoolData(event.PoolAddress.Hex())
if err != nil {
l.logger.Error(fmt.Sprintf("Failed to fetch data for new pool %s: %v", event.PoolAddress, err))
return
}
// Validate that this is a real pool contract
if poolData.Address == (common.Address{}) {
l.logger.Warn(fmt.Sprintf("Invalid pool contract at address %s", event.PoolAddress.Hex()))
return
}
// Log pool data to database
marketScanner.LogPoolData(poolData)
l.logger.Info(fmt.Sprintf("Successfully added new pool %s to monitoring (tokens: %s-%s, fee: %d)",
event.PoolAddress.Hex(), poolData.Token0.Hex(), poolData.Token1.Hex(), poolData.Fee))
}
// calculateLiquidityUSDValues calculates USD values for liquidity event amounts
func (l *LiquidityAnalyzer) calculateLiquidityUSDValues(liquidityData *marketdata.LiquidityEventData) (amount0USD, amount1USD, totalUSD float64) {
// Get token prices in USD (using a simplified approach)
token0Price := l.getTokenPriceUSD(liquidityData.Token0)
token1Price := l.getTokenPriceUSD(liquidityData.Token1)
// Calculate decimals for proper conversion
token0Decimals := l.getTokenDecimals(liquidityData.Token0)
token1Decimals := l.getTokenDecimals(liquidityData.Token1)
// Calculate amount0 USD
if liquidityData.Amount0 != nil {
amount0Float := l.bigIntToFloat(liquidityData.Amount0, token0Decimals)
amount0USD = amount0Float * token0Price
}
// Calculate amount1 USD
if liquidityData.Amount1 != nil {
amount1Float := l.bigIntToFloat(liquidityData.Amount1, token1Decimals)
amount1USD = amount1Float * token1Price
}
// Total USD value
totalUSD = amount0USD + amount1USD
return amount0USD, amount1USD, totalUSD
}
// getTokenPriceUSD gets the USD price of a token using various price sources
func (l *LiquidityAnalyzer) getTokenPriceUSD(tokenAddr common.Address) float64 {
if price, exists := scannercommon.GetTokenPriceUSD(tokenAddr); exists {
return price
}
// For unknown tokens, return 0 (in production, would query price oracle or DEX)
return 0.0
}
// getTokenDecimals returns the decimal places for a token
func (l *LiquidityAnalyzer) getTokenDecimals(tokenAddr common.Address) uint8 {
if decimals, exists := scannercommon.GetTokenDecimals(tokenAddr); exists {
return decimals
}
// Default to 18 for unknown tokens
return 18
}
// bigIntToFloat converts a big.Int amount to float64 accounting for token decimals
func (l *LiquidityAnalyzer) bigIntToFloat(amount *big.Int, decimals uint8) float64 {
if amount == nil {
return 0.0
}
divisor := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(decimals)), nil)
amountFloat := new(big.Float).SetInt(amount)
divisorFloat := new(big.Float).SetInt(divisor)
result := new(big.Float).Quo(amountFloat, divisorFloat)
resultFloat, _ := result.Float64()
return resultFloat
}