package market import ( "context" "fmt" "math/big" "sync" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/fraktal/mev-beta/internal/config" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/events" "github.com/fraktal/mev-beta/pkg/scanner" "github.com/fraktal/mev-beta/pkg/uniswap" "github.com/fraktal/mev-beta/pkg/validation" "github.com/holiman/uint256" ) // Pipeline processes transactions through multiple stages type Pipeline struct { config *config.BotConfig logger *logger.Logger marketMgr *MarketManager scanner *scanner.MarketScanner stages []PipelineStage bufferSize int concurrency int eventParser *events.EventParser validator *validation.InputValidator ethClient *ethclient.Client // Add Ethereum client for fetching receipts } // PipelineStage represents a stage in the processing pipeline type PipelineStage func(context.Context, <-chan *events.Event, chan<- *events.Event) error // NewPipeline creates a new transaction processing pipeline func NewPipeline( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, scanner *scanner.MarketScanner, ethClient *ethclient.Client, // Add Ethereum client parameter ) *Pipeline { pipeline := &Pipeline{ config: cfg, logger: logger, marketMgr: marketMgr, scanner: scanner, bufferSize: cfg.ChannelBufferSize, concurrency: cfg.MaxWorkers, eventParser: events.NewEventParser(), validator: validation.NewInputValidator(), ethClient: ethClient, // Store the Ethereum client } // Add default stages pipeline.AddStage(TransactionDecoderStage(cfg, logger, marketMgr, pipeline.validator, pipeline.ethClient)) return pipeline } // AddDefaultStages adds the default processing stages to the pipeline func (p *Pipeline) AddDefaultStages() { p.AddStage(TransactionDecoderStage(p.config, p.logger, p.marketMgr, p.validator, p.ethClient)) p.AddStage(MarketAnalysisStage(p.config, p.logger, p.marketMgr, p.validator)) p.AddStage(ArbitrageDetectionStage(p.config, p.logger, p.marketMgr, p.validator)) } // AddStage adds a processing stage to the pipeline func (p *Pipeline) AddStage(stage PipelineStage) { p.stages = append(p.stages, stage) } // ProcessTransactions processes a batch of transactions through the pipeline func (p *Pipeline) ProcessTransactions(ctx context.Context, transactions []*types.Transaction, blockNumber uint64, timestamp uint64) error { if len(p.stages) == 0 { return fmt.Errorf("no pipeline stages configured") } // Parse events from transaction receipts eventChan := make(chan *events.Event, p.bufferSize) // Parse transactions in a goroutine go func() { defer close(eventChan) for _, tx := range transactions { // Validate transaction input if err := p.validator.ValidateTransaction(tx); err != nil { p.logger.Warn(fmt.Sprintf("Invalid transaction %s: %v", tx.Hash().Hex(), err)) continue } // Fetch transaction receipt receipt, err := p.ethClient.TransactionReceipt(ctx, tx.Hash()) if err != nil { p.logger.Error(fmt.Sprintf("Error fetching receipt for transaction %s: %v", tx.Hash().Hex(), err)) continue } // Parse events from receipt logs events, err := p.eventParser.ParseTransactionReceipt(receipt, blockNumber, timestamp) if err != nil { p.logger.Error(fmt.Sprintf("Error parsing receipt for transaction %s: %v", tx.Hash().Hex(), err)) continue } for _, event := range events { // Validate the parsed event if err := p.validator.ValidateEvent(event); err != nil { p.logger.Warn(fmt.Sprintf("Invalid event from transaction %s: %v", tx.Hash().Hex(), err)) continue } select { case eventChan <- event: case <-ctx.Done(): return } } } }() // Process through each stage var currentChan <-chan *events.Event = eventChan for i, stage := range p.stages { // Create output channel for this stage outputChan := make(chan *events.Event, p.bufferSize) go func(stage PipelineStage, input <-chan *events.Event, output chan<- *events.Event, stageIndex int) { err := stage(ctx, input, output) if err != nil { p.logger.Error(fmt.Sprintf("Pipeline stage %d error: %v", stageIndex, err)) } }(stage, currentChan, outputChan, i) currentChan = outputChan } // Process the final output if currentChan != nil { go func() { defer func() { if r := recover(); r != nil { p.logger.Error(fmt.Sprintf("Final output processor panic recovered: %v", r)) } }() p.processSwapDetails(ctx, currentChan) }() } return nil } // processSwapDetails processes the final output of the pipeline func (p *Pipeline) processSwapDetails(ctx context.Context, eventDetails <-chan *events.Event) { for { select { case event, ok := <-eventDetails: if !ok { return // Channel closed } // Submit to the market scanner for processing p.scanner.SubmitEvent(*event) case <-ctx.Done(): return } } } // TransactionDecoderStage decodes transactions to identify swap opportunities func TransactionDecoderStage( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, validator *validation.InputValidator, ethClient *ethclient.Client, // Add Ethereum client parameter ) PipelineStage { return func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error { var wg sync.WaitGroup // Process events concurrently for i := 0; i < cfg.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case event, ok := <-input: if !ok { return // Channel closed } // Process the event (in this case, it's already decoded) // In a real implementation, you might do additional processing here if event != nil { // Additional validation at the stage level if err := validator.ValidateEvent(event); err != nil { logger.Warn(fmt.Sprintf("Event validation failed in decoder stage: %v", err)) continue } select { case output <- event: case <-ctx.Done(): return } } case <-ctx.Done(): return } } }() } // Wait for all workers to finish, then close the output channel go func() { wg.Wait() // Safely close the output channel defer func() { if r := recover(); r != nil { logger.Debug("Channel already closed in TransactionDecoderStage") } }() select { case <-ctx.Done(): // Context cancelled, don't close channel as it might be used elsewhere default: close(output) } }() return nil } } // MarketAnalysisStage performs market analysis on event details func MarketAnalysisStage( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, validator *validation.InputValidator, ) PipelineStage { return func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error { var wg sync.WaitGroup // Process events concurrently for i := 0; i < cfg.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case event, ok := <-input: if !ok { return // Channel closed } // Validate event before processing if err := validator.ValidateEvent(event); err != nil { logger.Warn(fmt.Sprintf("Event validation failed in analysis stage: %v", err)) continue } // Only process swap events if event.Type != events.Swap { // Forward non-swap events without processing select { case output <- event: case <-ctx.Done(): return } continue } // Get pool data from market manager poolData, err := marketMgr.GetPool(ctx, event.PoolAddress) if err != nil { logger.Error(fmt.Sprintf("Error getting pool data for %s: %v", event.PoolAddress, err)) // Forward the event even if we can't get pool data select { case output <- event: case <-ctx.Done(): return } continue } // Calculate price impact using Uniswap V3 math priceImpact, err := calculatePriceImpact(event, poolData) if err != nil { logger.Error(fmt.Sprintf("Error calculating price impact for pool %s: %v", event.PoolAddress, err)) // Forward the event even if we can't calculate price impact select { case output <- event: case <-ctx.Done(): return } continue } // Add price impact to the event // Note: In a real implementation, you might want to create a new struct // that extends EventDetails with additional fields logger.Debug(fmt.Sprintf("Price impact for pool %s: %f", event.PoolAddress, priceImpact)) // Forward the processed event select { case output <- event: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } // Wait for all workers to finish, then close the output channel go func() { wg.Wait() // Safely close the output channel defer func() { if r := recover(); r != nil { logger.Debug("Channel already closed in MarketAnalysisStage") } }() select { case <-ctx.Done(): // Context cancelled, don't close channel as it might be used elsewhere default: close(output) } }() return nil } } // calculatePriceImpact calculates the price impact of a swap using Uniswap V3 math func calculatePriceImpact(event *events.Event, poolData *PoolData) (float64, error) { // Convert event amounts to uint256 for calculations amount0In := uint256.NewInt(0) amount0In.SetFromBig(event.Amount0) amount1In := uint256.NewInt(0) amount1In.SetFromBig(event.Amount1) // Determine which token is being swapped in var amountIn *uint256.Int if amount0In.Cmp(uint256.NewInt(0)) > 0 { amountIn = amount0In } else { amountIn = amount1In } // If no amount is being swapped in, return 0 impact if amountIn.Cmp(uint256.NewInt(0)) == 0 { return 0.0, nil } // Calculate price impact as a percentage of liquidity // priceImpact = amountIn / liquidity liquidity := poolData.Liquidity // If liquidity is 0, we can't calculate impact if liquidity.Cmp(uint256.NewInt(0)) == 0 { return 0.0, nil } // Calculate impact impact := new(uint256.Int).Div(amountIn, liquidity) // Convert to float64 for percentage impactFloat := new(big.Float).SetInt(impact.ToBig()) percentage, _ := impactFloat.Float64() // Convert to percentage (multiply by 100) return percentage * 100.0, nil } // ArbitrageDetectionStage detects arbitrage opportunities func ArbitrageDetectionStage( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, validator *validation.InputValidator, ) PipelineStage { return func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error { var wg sync.WaitGroup // Process events concurrently for i := 0; i < cfg.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case event, ok := <-input: if !ok { return // Channel closed } // Validate event before processing if err := validator.ValidateEvent(event); err != nil { logger.Warn(fmt.Sprintf("Event validation failed in arbitrage detection stage: %v", err)) continue } // Only process swap events if event.Type != events.Swap { // Forward non-swap events without processing select { case output <- event: case <-ctx.Done(): return } continue } // Look for arbitrage opportunities opportunities, err := findArbitrageOpportunities(ctx, event, marketMgr, logger) if err != nil { logger.Error(fmt.Sprintf("Error finding arbitrage opportunities for pool %s: %v", event.PoolAddress, err)) // Forward the event even if we encounter an error select { case output <- event: case <-ctx.Done(): return } continue } // Log any found opportunities if len(opportunities) > 0 { logger.Info(fmt.Sprintf("Found %d arbitrage opportunities for pool %s", len(opportunities), event.PoolAddress)) for _, opp := range opportunities { logger.Info(fmt.Sprintf("Arbitrage opportunity: %+v", opp)) } } // Forward the processed event select { case output <- event: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } // Wait for all workers to finish, then close the output channel go func() { wg.Wait() // Safely close the output channel defer func() { if r := recover(); r != nil { logger.Debug("Channel already closed in ArbitrageDetectionStage") } }() select { case <-ctx.Done(): // Context cancelled, don't close channel as it might be used elsewhere default: close(output) } }() return nil } } // findArbitrageOpportunities looks for arbitrage opportunities based on a swap event func findArbitrageOpportunities(ctx context.Context, event *events.Event, marketMgr *MarketManager, logger *logger.Logger) ([]scanner.ArbitrageOpportunity, error) { opportunities := make([]scanner.ArbitrageOpportunity, 0) // Get all pools for the same token pair pools := marketMgr.GetPoolsByTokens(event.Token0, event.Token1) // If we don't have multiple pools, we can't do arbitrage if len(pools) < 2 { return opportunities, nil } // Get the pool that triggered the event // Find the pool that triggered the event var eventPool *PoolData for _, pool := range pools { if pool.Address == event.PoolAddress { eventPool = pool break } } // If we can't find the event pool, return if eventPool == nil { return opportunities, nil } // Convert sqrtPriceX96 to price for the event pool eventPoolPrice := uniswap.SqrtPriceX96ToPrice(eventPool.SqrtPriceX96.ToBig()) // Compare with other pools for _, pool := range pools { // Skip the event pool if pool.Address == event.PoolAddress { continue } // Convert sqrtPriceX96 to price for comparison pool compPoolPrice := uniswap.SqrtPriceX96ToPrice(pool.SqrtPriceX96.ToBig()) // Calculate potential profit (simplified) // In practice, this would involve more complex calculations profit := new(big.Float).Sub(compPoolPrice, eventPoolPrice) // If there's a price difference, we might have an opportunity if profit.Cmp(big.NewFloat(0)) > 0 { opp := scanner.ArbitrageOpportunity{ Path: []string{event.Token0.Hex(), event.Token1.Hex()}, Pools: []string{event.PoolAddress.Hex(), pool.Address.Hex()}, Profit: big.NewInt(1000000000000000000), // 1 ETH (mock value) GasEstimate: big.NewInt(200000000000000000), // 0.2 ETH (mock value) ROI: 5.0, // 500% (mock value) Protocol: event.Protocol, } opportunities = append(opportunities, opp) } } return opportunities, nil }