- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing - Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives - Added LRU caching system for address validation with 10-minute TTL - Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures - Fixed duplicate function declarations and import conflicts across multiple files - Added error recovery mechanisms with multiple fallback strategies - Updated tests to handle new validation behavior for suspicious addresses - Fixed parser test expectations for improved validation system - Applied gofmt formatting fixes to ensure code style compliance - Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot - Resolved critical security vulnerabilities in heuristic address extraction - Progress: Updated TODO audit from 10% to 35% complete 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
633 lines
19 KiB
Go
633 lines
19 KiB
Go
package market
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/ethclient"
|
|
"github.com/holiman/uint256"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
"github.com/fraktal/mev-beta/pkg/database"
|
|
"github.com/fraktal/mev-beta/pkg/marketdata"
|
|
)
|
|
|
|
// MarketBuilder constructs comprehensive market structures from cached data
|
|
type MarketBuilder struct {
|
|
logger *logger.Logger
|
|
database *database.Database
|
|
client *ethclient.Client
|
|
dataLogger *marketdata.MarketDataLogger
|
|
|
|
// Built markets
|
|
markets map[string]*Market // key: "tokenA_tokenB"
|
|
marketsMutex sync.RWMutex
|
|
|
|
// Build configuration
|
|
buildConfig *BuildConfig
|
|
initialized bool
|
|
initMutex sync.Mutex
|
|
}
|
|
|
|
// Market represents a comprehensive trading market for a token pair
|
|
type Market struct {
|
|
TokenA common.Address `json:"tokenA"`
|
|
TokenB common.Address `json:"tokenB"`
|
|
Pools []*MarketPool `json:"pools"`
|
|
TotalLiquidity *big.Int `json:"totalLiquidity"`
|
|
BestPool *MarketPool `json:"bestPool"` // Pool with highest liquidity
|
|
|
|
// Market statistics
|
|
PoolCount int `json:"poolCount"`
|
|
Volume24h *big.Int `json:"volume24h"`
|
|
SwapCount24h int64 `json:"swapCount24h"`
|
|
LastUpdated time.Time `json:"lastUpdated"`
|
|
FirstSeen time.Time `json:"firstSeen"`
|
|
|
|
// Price information
|
|
WeightedPrice *big.Float `json:"weightedPrice"` // Liquidity-weighted price
|
|
PriceSpread float64 `json:"priceSpread"` // Price spread across pools (%)
|
|
|
|
// DEX coverage
|
|
Protocols map[string]int `json:"protocols"` // Protocol -> pool count
|
|
Factories []common.Address `json:"factories"` // All factories for this pair
|
|
}
|
|
|
|
// MarketPool represents a pool within a market
|
|
type MarketPool struct {
|
|
Address common.Address `json:"address"`
|
|
Factory common.Address `json:"factory"`
|
|
Protocol string `json:"protocol"`
|
|
Fee uint32 `json:"fee"`
|
|
|
|
// Current state
|
|
Liquidity *uint256.Int `json:"liquidity"`
|
|
SqrtPriceX96 *uint256.Int `json:"sqrtPriceX96"`
|
|
Tick int32 `json:"tick"`
|
|
Price *big.Float `json:"price"` // Calculated price
|
|
|
|
// Market share in this token pair
|
|
LiquidityShare float64 `json:"liquidityShare"` // % of total liquidity
|
|
VolumeShare24h float64 `json:"volumeShare24h"` // % of 24h volume
|
|
|
|
// Activity metrics
|
|
SwapCount int64 `json:"swapCount"`
|
|
Volume24h *big.Int `json:"volume24h"`
|
|
LastSwapTime time.Time `json:"lastSwapTime"`
|
|
AvgSwapSize *big.Int `json:"avgSwapSize"`
|
|
|
|
// Quality metrics
|
|
PriceDeviation float64 `json:"priceDeviation"` // Deviation from weighted avg (%)
|
|
Efficiency float64 `json:"efficiency"` // Volume/Liquidity ratio
|
|
Reliability float64 `json:"reliability"` // Uptime/activity score
|
|
}
|
|
|
|
// BuildConfig configures market building parameters
|
|
type BuildConfig struct {
|
|
// Pool filtering
|
|
MinLiquidity *big.Int `json:"minLiquidity"`
|
|
MinVolume24h *big.Int `json:"minVolume24h"`
|
|
MaxPriceDeviation float64 `json:"maxPriceDeviation"` // Max price deviation to include (%)
|
|
|
|
// Token filtering
|
|
RequiredTokens []common.Address `json:"requiredTokens"` // Must include these tokens
|
|
ExcludedTokens []common.Address `json:"excludedTokens"` // Exclude these tokens
|
|
OnlyVerifiedTokens bool `json:"onlyVerifiedTokens"`
|
|
|
|
// Market requirements
|
|
MinPoolsPerMarket int `json:"minPoolsPerMarket"`
|
|
RequireMultiDEX bool `json:"requireMultiDEX"` // Require pools from multiple DEXs
|
|
|
|
// Update behavior
|
|
RebuildInterval time.Duration `json:"rebuildInterval"`
|
|
AutoUpdate bool `json:"autoUpdate"`
|
|
|
|
// Performance
|
|
MaxMarketsToCache int `json:"maxMarketsToCache"`
|
|
ParallelBuildJobs int `json:"parallelBuildJobs"`
|
|
}
|
|
|
|
// NewMarketBuilder creates a new market builder
|
|
func NewMarketBuilder(logger *logger.Logger, database *database.Database, client *ethclient.Client, dataLogger *marketdata.MarketDataLogger) *MarketBuilder {
|
|
return &MarketBuilder{
|
|
logger: logger,
|
|
database: database,
|
|
client: client,
|
|
dataLogger: dataLogger,
|
|
markets: make(map[string]*Market),
|
|
buildConfig: &BuildConfig{
|
|
MinLiquidity: big.NewInt(1000000000000000000), // 1 ETH minimum
|
|
MinVolume24h: big.NewInt(100000000000000000), // 0.1 ETH minimum
|
|
MaxPriceDeviation: 5.0, // 5% max deviation
|
|
MinPoolsPerMarket: 2, // At least 2 pools
|
|
RequireMultiDEX: false, // Don't require multi-DEX
|
|
RebuildInterval: 30 * time.Minute, // Rebuild every 30 minutes
|
|
AutoUpdate: true,
|
|
MaxMarketsToCache: 1000, // Cache up to 1000 markets
|
|
ParallelBuildJobs: 4, // 4 parallel build jobs
|
|
},
|
|
}
|
|
}
|
|
|
|
// Initialize sets up the market builder
|
|
func (mb *MarketBuilder) Initialize(ctx context.Context) error {
|
|
mb.initMutex.Lock()
|
|
defer mb.initMutex.Unlock()
|
|
|
|
if mb.initialized {
|
|
return nil
|
|
}
|
|
|
|
// Validate configuration
|
|
if err := mb.validateConfig(); err != nil {
|
|
return fmt.Errorf("invalid build configuration: %w", err)
|
|
}
|
|
|
|
// Build initial markets from cached data
|
|
if err := mb.buildInitialMarkets(ctx); err != nil {
|
|
return fmt.Errorf("failed to build initial markets: %w", err)
|
|
}
|
|
|
|
// Start automatic rebuilding if enabled
|
|
if mb.buildConfig.AutoUpdate {
|
|
go mb.autoRebuildLoop()
|
|
}
|
|
|
|
mb.initialized = true
|
|
mb.logger.Info(fmt.Sprintf("Market builder initialized with %d markets", len(mb.markets)))
|
|
|
|
return nil
|
|
}
|
|
|
|
// buildInitialMarkets builds markets from existing cached data
|
|
func (mb *MarketBuilder) buildInitialMarkets(ctx context.Context) error {
|
|
if mb.dataLogger == nil {
|
|
return fmt.Errorf("data logger not available")
|
|
}
|
|
|
|
// Get all token pairs that have pools
|
|
tokenPairs := mb.extractTokenPairs()
|
|
if len(tokenPairs) == 0 {
|
|
mb.logger.Warn("No token pairs found in cached data")
|
|
return nil
|
|
}
|
|
|
|
mb.logger.Info(fmt.Sprintf("Building markets for %d token pairs", len(tokenPairs)))
|
|
|
|
// Build markets in parallel
|
|
semaphore := make(chan struct{}, mb.buildConfig.ParallelBuildJobs)
|
|
var wg sync.WaitGroup
|
|
|
|
for _, pair := range tokenPairs {
|
|
wg.Add(1)
|
|
go func(tokenPair TokenPair) {
|
|
defer wg.Done()
|
|
|
|
semaphore <- struct{}{} // Acquire
|
|
defer func() { <-semaphore }() // Release
|
|
|
|
if market, err := mb.buildMarketForPair(ctx, tokenPair.TokenA, tokenPair.TokenB); err != nil {
|
|
mb.logger.Debug(fmt.Sprintf("Failed to build market for %s/%s: %v",
|
|
tokenPair.TokenA.Hex(), tokenPair.TokenB.Hex(), err))
|
|
} else if market != nil {
|
|
mb.addMarket(market)
|
|
}
|
|
}(pair)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
mb.logger.Info(fmt.Sprintf("Built %d markets from cached data", len(mb.markets)))
|
|
return nil
|
|
}
|
|
|
|
// TokenPair represents a token pair
|
|
type TokenPair struct {
|
|
TokenA common.Address
|
|
TokenB common.Address
|
|
}
|
|
|
|
// extractTokenPairs extracts unique token pairs from cached pools
|
|
func (mb *MarketBuilder) extractTokenPairs() []TokenPair {
|
|
tokenPairs := make(map[string]TokenPair)
|
|
|
|
// Extract from data logger cache (implementation would iterate through cached pools)
|
|
// For now, return some common pairs
|
|
commonPairs := []TokenPair{
|
|
{
|
|
TokenA: common.HexToAddress("0x82af49447d8a07e3bd95bd0d56f35241523fbab1"), // WETH
|
|
TokenB: common.HexToAddress("0xaf88d065e77c8cc2239327c5edb3a432268e5831"), // USDC
|
|
},
|
|
{
|
|
TokenA: common.HexToAddress("0x82af49447d8a07e3bd95bd0d56f35241523fbab1"), // WETH
|
|
TokenB: common.HexToAddress("0x912ce59144191c1204e64559fe8253a0e49e6548"), // ARB
|
|
},
|
|
{
|
|
TokenA: common.HexToAddress("0xaf88d065e77c8cc2239327c5edb3a432268e5831"), // USDC
|
|
TokenB: common.HexToAddress("0xfd086bc7cd5c481dcc9c85ebe478a1c0b69fcbb9"), // USDT
|
|
},
|
|
}
|
|
|
|
for _, pair := range commonPairs {
|
|
key := mb.makeTokenPairKey(pair.TokenA, pair.TokenB)
|
|
tokenPairs[key] = pair
|
|
}
|
|
|
|
result := make([]TokenPair, 0, len(tokenPairs))
|
|
for _, pair := range tokenPairs {
|
|
result = append(result, pair)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// buildMarketForPair builds a comprehensive market for a token pair
|
|
func (mb *MarketBuilder) buildMarketForPair(ctx context.Context, tokenA, tokenB common.Address) (*Market, error) {
|
|
// Get pools for this token pair
|
|
pools := mb.dataLogger.GetPoolsForTokenPair(tokenA, tokenB)
|
|
if len(pools) < mb.buildConfig.MinPoolsPerMarket {
|
|
return nil, fmt.Errorf("insufficient pools (%d < %d required)", len(pools), mb.buildConfig.MinPoolsPerMarket)
|
|
}
|
|
|
|
// Filter and convert pools
|
|
marketPools := make([]*MarketPool, 0, len(pools))
|
|
totalLiquidity := big.NewInt(0)
|
|
totalVolume := big.NewInt(0)
|
|
protocols := make(map[string]int)
|
|
factories := make(map[common.Address]bool)
|
|
|
|
for _, pool := range pools {
|
|
// Apply filters
|
|
if !mb.passesFilters(pool) {
|
|
continue
|
|
}
|
|
|
|
marketPool := &MarketPool{
|
|
Address: pool.Address,
|
|
Factory: pool.Factory,
|
|
Protocol: pool.Protocol,
|
|
Fee: pool.Fee,
|
|
Liquidity: pool.Liquidity,
|
|
SqrtPriceX96: pool.SqrtPriceX96,
|
|
Tick: pool.Tick,
|
|
SwapCount: pool.SwapCount,
|
|
Volume24h: pool.Volume24h,
|
|
LastSwapTime: pool.LastSwapTime,
|
|
}
|
|
|
|
// Calculate price from sqrtPriceX96
|
|
if pool.SqrtPriceX96 != nil && pool.SqrtPriceX96.Sign() > 0 {
|
|
marketPool.Price = mb.calculatePriceFromSqrt(pool.SqrtPriceX96)
|
|
}
|
|
|
|
marketPools = append(marketPools, marketPool)
|
|
|
|
// Update totals
|
|
if pool.Liquidity != nil {
|
|
totalLiquidity.Add(totalLiquidity, pool.Liquidity.ToBig())
|
|
}
|
|
if pool.Volume24h != nil {
|
|
totalVolume.Add(totalVolume, pool.Volume24h)
|
|
}
|
|
|
|
// Track protocols and factories
|
|
protocols[pool.Protocol]++
|
|
factories[pool.Factory] = true
|
|
}
|
|
|
|
if len(marketPools) < mb.buildConfig.MinPoolsPerMarket {
|
|
return nil, fmt.Errorf("insufficient qualifying pools after filtering")
|
|
}
|
|
|
|
// Check multi-DEX requirement
|
|
if mb.buildConfig.RequireMultiDEX && len(protocols) < 2 {
|
|
return nil, fmt.Errorf("requires multiple DEXs but only found %d", len(protocols))
|
|
}
|
|
|
|
// Calculate market metrics
|
|
weightedPrice := mb.calculateWeightedPrice(marketPools)
|
|
priceSpread := mb.calculatePriceSpread(marketPools, weightedPrice)
|
|
bestPool := mb.findBestPool(marketPools)
|
|
|
|
// Update pool market shares and metrics
|
|
mb.updatePoolMetrics(marketPools, totalLiquidity, totalVolume, weightedPrice)
|
|
|
|
// Create factory slice
|
|
factorySlice := make([]common.Address, 0, len(factories))
|
|
for factory := range factories {
|
|
factorySlice = append(factorySlice, factory)
|
|
}
|
|
|
|
market := &Market{
|
|
TokenA: tokenA,
|
|
TokenB: tokenB,
|
|
Pools: marketPools,
|
|
TotalLiquidity: totalLiquidity,
|
|
BestPool: bestPool,
|
|
PoolCount: len(marketPools),
|
|
Volume24h: totalVolume,
|
|
WeightedPrice: weightedPrice,
|
|
PriceSpread: priceSpread,
|
|
Protocols: protocols,
|
|
Factories: factorySlice,
|
|
LastUpdated: time.Now(),
|
|
FirstSeen: time.Now(), // Would be minimum of all pool first seen times
|
|
}
|
|
|
|
return market, nil
|
|
}
|
|
|
|
// passesFilters checks if a pool passes the configured filters
|
|
func (mb *MarketBuilder) passesFilters(pool *marketdata.PoolInfo) bool {
|
|
// Check minimum liquidity
|
|
if pool.Liquidity != nil && mb.buildConfig.MinLiquidity != nil {
|
|
if pool.Liquidity.ToBig().Cmp(mb.buildConfig.MinLiquidity) < 0 {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Check minimum volume
|
|
if pool.Volume24h != nil && mb.buildConfig.MinVolume24h != nil {
|
|
if pool.Volume24h.Cmp(mb.buildConfig.MinVolume24h) < 0 {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// calculatePriceFromSqrt calculates price from sqrtPriceX96
|
|
func (mb *MarketBuilder) calculatePriceFromSqrt(sqrtPriceX96 *uint256.Int) *big.Float {
|
|
// Convert sqrtPriceX96 to price
|
|
// price = (sqrtPriceX96 / 2^96)^2
|
|
|
|
sqrtPrice := new(big.Float).SetInt(sqrtPriceX96.ToBig())
|
|
q96 := new(big.Float).SetInt(new(big.Int).Lsh(big.NewInt(1), 96))
|
|
|
|
normalizedSqrt := new(big.Float).Quo(sqrtPrice, q96)
|
|
price := new(big.Float).Mul(normalizedSqrt, normalizedSqrt)
|
|
|
|
return price
|
|
}
|
|
|
|
// calculateWeightedPrice calculates liquidity-weighted average price
|
|
func (mb *MarketBuilder) calculateWeightedPrice(pools []*MarketPool) *big.Float {
|
|
if len(pools) == 0 {
|
|
return big.NewFloat(0)
|
|
}
|
|
|
|
weightedSum := big.NewFloat(0)
|
|
totalWeight := big.NewFloat(0)
|
|
|
|
for _, pool := range pools {
|
|
if pool.Price != nil && pool.Liquidity != nil {
|
|
weight := new(big.Float).SetInt(pool.Liquidity.ToBig())
|
|
weightedPrice := new(big.Float).Mul(pool.Price, weight)
|
|
|
|
weightedSum.Add(weightedSum, weightedPrice)
|
|
totalWeight.Add(totalWeight, weight)
|
|
}
|
|
}
|
|
|
|
if totalWeight.Sign() == 0 {
|
|
return big.NewFloat(0)
|
|
}
|
|
|
|
return new(big.Float).Quo(weightedSum, totalWeight)
|
|
}
|
|
|
|
// calculatePriceSpread calculates price spread across pools
|
|
func (mb *MarketBuilder) calculatePriceSpread(pools []*MarketPool, weightedPrice *big.Float) float64 {
|
|
if len(pools) == 0 || weightedPrice.Sign() == 0 {
|
|
return 0
|
|
}
|
|
|
|
maxDeviation := 0.0
|
|
|
|
for _, pool := range pools {
|
|
if pool.Price != nil {
|
|
deviation := new(big.Float).Sub(pool.Price, weightedPrice)
|
|
deviation.Abs(deviation)
|
|
deviationRatio := new(big.Float).Quo(deviation, weightedPrice)
|
|
|
|
if ratio, _ := deviationRatio.Float64(); ratio > maxDeviation {
|
|
maxDeviation = ratio
|
|
}
|
|
}
|
|
}
|
|
|
|
return maxDeviation * 100 // Convert to percentage
|
|
}
|
|
|
|
// findBestPool finds the pool with highest liquidity
|
|
func (mb *MarketBuilder) findBestPool(pools []*MarketPool) *MarketPool {
|
|
var best *MarketPool
|
|
var maxLiquidity *big.Int
|
|
|
|
for _, pool := range pools {
|
|
if pool.Liquidity != nil {
|
|
liquidity := pool.Liquidity.ToBig()
|
|
if maxLiquidity == nil || liquidity.Cmp(maxLiquidity) > 0 {
|
|
maxLiquidity = liquidity
|
|
best = pool
|
|
}
|
|
}
|
|
}
|
|
|
|
return best
|
|
}
|
|
|
|
// updatePoolMetrics calculates market share and other metrics for pools
|
|
func (mb *MarketBuilder) updatePoolMetrics(pools []*MarketPool, totalLiquidity, totalVolume *big.Int, weightedPrice *big.Float) {
|
|
for _, pool := range pools {
|
|
// Calculate liquidity share
|
|
if pool.Liquidity != nil && totalLiquidity.Sign() > 0 {
|
|
liquidityFloat := new(big.Float).SetInt(pool.Liquidity.ToBig())
|
|
totalLiquidityFloat := new(big.Float).SetInt(totalLiquidity)
|
|
shareRatio := new(big.Float).Quo(liquidityFloat, totalLiquidityFloat)
|
|
pool.LiquidityShare, _ = shareRatio.Float64()
|
|
}
|
|
|
|
// Calculate volume share
|
|
if pool.Volume24h != nil && totalVolume.Sign() > 0 {
|
|
volumeFloat := new(big.Float).SetInt(pool.Volume24h)
|
|
totalVolumeFloat := new(big.Float).SetInt(totalVolume)
|
|
shareRatio := new(big.Float).Quo(volumeFloat, totalVolumeFloat)
|
|
pool.VolumeShare24h, _ = shareRatio.Float64()
|
|
}
|
|
|
|
// Calculate price deviation
|
|
if pool.Price != nil && weightedPrice.Sign() > 0 {
|
|
deviation := new(big.Float).Sub(pool.Price, weightedPrice)
|
|
deviation.Abs(deviation)
|
|
deviationRatio := new(big.Float).Quo(deviation, weightedPrice)
|
|
pool.PriceDeviation, _ = deviationRatio.Float64()
|
|
pool.PriceDeviation *= 100 // Convert to percentage
|
|
}
|
|
|
|
// Calculate efficiency (volume/liquidity ratio)
|
|
if pool.Volume24h != nil && pool.Liquidity != nil && pool.Liquidity.Sign() > 0 {
|
|
volumeFloat := new(big.Float).SetInt(pool.Volume24h)
|
|
liquidityFloat := new(big.Float).SetInt(pool.Liquidity.ToBig())
|
|
efficiency := new(big.Float).Quo(volumeFloat, liquidityFloat)
|
|
pool.Efficiency, _ = efficiency.Float64()
|
|
}
|
|
|
|
// Calculate average swap size
|
|
if pool.Volume24h != nil && pool.SwapCount > 0 {
|
|
avgSize := new(big.Int).Div(pool.Volume24h, big.NewInt(pool.SwapCount))
|
|
pool.AvgSwapSize = avgSize
|
|
}
|
|
|
|
// Calculate reliability (simplified - based on recent activity)
|
|
if time.Since(pool.LastSwapTime) < 24*time.Hour {
|
|
pool.Reliability = 1.0
|
|
} else if time.Since(pool.LastSwapTime) < 7*24*time.Hour {
|
|
pool.Reliability = 0.5
|
|
} else {
|
|
pool.Reliability = 0.1
|
|
}
|
|
}
|
|
}
|
|
|
|
// addMarket adds a market to the cache
|
|
func (mb *MarketBuilder) addMarket(market *Market) {
|
|
mb.marketsMutex.Lock()
|
|
defer mb.marketsMutex.Unlock()
|
|
|
|
key := mb.makeTokenPairKey(market.TokenA, market.TokenB)
|
|
mb.markets[key] = market
|
|
|
|
mb.logger.Debug(fmt.Sprintf("Added market %s with %d pools (total liquidity: %s)",
|
|
key, market.PoolCount, market.TotalLiquidity.String()))
|
|
}
|
|
|
|
// makeTokenPairKey creates a consistent key for token pairs
|
|
func (mb *MarketBuilder) makeTokenPairKey(tokenA, tokenB common.Address) string {
|
|
// Ensure consistent ordering (smaller address first)
|
|
if tokenA.Big().Cmp(tokenB.Big()) > 0 {
|
|
tokenA, tokenB = tokenB, tokenA
|
|
}
|
|
return fmt.Sprintf("%s_%s", tokenA.Hex(), tokenB.Hex())
|
|
}
|
|
|
|
// validateConfig validates the build configuration
|
|
func (mb *MarketBuilder) validateConfig() error {
|
|
if mb.buildConfig.MinPoolsPerMarket < 1 {
|
|
return fmt.Errorf("minPoolsPerMarket must be at least 1")
|
|
}
|
|
if mb.buildConfig.ParallelBuildJobs < 1 {
|
|
return fmt.Errorf("parallelBuildJobs must be at least 1")
|
|
}
|
|
if mb.buildConfig.MaxMarketsToCache < 1 {
|
|
return fmt.Errorf("maxMarketsToCache must be at least 1")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// autoRebuildLoop automatically rebuilds markets at intervals
|
|
func (mb *MarketBuilder) autoRebuildLoop() {
|
|
ticker := time.NewTicker(mb.buildConfig.RebuildInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
|
if err := mb.RebuildMarkets(ctx); err != nil {
|
|
mb.logger.Warn(fmt.Sprintf("Failed to rebuild markets: %v", err))
|
|
}
|
|
cancel()
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetMarket returns a market for a token pair
|
|
func (mb *MarketBuilder) GetMarket(tokenA, tokenB common.Address) (*Market, bool) {
|
|
mb.marketsMutex.RLock()
|
|
defer mb.marketsMutex.RUnlock()
|
|
|
|
key := mb.makeTokenPairKey(tokenA, tokenB)
|
|
market, exists := mb.markets[key]
|
|
return market, exists
|
|
}
|
|
|
|
// GetAllMarkets returns all cached markets
|
|
func (mb *MarketBuilder) GetAllMarkets() []*Market {
|
|
mb.marketsMutex.RLock()
|
|
defer mb.marketsMutex.RUnlock()
|
|
|
|
markets := make([]*Market, 0, len(mb.markets))
|
|
for _, market := range mb.markets {
|
|
markets = append(markets, market)
|
|
}
|
|
|
|
return markets
|
|
}
|
|
|
|
// RebuildMarkets rebuilds all markets from current cached data
|
|
func (mb *MarketBuilder) RebuildMarkets(ctx context.Context) error {
|
|
mb.logger.Info("Rebuilding markets from cached data...")
|
|
|
|
// Clear existing markets
|
|
mb.marketsMutex.Lock()
|
|
oldCount := len(mb.markets)
|
|
mb.markets = make(map[string]*Market)
|
|
mb.marketsMutex.Unlock()
|
|
|
|
// Rebuild
|
|
if err := mb.buildInitialMarkets(ctx); err != nil {
|
|
return fmt.Errorf("failed to rebuild markets: %w", err)
|
|
}
|
|
|
|
newCount := len(mb.markets)
|
|
mb.logger.Info(fmt.Sprintf("Rebuilt markets: %d -> %d", oldCount, newCount))
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetStatistics returns comprehensive market builder statistics
|
|
func (mb *MarketBuilder) GetStatistics() map[string]interface{} {
|
|
mb.marketsMutex.RLock()
|
|
defer mb.marketsMutex.RUnlock()
|
|
|
|
totalPools := 0
|
|
totalLiquidity := big.NewInt(0)
|
|
protocolCounts := make(map[string]int)
|
|
|
|
for _, market := range mb.markets {
|
|
totalPools += market.PoolCount
|
|
totalLiquidity.Add(totalLiquidity, market.TotalLiquidity)
|
|
|
|
for protocol, count := range market.Protocols {
|
|
protocolCounts[protocol] += count
|
|
}
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"totalMarkets": len(mb.markets),
|
|
"totalPools": totalPools,
|
|
"totalLiquidity": totalLiquidity.String(),
|
|
"protocolCounts": protocolCounts,
|
|
"initialized": mb.initialized,
|
|
"autoUpdate": mb.buildConfig.AutoUpdate,
|
|
}
|
|
}
|
|
|
|
// Stop gracefully shuts down the market builder
|
|
func (mb *MarketBuilder) Stop() {
|
|
mb.initMutex.Lock()
|
|
defer mb.initMutex.Unlock()
|
|
|
|
if !mb.initialized {
|
|
return
|
|
}
|
|
|
|
mb.logger.Info("Market builder stopped")
|
|
mb.initialized = false
|
|
}
|