Files
mev-beta/pkg/market/market_builder.go
Krypto Kajun fac8a64092 feat: Implement comprehensive Market Manager with database and logging
- Add complete Market Manager package with in-memory storage and CRUD operations
- Implement arbitrage detection with profit calculations and thresholds
- Add database adapter with PostgreSQL schema for persistence
- Create comprehensive logging system with specialized log files
- Add detailed documentation and implementation plans
- Include example application and comprehensive test suite
- Update Makefile with market manager build targets
- Add check-implementations command for verification
2025-09-18 03:52:33 -05:00

632 lines
19 KiB
Go

package market
import (
"context"
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/database"
"github.com/fraktal/mev-beta/pkg/marketdata"
"github.com/holiman/uint256"
)
// MarketBuilder constructs comprehensive market structures from cached data
type MarketBuilder struct {
logger *logger.Logger
database *database.Database
client *ethclient.Client
dataLogger *marketdata.MarketDataLogger
// Built markets
markets map[string]*Market // key: "tokenA_tokenB"
marketsMutex sync.RWMutex
// Build configuration
buildConfig *BuildConfig
initialized bool
initMutex sync.Mutex
}
// Market represents a comprehensive trading market for a token pair
type Market struct {
TokenA common.Address `json:"tokenA"`
TokenB common.Address `json:"tokenB"`
Pools []*MarketPool `json:"pools"`
TotalLiquidity *big.Int `json:"totalLiquidity"`
BestPool *MarketPool `json:"bestPool"` // Pool with highest liquidity
// Market statistics
PoolCount int `json:"poolCount"`
Volume24h *big.Int `json:"volume24h"`
SwapCount24h int64 `json:"swapCount24h"`
LastUpdated time.Time `json:"lastUpdated"`
FirstSeen time.Time `json:"firstSeen"`
// Price information
WeightedPrice *big.Float `json:"weightedPrice"` // Liquidity-weighted price
PriceSpread float64 `json:"priceSpread"` // Price spread across pools (%)
// DEX coverage
Protocols map[string]int `json:"protocols"` // Protocol -> pool count
Factories []common.Address `json:"factories"` // All factories for this pair
}
// MarketPool represents a pool within a market
type MarketPool struct {
Address common.Address `json:"address"`
Factory common.Address `json:"factory"`
Protocol string `json:"protocol"`
Fee uint32 `json:"fee"`
// Current state
Liquidity *uint256.Int `json:"liquidity"`
SqrtPriceX96 *uint256.Int `json:"sqrtPriceX96"`
Tick int32 `json:"tick"`
Price *big.Float `json:"price"` // Calculated price
// Market share in this token pair
LiquidityShare float64 `json:"liquidityShare"` // % of total liquidity
VolumeShare24h float64 `json:"volumeShare24h"` // % of 24h volume
// Activity metrics
SwapCount int64 `json:"swapCount"`
Volume24h *big.Int `json:"volume24h"`
LastSwapTime time.Time `json:"lastSwapTime"`
AvgSwapSize *big.Int `json:"avgSwapSize"`
// Quality metrics
PriceDeviation float64 `json:"priceDeviation"` // Deviation from weighted avg (%)
Efficiency float64 `json:"efficiency"` // Volume/Liquidity ratio
Reliability float64 `json:"reliability"` // Uptime/activity score
}
// BuildConfig configures market building parameters
type BuildConfig struct {
// Pool filtering
MinLiquidity *big.Int `json:"minLiquidity"`
MinVolume24h *big.Int `json:"minVolume24h"`
MaxPriceDeviation float64 `json:"maxPriceDeviation"` // Max price deviation to include (%)
// Token filtering
RequiredTokens []common.Address `json:"requiredTokens"` // Must include these tokens
ExcludedTokens []common.Address `json:"excludedTokens"` // Exclude these tokens
OnlyVerifiedTokens bool `json:"onlyVerifiedTokens"`
// Market requirements
MinPoolsPerMarket int `json:"minPoolsPerMarket"`
RequireMultiDEX bool `json:"requireMultiDEX"` // Require pools from multiple DEXs
// Update behavior
RebuildInterval time.Duration `json:"rebuildInterval"`
AutoUpdate bool `json:"autoUpdate"`
// Performance
MaxMarketsToCache int `json:"maxMarketsToCache"`
ParallelBuildJobs int `json:"parallelBuildJobs"`
}
// NewMarketBuilder creates a new market builder
func NewMarketBuilder(logger *logger.Logger, database *database.Database, client *ethclient.Client, dataLogger *marketdata.MarketDataLogger) *MarketBuilder {
return &MarketBuilder{
logger: logger,
database: database,
client: client,
dataLogger: dataLogger,
markets: make(map[string]*Market),
buildConfig: &BuildConfig{
MinLiquidity: big.NewInt(1000000000000000000), // 1 ETH minimum
MinVolume24h: big.NewInt(100000000000000000), // 0.1 ETH minimum
MaxPriceDeviation: 5.0, // 5% max deviation
MinPoolsPerMarket: 2, // At least 2 pools
RequireMultiDEX: false, // Don't require multi-DEX
RebuildInterval: 30 * time.Minute, // Rebuild every 30 minutes
AutoUpdate: true,
MaxMarketsToCache: 1000, // Cache up to 1000 markets
ParallelBuildJobs: 4, // 4 parallel build jobs
},
}
}
// Initialize sets up the market builder
func (mb *MarketBuilder) Initialize(ctx context.Context) error {
mb.initMutex.Lock()
defer mb.initMutex.Unlock()
if mb.initialized {
return nil
}
// Validate configuration
if err := mb.validateConfig(); err != nil {
return fmt.Errorf("invalid build configuration: %w", err)
}
// Build initial markets from cached data
if err := mb.buildInitialMarkets(ctx); err != nil {
return fmt.Errorf("failed to build initial markets: %w", err)
}
// Start automatic rebuilding if enabled
if mb.buildConfig.AutoUpdate {
go mb.autoRebuildLoop()
}
mb.initialized = true
mb.logger.Info(fmt.Sprintf("Market builder initialized with %d markets", len(mb.markets)))
return nil
}
// buildInitialMarkets builds markets from existing cached data
func (mb *MarketBuilder) buildInitialMarkets(ctx context.Context) error {
if mb.dataLogger == nil {
return fmt.Errorf("data logger not available")
}
// Get all token pairs that have pools
tokenPairs := mb.extractTokenPairs()
if len(tokenPairs) == 0 {
mb.logger.Warn("No token pairs found in cached data")
return nil
}
mb.logger.Info(fmt.Sprintf("Building markets for %d token pairs", len(tokenPairs)))
// Build markets in parallel
semaphore := make(chan struct{}, mb.buildConfig.ParallelBuildJobs)
var wg sync.WaitGroup
for _, pair := range tokenPairs {
wg.Add(1)
go func(tokenPair TokenPair) {
defer wg.Done()
semaphore <- struct{}{} // Acquire
defer func() { <-semaphore }() // Release
if market, err := mb.buildMarketForPair(ctx, tokenPair.TokenA, tokenPair.TokenB); err != nil {
mb.logger.Debug(fmt.Sprintf("Failed to build market for %s/%s: %v",
tokenPair.TokenA.Hex(), tokenPair.TokenB.Hex(), err))
} else if market != nil {
mb.addMarket(market)
}
}(pair)
}
wg.Wait()
mb.logger.Info(fmt.Sprintf("Built %d markets from cached data", len(mb.markets)))
return nil
}
// TokenPair represents a token pair
type TokenPair struct {
TokenA common.Address
TokenB common.Address
}
// extractTokenPairs extracts unique token pairs from cached pools
func (mb *MarketBuilder) extractTokenPairs() []TokenPair {
tokenPairs := make(map[string]TokenPair)
// Extract from data logger cache (implementation would iterate through cached pools)
// For now, return some common pairs
commonPairs := []TokenPair{
{
TokenA: common.HexToAddress("0x82af49447d8a07e3bd95bd0d56f35241523fbab1"), // WETH
TokenB: common.HexToAddress("0xaf88d065e77c8cc2239327c5edb3a432268e5831"), // USDC
},
{
TokenA: common.HexToAddress("0x82af49447d8a07e3bd95bd0d56f35241523fbab1"), // WETH
TokenB: common.HexToAddress("0x912ce59144191c1204e64559fe8253a0e49e6548"), // ARB
},
{
TokenA: common.HexToAddress("0xaf88d065e77c8cc2239327c5edb3a432268e5831"), // USDC
TokenB: common.HexToAddress("0xfd086bc7cd5c481dcc9c85ebe478a1c0b69fcbb9"), // USDT
},
}
for _, pair := range commonPairs {
key := mb.makeTokenPairKey(pair.TokenA, pair.TokenB)
tokenPairs[key] = pair
}
result := make([]TokenPair, 0, len(tokenPairs))
for _, pair := range tokenPairs {
result = append(result, pair)
}
return result
}
// buildMarketForPair builds a comprehensive market for a token pair
func (mb *MarketBuilder) buildMarketForPair(ctx context.Context, tokenA, tokenB common.Address) (*Market, error) {
// Get pools for this token pair
pools := mb.dataLogger.GetPoolsForTokenPair(tokenA, tokenB)
if len(pools) < mb.buildConfig.MinPoolsPerMarket {
return nil, fmt.Errorf("insufficient pools (%d < %d required)", len(pools), mb.buildConfig.MinPoolsPerMarket)
}
// Filter and convert pools
marketPools := make([]*MarketPool, 0, len(pools))
totalLiquidity := big.NewInt(0)
totalVolume := big.NewInt(0)
protocols := make(map[string]int)
factories := make(map[common.Address]bool)
for _, pool := range pools {
// Apply filters
if !mb.passesFilters(pool) {
continue
}
marketPool := &MarketPool{
Address: pool.Address,
Factory: pool.Factory,
Protocol: pool.Protocol,
Fee: pool.Fee,
Liquidity: pool.Liquidity,
SqrtPriceX96: pool.SqrtPriceX96,
Tick: pool.Tick,
SwapCount: pool.SwapCount,
Volume24h: pool.Volume24h,
LastSwapTime: pool.LastSwapTime,
}
// Calculate price from sqrtPriceX96
if pool.SqrtPriceX96 != nil && pool.SqrtPriceX96.Sign() > 0 {
marketPool.Price = mb.calculatePriceFromSqrt(pool.SqrtPriceX96)
}
marketPools = append(marketPools, marketPool)
// Update totals
if pool.Liquidity != nil {
totalLiquidity.Add(totalLiquidity, pool.Liquidity.ToBig())
}
if pool.Volume24h != nil {
totalVolume.Add(totalVolume, pool.Volume24h)
}
// Track protocols and factories
protocols[pool.Protocol]++
factories[pool.Factory] = true
}
if len(marketPools) < mb.buildConfig.MinPoolsPerMarket {
return nil, fmt.Errorf("insufficient qualifying pools after filtering")
}
// Check multi-DEX requirement
if mb.buildConfig.RequireMultiDEX && len(protocols) < 2 {
return nil, fmt.Errorf("requires multiple DEXs but only found %d", len(protocols))
}
// Calculate market metrics
weightedPrice := mb.calculateWeightedPrice(marketPools)
priceSpread := mb.calculatePriceSpread(marketPools, weightedPrice)
bestPool := mb.findBestPool(marketPools)
// Update pool market shares and metrics
mb.updatePoolMetrics(marketPools, totalLiquidity, totalVolume, weightedPrice)
// Create factory slice
factorySlice := make([]common.Address, 0, len(factories))
for factory := range factories {
factorySlice = append(factorySlice, factory)
}
market := &Market{
TokenA: tokenA,
TokenB: tokenB,
Pools: marketPools,
TotalLiquidity: totalLiquidity,
BestPool: bestPool,
PoolCount: len(marketPools),
Volume24h: totalVolume,
WeightedPrice: weightedPrice,
PriceSpread: priceSpread,
Protocols: protocols,
Factories: factorySlice,
LastUpdated: time.Now(),
FirstSeen: time.Now(), // Would be minimum of all pool first seen times
}
return market, nil
}
// passesFilters checks if a pool passes the configured filters
func (mb *MarketBuilder) passesFilters(pool *marketdata.PoolInfo) bool {
// Check minimum liquidity
if pool.Liquidity != nil && mb.buildConfig.MinLiquidity != nil {
if pool.Liquidity.ToBig().Cmp(mb.buildConfig.MinLiquidity) < 0 {
return false
}
}
// Check minimum volume
if pool.Volume24h != nil && mb.buildConfig.MinVolume24h != nil {
if pool.Volume24h.Cmp(mb.buildConfig.MinVolume24h) < 0 {
return false
}
}
return true
}
// calculatePriceFromSqrt calculates price from sqrtPriceX96
func (mb *MarketBuilder) calculatePriceFromSqrt(sqrtPriceX96 *uint256.Int) *big.Float {
// Convert sqrtPriceX96 to price
// price = (sqrtPriceX96 / 2^96)^2
sqrtPrice := new(big.Float).SetInt(sqrtPriceX96.ToBig())
q96 := new(big.Float).SetInt(new(big.Int).Lsh(big.NewInt(1), 96))
normalizedSqrt := new(big.Float).Quo(sqrtPrice, q96)
price := new(big.Float).Mul(normalizedSqrt, normalizedSqrt)
return price
}
// calculateWeightedPrice calculates liquidity-weighted average price
func (mb *MarketBuilder) calculateWeightedPrice(pools []*MarketPool) *big.Float {
if len(pools) == 0 {
return big.NewFloat(0)
}
weightedSum := big.NewFloat(0)
totalWeight := big.NewFloat(0)
for _, pool := range pools {
if pool.Price != nil && pool.Liquidity != nil {
weight := new(big.Float).SetInt(pool.Liquidity.ToBig())
weightedPrice := new(big.Float).Mul(pool.Price, weight)
weightedSum.Add(weightedSum, weightedPrice)
totalWeight.Add(totalWeight, weight)
}
}
if totalWeight.Sign() == 0 {
return big.NewFloat(0)
}
return new(big.Float).Quo(weightedSum, totalWeight)
}
// calculatePriceSpread calculates price spread across pools
func (mb *MarketBuilder) calculatePriceSpread(pools []*MarketPool, weightedPrice *big.Float) float64 {
if len(pools) == 0 || weightedPrice.Sign() == 0 {
return 0
}
maxDeviation := 0.0
for _, pool := range pools {
if pool.Price != nil {
deviation := new(big.Float).Sub(pool.Price, weightedPrice)
deviation.Abs(deviation)
deviationRatio := new(big.Float).Quo(deviation, weightedPrice)
if ratio, _ := deviationRatio.Float64(); ratio > maxDeviation {
maxDeviation = ratio
}
}
}
return maxDeviation * 100 // Convert to percentage
}
// findBestPool finds the pool with highest liquidity
func (mb *MarketBuilder) findBestPool(pools []*MarketPool) *MarketPool {
var best *MarketPool
var maxLiquidity *big.Int
for _, pool := range pools {
if pool.Liquidity != nil {
liquidity := pool.Liquidity.ToBig()
if maxLiquidity == nil || liquidity.Cmp(maxLiquidity) > 0 {
maxLiquidity = liquidity
best = pool
}
}
}
return best
}
// updatePoolMetrics calculates market share and other metrics for pools
func (mb *MarketBuilder) updatePoolMetrics(pools []*MarketPool, totalLiquidity, totalVolume *big.Int, weightedPrice *big.Float) {
for _, pool := range pools {
// Calculate liquidity share
if pool.Liquidity != nil && totalLiquidity.Sign() > 0 {
liquidityFloat := new(big.Float).SetInt(pool.Liquidity.ToBig())
totalLiquidityFloat := new(big.Float).SetInt(totalLiquidity)
shareRatio := new(big.Float).Quo(liquidityFloat, totalLiquidityFloat)
pool.LiquidityShare, _ = shareRatio.Float64()
}
// Calculate volume share
if pool.Volume24h != nil && totalVolume.Sign() > 0 {
volumeFloat := new(big.Float).SetInt(pool.Volume24h)
totalVolumeFloat := new(big.Float).SetInt(totalVolume)
shareRatio := new(big.Float).Quo(volumeFloat, totalVolumeFloat)
pool.VolumeShare24h, _ = shareRatio.Float64()
}
// Calculate price deviation
if pool.Price != nil && weightedPrice.Sign() > 0 {
deviation := new(big.Float).Sub(pool.Price, weightedPrice)
deviation.Abs(deviation)
deviationRatio := new(big.Float).Quo(deviation, weightedPrice)
pool.PriceDeviation, _ = deviationRatio.Float64()
pool.PriceDeviation *= 100 // Convert to percentage
}
// Calculate efficiency (volume/liquidity ratio)
if pool.Volume24h != nil && pool.Liquidity != nil && pool.Liquidity.Sign() > 0 {
volumeFloat := new(big.Float).SetInt(pool.Volume24h)
liquidityFloat := new(big.Float).SetInt(pool.Liquidity.ToBig())
efficiency := new(big.Float).Quo(volumeFloat, liquidityFloat)
pool.Efficiency, _ = efficiency.Float64()
}
// Calculate average swap size
if pool.Volume24h != nil && pool.SwapCount > 0 {
avgSize := new(big.Int).Div(pool.Volume24h, big.NewInt(pool.SwapCount))
pool.AvgSwapSize = avgSize
}
// Calculate reliability (simplified - based on recent activity)
if time.Since(pool.LastSwapTime) < 24*time.Hour {
pool.Reliability = 1.0
} else if time.Since(pool.LastSwapTime) < 7*24*time.Hour {
pool.Reliability = 0.5
} else {
pool.Reliability = 0.1
}
}
}
// addMarket adds a market to the cache
func (mb *MarketBuilder) addMarket(market *Market) {
mb.marketsMutex.Lock()
defer mb.marketsMutex.Unlock()
key := mb.makeTokenPairKey(market.TokenA, market.TokenB)
mb.markets[key] = market
mb.logger.Debug(fmt.Sprintf("Added market %s with %d pools (total liquidity: %s)",
key, market.PoolCount, market.TotalLiquidity.String()))
}
// makeTokenPairKey creates a consistent key for token pairs
func (mb *MarketBuilder) makeTokenPairKey(tokenA, tokenB common.Address) string {
// Ensure consistent ordering (smaller address first)
if tokenA.Big().Cmp(tokenB.Big()) > 0 {
tokenA, tokenB = tokenB, tokenA
}
return fmt.Sprintf("%s_%s", tokenA.Hex(), tokenB.Hex())
}
// validateConfig validates the build configuration
func (mb *MarketBuilder) validateConfig() error {
if mb.buildConfig.MinPoolsPerMarket < 1 {
return fmt.Errorf("minPoolsPerMarket must be at least 1")
}
if mb.buildConfig.ParallelBuildJobs < 1 {
return fmt.Errorf("parallelBuildJobs must be at least 1")
}
if mb.buildConfig.MaxMarketsToCache < 1 {
return fmt.Errorf("maxMarketsToCache must be at least 1")
}
return nil
}
// autoRebuildLoop automatically rebuilds markets at intervals
func (mb *MarketBuilder) autoRebuildLoop() {
ticker := time.NewTicker(mb.buildConfig.RebuildInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
if err := mb.RebuildMarkets(ctx); err != nil {
mb.logger.Warn(fmt.Sprintf("Failed to rebuild markets: %v", err))
}
cancel()
}
}
}
// GetMarket returns a market for a token pair
func (mb *MarketBuilder) GetMarket(tokenA, tokenB common.Address) (*Market, bool) {
mb.marketsMutex.RLock()
defer mb.marketsMutex.RUnlock()
key := mb.makeTokenPairKey(tokenA, tokenB)
market, exists := mb.markets[key]
return market, exists
}
// GetAllMarkets returns all cached markets
func (mb *MarketBuilder) GetAllMarkets() []*Market {
mb.marketsMutex.RLock()
defer mb.marketsMutex.RUnlock()
markets := make([]*Market, 0, len(mb.markets))
for _, market := range mb.markets {
markets = append(markets, market)
}
return markets
}
// RebuildMarkets rebuilds all markets from current cached data
func (mb *MarketBuilder) RebuildMarkets(ctx context.Context) error {
mb.logger.Info("Rebuilding markets from cached data...")
// Clear existing markets
mb.marketsMutex.Lock()
oldCount := len(mb.markets)
mb.markets = make(map[string]*Market)
mb.marketsMutex.Unlock()
// Rebuild
if err := mb.buildInitialMarkets(ctx); err != nil {
return fmt.Errorf("failed to rebuild markets: %w", err)
}
newCount := len(mb.markets)
mb.logger.Info(fmt.Sprintf("Rebuilt markets: %d -> %d", oldCount, newCount))
return nil
}
// GetStatistics returns comprehensive market builder statistics
func (mb *MarketBuilder) GetStatistics() map[string]interface{} {
mb.marketsMutex.RLock()
defer mb.marketsMutex.RUnlock()
totalPools := 0
totalLiquidity := big.NewInt(0)
protocolCounts := make(map[string]int)
for _, market := range mb.markets {
totalPools += market.PoolCount
totalLiquidity.Add(totalLiquidity, market.TotalLiquidity)
for protocol, count := range market.Protocols {
protocolCounts[protocol] += count
}
}
return map[string]interface{}{
"totalMarkets": len(mb.markets),
"totalPools": totalPools,
"totalLiquidity": totalLiquidity.String(),
"protocolCounts": protocolCounts,
"initialized": mb.initialized,
"autoUpdate": mb.buildConfig.AutoUpdate,
}
}
// Stop gracefully shuts down the market builder
func (mb *MarketBuilder) Stop() {
mb.initMutex.Lock()
defer mb.initMutex.Unlock()
if !mb.initialized {
return
}
mb.logger.Info("Market builder stopped")
mb.initialized = false
}