package pools import ( "context" "fmt" "log/slog" "math/big" "sync" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/your-org/mev-bot/pkg/cache" mevtypes "github.com/your-org/mev-bot/pkg/types" ) // Known factory addresses on Arbitrum var ( UniswapV2FactoryAddress = common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9") // SushiSwap UniswapV3FactoryAddress = common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984") // Uniswap V3 CamelotFactoryAddress = common.HexToAddress("0x6EcCab422D763aC031210895C81787E87B43A652") // Camelot CurveRegistryAddress = common.HexToAddress("0x445FE580eF8d70FF569aB36e80c647af338db351") // Curve (mainnet, example) ) // Top traded tokens on Arbitrum var TopTokens = []common.Address{ common.HexToAddress("0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"), // WETH common.HexToAddress("0xFF970a61A04b1cA14834A43f5dE4533eBDDB5CC8"), // USDC common.HexToAddress("0xFd086bC7CD5C481DCC9C85ebE478A1C0b69FCbb9"), // USDT common.HexToAddress("0x2f2a2543B76A4166549F7aaB2e75Bef0aefC5B0f"), // WBTC common.HexToAddress("0xDA10009cBd5D07dd0CeCc66161FC93D7c9000da1"), // DAI common.HexToAddress("0xf97f4df75117a78c1A5a0DBb814Af92458539FB4"), // LINK common.HexToAddress("0xFA7F8980b0f1E64A2062791cc3b0871572f1F7f0"), // UNI } // DiscoveryConfig contains configuration for pool discovery type DiscoveryConfig struct { // Connection RPCURL string // Discovery parameters StartBlock uint64 MaxPools int MinLiquidity *big.Int BatchSize int ConcurrentFetches int // Token pairs to discover TokenPairs []TokenPair } // TokenPair represents a pair of tokens type TokenPair struct { Token0 common.Address Token1 common.Address } // DefaultDiscoveryConfig returns default configuration func DefaultDiscoveryConfig() *DiscoveryConfig { // Generate pairs from top tokens pairs := make([]TokenPair, 0) for i := 0; i < len(TopTokens); i++ { for j := i + 1; j < len(TopTokens); j++ { pairs = append(pairs, TokenPair{ Token0: TopTokens[i], Token1: TopTokens[j], }) } } return &DiscoveryConfig{ RPCURL: "https://arb1.arbitrum.io/rpc", StartBlock: 0, MaxPools: 1000, MinLiquidity: big.NewInt(1e18), // 1 ETH minimum BatchSize: 100, ConcurrentFetches: 10, TokenPairs: pairs, } } // Discovery discovers pools on Arbitrum type Discovery struct { config *DiscoveryConfig client *ethclient.Client cache cache.PoolCache logger *slog.Logger mu sync.Mutex poolsDiscovered int } // NewDiscovery creates a new pool discovery service func NewDiscovery(config *DiscoveryConfig, poolCache cache.PoolCache, logger *slog.Logger) (*Discovery, error) { if config == nil { config = DefaultDiscoveryConfig() } // Fill in defaults for missing fields if config.TokenPairs == nil || len(config.TokenPairs) == 0 { // Generate pairs from top tokens pairs := make([]TokenPair, 0) for i := 0; i < len(TopTokens); i++ { for j := i + 1; j < len(TopTokens); j++ { pairs = append(pairs, TokenPair{ Token0: TopTokens[i], Token1: TopTokens[j], }) } } config.TokenPairs = pairs } if config.MinLiquidity == nil { config.MinLiquidity = big.NewInt(1e18) } if config.MaxPools == 0 { config.MaxPools = 1000 } client, err := ethclient.Dial(config.RPCURL) if err != nil { return nil, fmt.Errorf("failed to connect to RPC: %w", err) } return &Discovery{ config: config, client: client, cache: poolCache, logger: logger.With("component", "pool_discovery"), }, nil } // DiscoverAll discovers all pools from known DEXes func (d *Discovery) DiscoverAll(ctx context.Context) error { d.logger.Info("starting pool discovery") // Try hardcoded pools first (for Anvil fork testing without archive access) if err := d.loadHardcodedPools(ctx); err == nil && d.poolsDiscovered > 0 { count, _ := d.cache.Count(ctx) d.logger.Info("pool discovery complete (using hardcoded pools)", "pools_discovered", d.poolsDiscovered, "total_cached", count) return nil } // Fallback to RPC discovery if hardcoded pools fail // Discover UniswapV2-style pools (SushiSwap, Camelot, etc.) if err := d.discoverUniswapV2Pools(ctx); err != nil { d.logger.Error("uniswap v2 discovery failed", "error", err) } // Discover UniswapV3 pools if err := d.discoverUniswapV3Pools(ctx); err != nil { d.logger.Error("uniswap v3 discovery failed", "error", err) } count, _ := d.cache.Count(ctx) d.logger.Info("pool discovery complete", "pools_discovered", d.poolsDiscovered, "total_cached", count) return nil } // loadHardcodedPools loads well-known pools for testing (no RPC required) func (d *Discovery) loadHardcodedPools(ctx context.Context) error { d.logger.Info("loading hardcoded pools for testing") // Well-known pools on Arbitrum mainnet with estimated reserves // These can be used for testing without requiring archive RPC access testPools := []struct { address string protocol mevtypes.ProtocolType token0 string token1 string reserve0 string // in wei reserve1 string // in wei fee uint32 }{ // SushiSwap WETH/USDC { address: "0x905dfCD5649217c42684f23958568e533C711Aa3", protocol: mevtypes.ProtocolUniswapV2, token0: "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", // WETH token1: "0xFF970a61A04b1cA14834A43f5dE4533eBDDB5CC8", // USDC reserve0: "1000000000000000000000", // 1000 WETH reserve1: "2000000000000", // 2M USDC (6 decimals) fee: 300, }, // SushiSwap WETH/USDT { address: "0xCB0E5bFa72bBb4d16AB5aA0c60601c438F04b4ad", protocol: mevtypes.ProtocolUniswapV2, token0: "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", // WETH token1: "0xFd086bC7CD5C481DCC9C85ebE478A1C0b69FCbb9", // USDT reserve0: "800000000000000000000", // 800 WETH reserve1: "1600000000000", // 1.6M USDT (6 decimals) fee: 300, }, // SushiSwap WETH/WBTC { address: "0x515e252b2b5c22b4b2b6Df66c2eBeeA871AA4d69", protocol: mevtypes.ProtocolUniswapV2, token0: "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", // WETH token1: "0x2f2a2543B76A4166549F7aaB2e75Bef0aefC5B0f", // WBTC reserve0: "500000000000000000000", // 500 WETH reserve1: "1500000000", // 15 WBTC (8 decimals) fee: 300, }, // Camelot WETH/USDC { address: "0x84652bb2539513BAf36e225c930Fdd8eaa63CE27", protocol: mevtypes.ProtocolCamelot, token0: "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", // WETH token1: "0xFF970a61A04b1cA14834A43f5dE4533eBDDB5CC8", // USDC reserve0: "1200000000000000000000", // 1200 WETH reserve1: "2400000000000", // 2.4M USDC fee: 300, }, // Camelot WETH/ARB { address: "0xA6c5C7D189fA4eB5Af8ba34E63dCDD3a635D433f", protocol: mevtypes.ProtocolCamelot, token0: "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", // WETH token1: "0x912CE59144191C1204E64559FE8253a0e49E6548", // ARB reserve0: "600000000000000000000", // 600 WETH reserve1: "800000000000000000000000", // 800k ARB fee: 300, }, } // Token decimals mapping tokenDecimals := map[string]uint8{ "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1": 18, // WETH "0xFF970a61A04b1cA14834A43f5dE4533eBDDB5CC8": 6, // USDC "0xFd086bC7CD5C481DCC9C85ebE478A1C0b69FCbb9": 6, // USDT "0x2f2a2543B76A4166549F7aaB2e75Bef0aefC5B0f": 8, // WBTC "0x912CE59144191C1204E64559FE8253a0e49E6548": 18, // ARB } for _, pool := range testPools { reserve0, _ := new(big.Int).SetString(pool.reserve0, 10) reserve1, _ := new(big.Int).SetString(pool.reserve1, 10) // Estimate liquidity in USD (simplified - assume $2000/ETH equivalent) liquidityUSD := new(big.Int).Add(reserve0, reserve1) poolInfo := &mevtypes.PoolInfo{ Address: common.HexToAddress(pool.address), Protocol: pool.protocol, Token0: common.HexToAddress(pool.token0), Token1: common.HexToAddress(pool.token1), Token0Decimals: tokenDecimals[pool.token0], Token1Decimals: tokenDecimals[pool.token1], Reserve0: reserve0, Reserve1: reserve1, Fee: pool.fee, LiquidityUSD: liquidityUSD, } // Add to cache if err := d.cache.Add(ctx, poolInfo); err != nil { d.logger.Warn("failed to add hardcoded pool to cache", "pool", pool.address, "error", err) continue } d.mu.Lock() d.poolsDiscovered++ d.mu.Unlock() d.logger.Debug("loaded hardcoded pool", "protocol", pool.protocol, "pool", pool.address, "token0", pool.token0, "token1", pool.token1, ) } return nil } // discoverUniswapV2Pools discovers UniswapV2-style pools func (d *Discovery) discoverUniswapV2Pools(ctx context.Context) error { d.logger.Info("discovering UniswapV2-style pools") factories := []struct { address common.Address protocol mevtypes.ProtocolType }{ {UniswapV2FactoryAddress, mevtypes.ProtocolUniswapV2}, {CamelotFactoryAddress, mevtypes.ProtocolCamelot}, } for _, factory := range factories { d.logger.Info("querying factory", "protocol", factory.protocol, "address", factory.address.Hex()) // Query each token pair for _, pair := range d.config.TokenPairs { select { case <-ctx.Done(): return ctx.Err() default: } poolAddr, err := d.getUniswapV2Pool(ctx, factory.address, pair.Token0, pair.Token1) if err != nil { continue } if poolAddr == (common.Address{}) { continue // Pool doesn't exist } // Fetch pool info poolInfo, err := d.fetchUniswapV2PoolInfo(ctx, poolAddr, pair.Token0, pair.Token1, factory.protocol) if err != nil { d.logger.Debug("failed to fetch pool info", "pool", poolAddr.Hex(), "error", err) continue } // Check minimum liquidity if poolInfo.LiquidityUSD.Cmp(d.config.MinLiquidity) < 0 { continue } // Add to cache if err := d.cache.Add(ctx, poolInfo); err != nil { d.logger.Warn("failed to add pool to cache", "pool", poolAddr.Hex(), "error", err) continue } d.mu.Lock() d.poolsDiscovered++ d.mu.Unlock() d.logger.Debug("discovered pool", "protocol", factory.protocol, "pool", poolAddr.Hex(), "token0", pair.Token0.Hex(), "token1", pair.Token1.Hex(), "liquidity", poolInfo.LiquidityUSD.String(), ) } } return nil } // getUniswapV2Pool gets a UniswapV2 pool address for a token pair func (d *Discovery) getUniswapV2Pool(ctx context.Context, factory common.Address, token0, token1 common.Address) (common.Address, error) { // getPair(address,address) returns (address) // This is a simplified version - in production, use generated bindings calldata := append([]byte{0xe6, 0xa4, 0x39, 0x05}, // getPair selector append(padLeft(token0.Bytes(), 32), padLeft(token1.Bytes(), 32)...)...) result, err := d.client.CallContract(ctx, ethereum.CallMsg{ To: &factory, Data: calldata, }, nil) if err != nil { return common.Address{}, err } if len(result) == 0 { return common.Address{}, nil } return common.BytesToAddress(result[12:]), nil } // fetchUniswapV2PoolInfo fetches pool information func (d *Discovery) fetchUniswapV2PoolInfo(ctx context.Context, poolAddr, token0, token1 common.Address, protocol mevtypes.ProtocolType) (*mevtypes.PoolInfo, error) { // getReserves() returns (uint112,uint112,uint32) // Simplified - in production use generated bindings calldata := []byte{0x09, 0x02, 0xf1, 0xac} // getReserves selector result, err := d.client.CallContract(ctx, ethereum.CallMsg{ To: &poolAddr, Data: calldata, }, nil) if err != nil { return nil, err } if len(result) < 64 { return nil, fmt.Errorf("invalid reserves response") } reserve0 := new(big.Int).SetBytes(result[0:32]) reserve1 := new(big.Int).SetBytes(result[32:64]) // Estimate liquidity in USD (simplified - in production, use price oracle) liquidityUSD := new(big.Int).Add(reserve0, reserve1) return &mevtypes.PoolInfo{ Address: poolAddr, Protocol: protocol, Token0: token0, Token1: token1, Reserve0: reserve0, Reserve1: reserve1, Fee: 300, // 0.3% for UniswapV2 LiquidityUSD: liquidityUSD, }, nil } // discoverUniswapV3Pools discovers UniswapV3 pools func (d *Discovery) discoverUniswapV3Pools(ctx context.Context) error { d.logger.Info("discovering UniswapV3 pools") // UniswapV3 has multiple fee tiers feeTiers := []uint32{100, 500, 3000, 10000} for _, pair := range d.config.TokenPairs { for _, fee := range feeTiers { select { case <-ctx.Done(): return ctx.Err() default: } poolAddr, err := d.getUniswapV3Pool(ctx, pair.Token0, pair.Token1, fee) if err != nil { continue } if poolAddr == (common.Address{}) { continue // Pool doesn't exist } // Fetch pool info poolInfo, err := d.fetchUniswapV3PoolInfo(ctx, poolAddr, pair.Token0, pair.Token1, fee) if err != nil { d.logger.Debug("failed to fetch pool info", "pool", poolAddr.Hex(), "error", err) continue } // Check minimum liquidity if poolInfo.LiquidityUSD.Cmp(d.config.MinLiquidity) < 0 { continue } // Add to cache if err := d.cache.Add(ctx, poolInfo); err != nil { d.logger.Warn("failed to add pool to cache", "pool", poolAddr.Hex(), "error", err) continue } d.mu.Lock() d.poolsDiscovered++ d.mu.Unlock() d.logger.Debug("discovered pool", "protocol", mevtypes.ProtocolUniswapV3, "pool", poolAddr.Hex(), "token0", pair.Token0.Hex(), "token1", pair.Token1.Hex(), "fee", fee, "liquidity", poolInfo.LiquidityUSD.String(), ) // Check if we've reached max pools if d.poolsDiscovered >= d.config.MaxPools { return nil } } } return nil } // getUniswapV3Pool gets a UniswapV3 pool address func (d *Discovery) getUniswapV3Pool(ctx context.Context, token0, token1 common.Address, fee uint32) (common.Address, error) { // getPool(address,address,uint24) returns (address) // Simplified - in production use generated bindings feeBytes := make([]byte, 32) copy(feeBytes[29:], big.NewInt(int64(fee)).Bytes()) calldata := append([]byte{0x17, 0x79, 0x05, 0x7a}, // getPool selector append(append(padLeft(token0.Bytes(), 32), padLeft(token1.Bytes(), 32)...), feeBytes...)...) factoryAddr := UniswapV3FactoryAddress result, err := d.client.CallContract(ctx, ethereum.CallMsg{ To: &factoryAddr, Data: calldata, }, nil) if err != nil { return common.Address{}, err } if len(result) == 0 { return common.Address{}, nil } return common.BytesToAddress(result[12:]), nil } // fetchUniswapV3PoolInfo fetches UniswapV3 pool information func (d *Discovery) fetchUniswapV3PoolInfo(ctx context.Context, poolAddr, token0, token1 common.Address, fee uint32) (*mevtypes.PoolInfo, error) { // liquidity() returns (uint128) // Simplified - in production use generated bindings calldata := []byte{0x1a, 0x68, 0x65, 0x02} // liquidity selector result, err := d.client.CallContract(ctx, ethereum.CallMsg{ To: &poolAddr, Data: calldata, }, nil) if err != nil { return nil, err } if len(result) < 16 { return nil, fmt.Errorf("invalid liquidity response") } liquidity := new(big.Int).SetBytes(result[16:32]) return &mevtypes.PoolInfo{ Address: poolAddr, Protocol: mevtypes.ProtocolUniswapV3, Token0: token0, Token1: token1, Reserve0: liquidity, // Simplified Reserve1: liquidity, Fee: fee, LiquidityUSD: liquidity, }, nil } // padLeft pads bytes to the left with zeros func padLeft(data []byte, length int) []byte { if len(data) >= length { return data } padded := make([]byte, length) copy(padded[length-len(data):], data) return padded } // GetStats returns discovery statistics func (d *Discovery) GetStats() map[string]interface{} { d.mu.Lock() defer d.mu.Unlock() count, _ := d.cache.Count(context.Background()) return map[string]interface{}{ "pools_discovered": d.poolsDiscovered, "pools_cached": count, } }