package market import ( "context" "errors" "fmt" "math" "math/big" "strings" "sync" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/holiman/uint256" "github.com/fraktal/mev-beta/internal/config" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/events" "github.com/fraktal/mev-beta/pkg/scanner" marketscanner "github.com/fraktal/mev-beta/pkg/scanner/market" stypes "github.com/fraktal/mev-beta/pkg/types" "github.com/fraktal/mev-beta/pkg/uniswap" "github.com/fraktal/mev-beta/pkg/validation" ) // Pipeline processes transactions through multiple stages type Pipeline struct { config *config.BotConfig logger *logger.Logger marketMgr *MarketManager scanner *scanner.Scanner stages []PipelineStage bufferSize int concurrency int eventParser *events.EventParser validator *validation.InputValidator ethClient *ethclient.Client // Add Ethereum client for fetching receipts } // PipelineStage represents a stage in the processing pipeline type PipelineStage func(context.Context, <-chan *events.Event, chan<- *events.Event) error // NewPipeline creates a new transaction processing pipeline func NewPipeline( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, scanner *scanner.Scanner, ethClient *ethclient.Client, // Add Ethereum client parameter ) *Pipeline { // Enhanced parser setup moved to monitor to avoid import cycle // The monitor will be responsible for setting up enhanced parsing pipeline := &Pipeline{ config: cfg, logger: logger, marketMgr: marketMgr, scanner: scanner, bufferSize: cfg.ChannelBufferSize, concurrency: cfg.MaxWorkers, eventParser: events.NewEventParser(), validator: validation.NewInputValidator(nil, logger), ethClient: ethClient, // Store the Ethereum client } // Add default stages pipeline.AddStage(TransactionDecoderStage(cfg, logger, marketMgr, pipeline.validator, pipeline.ethClient)) return pipeline } // SetEnhancedEventParser allows injecting an enhanced event parser after creation // This avoids import cycle issues while enabling enhanced parsing capabilities func (p *Pipeline) SetEnhancedEventParser(parser *events.EventParser) { if parser != nil { p.eventParser = parser p.logger.Info("✅ ENHANCED EVENT PARSER INJECTED INTO PIPELINE - Enhanced parsing now active") } else { p.logger.Warn("❌ ENHANCED PARSER INJECTION FAILED - Received nil parser") } } // AddDefaultStages adds the default processing stages to the pipeline func (p *Pipeline) AddDefaultStages() { p.AddStage(TransactionDecoderStage(p.config, p.logger, p.marketMgr, p.validator, p.ethClient)) p.AddStage(MarketAnalysisStage(p.config, p.logger, p.marketMgr, p.validator)) p.AddStage(ArbitrageDetectionStage(p.config, p.logger, p.marketMgr, p.validator)) } // AddStage adds a processing stage to the pipeline func (p *Pipeline) AddStage(stage PipelineStage) { p.stages = append(p.stages, stage) } // ProcessTransactions processes a batch of transactions through the pipeline func (p *Pipeline) ProcessTransactions(ctx context.Context, transactions []*types.Transaction, blockNumber uint64, timestamp uint64) error { if len(p.stages) == 0 { return fmt.Errorf("no pipeline stages configured") } // Parse events from transaction receipts eventChan := make(chan *events.Event, p.bufferSize) // Parse transactions in a goroutine go func() { defer close(eventChan) for _, tx := range transactions { // Validate transaction input validationResult, err := p.validator.ValidateTransaction(tx) if err != nil || !validationResult.IsValid { // Skip logging for known problematic transactions to reduce spam txHash := tx.Hash().Hex() if !p.isKnownProblematicTransaction(txHash) { p.logger.Warn(fmt.Sprintf("Invalid transaction %s: %v", txHash, err)) } continue } // Fetch transaction receipt receipt, err := p.ethClient.TransactionReceipt(ctx, tx.Hash()) if err != nil { p.logger.Error(fmt.Sprintf("Error fetching receipt for transaction %s: %v", tx.Hash().Hex(), err)) continue } // Parse events from receipt logs events, err := p.eventParser.ParseTransactionReceipt(receipt, blockNumber, timestamp) if err != nil { p.logger.Error(fmt.Sprintf("Error parsing receipt for transaction %s: %v", tx.Hash().Hex(), err)) continue } for _, event := range events { // Validate the parsed event if err := p.validator.ValidateEvent(event); err != nil { p.logger.Warn(fmt.Sprintf("Invalid event from transaction %s: %v", tx.Hash().Hex(), err)) continue } select { case eventChan <- event: case <-ctx.Done(): return } } } }() // Process through each stage var currentChan <-chan *events.Event = eventChan for i, stage := range p.stages { // Create output channel for this stage outputChan := make(chan *events.Event, p.bufferSize) go func(stage PipelineStage, input <-chan *events.Event, output chan<- *events.Event, stageIndex int) { err := stage(ctx, input, output) if err != nil { p.logger.Error(fmt.Sprintf("Pipeline stage %d error: %v", stageIndex, err)) } }(stage, currentChan, outputChan, i) currentChan = outputChan } // Process the final output if currentChan != nil { go func() { defer func() { if r := recover(); r != nil { p.logger.Error(fmt.Sprintf("Final output processor panic recovered: %v", r)) } }() p.processSwapDetails(ctx, currentChan) }() } return nil } // processSwapDetails processes the final output of the pipeline func (p *Pipeline) processSwapDetails(ctx context.Context, eventDetails <-chan *events.Event) { for { select { case event, ok := <-eventDetails: if !ok { return // Channel closed } // Submit to the scanner for processing p.scanner.SubmitEvent(*event) case <-ctx.Done(): return } } } // TransactionDecoderStage decodes transactions to identify swap opportunities func TransactionDecoderStage( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, validator *validation.InputValidator, ethClient *ethclient.Client, // Add Ethereum client parameter ) PipelineStage { return func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error { var wg sync.WaitGroup // Process events concurrently for i := 0; i < cfg.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case event, ok := <-input: if !ok { return // Channel closed } // Process the event (in this case, it's already decoded) // In a real implementation, you might do additional processing here if event != nil { // Additional validation at the stage level if err := validator.ValidateEvent(event); err != nil { logger.Warn(fmt.Sprintf("Event validation failed in decoder stage: %v", err)) continue } select { case output <- event: case <-ctx.Done(): return } } case <-ctx.Done(): return } } }() } // Wait for all workers to finish, then close the output channel go func() { wg.Wait() // Safely close the output channel defer func() { if r := recover(); r != nil { logger.Debug("Channel already closed in TransactionDecoderStage") } }() select { case <-ctx.Done(): // Context cancelled, don't close channel as it might be used elsewhere default: close(output) } }() return nil } } // MarketAnalysisStage performs market analysis on event details func MarketAnalysisStage( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, validator *validation.InputValidator, ) PipelineStage { return func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error { var wg sync.WaitGroup // Process events concurrently for i := 0; i < cfg.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case event, ok := <-input: if !ok { return // Channel closed } // Validate event before processing if err := validator.ValidateEvent(event); err != nil { logger.Warn(fmt.Sprintf("Event validation failed in analysis stage: %v", err)) continue } // Only process swap events if event.Type != events.Swap { // Forward non-swap events without processing select { case output <- event: case <-ctx.Done(): return } continue } // Get pool data from market manager poolData, err := marketMgr.GetPool(ctx, event.PoolAddress) if err != nil { if errors.Is(err, marketscanner.ErrInvalidPoolCandidate) { logger.Debug("Skipping pool data fetch due to invalid candidate", "pool", event.PoolAddress, "error", err) } else { // Enhanced error logging with pipeline context errorMsg := fmt.Sprintf("Error getting pool data for %s: %v", event.PoolAddress, err) contextMsg := fmt.Sprintf("pipeline_stage:market_processing event_type:%s protocol:%s", event.Type.String(), event.Protocol) logger.Error(fmt.Sprintf("%s [context: %s]", errorMsg, contextMsg)) } // Forward the event even if we can't get pool data select { case output <- event: case <-ctx.Done(): return } continue } // Calculate price impact using Uniswap V3 math priceImpact, err := calculatePriceImpact(event, poolData) if err != nil { logger.Error(fmt.Sprintf("Error calculating price impact for pool %s: %v", event.PoolAddress, err)) // Forward the event even if we can't calculate price impact select { case output <- event: case <-ctx.Done(): return } continue } // Add price impact to the event // Note: In a real implementation, you might want to create a new struct // that extends EventDetails with additional fields logger.Debug(fmt.Sprintf("Price impact for pool %s: %f", event.PoolAddress, priceImpact)) // Forward the processed event select { case output <- event: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } // Wait for all workers to finish, then close the output channel go func() { wg.Wait() // Safely close the output channel defer func() { if r := recover(); r != nil { logger.Debug("Channel already closed in MarketAnalysisStage") } }() select { case <-ctx.Done(): // Context cancelled, don't close channel as it might be used elsewhere default: close(output) } }() return nil } } // calculatePriceImpact calculates the price impact of a swap using Uniswap V3 math func calculatePriceImpact(event *events.Event, poolData *PoolData) (float64, error) { // Convert event amounts to uint256 for calculations amount0In := uint256.NewInt(0) amount0In.SetFromBig(event.Amount0) amount1In := uint256.NewInt(0) amount1In.SetFromBig(event.Amount1) // Determine which token is being swapped in var amountIn *uint256.Int if amount0In.Cmp(uint256.NewInt(0)) > 0 { amountIn = amount0In } else { amountIn = amount1In } // If no amount is being swapped in, return 0 impact if amountIn.Cmp(uint256.NewInt(0)) == 0 { return 0.0, nil } // Calculate price impact as a percentage of liquidity // priceImpact = amountIn / liquidity liquidity := poolData.Liquidity // If liquidity is 0, we can't calculate impact if liquidity.Cmp(uint256.NewInt(0)) == 0 { return 0.0, nil } // Calculate impact impact := new(uint256.Int).Div(amountIn, liquidity) // Convert to float64 for percentage impactFloat := new(big.Float).SetInt(impact.ToBig()) percentage, _ := impactFloat.Float64() // Convert to percentage (multiply by 100) return percentage * 100.0, nil } // ArbitrageDetectionStage detects arbitrage opportunities func ArbitrageDetectionStage( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, validator *validation.InputValidator, ) PipelineStage { return func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error { var wg sync.WaitGroup // Process events concurrently for i := 0; i < cfg.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case event, ok := <-input: if !ok { return // Channel closed } // Validate event before processing if err := validator.ValidateEvent(event); err != nil { logger.Warn(fmt.Sprintf("Event validation failed in arbitrage detection stage: %v", err)) continue } // Only process swap events if event.Type != events.Swap { // Forward non-swap events without processing select { case output <- event: case <-ctx.Done(): return } continue } // Look for arbitrage opportunities opportunities, err := findArbitrageOpportunities(ctx, event, marketMgr, logger) if err != nil { logger.Error(fmt.Sprintf("Error finding arbitrage opportunities for pool %s: %v", event.PoolAddress, err)) // Forward the event even if we encounter an error select { case output <- event: case <-ctx.Done(): return } continue } // Log any found opportunities if len(opportunities) > 0 { logger.Info(fmt.Sprintf("Found %d arbitrage opportunities for pool %s", len(opportunities), event.PoolAddress)) for _, opp := range opportunities { logger.Info(fmt.Sprintf("Arbitrage opportunity: %+v", opp)) } } // Forward the processed event select { case output <- event: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } // Wait for all workers to finish, then close the output channel go func() { wg.Wait() // Safely close the output channel defer func() { if r := recover(); r != nil { logger.Debug("Channel already closed in ArbitrageDetectionStage") } }() select { case <-ctx.Done(): // Context cancelled, don't close channel as it might be used elsewhere default: close(output) } }() return nil } } // findArbitrageOpportunities looks for arbitrage opportunities based on a swap event func findArbitrageOpportunities(ctx context.Context, event *events.Event, marketMgr *MarketManager, logger *logger.Logger) ([]stypes.ArbitrageOpportunity, error) { opportunities := make([]stypes.ArbitrageOpportunity, 0) // Get all pools for the same token pair pools := marketMgr.GetPoolsByTokens(event.Token0, event.Token1) // If we don't have multiple pools, we can't do arbitrage if len(pools) < 2 { return opportunities, nil } // Get the pool that triggered the event // Find the pool that triggered the event var eventPool *PoolData for _, pool := range pools { if pool.Address == event.PoolAddress { eventPool = pool break } } // If we can't find the event pool, return if eventPool == nil { return opportunities, nil } // Convert sqrtPriceX96 to price for the event pool eventPoolPrice := uniswap.SqrtPriceX96ToPrice(eventPool.SqrtPriceX96.ToBig()) // Compare with other pools for _, pool := range pools { // Skip the event pool if pool.Address == event.PoolAddress { continue } // Convert sqrtPriceX96 to price for comparison pool compPoolPrice := uniswap.SqrtPriceX96ToPrice(pool.SqrtPriceX96.ToBig()) // Calculate potential profit using sophisticated arbitrage mathematics // This involves complex calculations considering: // 1. Price impact on both pools // 2. Gas costs and fees // 3. Optimal trade size // 4. Slippage and MEV competition profit := calculateSophisticatedArbitrageProfit(eventPoolPrice, compPoolPrice, *event, pool, logger) // If there's a price difference, we might have an opportunity if profit.Cmp(big.NewFloat(0)) > 0 { // Calculate realistic profit based on price difference and liquidity priceDiffFloat, _ := profit.Float64() estimatedAmount := big.NewInt(1000000) // 1 USDC equivalent test amount // Estimate actual profit based on AMM formulas profitBigInt := big.NewInt(int64(priceDiffFloat * 1000000)) // Convert to wei-like precision // Estimate gas costs for arbitrage transaction (typical multi-hop swap) gasPrice := big.NewInt(1000000000) // 1 gwei gasUnits := big.NewInt(300000) // ~300k gas for complex arbitrage gasEstimate := new(big.Int).Mul(gasPrice, gasUnits) // Calculate net profit after gas netProfit := new(big.Int).Sub(profitBigInt, gasEstimate) // Only include if profitable after gas costs if netProfit.Sign() > 0 { // Calculate ROI roi := 0.0 if estimatedAmount.Sign() > 0 { roiFloat := new(big.Float).Quo(new(big.Float).SetInt(netProfit), new(big.Float).SetInt(estimatedAmount)) roi, _ = roiFloat.Float64() roi *= 100 // Convert to percentage } opp := stypes.ArbitrageOpportunity{ Path: []string{event.Token0.Hex(), event.Token1.Hex()}, Pools: []string{event.PoolAddress.Hex(), pool.Address.Hex()}, Profit: netProfit, GasEstimate: gasEstimate, ROI: roi, Protocol: event.Protocol, } opportunities = append(opportunities, opp) } } } return opportunities, nil } // isKnownProblematicTransaction checks if a transaction hash is known to be problematic func (p *Pipeline) isKnownProblematicTransaction(txHash string) bool { // List of known problematic transaction hashes that should be skipped problematicTxs := map[string]bool{ "0xe79e4719c6770b41405f691c18be3346b691e220d730d6b61abb5dd3ac9d71f0": true, // Add other problematic transaction hashes here } return problematicTxs[txHash] } // calculateSophisticatedArbitrageProfit calculates profit using advanced arbitrage mathematics func calculateSophisticatedArbitrageProfit( eventPoolPrice *big.Float, compPoolPrice *big.Float, event events.Event, pool *PoolData, logger *logger.Logger, ) *big.Float { // Advanced arbitrage profit calculation considering: // 1. Optimal trade size calculation // 2. Price impact modeling for both pools // 3. Gas costs and protocol fees // 4. MEV competition adjustment // 5. Slippage protection // Calculate price difference as percentage priceDiff := new(big.Float).Sub(compPoolPrice, eventPoolPrice) if priceDiff.Sign() <= 0 { return big.NewFloat(0) // No profit if prices are equal or inverted } // Calculate relative price difference relativeDiff := new(big.Float).Quo(priceDiff, eventPoolPrice) relativeDiffFloat, _ := relativeDiff.Float64() // Sophisticated optimal trade size calculation using Uniswap V3 mathematics optimalTradeSize := calculateOptimalTradeSize(event, pool, relativeDiffFloat) // Calculate price impact on both pools eventPoolImpact := calculateTradeImpact(optimalTradeSize, event.Liquidity.ToBig(), "source") compPoolImpact := calculateTradeImpact(optimalTradeSize, pool.Liquidity.ToBig(), "destination") // Total price impact (reduces profit) totalImpact := eventPoolImpact + compPoolImpact // Adjusted profit after price impact adjustedRelativeDiff := relativeDiffFloat - totalImpact if adjustedRelativeDiff <= 0 { return big.NewFloat(0) } // Calculate gross profit in wei optimalTradeSizeBig := big.NewInt(optimalTradeSize) grossProfit := new(big.Float).Mul( new(big.Float).SetInt(optimalTradeSizeBig), big.NewFloat(adjustedRelativeDiff), ) // Subtract sophisticated gas cost estimation gasCost := calculateSophisticatedGasCost(event, pool) gasCostFloat := new(big.Float).SetInt(gasCost) // Subtract protocol fees (0.3% for Uniswap) protocolFeeRate := 0.003 protocolFee := new(big.Float).Mul( new(big.Float).SetInt(optimalTradeSizeBig), big.NewFloat(protocolFeeRate), ) // MEV competition adjustment (reduces profit by estimated competition) mevCompetitionFactor := calculateMEVCompetitionFactor(adjustedRelativeDiff) // Calculate net profit netProfit := new(big.Float).Sub(grossProfit, gasCostFloat) netProfit.Sub(netProfit, protocolFee) netProfit.Mul(netProfit, big.NewFloat(1.0-mevCompetitionFactor)) // Apply minimum profit threshold (0.01 ETH) minProfitThreshold := big.NewFloat(10000000000000000) // 0.01 ETH in wei if netProfit.Cmp(minProfitThreshold) < 0 { return big.NewFloat(0) } logger.Debug(fmt.Sprintf("Sophisticated arbitrage calculation: optimal_size=%d, price_impact=%.4f%%, gas=%s, mev_factor=%.2f, net_profit=%s", optimalTradeSize, totalImpact*100, gasCost.String(), mevCompetitionFactor, netProfit.String())) return netProfit } // calculateOptimalTradeSize calculates the optimal trade size for maximum profit func calculateOptimalTradeSize(event events.Event, pool *PoolData, priceDiffPercent float64) int64 { // Use Kelly criterion adapted for arbitrage // Optimal size = (edge * liquidity) / price_impact_factor // Base trade size on available liquidity and price difference eventLiquidity := int64(1000000000000000000) // Default 1 ETH if unknown if event.Liquidity != nil && event.Liquidity.Sign() > 0 { eventLiquidity = event.Liquidity.ToBig().Int64() } poolLiquidity := int64(1000000000000000000) // Default 1 ETH if unknown if pool.Liquidity != nil && pool.Liquidity.Sign() > 0 { poolLiquidity = pool.Liquidity.ToBig().Int64() } // Use the smaller liquidity as constraint minLiquidity := eventLiquidity if poolLiquidity < minLiquidity { minLiquidity = poolLiquidity } // Optimal size is typically 1-10% of available liquidity // Adjusted based on price difference (higher diff = larger size) sizeFactor := 0.02 + (priceDiffPercent * 5) // 2% base + up to 50% for large differences if sizeFactor > 0.15 { // Cap at 15% of liquidity sizeFactor = 0.15 } optimalSize := int64(float64(minLiquidity) * sizeFactor) // Minimum trade size (0.001 ETH) minTradeSize := int64(1000000000000000) if optimalSize < minTradeSize { optimalSize = minTradeSize } // Maximum trade size (5 ETH to avoid overflow) maxTradeSize := int64(5000000000000000000) // 5 ETH in wei if optimalSize > maxTradeSize { optimalSize = maxTradeSize } return optimalSize } // calculateTradeImpact calculates price impact for a given trade size func calculateTradeImpact(tradeSize int64, liquidity *big.Int, poolType string) float64 { if liquidity == nil || liquidity.Sign() == 0 { return 0.05 // 5% default impact for unknown liquidity } // Calculate utilization ratio utilizationRatio := float64(tradeSize) / float64(liquidity.Int64()) // Different impact models for different pool types var impact float64 switch poolType { case "source": // Source pool (where we buy) - typically has higher impact impact = utilizationRatio * (1 + utilizationRatio*2) // Quadratic model case "destination": // Destination pool (where we sell) - typically has lower impact impact = utilizationRatio * (1 + utilizationRatio*1.5) // Less aggressive model default: // Default model impact = utilizationRatio * (1 + utilizationRatio) } // Apply square root for very large trades (diminishing returns) if utilizationRatio > 0.1 { impact = math.Sqrt(impact) } // Cap impact at 50% if impact > 0.5 { impact = 0.5 } return impact } // calculateSophisticatedGasCost estimates gas costs for arbitrage execution func calculateSophisticatedGasCost(event events.Event, pool *PoolData) *big.Int { // Base gas costs for different operations baseGasSwap := int64(150000) // Base gas for a swap baseGasTransfer := int64(21000) // Base gas for transfer // Additional gas for complex operations var totalGas int64 = baseGasSwap*2 + baseGasTransfer // Two swaps + transfer // Add gas for protocol-specific operations switch { case strings.Contains(event.Protocol, "UniswapV3"): totalGas += 50000 // V3 callback gas case strings.Contains(event.Protocol, "UniswapV2"): totalGas += 20000 // V2 additional gas case strings.Contains(event.Protocol, "Curve"): totalGas += 80000 // Curve math complexity default: totalGas += 30000 // Unknown protocol buffer } // Current gas price on Arbitrum (approximate) gasPriceGwei := int64(1) // 1 gwei typical for Arbitrum gasPriceWei := gasPriceGwei * 1000000000 // Calculate total cost totalCost := totalGas * gasPriceWei return big.NewInt(totalCost) } // calculateMEVCompetitionFactor estimates profit reduction due to MEV competition func calculateMEVCompetitionFactor(profitMargin float64) float64 { // Higher profit margins attract more competition // This is based on empirical MEV research if profitMargin < 0.001 { // < 0.1% return 0.1 // Low competition } else if profitMargin < 0.005 { // < 0.5% return 0.2 // Moderate competition } else if profitMargin < 0.01 { // < 1% return 0.4 // High competition } else if profitMargin < 0.02 { // < 2% return 0.6 // Very high competition } else { return 0.8 // Extreme competition for large profits } }