Add enhanced concurrency patterns, rate limiting, market management, and pipeline processing

This commit is contained in:
Krypto Kajun
2025-09-12 01:35:50 -05:00
parent 300976219a
commit fbb85e529a
17 changed files with 1440 additions and 190 deletions

181
pkg/market/fan.go Normal file
View File

@@ -0,0 +1,181 @@
package market
import (
"context"
"fmt"
"sync"
"time"
"github.com/your-username/mev-beta/internal/config"
"github.com/your-username/mev-beta/internal/logger"
"github.com/your-username/mev-beta/internal/ratelimit"
"github.com/ethereum/go-ethereum/core/types"
)
// FanManager manages fan-in/fan-out patterns for multiple data sources
type FanManager struct {
config *config.Config
logger *logger.Logger
rateLimiter *ratelimit.LimiterManager
bufferSize int
maxWorkers int
}
// NewFanManager creates a new fan manager
func NewFanManager(cfg *config.Config, logger *logger.Logger, rateLimiter *ratelimit.LimiterManager) *FanManager {
return &FanManager{
config: cfg,
logger: logger,
rateLimiter: rateLimiter,
bufferSize: cfg.Bot.ChannelBufferSize,
maxWorkers: cfg.Bot.MaxWorkers,
}
}
// FanOut distributes work across multiple workers
func (fm *FanManager) FanOut(ctx context.Context, jobs <-chan *types.Transaction, numWorkers int) <-chan *types.Transaction {
// Create the output channel
out := make(chan *types.Transaction, fm.bufferSize)
// Create a wait group to wait for all workers
var wg sync.WaitGroup
// Start the workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
fm.worker(ctx, jobs, out, workerID)
}(i)
}
// Close the output channel when all workers are done
go func() {
wg.Wait()
close(out)
}()
return out
}
// worker processes jobs from the input channel and sends results to the output channel
func (fm *FanManager) worker(ctx context.Context, jobs <-chan *types.Transaction, out chan<- *types.Transaction, workerID int) {
for {
select {
case job, ok := <-jobs:
if !ok {
return // Channel closed
}
// Process the job (in this case, just pass it through)
// In practice, you would do some processing here
fm.logger.Debug(fmt.Sprintf("Worker %d processing transaction %s", workerID, job.Hash().Hex()))
// Simulate some work
time.Sleep(10 * time.Millisecond)
// Send the result to the output channel
select {
case out <- job:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
// FanIn combines multiple input channels into a single output channel
func (fm *FanManager) FanIn(ctx context.Context, inputs ...<-chan *types.Transaction) <-chan *types.Transaction {
// Create the output channel
out := make(chan *types.Transaction, fm.bufferSize)
// Create a wait group to wait for all input channels
var wg sync.WaitGroup
// Start a goroutine for each input channel
for i, input := range inputs {
wg.Add(1)
go func(inputID int, inputChan <-chan *types.Transaction) {
defer wg.Done()
fm.fanInWorker(ctx, inputChan, out, inputID)
}(i, input)
}
// Close the output channel when all input channels are done
go func() {
wg.Wait()
close(out)
}()
return out
}
// fanInWorker reads from an input channel and writes to the output channel
func (fm *FanManager) fanInWorker(ctx context.Context, input <-chan *types.Transaction, out chan<- *types.Transaction, inputID int) {
for {
select {
case job, ok := <-input:
if !ok {
return // Channel closed
}
// Send the job to the output channel
select {
case out <- job:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
// Multiplex distributes transactions across multiple endpoints with rate limiting
func (fm *FanManager) Multiplex(ctx context.Context, transactions <-chan *types.Transaction) []<-chan *types.Transaction {
endpoints := fm.rateLimiter.GetEndpoints()
outputs := make([]<-chan *types.Transaction, len(endpoints))
// Create a channel for each endpoint
for i, endpoint := range endpoints {
// Create a buffered channel for this endpoint
endpointChan := make(chan *types.Transaction, fm.bufferSize)
outputs[i] = endpointChan
// Start a worker for this endpoint
go func(endpointURL string, outChan chan<- *types.Transaction) {
defer close(outChan)
for {
select {
case tx, ok := <-transactions:
if !ok {
return // Input channel closed
}
// Wait for rate limiter
if err := fm.rateLimiter.WaitForLimit(ctx, endpointURL); err != nil {
fm.logger.Error(fmt.Sprintf("Rate limiter error for %s: %v", endpointURL, err))
continue
}
// Send to endpoint-specific channel
select {
case outChan <- tx:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(endpoint, endpointChan)
}
return outputs
}

201
pkg/market/manager.go Normal file
View File

@@ -0,0 +1,201 @@
package market
import (
"context"
"fmt"
"sync"
"time"
"github.com/your-username/mev-beta/internal/config"
"github.com/your-username/mev-beta/internal/logger"
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
"golang.org/x/sync/singleflight"
)
// MarketManager manages market data and pool information
type MarketManager struct {
config *config.UniswapConfig
logger *logger.Logger
pools map[string]*PoolData
mu sync.RWMutex
cacheGroup singleflight.Group
cacheDuration time.Duration
maxCacheSize int
}
// PoolData represents data for a Uniswap V3 pool
type PoolData struct {
Address common.Address
Token0 common.Address
Token1 common.Address
Fee int64
Liquidity *uint256.Int
SqrtPriceX96 *uint256.Int
Tick int
TickSpacing int
LastUpdated time.Time
}
// NewMarketManager creates a new market manager
func NewMarketManager(cfg *config.UniswapConfig, logger *logger.Logger) *MarketManager {
return &MarketManager{
config: cfg,
logger: logger,
pools: make(map[string]*PoolData),
cacheDuration: time.Duration(cfg.Cache.Expiration) * time.Second,
maxCacheSize: cfg.Cache.MaxSize,
}
}
// GetPool retrieves pool data, either from cache or by fetching it
func (mm *MarketManager) GetPool(ctx context.Context, poolAddress common.Address) (*PoolData, error) {
// Check if we have it in cache and it's still valid
poolKey := poolAddress.Hex()
mm.mu.RLock()
if pool, exists := mm.pools[poolKey]; exists {
// Check if cache is still valid
if time.Since(pool.LastUpdated) < mm.cacheDuration {
mm.mu.RUnlock()
return pool, nil
}
}
mm.mu.RUnlock()
// Use singleflight to prevent duplicate requests for the same pool
result, err, _ := mm.cacheGroup.Do(poolKey, func() (interface{}, error) {
return mm.fetchPoolData(ctx, poolAddress)
})
if err != nil {
return nil, err
}
pool := result.(*PoolData)
// Update cache
mm.mu.Lock()
// Check if we need to evict old entries
if len(mm.pools) >= mm.maxCacheSize {
mm.evictOldest()
}
mm.pools[poolKey] = pool
mm.mu.Unlock()
return pool, nil
}
// fetchPoolData fetches pool data from the blockchain
func (mm *MarketManager) fetchPoolData(ctx context.Context, poolAddress common.Address) (*PoolData, error) {
// This is a simplified implementation
// In practice, you would interact with the Ethereum blockchain to get real data
// For now, we'll return mock data
pool := &PoolData{
Address: poolAddress,
Token0: common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"), // USDC
Token1: common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), // WETH
Fee: 3000, // 0.3%
Liquidity: uint256.NewInt(1000000000000000000), // 1 ETH equivalent
SqrtPriceX96: uint256.NewInt(2505414483750470000), // Mock sqrt price
Tick: 200000, // Mock tick
TickSpacing: 60, // Tick spacing for 0.3% fee
LastUpdated: time.Now(),
}
mm.logger.Debug(fmt.Sprintf("Fetched pool data for %s", poolAddress.Hex()))
return pool, nil
}
// evictOldest removes the oldest entry from the cache
func (mm *MarketManager) evictOldest() {
oldestKey := ""
var oldestTime time.Time
for key, pool := range mm.pools {
if oldestKey == "" || pool.LastUpdated.Before(oldestTime) {
oldestKey = key
oldestTime = pool.LastUpdated
}
}
if oldestKey != "" {
delete(mm.pools, oldestKey)
mm.logger.Debug(fmt.Sprintf("Evicted pool %s from cache", oldestKey))
}
}
// UpdatePool updates pool data
func (mm *MarketManager) UpdatePool(poolAddress common.Address, liquidity *uint256.Int, sqrtPriceX96 *uint256.Int, tick int) {
poolKey := poolAddress.Hex()
mm.mu.Lock()
defer mm.mu.Unlock()
if pool, exists := mm.pools[poolKey]; exists {
pool.Liquidity = liquidity
pool.SqrtPriceX96 = sqrtPriceX96
pool.Tick = tick
pool.LastUpdated = time.Now()
} else {
// Create new pool entry
pool := &PoolData{
Address: poolAddress,
Liquidity: liquidity,
SqrtPriceX96: sqrtPriceX96,
Tick: tick,
LastUpdated: time.Now(),
}
mm.pools[poolKey] = pool
}
}
// GetPoolsByTokens retrieves pools for a pair of tokens
func (mm *MarketManager) GetPoolsByTokens(token0, token1 common.Address) []*PoolData {
mm.mu.RLock()
defer mm.mu.RUnlock()
pools := make([]*PoolData, 0)
for _, pool := range mm.pools {
// Check if this pool contains the token pair
if (pool.Token0 == token0 && pool.Token1 == token1) ||
(pool.Token0 == token1 && pool.Token1 == token0) {
pools = append(pools, pool)
}
}
return pools
}
// GetAllPools returns all cached pools
func (mm *MarketManager) GetAllPools() []*PoolData {
mm.mu.RLock()
defer mm.mu.RUnlock()
pools := make([]*PoolData, 0, len(mm.pools))
for _, pool := range mm.pools {
pools = append(pools, pool)
}
return pools
}
// ClearCache clears all cached pool data
func (mm *MarketManager) ClearCache() {
mm.mu.Lock()
defer mm.mu.Unlock()
mm.pools = make(map[string]*PoolData)
mm.logger.Info("Cleared pool cache")
}
// GetCacheStats returns cache statistics
func (mm *MarketManager) GetCacheStats() (int, int) {
mm.mu.RLock()
defer mm.mu.RUnlock()
return len(mm.pools), mm.maxCacheSize
}

249
pkg/market/pipeline.go Normal file
View File

@@ -0,0 +1,249 @@
package market
import (
"context"
"fmt"
"math/big"
"sync"
"time"
"github.com/your-username/mev-beta/internal/config"
"github.com/your-username/mev-beta/internal/logger"
"github.com/your-username/mev-beta/pkg/scanner"
"github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256"
)
// Pipeline processes transactions through multiple stages
type Pipeline struct {
config *config.BotConfig
logger *logger.Logger
marketMgr *MarketManager
scanner *scanner.MarketScanner
stages []PipelineStage
bufferSize int
concurrency int
}
// PipelineStage represents a stage in the processing pipeline
type PipelineStage func(context.Context, <-chan *types.Transaction, chan<- *scanner.SwapDetails) error
// NewPipeline creates a new transaction processing pipeline
func NewPipeline(
cfg *config.BotConfig,
logger *logger.Logger,
marketMgr *MarketManager,
scanner *scanner.MarketScanner,
) *Pipeline {
return &Pipeline{
config: cfg,
logger: logger,
marketMgr: marketMgr,
scanner: scanner,
bufferSize: cfg.ChannelBufferSize,
concurrency: cfg.MaxWorkers,
}
}
// AddStage adds a processing stage to the pipeline
func (p *Pipeline) AddStage(stage PipelineStage) {
p.stages = append(p.stages, stage)
}
// ProcessTransactions processes a batch of transactions through the pipeline
func (p *Pipeline) ProcessTransactions(ctx context.Context, transactions []*types.Transaction) error {
if len(p.stages) == 0 {
return fmt.Errorf("no pipeline stages configured")
}
// Create the initial input channel
inputChan := make(chan *types.Transaction, p.bufferSize)
// Send transactions to the input channel
go func() {
defer close(inputChan)
for _, tx := range transactions {
select {
case inputChan <- tx:
case <-ctx.Done():
return
}
}
}()
// Process through each stage
var currentChan <-chan *scanner.SwapDetails = nil
for i, stage := range p.stages {
// Create output channel for this stage
outputChan := make(chan *scanner.SwapDetails, p.bufferSize)
// For the first stage, we need to convert transactions to swap details
if i == 0 {
// Special handling for first stage
go func(stage PipelineStage, input <-chan *types.Transaction, output chan<- *scanner.SwapDetails) {
defer close(output)
err := stage(ctx, input, output)
if err != nil {
p.logger.Error(fmt.Sprintf("Pipeline stage %d error: %v", i, err))
}
}(stage, inputChan, outputChan)
} else {
// For subsequent stages
go func(stage PipelineStage, input <-chan *scanner.SwapDetails, output chan<- *scanner.SwapDetails) {
defer close(output)
// We need to create a dummy input channel for this stage
// This is a simplification - in practice you'd have a more complex pipeline
dummyInput := make(chan *types.Transaction, p.bufferSize)
close(dummyInput)
err := stage(ctx, dummyInput, output)
if err != nil {
p.logger.Error(fmt.Sprintf("Pipeline stage %d error: %v", i, err))
}
}(stage, currentChan, outputChan)
}
currentChan = outputChan
}
// Process the final output
if currentChan != nil {
go p.processSwapDetails(ctx, currentChan)
}
return nil
}
// processSwapDetails processes the final output of the pipeline
func (p *Pipeline) processSwapDetails(ctx context.Context, swapDetails <-chan *scanner.SwapDetails) {
for {
select {
case swap, ok := <-swapDetails:
if !ok {
return // Channel closed
}
// Submit to the market scanner for processing
p.scanner.SubmitSwap(*swap)
case <-ctx.Done():
return
}
}
}
// TransactionDecoderStage decodes transactions to identify swap opportunities
func TransactionDecoderStage(
cfg *config.BotConfig,
logger *logger.Logger,
marketMgr *MarketManager,
) PipelineStage {
return func(ctx context.Context, input <-chan *types.Transaction, output chan<- *scanner.SwapDetails) error {
var wg sync.WaitGroup
// Process transactions concurrently
for i := 0; i < cfg.MaxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case tx, ok := <-input:
if !ok {
return // Channel closed
}
// Process the transaction
swapDetails := decodeTransaction(tx, logger)
if swapDetails != nil {
select {
case output <- swapDetails:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
}
// Wait for all workers to finish
go func() {
wg.Wait()
close(output)
}()
return nil
}
}
// decodeTransaction decodes a transaction to extract swap details
func decodeTransaction(tx *types.Transaction, logger *logger.Logger) *scanner.SwapDetails {
// This is a simplified implementation
// In practice, you would:
// 1. Check if the transaction is calling a Uniswap-like contract
// 2. Decode the function call data
// 3. Extract token addresses, amounts, etc.
// For now, we'll return mock data for demonstration
if tx.To() != nil {
swap := &scanner.SwapDetails{
PoolAddress: tx.To().Hex(),
Token0: "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", // USDC
Token1: "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", // WETH
Amount0In: big.NewInt(1000000000), // 1000 USDC
Amount0Out: big.NewInt(0),
Amount1In: big.NewInt(0),
Amount1Out: big.NewInt(500000000000000000), // 0.5 WETH
SqrtPriceX96: uint256.NewInt(2505414483750470000),
Liquidity: uint256.NewInt(1000000000000000000),
Tick: 200000,
Timestamp: time.Now(),
TransactionHash: tx.Hash(),
}
logger.Debug(fmt.Sprintf("Decoded swap transaction: %s", tx.Hash().Hex()))
return swap
}
return nil
}
// MarketAnalysisStage performs market analysis on swap details
func MarketAnalysisStage(
cfg *config.BotConfig,
logger *logger.Logger,
marketMgr *MarketManager,
) PipelineStage {
return func(ctx context.Context, input <-chan *types.Transaction, output chan<- *scanner.SwapDetails) error {
// This is a placeholder for market analysis
// In practice, you would:
// 1. Get pool data from market manager
// 2. Analyze price impact
// 3. Check for arbitrage opportunities
close(output)
return nil
}
}
// ArbitrageDetectionStage detects arbitrage opportunities
func ArbitrageDetectionStage(
cfg *config.BotConfig,
logger *logger.Logger,
marketMgr *MarketManager,
) PipelineStage {
return func(ctx context.Context, input <-chan *types.Transaction, output chan<- *scanner.SwapDetails) error {
// This is a placeholder for arbitrage detection
// In practice, you would:
// 1. Compare prices across multiple pools
// 2. Calculate potential profit
// 3. Filter based on profitability
close(output)
return nil
}
}