package market import ( "context" "fmt" "math/big" "sync" "time" "github.com/fraktal/mev-beta/internal/config" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/events" "github.com/fraktal/mev-beta/pkg/scanner" "github.com/fraktal/mev-beta/pkg/uniswap" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/holiman/uint256" ) // Pipeline processes transactions through multiple stages type Pipeline struct { config *config.BotConfig logger *logger.Logger marketMgr *MarketManager scanner *scanner.MarketScanner stages []PipelineStage bufferSize int concurrency int eventParser *events.EventParser } // PipelineStage represents a stage in the processing pipeline type PipelineStage func(context.Context, <-chan *scanner.EventDetails, chan<- *scanner.EventDetails) error // NewPipeline creates a new transaction processing pipeline func NewPipeline( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, scanner *scanner.MarketScanner, ) *Pipeline { pipeline := &Pipeline{ config: cfg, logger: logger, marketMgr: marketMgr, scanner: scanner, bufferSize: cfg.ChannelBufferSize, concurrency: cfg.MaxWorkers, eventParser: events.NewEventParser(), } // Add default stages pipeline.AddStage(TransactionDecoderStage(cfg, logger, marketMgr)) return pipeline } // AddDefaultStages adds the default processing stages to the pipeline func (p *Pipeline) AddDefaultStages() { p.AddStage(TransactionDecoderStage(p.config, p.logger, p.marketMgr)) p.AddStage(MarketAnalysisStage(p.config, p.logger, p.marketMgr)) p.AddStage(ArbitrageDetectionStage(p.config, p.logger, p.marketMgr)) } // AddStage adds a processing stage to the pipeline func (p *Pipeline) AddStage(stage PipelineStage) { p.stages = append(p.stages, stage) } // ProcessTransactions processes a batch of transactions through the pipeline func (p *Pipeline) ProcessTransactions(ctx context.Context, transactions []*types.Transaction, blockNumber uint64, timestamp uint64) error { if len(p.stages) == 0 { return fmt.Errorf("no pipeline stages configured") } // Parse events from transactions eventChan := make(chan *events.Event, p.bufferSize) // Parse transactions in a goroutine go func() { defer close(eventChan) for _, tx := range transactions { // Skip transactions that don't interact with DEX contracts if !p.eventParser.IsDEXInteraction(tx) { continue } events, err := p.eventParser.ParseTransaction(tx, blockNumber, timestamp) if err != nil { p.logger.Error(fmt.Sprintf("Error parsing transaction %s: %v", tx.Hash().Hex(), err)) continue } for _, event := range events { select { case eventChan <- event: case <-ctx.Done(): return } } } }() // Process through each stage var currentChan <-chan *scanner.EventDetails = nil for i, stage := range p.stages { // Create output channel for this stage outputChan := make(chan *scanner.EventDetails, p.bufferSize) // For the first stage, we process events if i == 0 { // Special handling for first stage go func(stage PipelineStage, input <-chan *events.Event, output chan<- *scanner.EventDetails) { defer close(output) // Convert events.Event to scanner.EventDetails convertedInput := make(chan *scanner.EventDetails, p.bufferSize) go func() { defer close(convertedInput) for event := range input { eventDetails := &scanner.EventDetails{ Type: event.Type, Protocol: event.Protocol, PoolAddress: event.PoolAddress.Hex(), Token0: event.Token0.Hex(), Token1: event.Token1.Hex(), Amount0In: event.Amount0, Amount0Out: big.NewInt(0), Amount1In: big.NewInt(0), Amount1Out: event.Amount1, SqrtPriceX96: event.SqrtPriceX96, Liquidity: event.Liquidity, Tick: event.Tick, Timestamp: time.Unix(int64(event.Timestamp), 0), TransactionHash: event.TransactionHash, } select { case convertedInput <- eventDetails: case <-ctx.Done(): return } } }() err := stage(ctx, convertedInput, output) if err != nil { p.logger.Error(fmt.Sprintf("Pipeline stage %d error: %v", i, err)) } }(stage, eventChan, outputChan) } else { // For subsequent stages go func(stage PipelineStage, input <-chan *scanner.EventDetails, output chan<- *scanner.EventDetails) { defer close(output) err := stage(ctx, input, output) if err != nil { p.logger.Error(fmt.Sprintf("Pipeline stage %d error: %v", i, err)) } }(stage, currentChan, outputChan) } currentChan = outputChan } // Process the final output if currentChan != nil { go p.processSwapDetails(ctx, currentChan) } return nil } // processSwapDetails processes the final output of the pipeline func (p *Pipeline) processSwapDetails(ctx context.Context, eventDetails <-chan *scanner.EventDetails) { for { select { case event, ok := <-eventDetails: if !ok { return // Channel closed } // Submit to the market scanner for processing p.scanner.SubmitEvent(*event) case <-ctx.Done(): return } } } // TransactionDecoderStage decodes transactions to identify swap opportunities func TransactionDecoderStage( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, ) PipelineStage { return func(ctx context.Context, input <-chan *scanner.EventDetails, output chan<- *scanner.EventDetails) error { var wg sync.WaitGroup // Process events concurrently for i := 0; i < cfg.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case event, ok := <-input: if !ok { return // Channel closed } // Process the event (in this case, it's already decoded) // In a real implementation, you might do additional processing here if event != nil { select { case output <- event: case <-ctx.Done(): return } } case <-ctx.Done(): return } } }() } // Wait for all workers to finish, then close the output channel go func() { wg.Wait() // Use recover to handle potential panic from closing already closed channel defer func() { if r := recover(); r != nil { // Channel already closed, that's fine } }() close(output) }() return nil } } // MarketAnalysisStage performs market analysis on event details func MarketAnalysisStage( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, ) PipelineStage { return func(ctx context.Context, input <-chan *scanner.EventDetails, output chan<- *scanner.EventDetails) error { var wg sync.WaitGroup // Process events concurrently for i := 0; i < cfg.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case event, ok := <-input: if !ok { return // Channel closed } // Only process swap events if event.Type != events.Swap { // Forward non-swap events without processing select { case output <- event: case <-ctx.Done(): return } continue } // Get pool data from market manager poolAddress := common.HexToAddress(event.PoolAddress) poolData, err := marketMgr.GetPool(ctx, poolAddress) if err != nil { logger.Error(fmt.Sprintf("Error getting pool data for %s: %v", event.PoolAddress, err)) // Forward the event even if we can't get pool data select { case output <- event: case <-ctx.Done(): return } continue } // Calculate price impact using Uniswap V3 math priceImpact, err := calculatePriceImpact(event, poolData) if err != nil { logger.Error(fmt.Sprintf("Error calculating price impact for pool %s: %v", event.PoolAddress, err)) // Forward the event even if we can't calculate price impact select { case output <- event: case <-ctx.Done(): return } continue } // Add price impact to the event // Note: In a real implementation, you might want to create a new struct // that extends EventDetails with additional fields logger.Debug(fmt.Sprintf("Price impact for pool %s: %f", event.PoolAddress, priceImpact)) // Forward the processed event select { case output <- event: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } // Wait for all workers to finish, then close the output channel go func() { wg.Wait() // Use recover to handle potential panic from closing already closed channel defer func() { if r := recover(); r != nil { // Channel already closed, that's fine } }() close(output) }() return nil } } // calculatePriceImpact calculates the price impact of a swap using Uniswap V3 math func calculatePriceImpact(event *scanner.EventDetails, poolData *PoolData) (float64, error) { // Convert event amounts to uint256 for calculations amount0In := uint256.NewInt(0) amount0In.SetFromBig(event.Amount0In) amount1In := uint256.NewInt(0) amount1In.SetFromBig(event.Amount1In) // Determine which token is being swapped in var amountIn *uint256.Int if amount0In.Cmp(uint256.NewInt(0)) > 0 { amountIn = amount0In } else { amountIn = amount1In } // If no amount is being swapped in, return 0 impact if amountIn.Cmp(uint256.NewInt(0)) == 0 { return 0.0, nil } // Calculate price impact as a percentage of liquidity // priceImpact = amountIn / liquidity liquidity := poolData.Liquidity // If liquidity is 0, we can't calculate impact if liquidity.Cmp(uint256.NewInt(0)) == 0 { return 0.0, nil } // Calculate impact impact := new(uint256.Int).Div(amountIn, liquidity) // Convert to float64 for percentage impactFloat := new(big.Float).SetInt(impact.ToBig()) percentage, _ := impactFloat.Float64() // Convert to percentage (multiply by 100) return percentage * 100.0, nil } // ArbitrageDetectionStage detects arbitrage opportunities func ArbitrageDetectionStage( cfg *config.BotConfig, logger *logger.Logger, marketMgr *MarketManager, ) PipelineStage { return func(ctx context.Context, input <-chan *scanner.EventDetails, output chan<- *scanner.EventDetails) error { var wg sync.WaitGroup // Process events concurrently for i := 0; i < cfg.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case event, ok := <-input: if !ok { return // Channel closed } // Only process swap events if event.Type != events.Swap { // Forward non-swap events without processing select { case output <- event: case <-ctx.Done(): return } continue } // Look for arbitrage opportunities opportunities, err := findArbitrageOpportunities(ctx, event, marketMgr, logger) if err != nil { logger.Error(fmt.Sprintf("Error finding arbitrage opportunities for pool %s: %v", event.PoolAddress, err)) // Forward the event even if we encounter an error select { case output <- event: case <-ctx.Done(): return } continue } // Log any found opportunities if len(opportunities) > 0 { logger.Info(fmt.Sprintf("Found %d arbitrage opportunities for pool %s", len(opportunities), event.PoolAddress)) for _, opp := range opportunities { logger.Info(fmt.Sprintf("Arbitrage opportunity: %+v", opp)) } } // Forward the processed event select { case output <- event: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } // Wait for all workers to finish, then close the output channel go func() { wg.Wait() // Use recover to handle potential panic from closing already closed channel defer func() { if r := recover(); r != nil { // Channel already closed, that's fine } }() close(output) }() return nil } } // findArbitrageOpportunities looks for arbitrage opportunities based on a swap event func findArbitrageOpportunities(ctx context.Context, event *scanner.EventDetails, marketMgr *MarketManager, logger *logger.Logger) ([]scanner.ArbitrageOpportunity, error) { opportunities := make([]scanner.ArbitrageOpportunity, 0) // Get all pools for the same token pair token0 := common.HexToAddress(event.Token0) token1 := common.HexToAddress(event.Token1) pools := marketMgr.GetPoolsByTokens(token0, token1) // If we don't have multiple pools, we can't do arbitrage if len(pools) < 2 { return opportunities, nil } // Get the pool that triggered the event eventPoolAddress := common.HexToAddress(event.PoolAddress) // Find the pool that triggered the event var eventPool *PoolData for _, pool := range pools { if pool.Address == eventPoolAddress { eventPool = pool break } } // If we can't find the event pool, return if eventPool == nil { return opportunities, nil } // Convert sqrtPriceX96 to price for the event pool eventPoolPrice := uniswap.SqrtPriceX96ToPrice(eventPool.SqrtPriceX96.ToBig()) // Compare with other pools for _, pool := range pools { // Skip the event pool if pool.Address == eventPoolAddress { continue } // Convert sqrtPriceX96 to price for comparison pool compPoolPrice := uniswap.SqrtPriceX96ToPrice(pool.SqrtPriceX96.ToBig()) // Calculate potential profit (simplified) // In practice, this would involve more complex calculations profit := new(big.Float).Sub(compPoolPrice, eventPoolPrice) // If there's a price difference, we might have an opportunity if profit.Cmp(big.NewFloat(0)) > 0 { opp := scanner.ArbitrageOpportunity{ Path: []string{event.Token0, event.Token1}, Pools: []string{event.PoolAddress, pool.Address.Hex()}, Profit: big.NewInt(1000000000000000000), // 1 ETH (mock value) GasEstimate: big.NewInt(200000000000000000), // 0.2 ETH (mock value) ROI: 5.0, // 500% (mock value) Protocol: event.Protocol, } opportunities = append(opportunities, opp) } } return opportunities, nil }