feat: create v2-prep branch with comprehensive planning

Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Administrator
2025-11-10 10:14:26 +01:00
parent 1773daffe7
commit 803de231ba
411 changed files with 20390 additions and 8680 deletions

182
orig/pkg/market/fan.go Normal file
View File

@@ -0,0 +1,182 @@
package market
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/internal/ratelimit"
)
// FanManager manages fan-in/fan-out patterns for multiple data sources
type FanManager struct {
config *config.Config
logger *logger.Logger
rateLimiter *ratelimit.LimiterManager
bufferSize int
maxWorkers int
}
// NewFanManager creates a new fan manager
func NewFanManager(cfg *config.Config, logger *logger.Logger, rateLimiter *ratelimit.LimiterManager) *FanManager {
return &FanManager{
config: cfg,
logger: logger,
rateLimiter: rateLimiter,
bufferSize: cfg.Bot.ChannelBufferSize,
maxWorkers: cfg.Bot.MaxWorkers,
}
}
// FanOut distributes work across multiple workers
func (fm *FanManager) FanOut(ctx context.Context, jobs <-chan *types.Transaction, numWorkers int) <-chan *types.Transaction {
// Create the output channel
out := make(chan *types.Transaction, fm.bufferSize)
// Create a wait group to wait for all workers
var wg sync.WaitGroup
// Start the workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
fm.worker(ctx, jobs, out, workerID)
}(i)
}
// Close the output channel when all workers are done
go func() {
wg.Wait()
close(out)
}()
return out
}
// worker processes jobs from the input channel and sends results to the output channel
func (fm *FanManager) worker(ctx context.Context, jobs <-chan *types.Transaction, out chan<- *types.Transaction, workerID int) {
for {
select {
case job, ok := <-jobs:
if !ok {
return // Channel closed
}
// Process the job (in this case, just pass it through)
// In practice, you would do some processing here
fm.logger.Debug(fmt.Sprintf("Worker %d processing transaction %s", workerID, job.Hash().Hex()))
// Simulate some work
time.Sleep(10 * time.Millisecond)
// Send the result to the output channel
select {
case out <- job:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
// FanIn combines multiple input channels into a single output channel
func (fm *FanManager) FanIn(ctx context.Context, inputs ...<-chan *types.Transaction) <-chan *types.Transaction {
// Create the output channel
out := make(chan *types.Transaction, fm.bufferSize)
// Create a wait group to wait for all input channels
var wg sync.WaitGroup
// Start a goroutine for each input channel
for i, input := range inputs {
wg.Add(1)
go func(inputID int, inputChan <-chan *types.Transaction) {
defer wg.Done()
fm.fanInWorker(ctx, inputChan, out, inputID)
}(i, input)
}
// Close the output channel when all input channels are done
go func() {
wg.Wait()
close(out)
}()
return out
}
// fanInWorker reads from an input channel and writes to the output channel
func (fm *FanManager) fanInWorker(ctx context.Context, input <-chan *types.Transaction, out chan<- *types.Transaction, inputID int) {
for {
select {
case job, ok := <-input:
if !ok {
return // Channel closed
}
// Send the job to the output channel
select {
case out <- job:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
// Multiplex distributes transactions across multiple endpoints with rate limiting
func (fm *FanManager) Multiplex(ctx context.Context, transactions <-chan *types.Transaction) []<-chan *types.Transaction {
endpoints := fm.rateLimiter.GetEndpoints()
outputs := make([]<-chan *types.Transaction, len(endpoints))
// Create a channel for each endpoint
for i, endpoint := range endpoints {
// Create a buffered channel for this endpoint
endpointChan := make(chan *types.Transaction, fm.bufferSize)
outputs[i] = endpointChan
// Start a worker for this endpoint
go func(endpointURL string, outChan chan<- *types.Transaction) {
defer close(outChan)
for {
select {
case tx, ok := <-transactions:
if !ok {
return // Input channel closed
}
// Wait for rate limiter
if err := fm.rateLimiter.WaitForLimit(ctx, endpointURL); err != nil {
fm.logger.Error(fmt.Sprintf("Rate limiter error for %s: %v", endpointURL, err))
continue
}
// Send to endpoint-specific channel
select {
case outChan <- tx:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(endpoint, endpointChan)
}
return outputs
}

303
orig/pkg/market/manager.go Normal file
View File

@@ -0,0 +1,303 @@
package market
import (
"context"
"fmt"
"os"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/holiman/uint256"
"golang.org/x/sync/singleflight"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/uniswap"
)
// MarketManager manages market data and pool information
type MarketManager struct {
config *config.UniswapConfig
logger *logger.Logger
pools map[string]*PoolData
mu sync.RWMutex
cacheGroup singleflight.Group
cacheDuration time.Duration
maxCacheSize int
}
// PoolData represents data for a Uniswap V3 pool
type PoolData struct {
Address common.Address
Token0 common.Address
Token1 common.Address
Fee int64
Liquidity *uint256.Int
SqrtPriceX96 *uint256.Int
Tick int
TickSpacing int
LastUpdated time.Time
}
// NewMarketManager creates a new market manager
func NewMarketManager(cfg *config.UniswapConfig, logger *logger.Logger) *MarketManager {
return &MarketManager{
config: cfg,
logger: logger,
pools: make(map[string]*PoolData),
cacheDuration: time.Duration(cfg.Cache.Expiration) * time.Second,
maxCacheSize: cfg.Cache.MaxSize,
}
}
// GetPool retrieves pool data, either from cache or by fetching it
func (mm *MarketManager) GetPool(ctx context.Context, poolAddress common.Address) (*PoolData, error) {
// Check if we have it in cache and it's still valid
poolKey := poolAddress.Hex()
mm.mu.RLock()
if pool, exists := mm.pools[poolKey]; exists {
// Check if cache is still valid
if time.Since(pool.LastUpdated) < mm.cacheDuration {
mm.mu.RUnlock()
return pool, nil
}
}
mm.mu.RUnlock()
// Use singleflight to prevent duplicate requests for the same pool
result, err, _ := mm.cacheGroup.Do(poolKey, func() (interface{}, error) {
return mm.fetchPoolData(ctx, poolAddress)
})
if err != nil {
return nil, err
}
pool := result.(*PoolData)
// Update cache
mm.mu.Lock()
// Check if we need to evict old entries
if len(mm.pools) >= mm.maxCacheSize {
mm.evictOldest()
}
mm.pools[poolKey] = pool
mm.mu.Unlock()
return pool, nil
}
// fetchPoolData fetches pool data from the blockchain
func (mm *MarketManager) fetchPoolData(ctx context.Context, poolAddress common.Address) (*PoolData, error) {
// Validate that this is not a router address before attempting pool operations
knownRouters := map[common.Address]bool{
common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"): true, // Uniswap V3 Router
common.HexToAddress("0x4752ba5dbc23f44d87826276bf6fd6b1c372ad24"): true, // Uniswap V2 Router02
common.HexToAddress("0xA51afAFe0263b40EdaEf0Df8781eA9aa03E381a3"): true, // Universal Router
common.HexToAddress("0x1111111254EEB25477B68fb85Ed929f73A960582"): true, // 1inch Router v5
common.HexToAddress("0xC36442b4a4522E871399CD717aBDD847Ab11FE88"): true, // Uniswap V3 Position Manager
common.HexToAddress("0x87d66368cD08a7Ca42252f5ab44B2fb6d1Fb8d15"): true, // TraderJoe Router
common.HexToAddress("0x82dfd2b94222bDB603Aa6B34A8D37311ab3DB800"): true, // Another router
common.HexToAddress("0x1b81D678ffb9C0263b24A97847620C99d213eB14"): true, // Another router
common.HexToAddress("0x0000000000000000000000000000000000000001"): true, // Our placeholder address
common.HexToAddress("0x0000000000000000000000000000000000000002"): true, // Our router conflict placeholder
}
if knownRouters[poolAddress] {
return nil, fmt.Errorf("cannot fetch pool data for router address %s", poolAddress.Hex())
}
// Check for addresses starting with 0xDEAD (our derived placeholders)
if len(poolAddress.Bytes()) >= 2 && poolAddress.Bytes()[0] == 0xDE && poolAddress.Bytes()[1] == 0xAD {
return nil, fmt.Errorf("cannot fetch pool data for placeholder address %s", poolAddress.Hex())
}
// Connect to Ethereum client
// Get RPC endpoint from config or environment
rpcEndpoint := os.Getenv("ARBITRUM_RPC_ENDPOINT")
if rpcEndpoint == "" {
rpcEndpoint = "https://arbitrum-mainnet.core.chainstack.com/53c30e7a941160679fdcc396c894fc57" // fallback
}
client, err := ethclient.Dial(rpcEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to connect to Ethereum node: %v", err)
}
defer client.Close()
// Create Uniswap V3 pool interface
pool := uniswap.NewUniswapV3Pool(poolAddress, client)
// Validate that this is a real pool contract
if !uniswap.IsValidPool(ctx, client, poolAddress) {
return nil, fmt.Errorf("invalid pool contract at address %s", poolAddress.Hex())
}
// Fetch real pool state from the blockchain
poolState, err := pool.GetPoolState(ctx)
if err != nil {
mm.logger.Warn(fmt.Sprintf("Failed to fetch real pool state for %s, using fallback data: %v", poolAddress.Hex(), err))
// Fallback to realistic mock data with per-pool variation
poolData := &PoolData{
Address: poolAddress,
Token0: common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"), // USDC
Token1: common.HexToAddress("0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"), // WETH on Arbitrum
Fee: 3000, // 0.3%
Liquidity: uint256.NewInt(1000000000000000000), // 1 ETH equivalent
SqrtPriceX96: uint256.NewInt(2505414483750470000), // Realistic price
Tick: 200000, // Corresponding tick
TickSpacing: 60, // Tick spacing for 0.3% fee
LastUpdated: time.Now(),
}
// Add some variation based on pool address to make different pools have different data
addressBytes := poolAddress.Bytes()
variation := int64(addressBytes[19]) // Use last byte for variation
// Vary liquidity by up to ±50%
liquidityVariation := (variation - 128) * 5000000000000000 // ±0.05 ETH per unit
baseLiquidity := int64(1000000000000000000)
newLiquidityValue := baseLiquidity + liquidityVariation
if newLiquidityValue > 0 {
poolData.Liquidity = uint256.NewInt(uint64(newLiquidityValue))
}
// Vary price slightly
priceVariation := (variation - 128) * 10000000000000
basePrice := int64(2505414483750470000)
newPriceValue := basePrice + priceVariation
if newPriceValue > 0 {
poolData.SqrtPriceX96 = uint256.NewInt(uint64(newPriceValue))
}
return poolData, nil
}
// Create PoolData from real blockchain state
poolData := &PoolData{
Address: poolAddress,
Token0: poolState.Token0,
Token1: poolState.Token1,
Fee: poolState.Fee,
Liquidity: poolState.Liquidity,
SqrtPriceX96: poolState.SqrtPriceX96,
Tick: poolState.Tick,
TickSpacing: getTickSpacing(poolState.Fee),
LastUpdated: time.Now(),
}
mm.logger.Debug(fmt.Sprintf("Fetched real pool data for %s: Token0=%s, Token1=%s, Fee=%d",
poolAddress.Hex(), poolState.Token0.Hex(), poolState.Token1.Hex(), poolState.Fee))
return poolData, nil
}
// getTickSpacing returns the tick spacing for a given fee tier
func getTickSpacing(fee int64) int {
switch fee {
case 100: // 0.01%
return 1
case 500: // 0.05%
return 10
case 3000: // 0.3%
return 60
case 10000: // 1%
return 200
default:
return 60 // Default to medium spacing
}
}
// evictOldest removes the oldest entry from the cache
func (mm *MarketManager) evictOldest() {
oldestKey := ""
var oldestTime time.Time
for key, pool := range mm.pools {
if oldestKey == "" || pool.LastUpdated.Before(oldestTime) {
oldestKey = key
oldestTime = pool.LastUpdated
}
}
if oldestKey != "" {
delete(mm.pools, oldestKey)
mm.logger.Debug(fmt.Sprintf("Evicted pool %s from cache", oldestKey))
}
}
// UpdatePool updates pool data
func (mm *MarketManager) UpdatePool(poolAddress common.Address, liquidity *uint256.Int, sqrtPriceX96 *uint256.Int, tick int) {
poolKey := poolAddress.Hex()
mm.mu.Lock()
defer mm.mu.Unlock()
if pool, exists := mm.pools[poolKey]; exists {
pool.Liquidity = liquidity
pool.SqrtPriceX96 = sqrtPriceX96
pool.Tick = tick
pool.LastUpdated = time.Now()
} else {
// Create new pool entry
pool := &PoolData{
Address: poolAddress,
Liquidity: liquidity,
SqrtPriceX96: sqrtPriceX96,
Tick: tick,
LastUpdated: time.Now(),
}
mm.pools[poolKey] = pool
}
}
// GetPoolsByTokens retrieves pools for a pair of tokens
func (mm *MarketManager) GetPoolsByTokens(token0, token1 common.Address) []*PoolData {
mm.mu.RLock()
defer mm.mu.RUnlock()
pools := make([]*PoolData, 0)
for _, pool := range mm.pools {
// Check if this pool contains the token pair
if (pool.Token0 == token0 && pool.Token1 == token1) ||
(pool.Token0 == token1 && pool.Token1 == token0) {
pools = append(pools, pool)
}
}
return pools
}
// GetAllPools returns all cached pools
func (mm *MarketManager) GetAllPools() []*PoolData {
mm.mu.RLock()
defer mm.mu.RUnlock()
pools := make([]*PoolData, 0, len(mm.pools))
for _, pool := range mm.pools {
pools = append(pools, pool)
}
return pools
}
// ClearCache clears all cached pool data
func (mm *MarketManager) ClearCache() {
mm.mu.Lock()
defer mm.mu.Unlock()
mm.pools = make(map[string]*PoolData)
mm.logger.Info("Cleared pool cache")
}
// GetCacheStats returns cache statistics
func (mm *MarketManager) GetCacheStats() (int, int) {
mm.mu.RLock()
defer mm.mu.RUnlock()
return len(mm.pools), mm.maxCacheSize
}

View File

@@ -0,0 +1,295 @@
package market
import (
"context"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
"github.com/stretchr/testify/assert"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
)
func TestNewMarketManager(t *testing.T) {
// Create test config
cfg := &config.UniswapConfig{
Cache: config.CacheConfig{
Expiration: 300,
MaxSize: 10000,
},
}
// Create test logger
logger := logger.New("info", "text", "")
// Create market manager
manager := NewMarketManager(cfg, logger)
// Verify manager was created correctly
assert.NotNil(t, manager)
assert.Equal(t, cfg, manager.config)
assert.NotNil(t, manager.pools)
assert.Equal(t, time.Duration(cfg.Cache.Expiration)*time.Second, manager.cacheDuration)
assert.Equal(t, cfg.Cache.MaxSize, manager.maxCacheSize)
}
func TestGetPoolCacheHit(t *testing.T) {
// Create market manager
cfg := &config.UniswapConfig{
Cache: config.CacheConfig{
Expiration: 300,
MaxSize: 10000,
},
}
logger := logger.New("info", "text", "")
manager := NewMarketManager(cfg, logger)
// Add a pool to the cache
poolAddress := common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640")
pool := &PoolData{
Address: poolAddress,
Token0: common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
Token1: common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"),
Fee: 3000,
Liquidity: uint256.NewInt(1000000000000000000),
SqrtPriceX96: uint256.NewInt(2505414483750470000),
Tick: 200000,
TickSpacing: 60,
LastUpdated: time.Now(),
}
manager.pools[poolAddress.Hex()] = pool
// Get the pool (should be a cache hit)
ctx := context.Background()
result, err := manager.GetPool(ctx, poolAddress)
// Verify results
assert.NoError(t, err)
assert.Equal(t, pool, result)
}
func TestGetPoolCacheMiss(t *testing.T) {
// Create market manager
cfg := &config.UniswapConfig{
Cache: config.CacheConfig{
Expiration: 300,
MaxSize: 10000,
},
}
logger := logger.New("info", "text", "")
manager := NewMarketManager(cfg, logger)
// Get a pool that's not in the cache (should trigger fetch)
t.Skip("requires live RPC")
poolAddress := common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640")
ctx := context.Background()
result, err := manager.GetPool(ctx, poolAddress)
// Verify results (should get mock data)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, poolAddress, result.Address)
assert.Equal(t, "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", result.Token0.Hex())
assert.Equal(t, "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", result.Token1.Hex())
}
func TestGetPoolsByTokens(t *testing.T) {
// Create market manager
cfg := &config.UniswapConfig{
Cache: config.CacheConfig{
Expiration: 300,
MaxSize: 10000,
},
}
logger := logger.New("info", "text", "")
manager := NewMarketManager(cfg, logger)
// Add some pools to the cache
token0 := common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48") // USDC
token1 := common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2") // WETH
pool1 := &PoolData{
Address: common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640"),
Token0: token0,
Token1: token1,
Fee: 3000,
}
manager.pools[pool1.Address.Hex()] = pool1
pool2 := &PoolData{
Address: common.HexToAddress("0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc"),
Token0: token0,
Token1: token1,
Fee: 500,
}
manager.pools[pool2.Address.Hex()] = pool2
// Add a pool with different tokens
token2 := common.HexToAddress("0x1f9840a85d5aF5bf1D1762F925BDADdC4201F984") // UNI
pool3 := &PoolData{
Address: common.HexToAddress("0x1234567890123456789012345678901234567890"),
Token0: token0,
Token1: token2,
Fee: 3000,
}
manager.pools[pool3.Address.Hex()] = pool3
// Get pools for the token pair
pools := manager.GetPoolsByTokens(token0, token1)
// Verify results
assert.Len(t, pools, 2)
// Check that both pools are in the result
pool1Found := false
pool2Found := false
for _, pool := range pools {
if pool.Address == pool1.Address {
pool1Found = true
}
if pool.Address == pool2.Address {
pool2Found = true
}
}
assert.True(t, pool1Found)
assert.True(t, pool2Found)
}
func TestGetAllPools(t *testing.T) {
// Create market manager
cfg := &config.UniswapConfig{
Cache: config.CacheConfig{
Expiration: 300,
MaxSize: 10000,
},
}
logger := logger.New("info", "text", "")
manager := NewMarketManager(cfg, logger)
// Add some pools to the cache
pool1 := &PoolData{
Address: common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640"),
Token0: common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
Token1: common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"),
Fee: 3000,
}
manager.pools[pool1.Address.Hex()] = pool1
pool2 := &PoolData{
Address: common.HexToAddress("0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc"),
Token0: common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
Token1: common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"),
Fee: 500,
}
manager.pools[pool2.Address.Hex()] = pool2
// Get all pools
pools := manager.GetAllPools()
// Verify results
assert.Len(t, pools, 2)
}
func TestUpdatePoolExisting(t *testing.T) {
// Create market manager
cfg := &config.UniswapConfig{
Cache: config.CacheConfig{
Expiration: 300,
MaxSize: 10000,
},
}
logger := logger.New("info", "text", "")
manager := NewMarketManager(cfg, logger)
// Add a pool to the cache
poolAddress := common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640")
originalLiquidity := uint256.NewInt(1000000000000000000)
originalSqrtPrice := uint256.NewInt(2505414483750470000)
originalTick := 200000
pool := &PoolData{
Address: poolAddress,
Liquidity: originalLiquidity,
SqrtPriceX96: originalSqrtPrice,
Tick: originalTick,
LastUpdated: time.Now().Add(-time.Hour), // Set to past time
}
manager.pools[poolAddress.Hex()] = pool
// Update the pool
newLiquidity := uint256.NewInt(2000000000000000000)
newSqrtPrice := uint256.NewInt(3000000000000000000)
newTick := 250000
manager.UpdatePool(poolAddress, newLiquidity, newSqrtPrice, newTick)
// Verify the pool was updated
updatedPool := manager.pools[poolAddress.Hex()]
assert.Equal(t, newLiquidity, updatedPool.Liquidity)
assert.Equal(t, newSqrtPrice, updatedPool.SqrtPriceX96)
assert.Equal(t, newTick, updatedPool.Tick)
// Check that the last updated time is more recent (allowing for small time differences)
assert.True(t, updatedPool.LastUpdated.Unix() >= pool.LastUpdated.Unix())
}
func TestUpdatePoolNew(t *testing.T) {
// Create market manager
cfg := &config.UniswapConfig{
Cache: config.CacheConfig{
Expiration: 300,
MaxSize: 10000,
},
}
logger := logger.New("info", "text", "")
manager := NewMarketManager(cfg, logger)
// Update a pool that doesn't exist yet
poolAddress := common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640")
liquidity := uint256.NewInt(1000000000000000000)
sqrtPrice := uint256.NewInt(2505414483750470000)
tick := 200000
manager.UpdatePool(poolAddress, liquidity, sqrtPrice, tick)
// Verify the pool was created
createdPool := manager.pools[poolAddress.Hex()]
assert.NotNil(t, createdPool)
assert.Equal(t, poolAddress, createdPool.Address)
assert.Equal(t, liquidity, createdPool.Liquidity)
assert.Equal(t, sqrtPrice, createdPool.SqrtPriceX96)
assert.Equal(t, tick, createdPool.Tick)
}
func TestGetCacheStats(t *testing.T) {
// Create market manager
cfg := &config.UniswapConfig{
Cache: config.CacheConfig{
Expiration: 300,
MaxSize: 10000,
},
}
logger := logger.New("info", "text", "")
manager := NewMarketManager(cfg, logger)
// Add some pools to the cache
pool1 := &PoolData{
Address: common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640"),
}
manager.pools[pool1.Address.Hex()] = pool1
pool2 := &PoolData{
Address: common.HexToAddress("0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc"),
}
manager.pools[pool2.Address.Hex()] = pool2
// Get cache stats
currentSize, maxSize := manager.GetCacheStats()
// Verify results
assert.Equal(t, 2, currentSize)
assert.Equal(t, 10000, maxSize)
}

View File

@@ -0,0 +1,632 @@
package market
import (
"context"
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/holiman/uint256"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/database"
"github.com/fraktal/mev-beta/pkg/marketdata"
)
// MarketBuilder constructs comprehensive market structures from cached data
type MarketBuilder struct {
logger *logger.Logger
database *database.Database
client *ethclient.Client
dataLogger *marketdata.MarketDataLogger
// Built markets
markets map[string]*Market // key: "tokenA_tokenB"
marketsMutex sync.RWMutex
// Build configuration
buildConfig *BuildConfig
initialized bool
initMutex sync.Mutex
}
// Market represents a comprehensive trading market for a token pair
type Market struct {
TokenA common.Address `json:"tokenA"`
TokenB common.Address `json:"tokenB"`
Pools []*MarketPool `json:"pools"`
TotalLiquidity *big.Int `json:"totalLiquidity"`
BestPool *MarketPool `json:"bestPool"` // Pool with highest liquidity
// Market statistics
PoolCount int `json:"poolCount"`
Volume24h *big.Int `json:"volume24h"`
SwapCount24h int64 `json:"swapCount24h"`
LastUpdated time.Time `json:"lastUpdated"`
FirstSeen time.Time `json:"firstSeen"`
// Price information
WeightedPrice *big.Float `json:"weightedPrice"` // Liquidity-weighted price
PriceSpread float64 `json:"priceSpread"` // Price spread across pools (%)
// DEX coverage
Protocols map[string]int `json:"protocols"` // Protocol -> pool count
Factories []common.Address `json:"factories"` // All factories for this pair
}
// MarketPool represents a pool within a market
type MarketPool struct {
Address common.Address `json:"address"`
Factory common.Address `json:"factory"`
Protocol string `json:"protocol"`
Fee uint32 `json:"fee"`
// Current state
Liquidity *uint256.Int `json:"liquidity"`
SqrtPriceX96 *uint256.Int `json:"sqrtPriceX96"`
Tick int32 `json:"tick"`
Price *big.Float `json:"price"` // Calculated price
// Market share in this token pair
LiquidityShare float64 `json:"liquidityShare"` // % of total liquidity
VolumeShare24h float64 `json:"volumeShare24h"` // % of 24h volume
// Activity metrics
SwapCount int64 `json:"swapCount"`
Volume24h *big.Int `json:"volume24h"`
LastSwapTime time.Time `json:"lastSwapTime"`
AvgSwapSize *big.Int `json:"avgSwapSize"`
// Quality metrics
PriceDeviation float64 `json:"priceDeviation"` // Deviation from weighted avg (%)
Efficiency float64 `json:"efficiency"` // Volume/Liquidity ratio
Reliability float64 `json:"reliability"` // Uptime/activity score
}
// BuildConfig configures market building parameters
type BuildConfig struct {
// Pool filtering
MinLiquidity *big.Int `json:"minLiquidity"`
MinVolume24h *big.Int `json:"minVolume24h"`
MaxPriceDeviation float64 `json:"maxPriceDeviation"` // Max price deviation to include (%)
// Token filtering
RequiredTokens []common.Address `json:"requiredTokens"` // Must include these tokens
ExcludedTokens []common.Address `json:"excludedTokens"` // Exclude these tokens
OnlyVerifiedTokens bool `json:"onlyVerifiedTokens"`
// Market requirements
MinPoolsPerMarket int `json:"minPoolsPerMarket"`
RequireMultiDEX bool `json:"requireMultiDEX"` // Require pools from multiple DEXs
// Update behavior
RebuildInterval time.Duration `json:"rebuildInterval"`
AutoUpdate bool `json:"autoUpdate"`
// Performance
MaxMarketsToCache int `json:"maxMarketsToCache"`
ParallelBuildJobs int `json:"parallelBuildJobs"`
}
// NewMarketBuilder creates a new market builder
func NewMarketBuilder(logger *logger.Logger, database *database.Database, client *ethclient.Client, dataLogger *marketdata.MarketDataLogger) *MarketBuilder {
return &MarketBuilder{
logger: logger,
database: database,
client: client,
dataLogger: dataLogger,
markets: make(map[string]*Market),
buildConfig: &BuildConfig{
MinLiquidity: big.NewInt(1000000000000000000), // 1 ETH minimum
MinVolume24h: big.NewInt(100000000000000000), // 0.1 ETH minimum
MaxPriceDeviation: 5.0, // 5% max deviation
MinPoolsPerMarket: 2, // At least 2 pools
RequireMultiDEX: false, // Don't require multi-DEX
RebuildInterval: 30 * time.Minute, // Rebuild every 30 minutes
AutoUpdate: true,
MaxMarketsToCache: 1000, // Cache up to 1000 markets
ParallelBuildJobs: 4, // 4 parallel build jobs
},
}
}
// Initialize sets up the market builder
func (mb *MarketBuilder) Initialize(ctx context.Context) error {
mb.initMutex.Lock()
defer mb.initMutex.Unlock()
if mb.initialized {
return nil
}
// Validate configuration
if err := mb.validateConfig(); err != nil {
return fmt.Errorf("invalid build configuration: %w", err)
}
// Build initial markets from cached data
if err := mb.buildInitialMarkets(ctx); err != nil {
return fmt.Errorf("failed to build initial markets: %w", err)
}
// Start automatic rebuilding if enabled
if mb.buildConfig.AutoUpdate {
go mb.autoRebuildLoop()
}
mb.initialized = true
mb.logger.Info(fmt.Sprintf("Market builder initialized with %d markets", len(mb.markets)))
return nil
}
// buildInitialMarkets builds markets from existing cached data
func (mb *MarketBuilder) buildInitialMarkets(ctx context.Context) error {
if mb.dataLogger == nil {
return fmt.Errorf("data logger not available")
}
// Get all token pairs that have pools
tokenPairs := mb.extractTokenPairs()
if len(tokenPairs) == 0 {
mb.logger.Warn("No token pairs found in cached data")
return nil
}
mb.logger.Info(fmt.Sprintf("Building markets for %d token pairs", len(tokenPairs)))
// Build markets in parallel
semaphore := make(chan struct{}, mb.buildConfig.ParallelBuildJobs)
var wg sync.WaitGroup
for _, pair := range tokenPairs {
wg.Add(1)
go func(tokenPair TokenPair) {
defer wg.Done()
semaphore <- struct{}{} // Acquire
defer func() { <-semaphore }() // Release
if market, err := mb.buildMarketForPair(ctx, tokenPair.TokenA, tokenPair.TokenB); err != nil {
mb.logger.Debug(fmt.Sprintf("Failed to build market for %s/%s: %v",
tokenPair.TokenA.Hex(), tokenPair.TokenB.Hex(), err))
} else if market != nil {
mb.addMarket(market)
}
}(pair)
}
wg.Wait()
mb.logger.Info(fmt.Sprintf("Built %d markets from cached data", len(mb.markets)))
return nil
}
// TokenPair represents a token pair
type TokenPair struct {
TokenA common.Address
TokenB common.Address
}
// extractTokenPairs extracts unique token pairs from cached pools
func (mb *MarketBuilder) extractTokenPairs() []TokenPair {
tokenPairs := make(map[string]TokenPair)
// Extract from data logger cache (implementation would iterate through cached pools)
// For now, return some common pairs
commonPairs := []TokenPair{
{
TokenA: common.HexToAddress("0x82af49447d8a07e3bd95bd0d56f35241523fbab1"), // WETH
TokenB: common.HexToAddress("0xaf88d065e77c8cc2239327c5edb3a432268e5831"), // USDC
},
{
TokenA: common.HexToAddress("0x82af49447d8a07e3bd95bd0d56f35241523fbab1"), // WETH
TokenB: common.HexToAddress("0x912ce59144191c1204e64559fe8253a0e49e6548"), // ARB
},
{
TokenA: common.HexToAddress("0xaf88d065e77c8cc2239327c5edb3a432268e5831"), // USDC
TokenB: common.HexToAddress("0xfd086bc7cd5c481dcc9c85ebe478a1c0b69fcbb9"), // USDT
},
}
for _, pair := range commonPairs {
key := mb.makeTokenPairKey(pair.TokenA, pair.TokenB)
tokenPairs[key] = pair
}
result := make([]TokenPair, 0, len(tokenPairs))
for _, pair := range tokenPairs {
result = append(result, pair)
}
return result
}
// buildMarketForPair builds a comprehensive market for a token pair
func (mb *MarketBuilder) buildMarketForPair(ctx context.Context, tokenA, tokenB common.Address) (*Market, error) {
// Get pools for this token pair
pools := mb.dataLogger.GetPoolsForTokenPair(tokenA, tokenB)
if len(pools) < mb.buildConfig.MinPoolsPerMarket {
return nil, fmt.Errorf("insufficient pools (%d < %d required)", len(pools), mb.buildConfig.MinPoolsPerMarket)
}
// Filter and convert pools
marketPools := make([]*MarketPool, 0, len(pools))
totalLiquidity := big.NewInt(0)
totalVolume := big.NewInt(0)
protocols := make(map[string]int)
factories := make(map[common.Address]bool)
for _, pool := range pools {
// Apply filters
if !mb.passesFilters(pool) {
continue
}
marketPool := &MarketPool{
Address: pool.Address,
Factory: pool.Factory,
Protocol: pool.Protocol,
Fee: pool.Fee,
Liquidity: pool.Liquidity,
SqrtPriceX96: pool.SqrtPriceX96,
Tick: pool.Tick,
SwapCount: pool.SwapCount,
Volume24h: pool.Volume24h,
LastSwapTime: pool.LastSwapTime,
}
// Calculate price from sqrtPriceX96
if pool.SqrtPriceX96 != nil && pool.SqrtPriceX96.Sign() > 0 {
marketPool.Price = mb.calculatePriceFromSqrt(pool.SqrtPriceX96)
}
marketPools = append(marketPools, marketPool)
// Update totals
if pool.Liquidity != nil {
totalLiquidity.Add(totalLiquidity, pool.Liquidity.ToBig())
}
if pool.Volume24h != nil {
totalVolume.Add(totalVolume, pool.Volume24h)
}
// Track protocols and factories
protocols[pool.Protocol]++
factories[pool.Factory] = true
}
if len(marketPools) < mb.buildConfig.MinPoolsPerMarket {
return nil, fmt.Errorf("insufficient qualifying pools after filtering")
}
// Check multi-DEX requirement
if mb.buildConfig.RequireMultiDEX && len(protocols) < 2 {
return nil, fmt.Errorf("requires multiple DEXs but only found %d", len(protocols))
}
// Calculate market metrics
weightedPrice := mb.calculateWeightedPrice(marketPools)
priceSpread := mb.calculatePriceSpread(marketPools, weightedPrice)
bestPool := mb.findBestPool(marketPools)
// Update pool market shares and metrics
mb.updatePoolMetrics(marketPools, totalLiquidity, totalVolume, weightedPrice)
// Create factory slice
factorySlice := make([]common.Address, 0, len(factories))
for factory := range factories {
factorySlice = append(factorySlice, factory)
}
market := &Market{
TokenA: tokenA,
TokenB: tokenB,
Pools: marketPools,
TotalLiquidity: totalLiquidity,
BestPool: bestPool,
PoolCount: len(marketPools),
Volume24h: totalVolume,
WeightedPrice: weightedPrice,
PriceSpread: priceSpread,
Protocols: protocols,
Factories: factorySlice,
LastUpdated: time.Now(),
FirstSeen: time.Now(), // Would be minimum of all pool first seen times
}
return market, nil
}
// passesFilters checks if a pool passes the configured filters
func (mb *MarketBuilder) passesFilters(pool *marketdata.PoolInfo) bool {
// Check minimum liquidity
if pool.Liquidity != nil && mb.buildConfig.MinLiquidity != nil {
if pool.Liquidity.ToBig().Cmp(mb.buildConfig.MinLiquidity) < 0 {
return false
}
}
// Check minimum volume
if pool.Volume24h != nil && mb.buildConfig.MinVolume24h != nil {
if pool.Volume24h.Cmp(mb.buildConfig.MinVolume24h) < 0 {
return false
}
}
return true
}
// calculatePriceFromSqrt calculates price from sqrtPriceX96
func (mb *MarketBuilder) calculatePriceFromSqrt(sqrtPriceX96 *uint256.Int) *big.Float {
// Convert sqrtPriceX96 to price
// price = (sqrtPriceX96 / 2^96)^2
sqrtPrice := new(big.Float).SetInt(sqrtPriceX96.ToBig())
q96 := new(big.Float).SetInt(new(big.Int).Lsh(big.NewInt(1), 96))
normalizedSqrt := new(big.Float).Quo(sqrtPrice, q96)
price := new(big.Float).Mul(normalizedSqrt, normalizedSqrt)
return price
}
// calculateWeightedPrice calculates liquidity-weighted average price
func (mb *MarketBuilder) calculateWeightedPrice(pools []*MarketPool) *big.Float {
if len(pools) == 0 {
return big.NewFloat(0)
}
weightedSum := big.NewFloat(0)
totalWeight := big.NewFloat(0)
for _, pool := range pools {
if pool.Price != nil && pool.Liquidity != nil {
weight := new(big.Float).SetInt(pool.Liquidity.ToBig())
weightedPrice := new(big.Float).Mul(pool.Price, weight)
weightedSum.Add(weightedSum, weightedPrice)
totalWeight.Add(totalWeight, weight)
}
}
if totalWeight.Sign() == 0 {
return big.NewFloat(0)
}
return new(big.Float).Quo(weightedSum, totalWeight)
}
// calculatePriceSpread calculates price spread across pools
func (mb *MarketBuilder) calculatePriceSpread(pools []*MarketPool, weightedPrice *big.Float) float64 {
if len(pools) == 0 || weightedPrice.Sign() == 0 {
return 0
}
maxDeviation := 0.0
for _, pool := range pools {
if pool.Price != nil {
deviation := new(big.Float).Sub(pool.Price, weightedPrice)
deviation.Abs(deviation)
deviationRatio := new(big.Float).Quo(deviation, weightedPrice)
if ratio, _ := deviationRatio.Float64(); ratio > maxDeviation {
maxDeviation = ratio
}
}
}
return maxDeviation * 100 // Convert to percentage
}
// findBestPool finds the pool with highest liquidity
func (mb *MarketBuilder) findBestPool(pools []*MarketPool) *MarketPool {
var best *MarketPool
var maxLiquidity *big.Int
for _, pool := range pools {
if pool.Liquidity != nil {
liquidity := pool.Liquidity.ToBig()
if maxLiquidity == nil || liquidity.Cmp(maxLiquidity) > 0 {
maxLiquidity = liquidity
best = pool
}
}
}
return best
}
// updatePoolMetrics calculates market share and other metrics for pools
func (mb *MarketBuilder) updatePoolMetrics(pools []*MarketPool, totalLiquidity, totalVolume *big.Int, weightedPrice *big.Float) {
for _, pool := range pools {
// Calculate liquidity share
if pool.Liquidity != nil && totalLiquidity.Sign() > 0 {
liquidityFloat := new(big.Float).SetInt(pool.Liquidity.ToBig())
totalLiquidityFloat := new(big.Float).SetInt(totalLiquidity)
shareRatio := new(big.Float).Quo(liquidityFloat, totalLiquidityFloat)
pool.LiquidityShare, _ = shareRatio.Float64()
}
// Calculate volume share
if pool.Volume24h != nil && totalVolume.Sign() > 0 {
volumeFloat := new(big.Float).SetInt(pool.Volume24h)
totalVolumeFloat := new(big.Float).SetInt(totalVolume)
shareRatio := new(big.Float).Quo(volumeFloat, totalVolumeFloat)
pool.VolumeShare24h, _ = shareRatio.Float64()
}
// Calculate price deviation
if pool.Price != nil && weightedPrice.Sign() > 0 {
deviation := new(big.Float).Sub(pool.Price, weightedPrice)
deviation.Abs(deviation)
deviationRatio := new(big.Float).Quo(deviation, weightedPrice)
pool.PriceDeviation, _ = deviationRatio.Float64()
pool.PriceDeviation *= 100 // Convert to percentage
}
// Calculate efficiency (volume/liquidity ratio)
if pool.Volume24h != nil && pool.Liquidity != nil && pool.Liquidity.Sign() > 0 {
volumeFloat := new(big.Float).SetInt(pool.Volume24h)
liquidityFloat := new(big.Float).SetInt(pool.Liquidity.ToBig())
efficiency := new(big.Float).Quo(volumeFloat, liquidityFloat)
pool.Efficiency, _ = efficiency.Float64()
}
// Calculate average swap size
if pool.Volume24h != nil && pool.SwapCount > 0 {
avgSize := new(big.Int).Div(pool.Volume24h, big.NewInt(pool.SwapCount))
pool.AvgSwapSize = avgSize
}
// Calculate reliability (simplified - based on recent activity)
if time.Since(pool.LastSwapTime) < 24*time.Hour {
pool.Reliability = 1.0
} else if time.Since(pool.LastSwapTime) < 7*24*time.Hour {
pool.Reliability = 0.5
} else {
pool.Reliability = 0.1
}
}
}
// addMarket adds a market to the cache
func (mb *MarketBuilder) addMarket(market *Market) {
mb.marketsMutex.Lock()
defer mb.marketsMutex.Unlock()
key := mb.makeTokenPairKey(market.TokenA, market.TokenB)
mb.markets[key] = market
mb.logger.Debug(fmt.Sprintf("Added market %s with %d pools (total liquidity: %s)",
key, market.PoolCount, market.TotalLiquidity.String()))
}
// makeTokenPairKey creates a consistent key for token pairs
func (mb *MarketBuilder) makeTokenPairKey(tokenA, tokenB common.Address) string {
// Ensure consistent ordering (smaller address first)
if tokenA.Big().Cmp(tokenB.Big()) > 0 {
tokenA, tokenB = tokenB, tokenA
}
return fmt.Sprintf("%s_%s", tokenA.Hex(), tokenB.Hex())
}
// validateConfig validates the build configuration
func (mb *MarketBuilder) validateConfig() error {
if mb.buildConfig.MinPoolsPerMarket < 1 {
return fmt.Errorf("minPoolsPerMarket must be at least 1")
}
if mb.buildConfig.ParallelBuildJobs < 1 {
return fmt.Errorf("parallelBuildJobs must be at least 1")
}
if mb.buildConfig.MaxMarketsToCache < 1 {
return fmt.Errorf("maxMarketsToCache must be at least 1")
}
return nil
}
// autoRebuildLoop automatically rebuilds markets at intervals
func (mb *MarketBuilder) autoRebuildLoop() {
ticker := time.NewTicker(mb.buildConfig.RebuildInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
if err := mb.RebuildMarkets(ctx); err != nil {
mb.logger.Warn(fmt.Sprintf("Failed to rebuild markets: %v", err))
}
cancel()
}
}
}
// GetMarket returns a market for a token pair
func (mb *MarketBuilder) GetMarket(tokenA, tokenB common.Address) (*Market, bool) {
mb.marketsMutex.RLock()
defer mb.marketsMutex.RUnlock()
key := mb.makeTokenPairKey(tokenA, tokenB)
market, exists := mb.markets[key]
return market, exists
}
// GetAllMarkets returns all cached markets
func (mb *MarketBuilder) GetAllMarkets() []*Market {
mb.marketsMutex.RLock()
defer mb.marketsMutex.RUnlock()
markets := make([]*Market, 0, len(mb.markets))
for _, market := range mb.markets {
markets = append(markets, market)
}
return markets
}
// RebuildMarkets rebuilds all markets from current cached data
func (mb *MarketBuilder) RebuildMarkets(ctx context.Context) error {
mb.logger.Info("Rebuilding markets from cached data...")
// Clear existing markets
mb.marketsMutex.Lock()
oldCount := len(mb.markets)
mb.markets = make(map[string]*Market)
mb.marketsMutex.Unlock()
// Rebuild
if err := mb.buildInitialMarkets(ctx); err != nil {
return fmt.Errorf("failed to rebuild markets: %w", err)
}
newCount := len(mb.markets)
mb.logger.Info(fmt.Sprintf("Rebuilt markets: %d -> %d", oldCount, newCount))
return nil
}
// GetStatistics returns comprehensive market builder statistics
func (mb *MarketBuilder) GetStatistics() map[string]interface{} {
mb.marketsMutex.RLock()
defer mb.marketsMutex.RUnlock()
totalPools := 0
totalLiquidity := big.NewInt(0)
protocolCounts := make(map[string]int)
for _, market := range mb.markets {
totalPools += market.PoolCount
totalLiquidity.Add(totalLiquidity, market.TotalLiquidity)
for protocol, count := range market.Protocols {
protocolCounts[protocol] += count
}
}
return map[string]interface{}{
"totalMarkets": len(mb.markets),
"totalPools": totalPools,
"totalLiquidity": totalLiquidity.String(),
"protocolCounts": protocolCounts,
"initialized": mb.initialized,
"autoUpdate": mb.buildConfig.AutoUpdate,
}
}
// Stop gracefully shuts down the market builder
func (mb *MarketBuilder) Stop() {
mb.initMutex.Lock()
defer mb.initMutex.Unlock()
if !mb.initialized {
return
}
mb.logger.Info("Market builder stopped")
mb.initialized = false
}

829
orig/pkg/market/pipeline.go Normal file
View File

@@ -0,0 +1,829 @@
package market
import (
"context"
"errors"
"fmt"
"math"
"math/big"
"strings"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/holiman/uint256"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/events"
"github.com/fraktal/mev-beta/pkg/scanner"
marketscanner "github.com/fraktal/mev-beta/pkg/scanner/market"
stypes "github.com/fraktal/mev-beta/pkg/types"
"github.com/fraktal/mev-beta/pkg/uniswap"
"github.com/fraktal/mev-beta/pkg/validation"
)
// Pipeline processes transactions through multiple stages
type Pipeline struct {
config *config.BotConfig
logger *logger.Logger
marketMgr *MarketManager
scanner *scanner.Scanner
stages []PipelineStage
bufferSize int
concurrency int
eventParser *events.EventParser
validator *validation.InputValidator
ethClient *ethclient.Client // Add Ethereum client for fetching receipts
}
// PipelineStage represents a stage in the processing pipeline
type PipelineStage func(context.Context, <-chan *events.Event, chan<- *events.Event) error
// NewPipeline creates a new transaction processing pipeline
func NewPipeline(
cfg *config.BotConfig,
logger *logger.Logger,
marketMgr *MarketManager,
scanner *scanner.Scanner,
ethClient *ethclient.Client, // Add Ethereum client parameter
) *Pipeline {
// Enhanced parser setup moved to monitor to avoid import cycle
// The monitor will be responsible for setting up enhanced parsing
pipeline := &Pipeline{
config: cfg,
logger: logger,
marketMgr: marketMgr,
scanner: scanner,
bufferSize: cfg.ChannelBufferSize,
concurrency: cfg.MaxWorkers,
eventParser: events.NewEventParser(),
validator: validation.NewInputValidator(nil, logger),
ethClient: ethClient, // Store the Ethereum client
}
// Add default stages
pipeline.AddStage(TransactionDecoderStage(cfg, logger, marketMgr, pipeline.validator, pipeline.ethClient))
return pipeline
}
// SetEnhancedEventParser allows injecting an enhanced event parser after creation
// This avoids import cycle issues while enabling enhanced parsing capabilities
func (p *Pipeline) SetEnhancedEventParser(parser *events.EventParser) {
if parser != nil {
p.eventParser = parser
p.logger.Info("✅ ENHANCED EVENT PARSER INJECTED INTO PIPELINE - Enhanced parsing now active")
} else {
p.logger.Warn("❌ ENHANCED PARSER INJECTION FAILED - Received nil parser")
}
}
// AddDefaultStages adds the default processing stages to the pipeline
func (p *Pipeline) AddDefaultStages() {
p.AddStage(TransactionDecoderStage(p.config, p.logger, p.marketMgr, p.validator, p.ethClient))
p.AddStage(MarketAnalysisStage(p.config, p.logger, p.marketMgr, p.validator))
p.AddStage(ArbitrageDetectionStage(p.config, p.logger, p.marketMgr, p.validator))
}
// AddStage adds a processing stage to the pipeline
func (p *Pipeline) AddStage(stage PipelineStage) {
p.stages = append(p.stages, stage)
}
// ProcessTransactions processes a batch of transactions through the pipeline
func (p *Pipeline) ProcessTransactions(ctx context.Context, transactions []*types.Transaction, blockNumber uint64, timestamp uint64) error {
if len(p.stages) == 0 {
return fmt.Errorf("no pipeline stages configured")
}
// Parse events from transaction receipts
eventChan := make(chan *events.Event, p.bufferSize)
// Parse transactions in a goroutine
go func() {
defer close(eventChan)
for _, tx := range transactions {
// Validate transaction input
validationResult, err := p.validator.ValidateTransaction(tx)
if err != nil || !validationResult.IsValid {
// Skip logging for known problematic transactions to reduce spam
txHash := tx.Hash().Hex()
if !p.isKnownProblematicTransaction(txHash) {
p.logger.Warn(fmt.Sprintf("Invalid transaction %s: %v", txHash, err))
}
continue
}
// Fetch transaction receipt
receipt, err := p.ethClient.TransactionReceipt(ctx, tx.Hash())
if err != nil {
p.logger.Error(fmt.Sprintf("Error fetching receipt for transaction %s: %v", tx.Hash().Hex(), err))
continue
}
// Parse events from receipt logs
events, err := p.eventParser.ParseTransactionReceipt(receipt, blockNumber, timestamp)
if err != nil {
p.logger.Error(fmt.Sprintf("Error parsing receipt for transaction %s: %v", tx.Hash().Hex(), err))
continue
}
for _, event := range events {
// Validate the parsed event
if err := p.validator.ValidateEvent(event); err != nil {
p.logger.Warn(fmt.Sprintf("Invalid event from transaction %s: %v", tx.Hash().Hex(), err))
continue
}
select {
case eventChan <- event:
case <-ctx.Done():
return
}
}
}
}()
// Process through each stage
var currentChan <-chan *events.Event = eventChan
for i, stage := range p.stages {
// Create output channel for this stage
outputChan := make(chan *events.Event, p.bufferSize)
go func(stage PipelineStage, input <-chan *events.Event, output chan<- *events.Event, stageIndex int) {
err := stage(ctx, input, output)
if err != nil {
p.logger.Error(fmt.Sprintf("Pipeline stage %d error: %v", stageIndex, err))
}
}(stage, currentChan, outputChan, i)
currentChan = outputChan
}
// Process the final output
if currentChan != nil {
go func() {
defer func() {
if r := recover(); r != nil {
p.logger.Error(fmt.Sprintf("Final output processor panic recovered: %v", r))
}
}()
p.processSwapDetails(ctx, currentChan)
}()
}
return nil
}
// processSwapDetails processes the final output of the pipeline
func (p *Pipeline) processSwapDetails(ctx context.Context, eventDetails <-chan *events.Event) {
for {
select {
case event, ok := <-eventDetails:
if !ok {
return // Channel closed
}
// Submit to the scanner for processing
p.scanner.SubmitEvent(*event)
case <-ctx.Done():
return
}
}
}
// TransactionDecoderStage decodes transactions to identify swap opportunities
func TransactionDecoderStage(
cfg *config.BotConfig,
logger *logger.Logger,
marketMgr *MarketManager,
validator *validation.InputValidator,
ethClient *ethclient.Client, // Add Ethereum client parameter
) PipelineStage {
return func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error {
var wg sync.WaitGroup
// Process events concurrently
for i := 0; i < cfg.MaxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event, ok := <-input:
if !ok {
return // Channel closed
}
// Process the event (in this case, it's already decoded)
// In a real implementation, you might do additional processing here
if event != nil {
// Additional validation at the stage level
if err := validator.ValidateEvent(event); err != nil {
logger.Warn(fmt.Sprintf("Event validation failed in decoder stage: %v", err))
continue
}
select {
case output <- event:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
}
// Wait for all workers to finish, then close the output channel
go func() {
wg.Wait()
// Safely close the output channel
defer func() {
if r := recover(); r != nil {
logger.Debug("Channel already closed in TransactionDecoderStage")
}
}()
select {
case <-ctx.Done():
// Context cancelled, don't close channel as it might be used elsewhere
default:
close(output)
}
}()
return nil
}
}
// MarketAnalysisStage performs market analysis on event details
func MarketAnalysisStage(
cfg *config.BotConfig,
logger *logger.Logger,
marketMgr *MarketManager,
validator *validation.InputValidator,
) PipelineStage {
return func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error {
var wg sync.WaitGroup
// Process events concurrently
for i := 0; i < cfg.MaxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event, ok := <-input:
if !ok {
return // Channel closed
}
// Validate event before processing
if err := validator.ValidateEvent(event); err != nil {
logger.Warn(fmt.Sprintf("Event validation failed in analysis stage: %v", err))
continue
}
// Only process swap events
if event.Type != events.Swap {
// Forward non-swap events without processing
select {
case output <- event:
case <-ctx.Done():
return
}
continue
}
// Get pool data from market manager
poolData, err := marketMgr.GetPool(ctx, event.PoolAddress)
if err != nil {
if errors.Is(err, marketscanner.ErrInvalidPoolCandidate) {
logger.Debug("Skipping pool data fetch due to invalid candidate",
"pool", event.PoolAddress,
"error", err)
} else {
// DEBUG logging only - pool validation expected to fail for known-bad pools
// This is normal behavior, not an error condition
errorMsg := fmt.Sprintf("Pool fetch skipped for %s: %v", event.PoolAddress, err)
contextMsg := fmt.Sprintf("pipeline_stage:market_processing event_type:%s protocol:%s",
event.Type.String(), event.Protocol)
logger.Debug(fmt.Sprintf("%s [context: %s]", errorMsg, contextMsg))
}
// Forward the event even if we can't get pool data
select {
case output <- event:
case <-ctx.Done():
return
}
continue
}
// Calculate price impact using Uniswap V3 math
priceImpact, err := calculatePriceImpact(event, poolData)
if err != nil {
logger.Error(fmt.Sprintf("Error calculating price impact for pool %s: %v", event.PoolAddress, err))
// Forward the event even if we can't calculate price impact
select {
case output <- event:
case <-ctx.Done():
return
}
continue
}
// Add price impact to the event
// Note: In a real implementation, you might want to create a new struct
// that extends EventDetails with additional fields
logger.Debug(fmt.Sprintf("Price impact for pool %s: %f", event.PoolAddress, priceImpact))
// Forward the processed event
select {
case output <- event:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
}
// Wait for all workers to finish, then close the output channel
go func() {
wg.Wait()
// Safely close the output channel
defer func() {
if r := recover(); r != nil {
logger.Debug("Channel already closed in MarketAnalysisStage")
}
}()
select {
case <-ctx.Done():
// Context cancelled, don't close channel as it might be used elsewhere
default:
close(output)
}
}()
return nil
}
}
// calculatePriceImpact calculates the price impact of a swap using Uniswap V3 math
func calculatePriceImpact(event *events.Event, poolData *PoolData) (float64, error) {
// Convert event amounts to uint256 for calculations
amount0In := uint256.NewInt(0)
amount0In.SetFromBig(event.Amount0)
amount1In := uint256.NewInt(0)
amount1In.SetFromBig(event.Amount1)
// Determine which token is being swapped in
var amountIn *uint256.Int
if amount0In.Cmp(uint256.NewInt(0)) > 0 {
amountIn = amount0In
} else {
amountIn = amount1In
}
// If no amount is being swapped in, return 0 impact
if amountIn.Cmp(uint256.NewInt(0)) == 0 {
return 0.0, nil
}
// Calculate price impact as a percentage of liquidity
// priceImpact = amountIn / liquidity
liquidity := poolData.Liquidity
// If liquidity is 0, we can't calculate impact
if liquidity.Cmp(uint256.NewInt(0)) == 0 {
return 0.0, nil
}
// Calculate impact
impact := new(uint256.Int).Div(amountIn, liquidity)
// Convert to float64 for percentage
impactFloat := new(big.Float).SetInt(impact.ToBig())
percentage, _ := impactFloat.Float64()
// Convert to percentage (multiply by 100)
return percentage * 100.0, nil
}
// ArbitrageDetectionStage detects arbitrage opportunities
func ArbitrageDetectionStage(
cfg *config.BotConfig,
logger *logger.Logger,
marketMgr *MarketManager,
validator *validation.InputValidator,
) PipelineStage {
return func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error {
var wg sync.WaitGroup
// Process events concurrently
for i := 0; i < cfg.MaxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event, ok := <-input:
if !ok {
return // Channel closed
}
// Validate event before processing
if err := validator.ValidateEvent(event); err != nil {
logger.Warn(fmt.Sprintf("Event validation failed in arbitrage detection stage: %v", err))
continue
}
// Only process swap events
if event.Type != events.Swap {
// Forward non-swap events without processing
select {
case output <- event:
case <-ctx.Done():
return
}
continue
}
// Look for arbitrage opportunities
opportunities, err := findArbitrageOpportunities(ctx, event, marketMgr, logger)
if err != nil {
logger.Error(fmt.Sprintf("Error finding arbitrage opportunities for pool %s: %v", event.PoolAddress, err))
// Forward the event even if we encounter an error
select {
case output <- event:
case <-ctx.Done():
return
}
continue
}
// Log any found opportunities
if len(opportunities) > 0 {
logger.Info(fmt.Sprintf("Found %d arbitrage opportunities for pool %s", len(opportunities), event.PoolAddress))
for _, opp := range opportunities {
logger.Info(fmt.Sprintf("Arbitrage opportunity: %+v", opp))
}
}
// Forward the processed event
select {
case output <- event:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
}
// Wait for all workers to finish, then close the output channel
go func() {
wg.Wait()
// Safely close the output channel
defer func() {
if r := recover(); r != nil {
logger.Debug("Channel already closed in ArbitrageDetectionStage")
}
}()
select {
case <-ctx.Done():
// Context cancelled, don't close channel as it might be used elsewhere
default:
close(output)
}
}()
return nil
}
}
// findArbitrageOpportunities looks for arbitrage opportunities based on a swap event
func findArbitrageOpportunities(ctx context.Context, event *events.Event, marketMgr *MarketManager, logger *logger.Logger) ([]stypes.ArbitrageOpportunity, error) {
opportunities := make([]stypes.ArbitrageOpportunity, 0)
// Get all pools for the same token pair
pools := marketMgr.GetPoolsByTokens(event.Token0, event.Token1)
// If we don't have multiple pools, we can't do arbitrage
if len(pools) < 2 {
return opportunities, nil
}
// Get the pool that triggered the event
// Find the pool that triggered the event
var eventPool *PoolData
for _, pool := range pools {
if pool.Address == event.PoolAddress {
eventPool = pool
break
}
}
// If we can't find the event pool, return
if eventPool == nil {
return opportunities, nil
}
// Convert sqrtPriceX96 to price for the event pool
eventPoolPrice := uniswap.SqrtPriceX96ToPrice(eventPool.SqrtPriceX96.ToBig())
// Compare with other pools
for _, pool := range pools {
// Skip the event pool
if pool.Address == event.PoolAddress {
continue
}
// Convert sqrtPriceX96 to price for comparison pool
compPoolPrice := uniswap.SqrtPriceX96ToPrice(pool.SqrtPriceX96.ToBig())
// Calculate potential profit using sophisticated arbitrage mathematics
// This involves complex calculations considering:
// 1. Price impact on both pools
// 2. Gas costs and fees
// 3. Optimal trade size
// 4. Slippage and MEV competition
profit := calculateSophisticatedArbitrageProfit(eventPoolPrice, compPoolPrice, *event, pool, logger)
// If there's a price difference, we might have an opportunity
if profit.Cmp(big.NewFloat(0)) > 0 {
// Calculate realistic profit based on price difference and liquidity
priceDiffFloat, _ := profit.Float64()
estimatedAmount := big.NewInt(1000000) // 1 USDC equivalent test amount
// Estimate actual profit based on AMM formulas
profitBigInt := big.NewInt(int64(priceDiffFloat * 1000000)) // Convert to wei-like precision
// Estimate gas costs for arbitrage transaction (typical multi-hop swap)
gasPrice := big.NewInt(1000000000) // 1 gwei
gasUnits := big.NewInt(300000) // ~300k gas for complex arbitrage
gasEstimate := new(big.Int).Mul(gasPrice, gasUnits)
// Calculate net profit after gas
netProfit := new(big.Int).Sub(profitBigInt, gasEstimate)
// Only include if profitable after gas costs
if netProfit.Sign() > 0 {
// Calculate ROI
roi := 0.0
if estimatedAmount.Sign() > 0 {
roiFloat := new(big.Float).Quo(new(big.Float).SetInt(netProfit), new(big.Float).SetInt(estimatedAmount))
roi, _ = roiFloat.Float64()
roi *= 100 // Convert to percentage
}
opp := stypes.ArbitrageOpportunity{
Path: []string{event.Token0.Hex(), event.Token1.Hex()},
Pools: []string{event.PoolAddress.Hex(), pool.Address.Hex()},
Profit: netProfit,
GasEstimate: gasEstimate,
ROI: roi,
Protocol: event.Protocol,
}
opportunities = append(opportunities, opp)
}
}
}
return opportunities, nil
}
// isKnownProblematicTransaction checks if a transaction hash is known to be problematic
func (p *Pipeline) isKnownProblematicTransaction(txHash string) bool {
// List of known problematic transaction hashes that should be skipped
problematicTxs := map[string]bool{
"0xe79e4719c6770b41405f691c18be3346b691e220d730d6b61abb5dd3ac9d71f0": true,
// Add other problematic transaction hashes here
}
return problematicTxs[txHash]
}
// calculateSophisticatedArbitrageProfit calculates profit using advanced arbitrage mathematics
func calculateSophisticatedArbitrageProfit(
eventPoolPrice *big.Float,
compPoolPrice *big.Float,
event events.Event,
pool *PoolData,
logger *logger.Logger,
) *big.Float {
// Advanced arbitrage profit calculation considering:
// 1. Optimal trade size calculation
// 2. Price impact modeling for both pools
// 3. Gas costs and protocol fees
// 4. MEV competition adjustment
// 5. Slippage protection
// Calculate price difference as percentage
priceDiff := new(big.Float).Sub(compPoolPrice, eventPoolPrice)
if priceDiff.Sign() <= 0 {
return big.NewFloat(0) // No profit if prices are equal or inverted
}
// Calculate relative price difference
relativeDiff := new(big.Float).Quo(priceDiff, eventPoolPrice)
relativeDiffFloat, _ := relativeDiff.Float64()
// Sophisticated optimal trade size calculation using Uniswap V3 mathematics
optimalTradeSize := calculateOptimalTradeSize(event, pool, relativeDiffFloat)
// Calculate price impact on both pools
eventPoolImpact := calculateTradeImpact(optimalTradeSize, event.Liquidity.ToBig(), "source")
compPoolImpact := calculateTradeImpact(optimalTradeSize, pool.Liquidity.ToBig(), "destination")
// Total price impact (reduces profit)
totalImpact := eventPoolImpact + compPoolImpact
// Adjusted profit after price impact
adjustedRelativeDiff := relativeDiffFloat - totalImpact
if adjustedRelativeDiff <= 0 {
return big.NewFloat(0)
}
// Calculate gross profit in wei
optimalTradeSizeBig := big.NewInt(optimalTradeSize)
grossProfit := new(big.Float).Mul(
new(big.Float).SetInt(optimalTradeSizeBig),
big.NewFloat(adjustedRelativeDiff),
)
// Subtract sophisticated gas cost estimation
gasCost := calculateSophisticatedGasCost(event, pool)
gasCostFloat := new(big.Float).SetInt(gasCost)
// Subtract protocol fees (0.3% for Uniswap)
protocolFeeRate := 0.003
protocolFee := new(big.Float).Mul(
new(big.Float).SetInt(optimalTradeSizeBig),
big.NewFloat(protocolFeeRate),
)
// MEV competition adjustment (reduces profit by estimated competition)
mevCompetitionFactor := calculateMEVCompetitionFactor(adjustedRelativeDiff)
// Calculate net profit
netProfit := new(big.Float).Sub(grossProfit, gasCostFloat)
netProfit.Sub(netProfit, protocolFee)
netProfit.Mul(netProfit, big.NewFloat(1.0-mevCompetitionFactor))
// Apply minimum profit threshold (0.01 ETH)
minProfitThreshold := big.NewFloat(10000000000000000) // 0.01 ETH in wei
if netProfit.Cmp(minProfitThreshold) < 0 {
return big.NewFloat(0)
}
logger.Debug(fmt.Sprintf("Sophisticated arbitrage calculation: optimal_size=%d, price_impact=%.4f%%, gas=%s, mev_factor=%.2f, net_profit=%s",
optimalTradeSize, totalImpact*100, gasCost.String(), mevCompetitionFactor, netProfit.String()))
return netProfit
}
// calculateOptimalTradeSize calculates the optimal trade size for maximum profit
func calculateOptimalTradeSize(event events.Event, pool *PoolData, priceDiffPercent float64) int64 {
// Use Kelly criterion adapted for arbitrage
// Optimal size = (edge * liquidity) / price_impact_factor
// Base trade size on available liquidity and price difference
eventLiquidity := int64(1000000000000000000) // Default 1 ETH if unknown
if event.Liquidity != nil && event.Liquidity.Sign() > 0 {
eventLiquidity = event.Liquidity.ToBig().Int64()
}
poolLiquidity := int64(1000000000000000000) // Default 1 ETH if unknown
if pool.Liquidity != nil && pool.Liquidity.Sign() > 0 {
poolLiquidity = pool.Liquidity.ToBig().Int64()
}
// Use the smaller liquidity as constraint
minLiquidity := eventLiquidity
if poolLiquidity < minLiquidity {
minLiquidity = poolLiquidity
}
// Optimal size is typically 1-10% of available liquidity
// Adjusted based on price difference (higher diff = larger size)
sizeFactor := 0.02 + (priceDiffPercent * 5) // 2% base + up to 50% for large differences
if sizeFactor > 0.15 { // Cap at 15% of liquidity
sizeFactor = 0.15
}
optimalSize := int64(float64(minLiquidity) * sizeFactor)
// Minimum trade size (0.001 ETH)
minTradeSize := int64(1000000000000000)
if optimalSize < minTradeSize {
optimalSize = minTradeSize
}
// Maximum trade size (5 ETH to avoid overflow)
maxTradeSize := int64(5000000000000000000) // 5 ETH in wei
if optimalSize > maxTradeSize {
optimalSize = maxTradeSize
}
return optimalSize
}
// calculateTradeImpact calculates price impact for a given trade size
func calculateTradeImpact(tradeSize int64, liquidity *big.Int, poolType string) float64 {
if liquidity == nil || liquidity.Sign() == 0 {
return 0.05 // 5% default impact for unknown liquidity
}
// Calculate utilization ratio
utilizationRatio := float64(tradeSize) / float64(liquidity.Int64())
// Different impact models for different pool types
var impact float64
switch poolType {
case "source":
// Source pool (where we buy) - typically has higher impact
impact = utilizationRatio * (1 + utilizationRatio*2) // Quadratic model
case "destination":
// Destination pool (where we sell) - typically has lower impact
impact = utilizationRatio * (1 + utilizationRatio*1.5) // Less aggressive model
default:
// Default model
impact = utilizationRatio * (1 + utilizationRatio)
}
// Apply square root for very large trades (diminishing returns)
if utilizationRatio > 0.1 {
impact = math.Sqrt(impact)
}
// Cap impact at 50%
if impact > 0.5 {
impact = 0.5
}
return impact
}
// calculateSophisticatedGasCost estimates gas costs for arbitrage execution
func calculateSophisticatedGasCost(event events.Event, pool *PoolData) *big.Int {
// Base gas costs for different operations
baseGasSwap := int64(150000) // Base gas for a swap
baseGasTransfer := int64(21000) // Base gas for transfer
// Additional gas for complex operations
var totalGas int64 = baseGasSwap*2 + baseGasTransfer // Two swaps + transfer
// Add gas for protocol-specific operations
switch {
case strings.Contains(event.Protocol, "UniswapV3"):
totalGas += 50000 // V3 callback gas
case strings.Contains(event.Protocol, "UniswapV2"):
totalGas += 20000 // V2 additional gas
case strings.Contains(event.Protocol, "Curve"):
totalGas += 80000 // Curve math complexity
default:
totalGas += 30000 // Unknown protocol buffer
}
// Current gas price on Arbitrum (approximate)
gasPriceGwei := int64(1) // 1 gwei typical for Arbitrum
gasPriceWei := gasPriceGwei * 1000000000
// Calculate total cost
totalCost := totalGas * gasPriceWei
return big.NewInt(totalCost)
}
// calculateMEVCompetitionFactor estimates profit reduction due to MEV competition
func calculateMEVCompetitionFactor(profitMargin float64) float64 {
// Higher profit margins attract more competition
// This is based on empirical MEV research
if profitMargin < 0.001 { // < 0.1%
return 0.1 // Low competition
} else if profitMargin < 0.005 { // < 0.5%
return 0.2 // Moderate competition
} else if profitMargin < 0.01 { // < 1%
return 0.4 // High competition
} else if profitMargin < 0.02 { // < 2%
return 0.6 // Very high competition
} else {
return 0.8 // Extreme competition for large profits
}
}

View File

@@ -0,0 +1,206 @@
package market
import (
"context"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/events"
scannerpkg "github.com/fraktal/mev-beta/pkg/scanner"
)
// MockMarketManager is a mock implementation of MarketManager for testing
type MockMarketManager struct {
mock.Mock
}
func (m *MockMarketManager) GetPool(ctx context.Context, poolAddress common.Address) (*PoolData, error) {
args := m.Called(ctx, poolAddress)
return args.Get(0).(*PoolData), args.Error(1)
}
func (m *MockMarketManager) GetPoolsByTokens(token0, token1 common.Address) []*PoolData {
args := m.Called(token0, token1)
return args.Get(0).([]*PoolData)
}
// MockLogger is a mock implementation of logger.Logger for testing
type MockLogger struct {
mock.Mock
}
func (m *MockLogger) Debug(msg string) {
m.Called(msg)
}
func (m *MockLogger) Info(msg string) {
m.Called(msg)
}
func (m *MockLogger) Warn(msg string) {
m.Called(msg)
}
func (m *MockLogger) Error(msg string, err ...interface{}) {
m.Called(msg, err)
}
func TestNewPipeline(t *testing.T) {
// Create mock config
cfg := &config.BotConfig{
MaxWorkers: 5,
ChannelBufferSize: 10,
}
// Create mock logger
logger := logger.New("info", "text", "")
// Create mock market manager
marketMgr := &MarketManager{}
// Create mock scanner
scannerObj := &scannerpkg.Scanner{}
// Create pipeline
pipeline := NewPipeline(cfg, logger, marketMgr, scannerObj, nil)
// Verify pipeline was created correctly
assert.NotNil(t, pipeline)
assert.Equal(t, cfg, pipeline.config)
assert.Equal(t, logger, pipeline.logger)
assert.Equal(t, marketMgr, pipeline.marketMgr)
assert.Equal(t, scannerObj, pipeline.scanner)
assert.Equal(t, cfg.ChannelBufferSize, pipeline.bufferSize)
assert.Equal(t, cfg.MaxWorkers, pipeline.concurrency)
assert.NotNil(t, pipeline.eventParser)
assert.Len(t, pipeline.stages, 1) // Should have TransactionDecoderStage by default
}
func TestAddStage(t *testing.T) {
// Create pipeline
cfg := &config.BotConfig{
MaxWorkers: 5,
ChannelBufferSize: 10,
}
logger := logger.New("info", "text", "")
marketMgr := &MarketManager{}
scannerObj := &scannerpkg.Scanner{}
pipeline := NewPipeline(cfg, logger, marketMgr, scannerObj, nil)
// Add a new stage
newStage := func(ctx context.Context, input <-chan *events.Event, output chan<- *events.Event) error {
return nil
}
pipeline.AddStage(newStage)
// Verify stage was added
assert.Len(t, pipeline.stages, 2) // TransactionDecoderStage + newStage
}
func TestAddDefaultStages(t *testing.T) {
// Create pipeline
cfg := &config.BotConfig{
MaxWorkers: 5,
ChannelBufferSize: 10,
}
logger := logger.New("info", "text", "")
marketMgr := &MarketManager{}
scannerObj := &scannerpkg.Scanner{}
pipeline := NewPipeline(cfg, logger, marketMgr, scannerObj, nil)
// Add default stages
pipeline.AddDefaultStages()
// Verify stages were added (should be 4 total: TransactionDecoder, MarketAnalysis, ArbitrageDetection, plus the initial TransactionDecoder)
assert.Len(t, pipeline.stages, 4)
}
func TestTransactionDecoderStage(t *testing.T) {
// Create mock config
cfg := &config.BotConfig{
MaxWorkers: 1, // Use 1 worker for simplicity in test
ChannelBufferSize: 10,
}
// Create mock logger
log := logger.New("info", "text", "")
// Create mock market manager
marketMgr := &MarketManager{}
// Create the stage
stage := TransactionDecoderStage(cfg, log, marketMgr, nil, nil)
// Verify the stage function was created
assert.NotNil(t, stage)
}
func TestCalculatePriceImpact(t *testing.T) {
// Create test event
event := &events.Event{
Amount0: big.NewInt(1000000000), // 1000 tokens
Amount1: big.NewInt(0),
}
// Create test pool data
liquidity := uint256.NewInt(1000000000000000000) // 1 ETH in liquidity
poolData := &PoolData{
Liquidity: liquidity,
}
// Calculate price impact
impact, err := calculatePriceImpact(event, poolData)
// Verify results
assert.NoError(t, err)
assert.InDelta(t, 0.001, impact, 0.001) // 0.001% impact (1000/1000000000000000000 * 100)
}
func TestCalculatePriceImpactNoAmount(t *testing.T) {
// Create test event with no amount
event := &events.Event{
Amount0: big.NewInt(0),
Amount1: big.NewInt(0),
}
// Create test pool data
liquidity := uint256.NewInt(10000000000000000000) // 10 ETH in liquidity
poolData := &PoolData{
Liquidity: liquidity,
}
// Calculate price impact
impact, err := calculatePriceImpact(event, poolData)
// Verify results
assert.NoError(t, err)
assert.Equal(t, 0.0, impact)
}
func TestCalculatePriceImpactNoLiquidity(t *testing.T) {
// Create test event
event := &events.Event{
Amount0: big.NewInt(1000000000),
Amount1: big.NewInt(0),
}
// Create test pool data with zero liquidity
liquidity := uint256.NewInt(0)
poolData := &PoolData{
Liquidity: liquidity,
}
// Calculate price impact
impact, err := calculatePriceImpact(event, poolData)
// Verify results
assert.NoError(t, err)
assert.Equal(t, 0.0, impact)
}