package pools import ( "context" "encoding/hex" "encoding/json" "fmt" "math" "math/big" "os" "strings" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/security" "github.com/fraktal/mev-beta/pkg/uniswap" ) // Pool represents a discovered liquidity pool type Pool struct { Address string `json:"address"` Token0 string `json:"token0"` Token1 string `json:"token1"` Fee uint32 `json:"fee"` Protocol string `json:"protocol"` Factory string `json:"factory"` Reserves0 *big.Int `json:"reserves0,omitempty"` Reserves1 *big.Int `json:"reserves1,omitempty"` Liquidity *big.Int `json:"liquidity,omitempty"` LastUpdated time.Time `json:"lastUpdated"` TotalVolume *big.Int `json:"totalVolume"` SwapCount uint64 `json:"swapCount"` CreatedAt time.Time `json:"createdAt"` BlockNumber uint64 `json:"blockNumber"` } // Exchange represents a discovered exchange/DEX type Exchange struct { Name string `json:"name"` Router string `json:"router"` Factory string `json:"factory"` Protocol string `json:"protocol"` Version string `json:"version"` Discovered time.Time `json:"discovered"` PoolCount int `json:"poolCount"` TotalVolume *big.Int `json:"totalVolume"` } // LiquidityEvent represents a liquidity change event type LiquidityEvent struct { TxHash string `json:"txHash"` Pool string `json:"pool"` Type string `json:"type"` // "mint", "burn", "sync" Amount0 *big.Int `json:"amount0"` Amount1 *big.Int `json:"amount1"` Liquidity *big.Int `json:"liquidity"` Timestamp time.Time `json:"timestamp"` BlockNumber uint64 `json:"blockNumber"` } // SwapEvent represents a swap event with price impact analysis type SwapEvent struct { TxHash string `json:"txHash"` Pool string `json:"pool"` TokenIn string `json:"tokenIn"` TokenOut string `json:"tokenOut"` AmountIn *big.Int `json:"amountIn"` AmountOut *big.Int `json:"amountOut"` PriceImpact float64 `json:"priceImpact"` IsSignificant bool `json:"isSignificant"` Timestamp time.Time `json:"timestamp"` BlockNumber uint64 `json:"blockNumber"` } // PoolDiscovery manages dynamic pool and exchange discovery type PoolDiscovery struct { client *rpc.Client logger *logger.Logger create2Calculator *CREATE2Calculator // Storage pools map[string]*Pool // address -> pool exchanges map[string]*Exchange // address -> exchange mutex sync.RWMutex // Persistence poolsFile string exchangesFile string // Event signatures for discovery eventSignatures map[string]string // Factory contracts for pool creation events knownFactories map[string]string // Configuration minLiquidityThreshold *big.Int priceImpactThreshold float64 } // NewPoolDiscovery creates a new pool discovery system func NewPoolDiscovery(rpcClient *rpc.Client, logger *logger.Logger) *PoolDiscovery { // Create ethclient from rpc client for CREATE2 calculator ethClient := ethclient.NewClient(rpcClient) pd := &PoolDiscovery{ client: rpcClient, logger: logger, create2Calculator: NewCREATE2Calculator(logger, ethClient), pools: make(map[string]*Pool), exchanges: make(map[string]*Exchange), poolsFile: "data/pools.json", exchangesFile: "data/exchanges.json", eventSignatures: make(map[string]string), knownFactories: make(map[string]string), minLiquidityThreshold: big.NewInt(1000000000000000000), // 1 ETH equivalent priceImpactThreshold: 0.01, // 1% price impact threshold } pd.initializeEventSignatures() pd.initializeKnownFactories() pd.loadPersistedData() return pd } // initializeEventSignatures sets up event signatures for discovery func (pd *PoolDiscovery) initializeEventSignatures() { // Uniswap V2 events pd.eventSignatures["0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9"] = "PairCreated" // Uniswap V2 pd.eventSignatures["0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118"] = "PoolCreated" // Uniswap V3 pd.eventSignatures["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"] = "Transfer" // ERC20 Transfer pd.eventSignatures["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"] = "Swap" // Uniswap V2 Swap pd.eventSignatures["0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"] = "Swap" // Uniswap V3 Swap pd.eventSignatures["0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f"] = "Mint" // Liquidity mint pd.eventSignatures["0xdccd412f0b1252819cb1fd330b93224ca42612892bb3f4f789976e6d81936496"] = "Burn" // Liquidity burn pd.eventSignatures["0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1"] = "Sync" // Reserve sync } // initializeKnownFactories sets up known factory contracts to monitor for new pools func (pd *PoolDiscovery) initializeKnownFactories() { // Uniswap V2 Factory pd.knownFactories["0xf1d7cc64fb4452f05c498126312ebe29f30fbcf9"] = "UniswapV2" // Uniswap V3 Factory pd.knownFactories["0x1f98431c8ad98523631ae4a59f267346ea31f984"] = "UniswapV3" // SushiSwap Factory pd.knownFactories["0xc35dadb65012ec5796536bd9864ed8773abc74c4"] = "SushiSwap" // Camelot V2 Factory pd.knownFactories["0x6eccab422d763ac031210895c81787e87b43a652"] = "Camelot" // Balancer V2 Vault pd.knownFactories["0xba12222222228d8ba445958a75a0704d566bf2c8"] = "Balancer" // Curve Factory pd.knownFactories["0xb17b674d9c5cb2e441f8e196a2f048a81355d031"] = "Curve" } // DiscoverFromTransaction analyzes a transaction for new pools and exchanges func (pd *PoolDiscovery) DiscoverFromTransaction(txHash, to, input string, logs []interface{}) { pd.mutex.Lock() defer pd.mutex.Unlock() // Check if transaction is to an unknown contract (potential new DEX) if to != "" && pd.isUnknownContract(to) { pd.analyzeUnknownContract(to, input) } // Analyze logs for pool creation and swap events for _, logEntry := range logs { pd.analyzeLogEntry(logEntry, txHash) } } // isUnknownContract checks if a contract address is unknown func (pd *PoolDiscovery) isUnknownContract(address string) bool { _, exists := pd.exchanges[strings.ToLower(address)] return !exists } // analyzeUnknownContract analyzes an unknown contract to determine if it's a DEX func (pd *PoolDiscovery) analyzeUnknownContract(address, input string) { // Check function signatures that indicate DEX functionality if len(input) < 10 { return } functionSig := input[:10] isDEX := false protocol := "Unknown" switch functionSig { case "0x38ed1739", "0x18cbafe5", "0x7ff36ab5": // Uniswap V2 functions isDEX = true protocol = "UniswapV2-Like" case "0x414bf389", "0xac9650d8", "0x5ae401dc", "0x1f0464d1": // Uniswap V3 functions isDEX = true protocol = "UniswapV3-Like" case "0xa9059cbb", "0x095ea7b3": // ERC20 functions (might be router) isDEX = true protocol = "Router-Like" } if isDEX { exchange := &Exchange{ Name: fmt.Sprintf("Unknown-%s", address[:8]), Router: address, Protocol: protocol, Discovered: time.Now(), TotalVolume: big.NewInt(0), } pd.exchanges[strings.ToLower(address)] = exchange pd.logger.Opportunity("", address, "", "NEW_EXCHANGE_DISCOVERED", protocol, 0, 0, 0, 0, map[string]interface{}{ "router": address, "protocol": protocol, "discovered": time.Now(), }) pd.persistData() } } // analyzeLogEntry analyzes a log entry for pool creation or swap events func (pd *PoolDiscovery) analyzeLogEntry(logEntry interface{}, txHash string) { // Convert log entry to map for analysis logMap, ok := logEntry.(map[string]interface{}) if !ok { return } topics, ok := logMap["topics"].([]interface{}) if !ok || len(topics) == 0 { return } topic0, ok := topics[0].(string) if !ok { return } eventType, exists := pd.eventSignatures[topic0] if !exists { return } address, ok := logMap["address"].(string) if !ok { return } switch eventType { case "PairCreated", "PoolCreated": pd.handlePoolCreation(address, topics, logMap, txHash) case "Swap": pd.handleSwapEvent(address, topics, logMap, txHash) case "Mint", "Burn": pd.handleLiquidityEvent(address, topics, logMap, txHash, eventType) case "Sync": pd.handleSyncEvent(address, topics, logMap, txHash) } } // handlePoolCreation processes pool creation events func (pd *PoolDiscovery) handlePoolCreation(factoryAddress string, topics []interface{}, logData map[string]interface{}, txHash string) { if len(topics) < 4 { return } // Extract pool information from topics token0 := pd.addressFromTopic(topics[1]) token1 := pd.addressFromTopic(topics[2]) poolAddress := pd.addressFromTopic(topics[3]) protocol := "Unknown" if proto, exists := pd.knownFactories[strings.ToLower(factoryAddress)]; exists { protocol = proto } pool := &Pool{ Address: poolAddress, Token0: token0, Token1: token1, Protocol: protocol, Factory: factoryAddress, LastUpdated: time.Now(), TotalVolume: big.NewInt(0), SwapCount: 0, } pd.pools[strings.ToLower(poolAddress)] = pool pd.logger.Opportunity(txHash, factoryAddress, poolAddress, "NEW_POOL_DISCOVERED", protocol, 0, 0, 0, 0, map[string]interface{}{ "poolAddress": poolAddress, "token0": token0, "token1": token1, "factory": factoryAddress, "protocol": protocol, }) pd.persistData() } // handleSwapEvent processes swap events and calculates price impact func (pd *PoolDiscovery) handleSwapEvent(poolAddress string, topics []interface{}, logData map[string]interface{}, txHash string) { pool, exists := pd.pools[strings.ToLower(poolAddress)] if !exists { // Unknown pool, try to discover it pd.discoverPoolFromSwap(poolAddress, txHash) return } // Extract swap data from log data, ok := logData["data"].(string) if !ok { return } swapData := pd.parseSwapData(data, pool.Protocol) if swapData == nil { return } // Calculate price impact priceImpact := pd.calculatePriceImpact(pool, swapData.AmountIn, swapData.AmountOut) isSignificant := priceImpact >= pd.priceImpactThreshold // Update pool statistics pool.SwapCount++ if pool.TotalVolume == nil { pool.TotalVolume = big.NewInt(0) } pool.TotalVolume.Add(pool.TotalVolume, swapData.AmountIn) pool.LastUpdated = time.Now() // Log significant swaps if isSignificant { amountInFloat, _ := new(big.Float).Quo(new(big.Float).SetInt(swapData.AmountIn), big.NewFloat(1e18)).Float64() amountOutFloat, _ := new(big.Float).Quo(new(big.Float).SetInt(swapData.AmountOut), big.NewFloat(1e18)).Float64() pd.logger.Opportunity(txHash, "", poolAddress, "SIGNIFICANT_SWAP", pool.Protocol, amountInFloat, amountOutFloat, 0, priceImpact*100, map[string]interface{}{ "pool": poolAddress, "token0": pool.Token0, "token1": pool.Token1, "priceImpact": fmt.Sprintf("%.2f%%", priceImpact*100), "swapCount": pool.SwapCount, "totalVolume": pool.TotalVolume.String(), }) } } // SwapData represents parsed swap data type SwapData struct { AmountIn *big.Int AmountOut *big.Int TokenIn string TokenOut string } // DetailedSwapInfo represents enhanced swap information from L2 parser type DetailedSwapInfo struct { TxHash string From string To string MethodName string Protocol string AmountIn *big.Int AmountOut *big.Int AmountMin *big.Int TokenIn string TokenOut string Fee uint32 Recipient string IsValid bool } // parseSwapData parses swap data from log data func (pd *PoolDiscovery) parseSwapData(data, protocol string) *SwapData { if len(data) < 2 { return nil } // Remove 0x prefix dataBytes, err := hex.DecodeString(data[2:]) if err != nil { return nil } if len(dataBytes) < 128 { // 4 * 32 bytes minimum for swap data return nil } // Parse amounts based on protocol var amountIn, amountOut *big.Int switch protocol { case "UniswapV2", "SushiSwap", "Camelot": // Uniswap V2 Swap(sender,amount0In,amount1In,amount0Out,amount1Out,to) amount0In := new(big.Int).SetBytes(dataBytes[32:64]) amount1In := new(big.Int).SetBytes(dataBytes[64:96]) amount0Out := new(big.Int).SetBytes(dataBytes[96:128]) amount1Out := new(big.Int).SetBytes(dataBytes[128:160]) if amount0In.Cmp(big.NewInt(0)) > 0 { amountIn = amount0In amountOut = amount1Out } else { amountIn = amount1In amountOut = amount0Out } case "UniswapV3": // Uniswap V3 has different swap event structure amountIn = new(big.Int).SetBytes(dataBytes[0:32]) amountOut = new(big.Int).SetBytes(dataBytes[32:64]) default: // Generic parsing amountIn = new(big.Int).SetBytes(dataBytes[0:32]) amountOut = new(big.Int).SetBytes(dataBytes[32:64]) } return &SwapData{ AmountIn: amountIn, AmountOut: amountOut, } } // calculatePriceImpact calculates the price impact of a swap func (pd *PoolDiscovery) calculatePriceImpact(pool *Pool, amountIn, amountOut *big.Int) float64 { if pool.Reserves0 == nil || pool.Reserves1 == nil || pool.Reserves0.Cmp(big.NewInt(0)) == 0 { return 0.0 } // Simplified price impact calculation // Real implementation would use more sophisticated formulas based on AMM type // Calculate expected amount out without price impact reserve0Float := new(big.Float).SetInt(pool.Reserves0) reserve1Float := new(big.Float).SetInt(pool.Reserves1) amountInFloat := new(big.Float).SetInt(amountIn) amountOutFloat := new(big.Float).SetInt(amountOut) // Current price currentPrice := new(big.Float).Quo(reserve1Float, reserve0Float) // Expected amount out expectedOut := new(big.Float).Mul(amountInFloat, currentPrice) // Price impact = (expected - actual) / expected diff := new(big.Float).Sub(expectedOut, amountOutFloat) impact := new(big.Float).Quo(diff, expectedOut) impactFloat, _ := impact.Float64() if impactFloat < 0 { impactFloat = -impactFloat } return impactFloat } // Additional helper methods... func (pd *PoolDiscovery) addressFromTopic(topic interface{}) string { topicStr, ok := topic.(string) if !ok || len(topicStr) < 42 { return "" } // Extract address from topic (last 20 bytes) return "0x" + topicStr[26:] } func (pd *PoolDiscovery) handleLiquidityEvent(poolAddress string, topics []interface{}, logData map[string]interface{}, txHash, eventType string) { pool, exists := pd.pools[strings.ToLower(poolAddress)] if !exists { // Try to discover this unknown pool pd.discoverPoolFromSwap(poolAddress, txHash) return } // Parse liquidity event data data, ok := logData["data"].(string) if !ok { return } eventData := pd.parseLiquidityData(data, eventType) if eventData == nil { return } // Update pool liquidity if eventType == "Mint" && eventData.AmountIn != nil { if pool.Liquidity == nil { pool.Liquidity = big.NewInt(0) } pool.Liquidity.Add(pool.Liquidity, eventData.AmountIn) } else if eventType == "Burn" && eventData.AmountIn != nil { if pool.Liquidity != nil { pool.Liquidity.Sub(pool.Liquidity, eventData.AmountIn) } } pool.LastUpdated = time.Now() // Create liquidity event for potential future use _ = &LiquidityEvent{ TxHash: txHash, Pool: poolAddress, Type: strings.ToLower(eventType), Amount0: eventData.AmountIn, Amount1: eventData.AmountOut, Liquidity: pool.Liquidity, Timestamp: time.Now(), BlockNumber: 0, // Would be set by caller } pd.logger.Opportunity(txHash, "", poolAddress, "LIQUIDITY_EVENT", pool.Protocol, 0, 0, 0, 0, map[string]interface{}{ "eventType": eventType, "amount0": eventData.AmountIn.String(), "amount1": eventData.AmountOut.String(), "newLiquidity": pool.Liquidity.String(), }) pd.persistData() } func (pd *PoolDiscovery) handleSyncEvent(poolAddress string, topics []interface{}, logData map[string]interface{}, txHash string) { pool, exists := pd.pools[strings.ToLower(poolAddress)] if !exists { pd.discoverPoolFromSwap(poolAddress, txHash) return } // Parse sync event data (Uniswap V2 reserves update) data, ok := logData["data"].(string) if !ok { return } syncData := pd.parseSyncData(data) if syncData == nil { return } // Update pool reserves pool.Reserves0 = syncData.Reserve0 pool.Reserves1 = syncData.Reserve1 pool.LastUpdated = time.Now() pd.logger.Debug(fmt.Sprintf("Updated reserves for pool %s: Reserve0=%s, Reserve1=%s", poolAddress, syncData.Reserve0.String(), syncData.Reserve1.String())) pd.persistData() } func (pd *PoolDiscovery) discoverPoolFromSwap(poolAddress, txHash string) { // Check if we already know this pool if _, exists := pd.pools[strings.ToLower(poolAddress)]; exists { return } pd.logger.Info(fmt.Sprintf("Discovering unknown pool from swap: %s", poolAddress)) // Create Ethereum client to query pool contract // Get RPC endpoint from config or environment rpcEndpoint := os.Getenv("ARBITRUM_RPC_ENDPOINT") if rpcEndpoint == "" { rpcEndpoint = "wss://arbitrum-mainnet.core.chainstack.com/53c30e7a941160679fdcc396c894fc57" // fallback } client, err := ethclient.Dial(rpcEndpoint) if err != nil { pd.logger.Error(fmt.Sprintf("Failed to connect to Ethereum node for pool discovery: %v", err)) return } defer client.Close() address := common.HexToAddress(poolAddress) // Validate that this is a real pool contract if !uniswap.IsValidPool(context.Background(), client, address) { pd.logger.Debug(fmt.Sprintf("Address %s is not a valid pool contract", poolAddress)) return } // Create Uniswap V3 pool interface to fetch real data uniswapPool := uniswap.NewUniswapV3Pool(address, client) // Fetch pool state with timeout ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() poolState, err := uniswapPool.GetPoolState(ctx) if err != nil { pd.logger.Error(fmt.Sprintf("Failed to fetch pool state for %s: %v", poolAddress, err)) return } // Determine protocol (could be enhanced to detect different protocols) protocol := "UniswapV3" // Try to determine factory address by checking against known factories factory := "" for factoryAddr, proto := range pd.knownFactories { // This is a simplified check - in practice you'd call the pool's factory() function if proto == protocol { factory = factoryAddr break } } // Safely convert fee from int64 to uint32 var feeUint64 uint64 if poolState.Fee < 0 { feeUint64 = 0 } else { feeUint64 = uint64(poolState.Fee) } safeFee, err := security.SafeUint32(feeUint64) if err != nil { pd.logger.Warn(fmt.Sprintf("Failed to safely convert fee %d for pool %s: %v", poolState.Fee, poolAddress, err)) // Use a default fee value if conversion fails // Truncate to 32 bits safely by using math.MaxUint32 as mask if poolState.Fee > math.MaxUint32 { safeFee = math.MaxUint32 } else { safeFee = uint32(poolState.Fee) } } // Create pool entry with real data pool := &Pool{ Address: poolAddress, Token0: poolState.Token0.Hex(), Token1: poolState.Token1.Hex(), // Safely convert fee from uint64 to uint32 Fee: safeFee, Protocol: protocol, Factory: factory, Liquidity: poolState.Liquidity.ToBig(), LastUpdated: time.Now(), TotalVolume: big.NewInt(0), SwapCount: 0, } pd.pools[strings.ToLower(poolAddress)] = pool pd.logger.Opportunity(txHash, "", poolAddress, "POOL_DISCOVERED", protocol, 0, 0, 0, 0, map[string]interface{}{ "source": "swap_event", "poolAddress": poolAddress, "protocol": protocol, "token0": poolState.Token0.Hex(), "token1": poolState.Token1.Hex(), "fee": poolState.Fee, "liquidity": poolState.Liquidity.String(), "discoveredAt": time.Now(), }) pd.persistData() } // parseLiquidityData parses liquidity event data func (pd *PoolDiscovery) parseLiquidityData(data, eventType string) *SwapData { if len(data) < 2 { return nil } dataBytes, err := hex.DecodeString(data[2:]) if err != nil { return nil } if len(dataBytes) < 64 { // 2 * 32 bytes minimum return nil } amount0 := new(big.Int).SetBytes(dataBytes[0:32]) amount1 := new(big.Int).SetBytes(dataBytes[32:64]) return &SwapData{ AmountIn: amount0, AmountOut: amount1, } } // SyncData represents reserves from a sync event type SyncData struct { Reserve0 *big.Int Reserve1 *big.Int } // parseSyncData parses sync event data (Uniswap V2) func (pd *PoolDiscovery) parseSyncData(data string) *SyncData { if len(data) < 2 { return nil } dataBytes, err := hex.DecodeString(data[2:]) if err != nil { return nil } if len(dataBytes) < 64 { // 2 * 32 bytes return nil } reserve0 := new(big.Int).SetBytes(dataBytes[0:32]) reserve1 := new(big.Int).SetBytes(dataBytes[32:64]) return &SyncData{ Reserve0: reserve0, Reserve1: reserve1, } } // persistData saves pools and exchanges to files func (pd *PoolDiscovery) persistData() { // Ensure data directory exists os.MkdirAll("data", 0750) // Save pools poolsData, _ := json.MarshalIndent(pd.pools, "", " ") os.WriteFile(pd.poolsFile, poolsData, 0644) // Save exchanges exchangesData, _ := json.MarshalIndent(pd.exchanges, "", " ") os.WriteFile(pd.exchangesFile, exchangesData, 0644) } // loadPersistedData loads pools and exchanges from files func (pd *PoolDiscovery) loadPersistedData() { // Load pools if data, err := os.ReadFile(pd.poolsFile); err == nil { json.Unmarshal(data, &pd.pools) pd.logger.Info(fmt.Sprintf("Loaded %d pools from cache", len(pd.pools))) } // Load exchanges if data, err := os.ReadFile(pd.exchangesFile); err == nil { json.Unmarshal(data, &pd.exchanges) pd.logger.Info(fmt.Sprintf("Loaded %d exchanges from cache", len(pd.exchanges))) } } // GetPoolCount returns the number of discovered pools func (pd *PoolDiscovery) GetPoolCount() int { pd.mutex.RLock() defer pd.mutex.RUnlock() return len(pd.pools) } // GetExchangeCount returns the number of discovered exchanges func (pd *PoolDiscovery) GetExchangeCount() int { pd.mutex.RLock() defer pd.mutex.RUnlock() return len(pd.exchanges) } // GetPool returns a pool by address func (pd *PoolDiscovery) GetPool(address string) (*Pool, bool) { pd.mutex.RLock() defer pd.mutex.RUnlock() pool, exists := pd.pools[strings.ToLower(address)] return pool, exists } // GetAllPools returns all discovered pools func (pd *PoolDiscovery) GetAllPools() map[string]*Pool { pd.mutex.RLock() defer pd.mutex.RUnlock() pools := make(map[string]*Pool) for k, v := range pd.pools { pools[k] = v } return pools } // DiscoverPoolsForTokenPair uses CREATE2 to discover all possible pools for a token pair func (pd *PoolDiscovery) DiscoverPoolsForTokenPair(token0, token1 common.Address) ([]*Pool, error) { // Use CREATE2 calculator to find all possible pool addresses poolIdentifiers, err := pd.create2Calculator.FindPoolsForTokenPair(token0, token1) if err != nil { return nil, fmt.Errorf("failed to calculate pool addresses: %w", err) } pools := make([]*Pool, 0) for _, poolId := range poolIdentifiers { // Check if pool exists on-chain exists, err := pd.verifyPoolExists(poolId.PoolAddr) if err != nil { pd.logger.Debug(fmt.Sprintf("Failed to verify pool %s: %v", poolId.PoolAddr.Hex(), err)) continue } if !exists { pd.logger.Debug(fmt.Sprintf("Pool %s does not exist on-chain", poolId.PoolAddr.Hex())) continue } // Create pool object pool := &Pool{ Address: poolId.PoolAddr.Hex(), Token0: poolId.Token0.Hex(), Token1: poolId.Token1.Hex(), Fee: poolId.Fee, Protocol: poolId.Factory, Factory: poolId.Factory, CreatedAt: time.Now(), } // Get additional pool data if err := pd.enrichPoolData(pool); err != nil { pd.logger.Debug(fmt.Sprintf("Failed to enrich pool data for %s: %v", pool.Address, err)) } pools = append(pools, pool) // Add to our cache pd.addPool(pool) } pd.logger.Info(fmt.Sprintf("Discovered %d pools for token pair %s/%s", len(pools), token0.Hex(), token1.Hex())) return pools, nil } // verifyPoolExists checks if a pool actually exists at the calculated address func (pd *PoolDiscovery) verifyPoolExists(poolAddr common.Address) (bool, error) { // Check if there's code at the address var result string err := pd.client.Call(&result, "eth_getCode", poolAddr.Hex(), "latest") if err != nil { return false, fmt.Errorf("failed to get code: %w", err) } // If there's no code, the pool doesn't exist if result == "0x" || result == "" { return false, nil } return true, nil } // enrichPoolData gets additional data about a pool func (pd *PoolDiscovery) enrichPoolData(pool *Pool) error { poolAddr := common.HexToAddress(pool.Address) // For Uniswap V3 pools, get slot0 data if pool.Protocol == "uniswap_v3" || pool.Protocol == "camelot_v3" { return pd.enrichUniswapV3PoolData(pool, poolAddr) } // For Uniswap V2 style pools, get reserves if pool.Protocol == "uniswap_v2" || pool.Protocol == "sushiswap" { return pd.enrichUniswapV2PoolData(pool, poolAddr) } return nil } // enrichUniswapV3PoolData gets Uniswap V3 specific pool data func (pd *PoolDiscovery) enrichUniswapV3PoolData(pool *Pool, poolAddr common.Address) error { // Get slot0 data (price, tick, etc.) slot0ABI := `[{"inputs":[],"name":"slot0","outputs":[{"internalType":"uint160","name":"sqrtPriceX96","type":"uint160"},{"internalType":"int24","name":"tick","type":"int24"},{"internalType":"uint16","name":"observationIndex","type":"uint16"},{"internalType":"uint16","name":"observationCardinality","type":"uint16"},{"internalType":"uint16","name":"observationCardinalityNext","type":"uint16"},{"internalType":"uint8","name":"feeProtocol","type":"uint8"},{"internalType":"bool","name":"unlocked","type":"bool"}],"stateMutability":"view","type":"function"}]` contractABI, err := uniswap.ParseABI(slot0ABI) if err != nil { return fmt.Errorf("failed to parse slot0 ABI: %w", err) } callData, err := contractABI.Pack("slot0") if err != nil { return fmt.Errorf("failed to pack slot0 call: %w", err) } var result string err = pd.client.Call(&result, "eth_call", map[string]interface{}{ "to": poolAddr.Hex(), "data": "0x" + hex.EncodeToString(callData), }, "latest") if err != nil { return fmt.Errorf("slot0 call failed: %w", err) } // Decode result resultBytes, err := hex.DecodeString(strings.TrimPrefix(result, "0x")) if err != nil { return fmt.Errorf("failed to decode result: %w", err) } if len(resultBytes) == 0 { return fmt.Errorf("empty result from slot0 call") } // Store the fact that this is a valid V3 pool pool.BlockNumber = 0 // Will be set when we detect the creation event return nil } // enrichUniswapV2PoolData gets Uniswap V2 specific pool data func (pd *PoolDiscovery) enrichUniswapV2PoolData(pool *Pool, poolAddr common.Address) error { // Get reserves from getReserves() reservesABI := `[{"inputs":[],"name":"getReserves","outputs":[{"internalType":"uint112","name":"_reserve0","type":"uint112"},{"internalType":"uint112","name":"_reserve1","type":"uint112"},{"internalType":"uint32","name":"_blockTimestampLast","type":"uint32"}],"stateMutability":"view","type":"function"}]` contractABI, err := uniswap.ParseABI(reservesABI) if err != nil { return fmt.Errorf("failed to parse reserves ABI: %w", err) } callData, err := contractABI.Pack("getReserves") if err != nil { return fmt.Errorf("failed to pack getReserves call: %w", err) } var result string err = pd.client.Call(&result, "eth_call", map[string]interface{}{ "to": poolAddr.Hex(), "data": "0x" + hex.EncodeToString(callData), }, "latest") if err != nil { return fmt.Errorf("getReserves call failed: %w", err) } // Decode result resultBytes, err := hex.DecodeString(strings.TrimPrefix(result, "0x")) if err != nil { return fmt.Errorf("failed to decode result: %w", err) } if len(resultBytes) >= 64 { // Extract reserves (first 32 bytes for reserve0, second 32 bytes for reserve1) reserve0 := new(big.Int).SetBytes(resultBytes[:32]) reserve1 := new(big.Int).SetBytes(resultBytes[32:64]) pool.Reserves0 = reserve0 pool.Reserves1 = reserve1 } return nil } // ValidatePoolAddress validates a pool address using CREATE2 calculation func (pd *PoolDiscovery) ValidatePoolAddress(factoryName string, token0, token1 common.Address, fee uint32, poolAddr common.Address) bool { return pd.create2Calculator.ValidatePoolAddress(factoryName, token0, token1, fee, poolAddr) } // ProcessDetailedSwap processes a swap with detailed information from L2 parser func (pd *PoolDiscovery) ProcessDetailedSwap(swapInfo *DetailedSwapInfo) { if !swapInfo.IsValid { return } // Convert amounts to float for logging var amountInFloat, amountOutFloat, amountMinFloat float64 if swapInfo.AmountIn != nil { amountInFloat, _ = new(big.Float).Quo(new(big.Float).SetInt(swapInfo.AmountIn), big.NewFloat(1e18)).Float64() } if swapInfo.AmountOut != nil { amountOutFloat, _ = new(big.Float).Quo(new(big.Float).SetInt(swapInfo.AmountOut), big.NewFloat(1e18)).Float64() } if swapInfo.AmountMin != nil { amountMinFloat, _ = new(big.Float).Quo(new(big.Float).SetInt(swapInfo.AmountMin), big.NewFloat(1e18)).Float64() } // Estimate profit (simplified - could be enhanced) profitUSD := 0.0 // Would require price oracle integration // Log the detailed opportunity pd.logger.Opportunity( swapInfo.TxHash, swapInfo.From, swapInfo.To, swapInfo.MethodName, swapInfo.Protocol, amountInFloat, amountOutFloat, amountMinFloat, profitUSD, map[string]interface{}{ "tokenIn": swapInfo.TokenIn, "tokenOut": swapInfo.TokenOut, "recipient": swapInfo.Recipient, "fee": swapInfo.Fee, "functionSig": "", // Could be added if needed "contractName": swapInfo.Protocol, "deadline": 0, // Could be added if needed }, ) } // addPool adds a pool to the cache func (pd *PoolDiscovery) addPool(pool *Pool) { pd.mutex.Lock() defer pd.mutex.Unlock() if pd.pools == nil { pd.pools = make(map[string]*Pool) } pd.pools[pool.Address] = pool }