package analysis import ( "context" "fmt" "math/big" "time" "github.com/ethereum/go-ethereum/common" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/events" "github.com/fraktal/mev-beta/pkg/marketdata" scannercommon "github.com/fraktal/mev-beta/pkg/scanner/common" "github.com/fraktal/mev-beta/pkg/scanner/market" ) // LiquidityAnalyzer handles analysis of liquidity events type LiquidityAnalyzer struct { logger *logger.Logger marketDataLogger *marketdata.MarketDataLogger } // NewLiquidityAnalyzer creates a new liquidity analyzer func NewLiquidityAnalyzer(logger *logger.Logger, marketDataLogger *marketdata.MarketDataLogger) *LiquidityAnalyzer { return &LiquidityAnalyzer{ logger: logger, marketDataLogger: marketDataLogger, } } // AnalyzeLiquidityEvent analyzes liquidity events (add/remove) func (l *LiquidityAnalyzer) AnalyzeLiquidityEvent(event events.Event, marketScanner *market.MarketScanner, isAdd bool) { action := "adding" eventType := "mint" if !isAdd { action = "removing" eventType = "burn" } l.logger.Debug(fmt.Sprintf("Analyzing liquidity event (%s) in pool %s", action, event.PoolAddress)) // Get comprehensive pool data to determine factory poolInfo, poolExists := l.marketDataLogger.GetPoolInfo(event.PoolAddress) factory := common.Address{} if poolExists { factory = poolInfo.Factory } else { // Determine factory from known DEX protocols factory = marketScanner.GetFactoryForProtocol(event.Protocol) } // Create comprehensive liquidity event data for market data logger liquidityData := &marketdata.LiquidityEventData{ TxHash: event.TransactionHash, BlockNumber: event.BlockNumber, LogIndex: uint(0), // Default log index (would need to be extracted from receipt) Timestamp: time.Now(), EventType: eventType, PoolAddress: event.PoolAddress, Factory: factory, Protocol: event.Protocol, Token0: event.Token0, Token1: event.Token1, Amount0: event.Amount0, Amount1: event.Amount1, Liquidity: event.Liquidity, Owner: common.Address{}, // Default owner (would need to be extracted from transaction) Recipient: common.Address{}, // Default recipient (would need to be extracted from transaction) } // Calculate USD values for liquidity amounts liquidityData.Amount0USD, liquidityData.Amount1USD, liquidityData.TotalUSD = l.calculateLiquidityUSDValues(liquidityData) // Log comprehensive liquidity event to market data logger ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := l.marketDataLogger.LogLiquidityEvent(ctx, event, liquidityData); err != nil { l.logger.Debug(fmt.Sprintf("Failed to log liquidity event to market data logger: %v", err)) } // Log the liquidity event to database (legacy) marketScanner.LogLiquidityEvent(event, eventType) // Update cached pool data marketScanner.UpdatePoolData(event) l.logger.Info(fmt.Sprintf("Liquidity %s event processed for pool %s", action, event.PoolAddress)) } // AnalyzeNewPoolEvent analyzes new pool creation events func (l *LiquidityAnalyzer) AnalyzeNewPoolEvent(event events.Event, marketScanner *market.MarketScanner) { l.logger.Info(fmt.Sprintf("New pool created: %s (protocol: %s)", event.PoolAddress, event.Protocol)) // Add to known pools by fetching and caching the pool data l.logger.Debug(fmt.Sprintf("Adding new pool %s to monitoring", event.PoolAddress)) // Fetch pool data to validate it's a real pool poolData, err := marketScanner.GetPoolData(event.PoolAddress.Hex()) if err != nil { l.logger.Error(fmt.Sprintf("Failed to fetch data for new pool %s: %v", event.PoolAddress, err)) return } // Validate that this is a real pool contract if poolData.Address == (common.Address{}) { l.logger.Warn(fmt.Sprintf("Invalid pool contract at address %s", event.PoolAddress.Hex())) return } // Log pool data to database marketScanner.LogPoolData(poolData) l.logger.Info(fmt.Sprintf("Successfully added new pool %s to monitoring (tokens: %s-%s, fee: %d)", event.PoolAddress.Hex(), poolData.Token0.Hex(), poolData.Token1.Hex(), poolData.Fee)) } // calculateLiquidityUSDValues calculates USD values for liquidity event amounts func (l *LiquidityAnalyzer) calculateLiquidityUSDValues(liquidityData *marketdata.LiquidityEventData) (amount0USD, amount1USD, totalUSD float64) { // Get token prices in USD (using a simplified approach) token0Price := l.getTokenPriceUSD(liquidityData.Token0) token1Price := l.getTokenPriceUSD(liquidityData.Token1) // Calculate decimals for proper conversion token0Decimals := l.getTokenDecimals(liquidityData.Token0) token1Decimals := l.getTokenDecimals(liquidityData.Token1) // Calculate amount0 USD if liquidityData.Amount0 != nil { amount0Float := l.bigIntToFloat(liquidityData.Amount0, token0Decimals) amount0USD = amount0Float * token0Price } // Calculate amount1 USD if liquidityData.Amount1 != nil { amount1Float := l.bigIntToFloat(liquidityData.Amount1, token1Decimals) amount1USD = amount1Float * token1Price } // Total USD value totalUSD = amount0USD + amount1USD return amount0USD, amount1USD, totalUSD } // getTokenPriceUSD gets the USD price of a token using various price sources func (l *LiquidityAnalyzer) getTokenPriceUSD(tokenAddr common.Address) float64 { if price, exists := scannercommon.GetTokenPriceUSD(tokenAddr); exists { return price } // For unknown tokens, return 0 (in production, would query price oracle or DEX) return 0.0 } // getTokenDecimals returns the decimal places for a token func (l *LiquidityAnalyzer) getTokenDecimals(tokenAddr common.Address) uint8 { if decimals, exists := scannercommon.GetTokenDecimals(tokenAddr); exists { return decimals } // Default to 18 for unknown tokens return 18 } // bigIntToFloat converts a big.Int amount to float64 accounting for token decimals func (l *LiquidityAnalyzer) bigIntToFloat(amount *big.Int, decimals uint8) float64 { if amount == nil { return 0.0 } divisor := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(decimals)), nil) amountFloat := new(big.Float).SetInt(amount) divisorFloat := new(big.Float).SetInt(divisor) result := new(big.Float).Quo(amountFloat, divisorFloat) resultFloat, _ := result.Float64() return resultFloat }