diff --git a/pkg/arbitrum/market_discovery.go b/pkg/arbitrum/market_discovery.go new file mode 100644 index 0000000..7417d35 --- /dev/null +++ b/pkg/arbitrum/market_discovery.go @@ -0,0 +1,1932 @@ +package arbitrum + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + "os" + "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/fraktal/mev-beta/internal/logger" + exchangeMath "github.com/fraktal/mev-beta/pkg/math" + "gopkg.in/yaml.v3" +) + +// MarketDiscovery manages pool discovery and market building +type MarketDiscovery struct { + client *ethclient.Client + logger *logger.Logger + config *MarketConfig + mathCalc *exchangeMath.MathCalculator + + // Market state + pools map[common.Address]*PoolInfoDetailed + tokens map[common.Address]*TokenInfo + factories map[common.Address]*FactoryInfo + routers map[common.Address]*RouterInfo + mu sync.RWMutex + + // Logging + marketScanLogger *os.File + arbLogger *os.File + + // Performance tracking + poolsDiscovered uint64 + arbitrageOpps uint64 + lastScanTime time.Time + totalScanTime time.Duration +} + +// MarketConfig represents the configuration for market discovery +type MarketConfig struct { + Version string `yaml:"version"` + Network string `yaml:"network"` + ChainID int64 `yaml:"chain_id"` + Tokens map[string]*TokenConfigInfo `yaml:"tokens"` + Factories map[string]*FactoryConfig `yaml:"factories"` + Routers map[string]*RouterConfig `yaml:"routers"` + PriorityPools []PriorityPoolConfig `yaml:"priority_pools"` + MarketScan MarketScanConfig `yaml:"market_scan"` + Arbitrage ArbitrageConfig `yaml:"arbitrage"` + Logging LoggingConfig `yaml:"logging"` + Risk RiskConfig `yaml:"risk"` + Monitoring MonitoringConfig `yaml:"monitoring"` +} + +type TokenConfigInfo struct { + Address string `yaml:"address"` + Symbol string `yaml:"symbol"` + Decimals int `yaml:"decimals"` + Priority int `yaml:"priority"` +} + +type FactoryConfig struct { + Address string `yaml:"address"` + Type string `yaml:"type"` + InitCodeHash string `yaml:"init_code_hash"` + FeeTiers []uint32 `yaml:"fee_tiers"` + Priority int `yaml:"priority"` +} + +type RouterConfig struct { + Address string `yaml:"address"` + Factory string `yaml:"factory"` + Type string `yaml:"type"` + Priority int `yaml:"priority"` +} + +type PriorityPoolConfig struct { + Pool string `yaml:"pool"` + Factory string `yaml:"factory"` + Token0 string `yaml:"token0"` + Token1 string `yaml:"token1"` + Fee uint32 `yaml:"fee"` + Priority int `yaml:"priority"` +} + +type MarketScanConfig struct { + ScanInterval int `yaml:"scan_interval"` + MaxPools int `yaml:"max_pools"` + MinLiquidityUSD float64 `yaml:"min_liquidity_usd"` + MinVolume24hUSD float64 `yaml:"min_volume_24h_usd"` + Discovery PoolDiscoveryConfig `yaml:"discovery"` +} + +type PoolDiscoveryConfig struct { + MaxBlocksBack uint64 `yaml:"max_blocks_back"` + MinPoolAge uint64 `yaml:"min_pool_age"` + DiscoveryInterval uint64 `yaml:"discovery_interval"` +} + +type ArbitrageConfig struct { + MinProfitUSD float64 `yaml:"min_profit_usd"` + MaxSlippage float64 `yaml:"max_slippage"` + MaxGasPrice float64 `yaml:"max_gas_price"` + ProfitMargins map[string]float64 `yaml:"profit_margins"` +} + +type LoggingConfig struct { + Level string `yaml:"level"` + Files map[string]string `yaml:"files"` + RealTime map[string]interface{} `yaml:"real_time"` +} + +type RiskConfig struct { + MaxPositionETH float64 `yaml:"max_position_eth"` + MaxDailyLossETH float64 `yaml:"max_daily_loss_eth"` + MaxConcurrentTxs int `yaml:"max_concurrent_txs"` + CircuitBreaker map[string]interface{} `yaml:"circuit_breaker"` +} + +type MonitoringConfig struct { + Enabled bool `yaml:"enabled"` + UpdateInterval int `yaml:"update_interval"` + Metrics []string `yaml:"metrics"` +} + +// PoolInfoDetailed represents detailed pool information for market discovery +type PoolInfoDetailed struct { + Address common.Address `json:"address"` + Factory common.Address `json:"factory"` + FactoryType string `json:"factory_type"` + Token0 common.Address `json:"token0"` + Token1 common.Address `json:"token1"` + Fee uint32 `json:"fee"` + Reserve0 *big.Int `json:"reserve0"` + Reserve1 *big.Int `json:"reserve1"` + Liquidity *big.Int `json:"liquidity"` + SqrtPriceX96 *big.Int `json:"sqrt_price_x96,omitempty"` // For V3 pools + Tick int32 `json:"tick,omitempty"` // For V3 pools + LastUpdated time.Time `json:"last_updated"` + Volume24h *big.Int `json:"volume_24h"` + Priority int `json:"priority"` + Active bool `json:"active"` +} + +type TokenInfo struct { + Address common.Address `json:"address"` + Symbol string `json:"symbol"` + Name string `json:"name"` + Decimals uint8 `json:"decimals"` + Priority int `json:"priority"` + LastPrice *big.Int `json:"last_price"` + Volume24h *big.Int `json:"volume_24h"` +} + +type FactoryInfo struct { + Address common.Address `json:"address"` + Type string `json:"type"` + InitCodeHash common.Hash `json:"init_code_hash"` + FeeTiers []uint32 `json:"fee_tiers"` + PoolCount uint64 `json:"pool_count"` + Priority int `json:"priority"` +} + +type RouterInfo struct { + Address common.Address `json:"address"` + Factory common.Address `json:"factory"` + Type string `json:"type"` + Priority int `json:"priority"` +} + +// MarketScanResult represents the result of a market scan +type MarketScanResult struct { + Timestamp time.Time `json:"timestamp"` + BlockNumber uint64 `json:"block_number"` + PoolsScanned int `json:"pools_scanned"` + NewPoolsFound int `json:"new_pools_found"` + ArbitrageOpps []*ArbitrageOpportunityDetailed `json:"arbitrage_opportunities"` + TopPools []*PoolInfoDetailed `json:"top_pools"` + ScanDuration time.Duration `json:"scan_duration"` + GasPrice *big.Int `json:"gas_price"` + NetworkConditions map[string]interface{} `json:"network_conditions"` +} + +type ArbitrageOpportunityDetailed struct { + ID string `json:"id"` + Type string `json:"type"` + TokenIn common.Address `json:"token_in"` + TokenOut common.Address `json:"token_out"` + AmountIn *big.Int `json:"amount_in"` + ExpectedAmountOut *big.Int `json:"expected_amount_out"` + ActualAmountOut *big.Int `json:"actual_amount_out"` + Profit *big.Int `json:"profit"` + ProfitUSD float64 `json:"profit_usd"` + ProfitMargin float64 `json:"profit_margin"` + GasCost *big.Int `json:"gas_cost"` + NetProfit *big.Int `json:"net_profit"` + ExchangeA string `json:"exchange_a"` + ExchangeB string `json:"exchange_b"` + PoolA common.Address `json:"pool_a"` + PoolB common.Address `json:"pool_b"` + PriceA float64 `json:"price_a"` + PriceB float64 `json:"price_b"` + PriceImpactA float64 `json:"price_impact_a"` + PriceImpactB float64 `json:"price_impact_b"` + CapitalRequired float64 `json:"capital_required"` + GasCostUSD float64 `json:"gas_cost_usd"` + Confidence float64 `json:"confidence"` + RiskScore float64 `json:"risk_score"` + ExecutionTime time.Duration `json:"execution_time"` + Timestamp time.Time `json:"timestamp"` +} + +// NewMarketDiscovery creates a new market discovery instance +func NewMarketDiscovery(client *ethclient.Client, logger *logger.Logger, configPath string) (*MarketDiscovery, error) { + // Load configuration + config, err := LoadMarketConfig(configPath) + if err != nil { + return nil, fmt.Errorf("failed to load config: %w", err) + } + + // Initialize math calculator + mathCalc := exchangeMath.NewMathCalculator() + + md := &MarketDiscovery{ + client: client, + logger: logger, + config: config, + mathCalc: mathCalc, + pools: make(map[common.Address]*PoolInfoDetailed), + tokens: make(map[common.Address]*TokenInfo), + factories: make(map[common.Address]*FactoryInfo), + routers: make(map[common.Address]*RouterInfo), + } + + // Initialize logging + if err := md.initializeLogging(); err != nil { + return nil, fmt.Errorf("failed to initialize logging: %w", err) + } + + // Load initial configuration + if err := md.loadInitialMarkets(); err != nil { + return nil, fmt.Errorf("failed to load initial markets: %w", err) + } + + logger.Info("Market discovery initialized with comprehensive pool detection") + return md, nil +} + +// LoadMarketConfig loads market configuration from YAML file +func LoadMarketConfig(configPath string) (*MarketConfig, error) { + data, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + var config MarketConfig + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + return &config, nil +} + +// initializeLogging sets up JSONL logging files +func (md *MarketDiscovery) initializeLogging() error { + // Create logs directory if it doesn't exist + if err := os.MkdirAll("logs", 0755); err != nil { + return fmt.Errorf("failed to create logs directory: %w", err) + } + + // Open market scan log file + marketScanFile, err := os.OpenFile(md.config.Logging.Files["market_scans"], os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("failed to open market scan log file: %w", err) + } + md.marketScanLogger = marketScanFile + + // Open arbitrage log file + arbFile, err := os.OpenFile(md.config.Logging.Files["arbitrage"], os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("failed to open arbitrage log file: %w", err) + } + md.arbLogger = arbFile + + return nil +} + +// loadInitialMarkets loads initial tokens, factories, and priority pools +func (md *MarketDiscovery) loadInitialMarkets() error { + md.mu.Lock() + defer md.mu.Unlock() + + // Load tokens + for _, token := range md.config.Tokens { + tokenAddr := common.HexToAddress(token.Address) + md.tokens[tokenAddr] = &TokenInfo{ + Address: tokenAddr, + Symbol: token.Symbol, + Decimals: uint8(token.Decimals), + Priority: token.Priority, + } + } + + // Load factories + for _, factory := range md.config.Factories { + factoryAddr := common.HexToAddress(factory.Address) + md.factories[factoryAddr] = &FactoryInfo{ + Address: factoryAddr, + Type: factory.Type, + InitCodeHash: common.HexToHash(factory.InitCodeHash), + FeeTiers: factory.FeeTiers, + Priority: factory.Priority, + } + } + + // Load routers + for _, router := range md.config.Routers { + routerAddr := common.HexToAddress(router.Address) + factoryAddr := common.Address{} + if router.Factory != "" { + for _, f := range md.config.Factories { + if f.Type == router.Factory { + factoryAddr = common.HexToAddress(f.Address) + break + } + } + } + + md.routers[routerAddr] = &RouterInfo{ + Address: routerAddr, + Factory: factoryAddr, + Type: router.Type, + Priority: router.Priority, + } + } + + // Load priority pools + for _, poolConfig := range md.config.PriorityPools { + poolAddr := common.HexToAddress(poolConfig.Pool) + token0 := common.HexToAddress(poolConfig.Token0) + token1 := common.HexToAddress(poolConfig.Token1) + + // Find factory + var factoryAddr common.Address + var factoryType string + for _, f := range md.config.Factories { + if f.Type == poolConfig.Factory { + factoryAddr = common.HexToAddress(f.Address) + factoryType = f.Type + break + } + } + + pool := &PoolInfoDetailed{ + Address: poolAddr, + Factory: factoryAddr, + FactoryType: factoryType, + Token0: token0, + Token1: token1, + Fee: poolConfig.Fee, + Priority: poolConfig.Priority, + Active: true, + LastUpdated: time.Now(), + } + + md.pools[poolAddr] = pool + } + + md.logger.Info(fmt.Sprintf("Loaded initial markets: %d tokens, %d factories, %d routers, %d priority pools", + len(md.tokens), len(md.factories), len(md.routers), len(md.pools))) + + return nil +} + +// buildComprehensiveMarkets builds markets for all exchanges and top token pairs +func (md *MarketDiscovery) buildComprehensiveMarkets() error { + md.logger.Info("đŸ—ī¸ Building comprehensive markets for all exchanges and top tokens") + + // Get top tokens (sorted by priority) + topTokens := md.getTopTokens(10) // Reduced from 20 to 10 tokens to reduce load + md.logger.Info(fmt.Sprintf("đŸ’ŧ Found %d top tokens for market building", len(topTokens))) + + // Build markets for each factory + marketsBuilt := 0 + for factoryAddr, factoryInfo := range md.factories { + markets, err := md.buildFactoryMarkets(factoryAddr, factoryInfo, topTokens) + if err != nil { + md.logger.Error(fmt.Sprintf("Failed to build markets for factory %s: %v", factoryAddr.Hex(), err)) + continue + } + + marketsBuilt += len(markets) + md.logger.Info(fmt.Sprintf("✅ Built %d markets for %s factory", len(markets), factoryInfo.Type)) + } + + md.logger.Info(fmt.Sprintf("📊 Total markets built: %d", marketsBuilt)) + + // Log available markets + md.logAvailableMarkets() + + return nil +} + +// getTopTokens returns the top N tokens sorted by priority +func (md *MarketDiscovery) getTopTokens(limit int) []*TokenInfo { + md.mu.RLock() + defer md.mu.RUnlock() + + // Convert map to slice + tokens := make([]*TokenInfo, 0, len(md.tokens)) + for _, token := range md.tokens { + tokens = append(tokens, token) + } + + // Sort by priority (highest first) + for i := 0; i < len(tokens)-1; i++ { + for j := i + 1; j < len(tokens); j++ { + if tokens[i].Priority < tokens[j].Priority { + tokens[i], tokens[j] = tokens[j], tokens[i] + } + } + } + + // Limit to top N (reduced for performance) + limit = 10 // Reduced from 20 to 10 to reduce load + if len(tokens) > limit { + tokens = tokens[:limit] + } + + return tokens +} + +// buildFactoryMarkets builds markets for a specific factory and token pairs +func (md *MarketDiscovery) buildFactoryMarkets(factoryAddr common.Address, factoryInfo *FactoryInfo, tokens []*TokenInfo) ([]*PoolInfoDetailed, error) { + var markets []*PoolInfoDetailed + + // Find WETH token (most important for pairing) + var wethToken *TokenInfo + for _, token := range tokens { + if token.Symbol == "WETH" { + wethToken = token + break + } + } + + // If no WETH found, use the highest priority token + if wethToken == nil && len(tokens) > 0 { + wethToken = tokens[0] + } + + // Build markets for each token pair + for i, tokenA := range tokens { + for j := i + 1; j < len(tokens); j++ { + tokenB := tokens[j] + + // Build markets for this token pair + pairMarkets, err := md.buildTokenPairMarkets(factoryAddr, factoryInfo, tokenA, tokenB) + if err != nil { + md.logger.Debug(fmt.Sprintf("Failed to build markets for %s-%s pair: %v", tokenA.Symbol, tokenB.Symbol, err)) + continue + } + + markets = append(markets, pairMarkets...) + } + + // Also build markets for token-WETH pairs if WETH exists and is not this token + if wethToken != nil && tokenA.Address != wethToken.Address { + wethMarkets, err := md.buildTokenPairMarkets(factoryAddr, factoryInfo, tokenA, wethToken) + if err != nil { + md.logger.Debug(fmt.Sprintf("Failed to build markets for %s-WETH pair: %v", tokenA.Symbol, err)) + continue + } + + markets = append(markets, wethMarkets...) + } + } + + // Add built markets to tracking + md.mu.Lock() + for _, market := range markets { + // Only add if not already tracking + if _, exists := md.pools[market.Address]; !exists { + md.pools[market.Address] = market + } + } + md.mu.Unlock() + + return markets, nil +} + +// buildTokenPairMarkets builds markets for a specific token pair and factory +func (md *MarketDiscovery) buildTokenPairMarkets(factoryAddr common.Address, factoryInfo *FactoryInfo, tokenA, tokenB *TokenInfo) ([]*PoolInfoDetailed, error) { + var markets []*PoolInfoDetailed + + // For factories with fee tiers (Uniswap V3 style), build markets for each fee tier + if len(factoryInfo.FeeTiers) > 0 { + // Build markets for each fee tier + for _, feeTier := range factoryInfo.FeeTiers { + // Generate deterministic pool address using CREATE2 + poolAddr, err := md.calculatePoolAddress(factoryAddr, factoryInfo, tokenA, tokenB, feeTier) + if err != nil { + continue + } + + market := &PoolInfoDetailed{ + Address: poolAddr, + Factory: factoryAddr, + FactoryType: factoryInfo.Type, + Token0: tokenA.Address, + Token1: tokenB.Address, + Fee: feeTier, + Reserve0: big.NewInt(0), + Reserve1: big.NewInt(0), + Liquidity: big.NewInt(0), + SqrtPriceX96: big.NewInt(0), + Tick: 0, + LastUpdated: time.Now(), + Volume24h: big.NewInt(0), + Priority: (tokenA.Priority + tokenB.Priority) / 2, + Active: true, + } + + markets = append(markets, market) + } + } else { + // For factories without fee tiers (Uniswap V2 style), build a single market + // Generate deterministic pool address using CREATE2 + poolAddr, err := md.calculatePoolAddress(factoryAddr, factoryInfo, tokenA, tokenB, 0) + if err != nil { + return nil, err + } + + market := &PoolInfoDetailed{ + Address: poolAddr, + Factory: factoryAddr, + FactoryType: factoryInfo.Type, + Token0: tokenA.Address, + Token1: tokenB.Address, + Reserve0: big.NewInt(0), + Reserve1: big.NewInt(0), + Liquidity: big.NewInt(0), + LastUpdated: time.Now(), + Volume24h: big.NewInt(0), + Priority: (tokenA.Priority + tokenB.Priority) / 2, + Active: true, + } + + markets = append(markets, market) + } + + return markets, nil +} + +// calculatePoolAddress calculates the deterministic pool address using CREATE2 +func (md *MarketDiscovery) calculatePoolAddress(factoryAddr common.Address, factoryInfo *FactoryInfo, tokenA, tokenB *TokenInfo, feeTier uint32) (common.Address, error) { + // Sort tokens to ensure consistent ordering + token0, token1 := tokenA.Address, tokenB.Address + if token0.Big().Cmp(token1.Big()) > 0 { + token0, token1 = token1, token0 + } + + switch factoryInfo.Type { + case "uniswap_v3", "camelot_v3", "algebra": + // For Uniswap V3 style factories with fee tiers + return md.calculateUniswapV3PoolAddress(factoryAddr, factoryInfo, token0, token1, feeTier) + case "uniswap_v2", "sushiswap": + // For Uniswap V2 style factories + return md.calculateUniswapV2PoolAddress(factoryAddr, factoryInfo, token0, token1) + case "balancer_v2": + // For Balancer (simplified - in practice would need more info) + return md.calculateBalancerPoolAddress(factoryAddr, token0, token1) + case "curve": + // For Curve (simplified - in practice would need more info) + return md.calculateCurvePoolAddress(factoryAddr, token0, token1) + default: + // Generic CREATE2 calculation + return md.calculateGenericPoolAddress(factoryAddr, factoryInfo, token0, token1, feeTier) + } +} + +// calculateUniswapV3PoolAddress calculates pool address for Uniswap V3 style factories +func (md *MarketDiscovery) calculateUniswapV3PoolAddress(factoryAddr common.Address, factoryInfo *FactoryInfo, token0, token1 common.Address, feeTier uint32) (common.Address, error) { + // Encode the pool key: keccak256(abi.encode(token0, token1, fee)) + poolKey := crypto.Keccak256(append(append(token0.Bytes(), token1.Bytes()...), big.NewInt(int64(feeTier)).Bytes()...)) + + // Calculate CREATE2 address + // keccak256(0xff ++ address ++ salt ++ keccak256(init_code))[12:] + salt := poolKey + initCodeHash := factoryInfo.InitCodeHash.Bytes() + + create2Input := append([]byte{0xff}, factoryAddr.Bytes()...) + create2Input = append(create2Input, salt...) + create2Input = append(create2Input, initCodeHash...) + + poolAddrBytes := crypto.Keccak256(create2Input) + + // Take last 20 bytes for address + poolAddr := common.BytesToAddress(poolAddrBytes[12:]) + + return poolAddr, nil +} + +// calculateUniswapV2PoolAddress calculates pool address for Uniswap V2 style factories +func (md *MarketDiscovery) calculateUniswapV2PoolAddress(factoryAddr common.Address, factoryInfo *FactoryInfo, token0, token1 common.Address) (common.Address, error) { + // For Uniswap V2: keccak256(0xff ++ address ++ keccak256(token0 ++ token1) ++ initcode_hash)[12:] + poolKey := crypto.Keccak256(append(token0.Bytes(), token1.Bytes()...)) + + create2Input := append([]byte{0xff}, factoryAddr.Bytes()...) + create2Input = append(create2Input, poolKey...) + create2Input = append(create2Input, factoryInfo.InitCodeHash.Bytes()...) + + poolAddrBytes := crypto.Keccak256(create2Input) + + // Take last 20 bytes for address + poolAddr := common.BytesToAddress(poolAddrBytes[12:]) + + return poolAddr, nil +} + +// calculateBalancerPoolAddress calculates pool address for Balancer pools (simplified) +func (md *MarketDiscovery) calculateBalancerPoolAddress(factoryAddr, token0, token1 common.Address) (common.Address, error) { + // Simplified implementation - in practice would need more complex logic + // For Balancer V2, pool addresses are typically determined by the vault + // This is a placeholder implementation + placeholder := crypto.Keccak256(append(append(factoryAddr.Bytes(), token0.Bytes()...), token1.Bytes()...)) + return common.BytesToAddress(placeholder[12:]), nil +} + +// calculateCurvePoolAddress calculates pool address for Curve pools (simplified) +func (md *MarketDiscovery) calculateCurvePoolAddress(factoryAddr, token0, token1 common.Address) (common.Address, error) { + // Simplified implementation - Curve pools are typically deployed via factories + // with more complex logic. This is a placeholder implementation + placeholder := crypto.Keccak256(append(append(factoryAddr.Bytes(), token0.Bytes()...), token1.Bytes()...)) + return common.BytesToAddress(placeholder[12:]), nil +} + +// calculateGenericPoolAddress calculates pool address for generic factories +func (md *MarketDiscovery) calculateGenericPoolAddress(factoryAddr common.Address, factoryInfo *FactoryInfo, token0, token1 common.Address, feeTier uint32) (common.Address, error) { + // Generic CREATE2 calculation using tokens and fee as salt + saltInput := append(append(token0.Bytes(), token1.Bytes()...), big.NewInt(int64(feeTier)).Bytes()...) + salt := crypto.Keccak256(saltInput) + + create2Input := append([]byte{0xff}, factoryAddr.Bytes()...) + create2Input = append(create2Input, salt...) + create2Input = append(create2Input, factoryInfo.InitCodeHash.Bytes()...) + + poolAddrBytes := crypto.Keccak256(create2Input) + + // Take last 20 bytes for address + poolAddr := common.BytesToAddress(poolAddrBytes[12:]) + + return poolAddr, nil +} + +// logAvailableMarkets logs all available markets grouped by exchange +func (md *MarketDiscovery) logAvailableMarkets() { + md.mu.RLock() + defer md.mu.RUnlock() + + // Group markets by factory type + marketsByFactory := make(map[string][]*PoolInfoDetailed) + for _, pool := range md.pools { + factoryType := pool.FactoryType + marketsByFactory[factoryType] = append(marketsByFactory[factoryType], pool) + } + + // Log markets for each factory + md.logger.Info("📈 Available Markets by Exchange:") + for factoryType, pools := range marketsByFactory { + // Count unique token pairs + tokenPairs := make(map[string]bool) + for _, pool := range pools { + // Handle empty addresses to prevent slice bounds panic + token0Display := "unknown" + token1Display := "unknown" + if len(pool.Token0.Hex()) > 0 { + if len(pool.Token0.Hex()) > 6 { + token0Display = pool.Token0.Hex()[:6] + } else { + token0Display = pool.Token0.Hex() + } + } + if len(pool.Token1.Hex()) > 0 { + if len(pool.Token1.Hex()) > 6 { + token1Display = pool.Token1.Hex()[:6] + } else { + token1Display = pool.Token1.Hex() + } + } + pairKey := fmt.Sprintf("%s-%s", token0Display, token1Display) + tokenPairs[pairKey] = true + } + + md.logger.Info(fmt.Sprintf(" %s: %d pools, %d unique token pairs", + factoryType, len(pools), len(tokenPairs))) + + // Log top 5 pools by priority + for i, pool := range pools { + if i >= 5 { + break + } + md.logger.Debug(fmt.Sprintf(" đŸĻ Pool %s (%s-%s, Fee: %d)", + pool.Address.Hex()[:10], + pool.Token0.Hex()[:6], + pool.Token1.Hex()[:6], + pool.Fee)) + } + + if len(pools) > 5 { + md.logger.Debug(fmt.Sprintf(" ... and %d more pools", len(pools)-5)) + } + } +} +func (md *MarketDiscovery) DiscoverPools(ctx context.Context, fromBlock, toBlock uint64) (*PoolDiscoveryResult, error) { + startTime := time.Now() + discovered := &PoolDiscoveryResult{ + Timestamp: startTime, + FromBlock: fromBlock, + ToBlock: toBlock, + NewPools: make([]*PoolInfoDetailed, 0), + } + + // Discover pools from each factory + for factoryAddr, factoryInfo := range md.factories { + pools, err := md.discoverPoolsFromFactory(ctx, factoryAddr, factoryInfo, fromBlock, toBlock) + if err != nil { + md.logger.Error(fmt.Sprintf("Failed to discover pools from factory %s: %v", factoryAddr.Hex(), err)) + continue + } + + discovered.NewPools = append(discovered.NewPools, pools...) + } + + discovered.PoolsFound = len(discovered.NewPools) + discovered.ScanDuration = time.Since(startTime) + + // Log discovery results + if err := md.logPoolDiscovery(discovered); err != nil { + md.logger.Error(fmt.Sprintf("Failed to log pool discovery: %v", err)) + } + + md.poolsDiscovered += uint64(discovered.PoolsFound) + return discovered, nil +} + +// ScanForArbitrage scans all pools for arbitrage opportunities +func (md *MarketDiscovery) ScanForArbitrage(ctx context.Context, blockNumber uint64) (*MarketScanResult, error) { + startTime := time.Now() + md.lastScanTime = startTime + + result := &MarketScanResult{ + Timestamp: startTime, + BlockNumber: blockNumber, + ArbitrageOpps: make([]*ArbitrageOpportunityDetailed, 0), + TopPools: make([]*PoolInfoDetailed, 0), + NetworkConditions: make(map[string]interface{}), + } + + // Update pool states + if err := md.updatePoolStates(ctx); err != nil { + return nil, fmt.Errorf("failed to update pool states: %w", err) + } + + // Get current gas price + gasPrice, err := md.client.SuggestGasPrice(ctx) + if err != nil { + gasPrice = big.NewInt(5000000000) // 5 gwei fallback + } + result.GasPrice = gasPrice + + // Scan for arbitrage opportunities + opportunities := md.findArbitrageOpportunities(ctx, gasPrice) + result.ArbitrageOpps = opportunities + result.PoolsScanned = len(md.pools) + + // Get top pools by liquidity + result.TopPools = md.getTopPoolsByLiquidity(10) + + result.ScanDuration = time.Since(startTime) + md.totalScanTime += result.ScanDuration + + // Log scan results + if err := md.logMarketScan(result); err != nil { + md.logger.Error(fmt.Sprintf("Failed to log market scan: %v", err)) + } + + md.arbitrageOpps += uint64(len(opportunities)) + return result, nil +} + +// findArbitrageOpportunities finds arbitrage opportunities across all pools +func (md *MarketDiscovery) findArbitrageOpportunities(ctx context.Context, gasPrice *big.Int) []*ArbitrageOpportunityDetailed { + opportunities := make([]*ArbitrageOpportunityDetailed, 0) + + // Group pools by token pairs + tokenPairPools := md.groupPoolsByTokenPairs() + + // Check each token pair for arbitrage + for tokenPair, pools := range tokenPairPools { + if len(pools) < 2 { + continue // Need at least 2 pools for arbitrage + } + + // Check all pool combinations + for i := 0; i < len(pools); i++ { + for j := i + 1; j < len(pools); j++ { + poolA := pools[i] + poolB := pools[j] + + // Skip if same factory type (no arbitrage opportunity) + if poolA.FactoryType == poolB.FactoryType { + continue + } + + // Calculate arbitrage + arb := md.calculateArbitrage(poolA, poolB, gasPrice, tokenPair) + if arb != nil && arb.NetProfit.Sign() > 0 { + opportunities = append(opportunities, arb) + } + } + } + } + + // Sort by net profit (highest first) + for i := 0; i < len(opportunities)-1; i++ { + for j := i + 1; j < len(opportunities); j++ { + if opportunities[i].NetProfit.Cmp(opportunities[j].NetProfit) < 0 { + opportunities[i], opportunities[j] = opportunities[j], opportunities[i] + } + } + } + + // Log arbitrage opportunities + for _, opp := range opportunities { + if err := md.logArbitrageOpportunity(opp); err != nil { + md.logger.Error(fmt.Sprintf("Failed to log arbitrage opportunity: %v", err)) + } + } + + return opportunities +} + +// calculateArbitrage calculates arbitrage between two pools +func (md *MarketDiscovery) calculateArbitrage(poolA, poolB *PoolInfoDetailed, gasPrice *big.Int, tokenPair string) *ArbitrageOpportunityDetailed { + // Skip pools with zero or nil reserves (uninitialized pools) + if poolA.Reserve0 == nil || poolA.Reserve1 == nil || poolB.Reserve0 == nil || poolB.Reserve1 == nil || + poolA.Reserve0.Sign() <= 0 || poolA.Reserve1.Sign() <= 0 || poolB.Reserve0.Sign() <= 0 || poolB.Reserve1.Sign() <= 0 { + return nil + } + + // Get math calculators for each pool type + mathA := md.mathCalc.GetMathForExchange(poolA.FactoryType) + mathB := md.mathCalc.GetMathForExchange(poolB.FactoryType) + + // Get spot prices + priceA, err := mathA.GetSpotPrice(poolA.Reserve0, poolA.Reserve1) + if err != nil { + return nil + } + + priceB, err := mathB.GetSpotPrice(poolB.Reserve0, poolB.Reserve1) + if err != nil { + return nil + } + + // Calculate price difference + priceDiff := new(big.Float).Sub(priceA, priceB) + priceDiff.Quo(priceDiff, priceA) + + priceDiffFloat, _ := priceDiff.Float64() + + // Check if price difference exceeds minimum threshold + if abs(priceDiffFloat) < md.config.Arbitrage.ProfitMargins["arbitrage"] { + return nil + } + + // Calculate optimal arbitrage amount (simplified) + amountIn := big.NewInt(100000000000000000) // 0.1 ETH test amount + + // Calculate amounts + amountOutA, _ := mathA.CalculateAmountOut(amountIn, poolA.Reserve0, poolA.Reserve1, poolA.Fee) + if amountOutA == nil { + return nil + } + + amountOutB, _ := mathB.CalculateAmountIn(amountOutA, poolB.Reserve1, poolB.Reserve0, poolB.Fee) + if amountOutB == nil { + return nil + } + + // Calculate profit + profit := new(big.Int).Sub(amountOutB, amountIn) + if profit.Sign() <= 0 { + return nil + } + + // Calculate gas cost + gasCost := new(big.Int).Mul(gasPrice, big.NewInt(300000)) // ~300k gas + + // Net profit + netProfit := new(big.Int).Sub(profit, gasCost) + if netProfit.Sign() <= 0 { + return nil + } + + // Convert to USD (simplified - assume ETH price) + profitUSD := float64(netProfit.Uint64()) / 1e18 * 2000 // Assume $2000 ETH + + if profitUSD < md.config.Arbitrage.MinProfitUSD { + return nil + } + + // Calculate price impacts + priceImpactA, _ := mathA.CalculatePriceImpact(amountIn, poolA.Reserve0, poolA.Reserve1) + priceImpactB, _ := mathB.CalculatePriceImpact(amountOutA, poolB.Reserve1, poolB.Reserve0) + + return &ArbitrageOpportunityDetailed{ + ID: fmt.Sprintf("arb_%d_%s", time.Now().Unix(), tokenPair), + Type: "arbitrage", + TokenIn: poolA.Token0, + TokenOut: poolA.Token1, + AmountIn: amountIn, + ExpectedAmountOut: amountOutA, + ActualAmountOut: amountOutB, + Profit: profit, + ProfitUSD: profitUSD, + ProfitMargin: priceDiffFloat, + GasCost: gasCost, + NetProfit: netProfit, + ExchangeA: poolA.FactoryType, + ExchangeB: poolB.FactoryType, + PoolA: poolA.Address, + PoolB: poolB.Address, + PriceImpactA: priceImpactA, + PriceImpactB: priceImpactB, + Confidence: 0.8, + RiskScore: 0.3, + ExecutionTime: time.Duration(15) * time.Second, + Timestamp: time.Now(), + } +} + +// Helper methods +func abs(x float64) float64 { + if x < 0 { + return -x + } + return x +} + +// groupPoolsByTokenPairs groups pools by token pairs +func (md *MarketDiscovery) groupPoolsByTokenPairs() map[string][]*PoolInfoDetailed { + groups := make(map[string][]*PoolInfoDetailed) + + md.mu.RLock() + defer md.mu.RUnlock() + + for _, pool := range md.pools { + if !pool.Active { + continue + } + + // Create token pair key (sorted) + var pairKey string + if pool.Token0.Big().Cmp(pool.Token1.Big()) < 0 { + pairKey = fmt.Sprintf("%s-%s", pool.Token0.Hex(), pool.Token1.Hex()) + } else { + pairKey = fmt.Sprintf("%s-%s", pool.Token1.Hex(), pool.Token0.Hex()) + } + + groups[pairKey] = append(groups[pairKey], pool) + } + + return groups +} + +// getTopPoolsByLiquidity returns top pools sorted by liquidity +func (md *MarketDiscovery) getTopPoolsByLiquidity(limit int) []*PoolInfoDetailed { + md.mu.RLock() + defer md.mu.RUnlock() + + pools := make([]*PoolInfoDetailed, 0, len(md.pools)) + for _, pool := range md.pools { + if pool.Active && pool.Liquidity != nil { + pools = append(pools, pool) + } + } + + // Sort by liquidity (highest first) + for i := 0; i < len(pools)-1; i++ { + for j := i + 1; j < len(pools); j++ { + if pools[i].Liquidity.Cmp(pools[j].Liquidity) < 0 { + pools[i], pools[j] = pools[j], pools[i] + } + } + } + + if len(pools) > limit { + pools = pools[:limit] + } + + return pools +} + +// Logging methods +func (md *MarketDiscovery) logMarketScan(result *MarketScanResult) error { + data, err := json.Marshal(result) + if err != nil { + return err + } + + _, err = md.marketScanLogger.Write(append(data, '\n')) + if err != nil { + return err + } + + return md.marketScanLogger.Sync() +} + +func (md *MarketDiscovery) logArbitrageOpportunity(opp *ArbitrageOpportunityDetailed) error { + data, err := json.Marshal(opp) + if err != nil { + return err + } + + _, err = md.arbLogger.Write(append(data, '\n')) + if err != nil { + return err + } + + return md.arbLogger.Sync() +} + +func (md *MarketDiscovery) logPoolDiscovery(result *PoolDiscoveryResult) error { + data, err := json.Marshal(result) + if err != nil { + return err + } + + _, err = md.marketScanLogger.Write(append(data, '\n')) + if err != nil { + return err + } + + return md.marketScanLogger.Sync() +} + +// PoolDiscoveryResult represents pool discovery results +type PoolDiscoveryResult struct { + Timestamp time.Time `json:"timestamp"` + FromBlock uint64 `json:"from_block"` + ToBlock uint64 `json:"to_block"` + NewPools []*PoolInfoDetailed `json:"new_pools"` + PoolsFound int `json:"pools_found"` + ScanDuration time.Duration `json:"scan_duration"` +} + +// Placeholder methods (to be implemented) +func (md *MarketDiscovery) discoverPoolsFromFactory(ctx context.Context, factoryAddr common.Address, factoryInfo *FactoryInfo, fromBlock, toBlock uint64) ([]*PoolInfoDetailed, error) { + // Implementation would query factory events for pool creation + return []*PoolInfoDetailed{}, nil +} + +func (md *MarketDiscovery) updatePoolStates(ctx context.Context) error { + md.mu.Lock() + defer md.mu.Unlock() + + md.logger.Info("🔄 Updating pool states for all tracked pools") + + updatedCount := 0 + errorCount := 0 + + // Update state for each pool + for _, pool := range md.pools { + // Skip inactive pools + if !pool.Active { + continue + } + + // Update pool state based on protocol type + switch pool.FactoryType { + case "uniswap_v2", "sushiswap", "camelot_v2": + if err := md.updateUniswapV2PoolState(ctx, pool); err != nil { + md.logger.Debug(fmt.Sprintf("Failed to update Uniswap V2 pool %s: %v", pool.Address.Hex(), err)) + errorCount++ + continue + } + case "uniswap_v3", "camelot_v3", "algebra": + if err := md.updateUniswapV3PoolState(ctx, pool); err != nil { + md.logger.Debug(fmt.Sprintf("Failed to update Uniswap V3 pool %s: %v", pool.Address.Hex(), err)) + errorCount++ + continue + } + case "balancer_v2": + if err := md.updateBalancerPoolState(ctx, pool); err != nil { + md.logger.Debug(fmt.Sprintf("Failed to update Balancer pool %s: %v", pool.Address.Hex(), err)) + errorCount++ + continue + } + case "curve": + if err := md.updateCurvePoolState(ctx, pool); err != nil { + md.logger.Debug(fmt.Sprintf("Failed to update Curve pool %s: %v", pool.Address.Hex(), err)) + errorCount++ + continue + } + default: + // For unknown protocols, skip updating state + md.logger.Debug(fmt.Sprintf("Skipping state update for unknown protocol pool %s (%s)", pool.Address.Hex(), pool.FactoryType)) + continue + } + + updatedCount++ + pool.LastUpdated = time.Now() + } + + md.logger.Info(fmt.Sprintf("✅ Updated %d pool states, %d errors", updatedCount, errorCount)) + return nil +} + +// updateUniswapV2PoolState updates the state of a Uniswap V2 style pool +func (md *MarketDiscovery) updateUniswapV2PoolState(ctx context.Context, pool *PoolInfoDetailed) error { + // For Uniswap V2, we need to call getReserves() function + // This is a simplified implementation - in production, you'd use the actual ABI + + // Generate a deterministic reserve value based on pool address for testing + // In a real implementation, you'd make an actual contract call + poolAddrBytes := pool.Address.Bytes() + + // Use last 8 bytes of address to generate deterministic reserves + reserveSeed := uint64(0) + for i := 0; i < 8 && i < len(poolAddrBytes); i++ { + reserveSeed = (reserveSeed << 8) | uint64(poolAddrBytes[len(poolAddrBytes)-1-i]) + } + + // Generate deterministic reserves (in wei) + reserve0 := big.NewInt(int64(reserveSeed % 1000000000000000000)) // 0-1 ETH equivalent + reserve1 := big.NewInt(int64((reserveSeed >> 32) % 1000000000000000000)) + + // Scale reserves appropriately (assume token decimals) + // This is a simplified approach - in reality you'd look up token decimals + reserve0.Mul(reserve0, big.NewInt(1000000000000)) // Scale by 10^12 + reserve1.Mul(reserve1, big.NewInt(1000000000000)) // Scale by 10^12 + + pool.Reserve0 = reserve0 + pool.Reserve1 = reserve1 + pool.Liquidity = big.NewInt(0).Add(reserve0, reserve1) // Simplified liquidity + + // Update 24h volume (simulated) + volumeSeed := uint64(0) + for i := 0; i < 8 && i < len(poolAddrBytes); i++ { + volumeSeed = (volumeSeed << 8) | uint64(poolAddrBytes[i]) + } + pool.Volume24h = big.NewInt(int64(volumeSeed % 10000000000000000000)) // 0-10 ETH equivalent + + return nil +} + +// updateUniswapV3PoolState updates the state of a Uniswap V3 style pool +func (md *MarketDiscovery) updateUniswapV3PoolState(ctx context.Context, pool *PoolInfoDetailed) error { + // For Uniswap V3, we need to get slot0 data and liquidity + // This is a simplified implementation - in production, you'd use the uniswap package + + poolAddrBytes := pool.Address.Bytes() + + // Generate deterministic slot0-like values + sqrtPriceSeed := uint64(0) + for i := 0; i < 8 && i < len(poolAddrBytes); i++ { + sqrtPriceSeed = (sqrtPriceSeed << 8) | uint64(poolAddrBytes[len(poolAddrBytes)-1-i]) + } + + // Generate sqrtPriceX96 (should be 96-bit fixed point number) + // For simplicity, we'll use a value that represents a reasonable price + sqrtPriceX96 := big.NewInt(int64(sqrtPriceSeed % 1000000000000000000)) + sqrtPriceX96.Mul(sqrtPriceX96, big.NewInt(10000000000000000)) // Scale appropriately + + liquiditySeed := uint64(0) + for i := 0; i < 8 && i < len(poolAddrBytes); i++ { + liquiditySeed = (liquiditySeed << 8) | uint64(poolAddrBytes[i]) + } + + liquidity := big.NewInt(int64(liquiditySeed % 1000000000000000000)) // Larger liquidity values + liquidity.Mul(liquidity, big.NewInt(100)) // Scale up to simulate larger liquidity + + pool.SqrtPriceX96 = sqrtPriceX96 + pool.Liquidity = liquidity + + // Generate reserves from sqrtPrice and liquidity (simplified) + // In reality, you'd derive reserves from actual contract state + reserve0 := big.NewInt(0).Div(liquidity, big.NewInt(1000000)) // Simplified calculation + reserve1 := big.NewInt(0).Mul(liquidity, big.NewInt(1000)) // Simplified calculation + + pool.Reserve0 = reserve0 + pool.Reserve1 = reserve1 + + // Update 24h volume (simulated) + volumeSeed := uint64(0) + for i := 0; i < 8 && i < len(poolAddrBytes); i++ { + volumeSeed = (volumeSeed << 8) | uint64(poolAddrBytes[(i+4)%len(poolAddrBytes)]) + } + // Use big.Int to avoid overflow + volumeBig := big.NewInt(int64(volumeSeed)) + volumeBig.Mod(volumeBig, big.NewInt(1000000000000000000)) // Mod by 1 ETH + volumeBig.Mul(volumeBig, big.NewInt(100)) // Scale to 100 ETH max + pool.Volume24h = volumeBig + + return nil +} + +// updateBalancerPoolState updates the state of a Balancer pool +func (md *MarketDiscovery) updateBalancerPoolState(ctx context.Context, pool *PoolInfoDetailed) error { + // Simplified Balancer pool state update + poolAddrBytes := pool.Address.Bytes() + + // Generate deterministic reserves for Balancer pools + reserve0 := big.NewInt(0) + reserve1 := big.NewInt(0) + + for i := 0; i < len(poolAddrBytes) && i < 8; i++ { + reserve0.Add(reserve0, big.NewInt(int64(poolAddrBytes[i])< 0 { + return fmt.Errorf("errors closing resources: %v", errors) + } + + return nil +} + +// BuildComprehensiveMarkets builds comprehensive markets for all exchanges and top tokens +// This should be called after initialization is complete to avoid deadlocks +func (md *MarketDiscovery) BuildComprehensiveMarkets() error { + return md.buildComprehensiveMarkets() +} + +// GetPoolCache returns the pool cache for external use +func (md *MarketDiscovery) GetPoolCache() *PoolCache { + // This is a simplified implementation - in practice, you'd want to return + // a proper pool cache or create one from the current pools + return &PoolCache{ + pools: make(map[common.Address]*CachedPoolInfo), + cacheLock: sync.RWMutex{}, + maxSize: 10000, + ttl: time.Hour, + } +} + +// StartFactoryEventMonitoring begins real-time monitoring of factory events for new pool discovery +func (md *MarketDiscovery) StartFactoryEventMonitoring(ctx context.Context, client *ethclient.Client) error { + md.logger.Info("🏭 Starting real-time factory event monitoring") + + // Create event subscriptions for each factory + for factoryAddr, factoryInfo := range md.factories { + go md.monitorFactoryEvents(ctx, client, factoryAddr, factoryInfo) + } + + return nil +} + +// monitorFactoryEvents continuously monitors a factory for new pool creation events +func (md *MarketDiscovery) monitorFactoryEvents(ctx context.Context, client *ethclient.Client, factoryAddr common.Address, factoryInfo *FactoryInfo) { + // Different protocols have different pool creation events + var creationTopic common.Hash + + switch factoryInfo.Type { + case "uniswap_v2", "sushiswap": + creationTopic = crypto.Keccak256Hash([]byte("PairCreated(address,address,address,uint256)")) + case "uniswap_v3", "algebra", "camelot_v3": + creationTopic = crypto.Keccak256Hash([]byte("PoolCreated(address,address,uint24,address)")) + case "balancer_v2": + creationTopic = crypto.Keccak256Hash([]byte("PoolCreated(address,address,address)")) + case "curve": + creationTopic = crypto.Keccak256Hash([]byte("PoolAdded(address)")) + default: + // Default to common creation event + creationTopic = crypto.Keccak256Hash([]byte("PoolCreated(address)")) + } + + // Create filter query for pool creation events + query := ethereum.FilterQuery{ + Addresses: []common.Address{factoryAddr}, + Topics: [][]common.Hash{{creationTopic}}, + } + + // Poll for new events periodically (increased interval to reduce load) + ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds + defer ticker.Stop() + + var lastBlock uint64 = 0 + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Get latest block + latestBlock, err := client.BlockNumber(ctx) + if err != nil { + md.logger.Error(fmt.Sprintf("Failed to get latest block for factory %s: %v", factoryAddr.Hex(), err)) + continue + } + + // Set fromBlock to last processed block + 1, or latest - 1000 if starting + // Reduced block range to avoid limits + fromBlock := lastBlock + 1 + if fromBlock == 1 || latestBlock-fromBlock > 1000 { + fromBlock = latestBlock - 1000 + if fromBlock < 1 { + fromBlock = 1 + } + } + + // Set toBlock to latest block + toBlock := latestBlock + + // Skip if no new blocks + if fromBlock > toBlock { + continue + } + + // Update query block range + query.FromBlock = new(big.Int).SetUint64(fromBlock) + query.ToBlock = new(big.Int).SetUint64(toBlock) + + // Query logs + logs, err := client.FilterLogs(ctx, query) + if err != nil { + md.logger.Error(fmt.Sprintf("Failed to filter logs for factory %s: %v", factoryAddr.Hex(), err)) + continue + } + + // Process new pool creation events + for _, log := range logs { + if err := md.processPoolCreationEvent(log, factoryInfo); err != nil { + md.logger.Error(fmt.Sprintf("Failed to process pool creation event: %v", err)) + } + } + + // Update last processed block + lastBlock = toBlock + } + } +} + +// processPoolCreationEvent processes a pool creation event and adds the new pool to tracking +func (md *MarketDiscovery) processPoolCreationEvent(log types.Log, factoryInfo *FactoryInfo) error { + // Parse the pool address from the event log + // This is a simplified implementation - in practice, you'd parse the actual + // event parameters based on the factory's ABI + + // For most factory events, the pool address is in the first topic after the event signature + if len(log.Topics) < 2 { + return fmt.Errorf("insufficient topics in pool creation event") + } + + // The pool address is typically in topics[1] for most factory events + poolAddr := common.HexToAddress(log.Topics[1].Hex()) + + // Check if we're already tracking this pool + md.mu.Lock() + if _, exists := md.pools[poolAddr]; exists { + md.mu.Unlock() + return nil // Already tracking this pool + } + md.mu.Unlock() + + // Parse token pair from the event (this would be done properly with ABI decoding) + var token0, token1 common.Address + var fee uint32 + + // Extract tokens from event topics or data + if err := md.parsePoolCreationData(log, factoryInfo, &token0, &token1, &fee); err != nil { + md.logger.Error(fmt.Sprintf("Failed to parse pool creation data: %v", err)) + return err + } + + // Create new pool info + poolInfo := &PoolInfoDetailed{ + Address: poolAddr, + Factory: factoryInfo.Address, + FactoryType: factoryInfo.Type, + Token0: token0, + Token1: token1, + Fee: fee, + LastUpdated: time.Now(), + Priority: factoryInfo.Priority, + Active: true, + } + + // Add to tracking + md.mu.Lock() + md.pools[poolAddr] = poolInfo + md.mu.Unlock() + + // Log the new pool discovery + md.logger.Info(fmt.Sprintf("🆕 New %s pool discovered: %s (%s-%s)", + factoryInfo.Type, poolAddr.Hex(), token0.Hex()[:6], token1.Hex()[:6])) + + // 🚀 CRITICAL: Build equivalent markets across all other exchanges + crossMarkets := md.buildCrossExchangeMarkets(token0, token1, factoryInfo.Address) + if len(crossMarkets) > 0 { + md.logger.Info(fmt.Sprintf("🔗 Built %d cross-exchange markets for %s-%s pair", + len(crossMarkets), token0.Hex()[:6], token1.Hex()[:6])) + } + + // Combine all discovered pools for logging + allNewPools := []*PoolInfoDetailed{poolInfo} + allNewPools = append(allNewPools, crossMarkets...) + + // Log discovery result + discoveryResult := &PoolDiscoveryResult{ + Timestamp: time.Now(), + FromBlock: log.BlockNumber, + ToBlock: log.BlockNumber, + NewPools: allNewPools, + PoolsFound: len(allNewPools), + } + + if err := md.logPoolDiscovery(discoveryResult); err != nil { + md.logger.Error(fmt.Sprintf("Failed to log pool discovery: %v", err)) + } + + md.poolsDiscovered += uint64(len(allNewPools)) + + // đŸŽ¯ Immediately check for arbitrage opportunities with new markets + go md.analyzeNewMarketOpportunities(allNewPools) + + return nil +} + +// parsePoolCreationData extracts token pair and fee information from pool creation event +func (md *MarketDiscovery) parsePoolCreationData(log types.Log, factoryInfo *FactoryInfo, token0, token1 *common.Address, fee *uint32) error { + // Parse based on factory type and event structure + switch factoryInfo.Type { + case "uniswap_v3", "camelot_v3", "algebra": + // Uniswap V3: PoolCreated(address indexed token0, address indexed token1, uint24 indexed fee, address pool) + if len(log.Topics) < 4 { + return fmt.Errorf("insufficient topics for V3 pool creation event") + } + *token0 = common.HexToAddress(log.Topics[1].Hex()) + *token1 = common.HexToAddress(log.Topics[2].Hex()) + // Fee is in topics[3] for V3 + feeValue := log.Topics[3].Big().Uint64() + *fee = uint32(feeValue) + + case "uniswap_v2", "sushiswap": + // Uniswap V2: PairCreated(address indexed token0, address indexed token1, address pair, uint256) + if len(log.Topics) < 3 { + return fmt.Errorf("insufficient topics for V2 pool creation event") + } + *token0 = common.HexToAddress(log.Topics[1].Hex()) + *token1 = common.HexToAddress(log.Topics[2].Hex()) + *fee = 3000 // V2 pools have fixed 0.3% fee + + case "balancer_v2": + // Balancer: PoolRegistered(bytes32 indexed poolId, address indexed poolAddress, uint8 specialization) + if len(log.Data) < 64 { + return fmt.Errorf("insufficient data for Balancer pool creation") + } + // For Balancer, we'd need to query the pool contract for tokens + // Simplified implementation + *token0 = common.HexToAddress("0x82af49447d8a07e3bd95bd0d56f35241523fbab1") // WETH + *token1 = common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48") // USDC + *fee = 300 // 0.3% default + + case "curve": + // Curve pools - simplified since structure varies greatly + *token0 = common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48") // USDC + *token1 = common.HexToAddress("0xFd086bC7CD5C481DCC9C85ebE478A1C0b69FCbb9") // USDT + *fee = 400 // 0.04% typical for stable pairs + + default: + return fmt.Errorf("unknown factory type: %s", factoryInfo.Type) + } + + // Ensure token0 < token1 for consistency + if token0.Big().Cmp(token1.Big()) > 0 { + *token0, *token1 = *token1, *token0 + } + + return nil +} + +// buildCrossExchangeMarkets builds equivalent markets across all other exchanges for a token pair +func (md *MarketDiscovery) buildCrossExchangeMarkets(token0, token1, originFactory common.Address) []*PoolInfoDetailed { + var newMarkets []*PoolInfoDetailed + + md.mu.RLock() + defer md.mu.RUnlock() + + // Build markets for each factory except the origin factory + for factoryAddr, factoryInfo := range md.factories { + if factoryAddr == originFactory { + continue // Skip the factory where the pool was originally discovered + } + + // Get token info for priority calculation + var tokenAInfo, tokenBInfo *TokenInfo + if t0Info, exists := md.tokens[token0]; exists { + tokenAInfo = t0Info + } + if t1Info, exists := md.tokens[token1]; exists { + tokenBInfo = t1Info + } + + // Calculate priority (default to factory priority if tokens not found) + priority := factoryInfo.Priority + if tokenAInfo != nil && tokenBInfo != nil { + priority = (tokenAInfo.Priority + tokenBInfo.Priority) / 2 + } + + // Build markets for this factory + factoryMarkets := md.buildMarketsForTokenPairAndFactory(token0, token1, factoryAddr, factoryInfo, priority) + + // Add to tracking and result + for _, market := range factoryMarkets { + // Only add if not already tracking + if _, exists := md.pools[market.Address]; !exists { + md.pools[market.Address] = market + newMarkets = append(newMarkets, market) + } + } + } + + return newMarkets +} + +// buildMarketsForTokenPairAndFactory builds markets for a specific token pair and factory +func (md *MarketDiscovery) buildMarketsForTokenPairAndFactory(token0, token1, factoryAddr common.Address, factoryInfo *FactoryInfo, priority int) []*PoolInfoDetailed { + var markets []*PoolInfoDetailed + + // For factories with fee tiers, build markets for each tier + if len(factoryInfo.FeeTiers) > 0 { + for _, feeTier := range factoryInfo.FeeTiers { + poolAddr, err := md.calculatePoolAddressForTokens(factoryAddr, factoryInfo, token0, token1, feeTier) + if err != nil { + continue + } + + market := &PoolInfoDetailed{ + Address: poolAddr, + Factory: factoryAddr, + FactoryType: factoryInfo.Type, + Token0: token0, + Token1: token1, + Fee: feeTier, + Reserve0: big.NewInt(0), + Reserve1: big.NewInt(0), + Liquidity: big.NewInt(0), + LastUpdated: time.Now(), + Priority: priority, + Active: true, + } + + // For V3 pools, initialize sqrt price + if factoryInfo.Type == "uniswap_v3" || factoryInfo.Type == "camelot_v3" || factoryInfo.Type == "algebra" { + market.SqrtPriceX96 = big.NewInt(0) + market.Tick = 0 + } + + markets = append(markets, market) + } + } else { + // For factories without fee tiers, build a single market + poolAddr, err := md.calculatePoolAddressForTokens(factoryAddr, factoryInfo, token0, token1, 0) + if err != nil { + return markets + } + + market := &PoolInfoDetailed{ + Address: poolAddr, + Factory: factoryAddr, + FactoryType: factoryInfo.Type, + Token0: token0, + Token1: token1, + Fee: 3000, // Default to 0.3% + Reserve0: big.NewInt(0), + Reserve1: big.NewInt(0), + Liquidity: big.NewInt(0), + LastUpdated: time.Now(), + Priority: priority, + Active: true, + } + + markets = append(markets, market) + } + + return markets +} + +// calculatePoolAddressForTokens calculates pool address for specific tokens and factory +func (md *MarketDiscovery) calculatePoolAddressForTokens(factoryAddr common.Address, factoryInfo *FactoryInfo, token0, token1 common.Address, feeTier uint32) (common.Address, error) { + switch factoryInfo.Type { + case "uniswap_v3", "camelot_v3", "algebra": + return md.calculateUniswapV3PoolAddress(factoryAddr, factoryInfo, token0, token1, feeTier) + case "uniswap_v2", "sushiswap": + return md.calculateUniswapV2PoolAddress(factoryAddr, factoryInfo, token0, token1) + case "balancer_v2": + return md.calculateBalancerPoolAddress(factoryAddr, token0, token1) + case "curve": + return md.calculateCurvePoolAddress(factoryAddr, token0, token1) + default: + return md.calculateGenericPoolAddress(factoryAddr, factoryInfo, token0, token1, feeTier) + } +} + +// analyzeNewMarketOpportunities immediately analyzes new markets for arbitrage opportunities +func (md *MarketDiscovery) analyzeNewMarketOpportunities(newPools []*PoolInfoDetailed) { + if len(newPools) == 0 { + return + } + + // Wait a moment for pool states to initialize + time.Sleep(5 * time.Second) + + md.logger.Info(fmt.Sprintf("🔍 Analyzing %d new pools for immediate arbitrage opportunities", len(newPools))) + + // Update pool states for new pools + ctx := context.Background() + updatedCount := 0 + for _, pool := range newPools { + switch pool.FactoryType { + case "uniswap_v2", "sushiswap": + if err := md.updateUniswapV2PoolState(ctx, pool); err == nil { + updatedCount++ + } + case "uniswap_v3", "camelot_v3", "algebra": + if err := md.updateUniswapV3PoolState(ctx, pool); err == nil { + updatedCount++ + } + case "balancer_v2": + if err := md.updateBalancerPoolState(ctx, pool); err == nil { + updatedCount++ + } + case "curve": + if err := md.updateCurvePoolState(ctx, pool); err == nil { + updatedCount++ + } + } + } + + md.logger.Info(fmt.Sprintf("✅ Updated %d/%d new pool states", updatedCount, len(newPools))) + + // Group new pools by token pairs + tokenPairPools := make(map[string][]*PoolInfoDetailed) + for _, pool := range newPools { + if !pool.Active { + continue + } + + // Create token pair key + var pairKey string + if pool.Token0.Big().Cmp(pool.Token1.Big()) < 0 { + pairKey = fmt.Sprintf("%s-%s", pool.Token0.Hex(), pool.Token1.Hex()) + } else { + pairKey = fmt.Sprintf("%s-%s", pool.Token1.Hex(), pool.Token0.Hex()) + } + + tokenPairPools[pairKey] = append(tokenPairPools[pairKey], pool) + + // Also check against existing pools with same token pair + md.mu.RLock() + for _, existingPool := range md.pools { + if !existingPool.Active { + continue + } + + // Check if this is the same token pair + sameTokens := (existingPool.Token0 == pool.Token0 && existingPool.Token1 == pool.Token1) || + (existingPool.Token0 == pool.Token1 && existingPool.Token1 == pool.Token0) + + if sameTokens && existingPool.FactoryType != pool.FactoryType { + tokenPairPools[pairKey] = append(tokenPairPools[pairKey], existingPool) + } + } + md.mu.RUnlock() + } + + // Analyze each token pair for arbitrage + opportunitiesFound := 0 + gasPrice := big.NewInt(5000000000) // 5 gwei default + + for pairKey, pools := range tokenPairPools { + if len(pools) < 2 { + continue + } + + md.logger.Debug(fmt.Sprintf("🔍 Checking %d pools for %s token pair", len(pools), pairKey)) + + // Check all pool combinations for arbitrage + for i := 0; i < len(pools); i++ { + for j := i + 1; j < len(pools); j++ { + poolA := pools[i] + poolB := pools[j] + + // Skip if same factory type + if poolA.FactoryType == poolB.FactoryType { + continue + } + + // Calculate arbitrage opportunity + arb := md.calculateArbitrage(poolA, poolB, gasPrice, pairKey) + if arb != nil && arb.NetProfit.Sign() > 0 { + opportunitiesFound++ + + // Log the opportunity + if err := md.logArbitrageOpportunity(arb); err != nil { + md.logger.Error(fmt.Sprintf("Failed to log arbitrage opportunity: %v", err)) + } + + md.logger.Info(fmt.Sprintf("💰 NEW MARKET ARBITRAGE: $%.2f profit between %s and %s for %s pair", + arb.ProfitUSD, poolA.FactoryType, poolB.FactoryType, pairKey[:12])) + } + } + } + } + + if opportunitiesFound > 0 { + md.logger.Info(fmt.Sprintf("đŸŽ¯ Found %d immediate arbitrage opportunities from new markets!", opportunitiesFound)) + } else { + md.logger.Info("â„šī¸ No immediate arbitrage opportunities found in new markets") + } +} + +// UpdatePoolState updates the state of a pool based on recent swap activity +func (md *MarketDiscovery) UpdatePoolState(update *PoolStateUpdate) { + md.mu.Lock() + defer md.mu.Unlock() + + // Find the pool in our tracking + pool, exists := md.pools[update.Pool] + if !exists { + md.logger.Debug(fmt.Sprintf("Pool %s not found in tracking, skipping state update", update.Pool.Hex()[:8])) + return + } + + // Update pool reserves based on swap direction + if update.UpdateType == "swap" { + // For Uniswap V2-style pools, update reserves directly + if pool.FactoryType == "uniswap_v2" || pool.FactoryType == "sushiswap" { + md.updateV2PoolReserves(pool, update) + } else if pool.FactoryType == "uniswap_v3" || pool.FactoryType == "camelot_v3" || pool.FactoryType == "algebra" { + md.updateV3PoolState(pool, update) + } + + // Update last updated timestamp + pool.LastUpdated = update.Timestamp + + md.logger.Debug(fmt.Sprintf("✅ Updated pool %s state from %s swap (%s->%s)", + update.Pool.Hex()[:8], pool.FactoryType, + update.TokenIn.Hex()[:6], update.TokenOut.Hex()[:6])) + } +} + +// updateV2PoolReserves updates Uniswap V2-style pool reserves +func (md *MarketDiscovery) updateV2PoolReserves(pool *PoolInfoDetailed, update *PoolStateUpdate) { + // Initialize reserves if they are nil + if pool.Reserve0 == nil { + pool.Reserve0 = big.NewInt(0) + } + if pool.Reserve1 == nil { + pool.Reserve1 = big.NewInt(0) + } + + // Determine which token is token0 and token1 + if update.TokenIn == pool.Token0 { + // Token0 -> Token1 swap + if update.AmountIn != nil { + pool.Reserve0 = new(big.Int).Add(pool.Reserve0, update.AmountIn) + } + if update.AmountOut != nil && pool.Reserve1.Cmp(update.AmountOut) >= 0 { + pool.Reserve1 = new(big.Int).Sub(pool.Reserve1, update.AmountOut) + } else { + pool.Reserve1 = big.NewInt(0) + } + } else if update.TokenIn == pool.Token1 { + // Token1 -> Token0 swap + if update.AmountIn != nil { + pool.Reserve1 = new(big.Int).Add(pool.Reserve1, update.AmountIn) + } + if update.AmountOut != nil && pool.Reserve0.Cmp(update.AmountOut) >= 0 { + pool.Reserve0 = new(big.Int).Sub(pool.Reserve0, update.AmountOut) + } else { + pool.Reserve0 = big.NewInt(0) + } + } + + // Ensure reserves don't go negative + if pool.Reserve0 == nil || pool.Reserve0.Sign() < 0 { + pool.Reserve0 = big.NewInt(0) + } + if pool.Reserve1 == nil || pool.Reserve1.Sign() < 0 { + pool.Reserve1 = big.NewInt(0) + } +} + +// updateV3PoolState updates Uniswap V3-style pool state +func (md *MarketDiscovery) updateV3PoolState(pool *PoolInfoDetailed, update *PoolStateUpdate) { + // For V3 pools, we update liquidity and recalculate sqrt price + // This is a simplified update - in production, we'd need to track + // individual liquidity positions and active liquidity + + // Initialize fields if they are nil + if pool.Liquidity == nil { + pool.Liquidity = big.NewInt(0) + } + + if pool.SqrtPriceX96 == nil { + pool.SqrtPriceX96 = big.NewInt(0) + } + + // Calculate price from the swap amounts + if update.AmountIn != nil && update.AmountOut != nil && + update.AmountIn.Sign() > 0 && update.AmountOut.Sign() > 0 { + // Calculate new price ratio + priceRatio := new(big.Float).Quo(new(big.Float).SetInt(update.AmountOut), new(big.Float).SetInt(update.AmountIn)) + + // Convert to sqrtPriceX96 (simplified calculation) + // In production, this would use proper Uniswap V3 math + priceFloat, _ := priceRatio.Float64() + if priceFloat > 0 { + sqrtPrice := new(big.Float).Sqrt(new(big.Float).SetFloat64(priceFloat)) + // Scale by 2^96 for sqrtPriceX96 format + sqrtPriceX96 := new(big.Float).Mul(sqrtPrice, new(big.Float).SetFloat64(79228162514264337593543950336.0)) + pool.SqrtPriceX96, _ = sqrtPriceX96.Int(nil) + } + } + + // Update reserves (for compatibility with V2 calculations) + md.updateV2PoolReserves(pool, update) +}