Files
mev-beta/orig/pkg/pools/discovery.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

1099 lines
33 KiB
Go

package pools
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"math/big"
"os"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/security"
"github.com/fraktal/mev-beta/pkg/uniswap"
)
// parseSignedInt256 correctly parses a signed 256-bit integer from 32 bytes
// This is critical for UniswapV3 events which use int256 for amounts
func parseSignedInt256(data []byte) *big.Int {
if len(data) != 32 {
return big.NewInt(0)
}
value := new(big.Int).SetBytes(data)
// Check if the value is negative (MSB set)
if len(data) > 0 && data[0]&0x80 != 0 {
// Convert from two's complement
// Subtract 2^256 to get the negative value
maxUint256 := new(big.Int)
maxUint256.Lsh(big.NewInt(1), 256)
value.Sub(value, maxUint256)
}
return value
}
// Pool represents a discovered liquidity pool
type Pool struct {
Address string `json:"address"`
Token0 string `json:"token0"`
Token1 string `json:"token1"`
Fee uint32 `json:"fee"`
Protocol string `json:"protocol"`
Factory string `json:"factory"`
Reserves0 *big.Int `json:"reserves0,omitempty"`
Reserves1 *big.Int `json:"reserves1,omitempty"`
Liquidity *big.Int `json:"liquidity,omitempty"`
LastUpdated time.Time `json:"lastUpdated"`
TotalVolume *big.Int `json:"totalVolume"`
SwapCount uint64 `json:"swapCount"`
CreatedAt time.Time `json:"createdAt"`
BlockNumber uint64 `json:"blockNumber"`
}
// Exchange represents a discovered exchange/DEX
type Exchange struct {
Name string `json:"name"`
Router string `json:"router"`
Factory string `json:"factory"`
Protocol string `json:"protocol"`
Version string `json:"version"`
Discovered time.Time `json:"discovered"`
PoolCount int `json:"poolCount"`
TotalVolume *big.Int `json:"totalVolume"`
}
// LiquidityEvent represents a liquidity change event
type LiquidityEvent struct {
TxHash string `json:"txHash"`
Pool string `json:"pool"`
Type string `json:"type"` // "mint", "burn", "sync"
Amount0 *big.Int `json:"amount0"`
Amount1 *big.Int `json:"amount1"`
Liquidity *big.Int `json:"liquidity"`
Timestamp time.Time `json:"timestamp"`
BlockNumber uint64 `json:"blockNumber"`
}
// SwapEvent represents a swap event with price impact analysis
type SwapEvent struct {
TxHash string `json:"txHash"`
Pool string `json:"pool"`
TokenIn string `json:"tokenIn"`
TokenOut string `json:"tokenOut"`
AmountIn *big.Int `json:"amountIn"`
AmountOut *big.Int `json:"amountOut"`
PriceImpact float64 `json:"priceImpact"`
IsSignificant bool `json:"isSignificant"`
Timestamp time.Time `json:"timestamp"`
BlockNumber uint64 `json:"blockNumber"`
}
// PoolDiscovery manages dynamic pool and exchange discovery
type PoolDiscovery struct {
client *rpc.Client
logger *logger.Logger
create2Calculator *CREATE2Calculator
blacklist *PoolBlacklist // Pool blacklist manager
// Storage
pools map[string]*Pool // address -> pool
exchanges map[string]*Exchange // address -> exchange
mutex sync.RWMutex
// Persistence
poolsFile string
exchangesFile string
// Event signatures for discovery
eventSignatures map[string]string
// Factory contracts for pool creation events
knownFactories map[string]string
// Configuration
minLiquidityThreshold *big.Int
priceImpactThreshold float64
}
// NewPoolDiscovery creates a new pool discovery system
func NewPoolDiscovery(rpcClient *rpc.Client, logger *logger.Logger) *PoolDiscovery {
// Create ethclient from rpc client for CREATE2 calculator
ethClient := ethclient.NewClient(rpcClient)
pd := &PoolDiscovery{
client: rpcClient,
logger: logger,
create2Calculator: NewCREATE2Calculator(logger, ethClient),
blacklist: NewPoolBlacklist(logger), // Initialize blacklist
pools: make(map[string]*Pool),
exchanges: make(map[string]*Exchange),
poolsFile: "data/pools.json",
exchangesFile: "data/exchanges.json",
eventSignatures: make(map[string]string),
knownFactories: make(map[string]string),
minLiquidityThreshold: big.NewInt(1000000000000000000), // 1 ETH equivalent
priceImpactThreshold: 0.01, // 1% price impact threshold
}
pd.initializeEventSignatures()
pd.initializeKnownFactories()
pd.loadPersistedData()
return pd
}
// initializeEventSignatures sets up event signatures for discovery
func (pd *PoolDiscovery) initializeEventSignatures() {
// Uniswap V2 events
pd.eventSignatures["0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9"] = "PairCreated" // Uniswap V2
pd.eventSignatures["0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118"] = "PoolCreated" // Uniswap V3
pd.eventSignatures["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"] = "Transfer" // ERC20 Transfer
pd.eventSignatures["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"] = "Swap" // Uniswap V2 Swap
pd.eventSignatures["0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"] = "Swap" // Uniswap V3 Swap
pd.eventSignatures["0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f"] = "Mint" // Liquidity mint
pd.eventSignatures["0xdccd412f0b1252819cb1fd330b93224ca42612892bb3f4f789976e6d81936496"] = "Burn" // Liquidity burn
pd.eventSignatures["0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1"] = "Sync" // Reserve sync
}
// initializeKnownFactories sets up known factory contracts to monitor for new pools
func (pd *PoolDiscovery) initializeKnownFactories() {
// Uniswap V2 Factory
pd.knownFactories["0xf1d7cc64fb4452f05c498126312ebe29f30fbcf9"] = "UniswapV2"
// Uniswap V3 Factory
pd.knownFactories["0x1f98431c8ad98523631ae4a59f267346ea31f984"] = "UniswapV3"
// SushiSwap Factory
pd.knownFactories["0xc35dadb65012ec5796536bd9864ed8773abc74c4"] = "SushiSwap"
// Camelot V2 Factory
pd.knownFactories["0x6eccab422d763ac031210895c81787e87b43a652"] = "Camelot"
// Balancer V2 Vault
pd.knownFactories["0xba12222222228d8ba445958a75a0704d566bf2c8"] = "Balancer"
// Curve Factory
pd.knownFactories["0xb17b674d9c5cb2e441f8e196a2f048a81355d031"] = "Curve"
}
// DiscoverFromTransaction analyzes a transaction for new pools and exchanges
func (pd *PoolDiscovery) DiscoverFromTransaction(txHash, to, input string, logs []interface{}) {
pd.mutex.Lock()
defer pd.mutex.Unlock()
// Check if transaction is to an unknown contract (potential new DEX)
if to != "" && pd.isUnknownContract(to) {
pd.analyzeUnknownContract(to, input)
}
// Analyze logs for pool creation and swap events
for _, logEntry := range logs {
pd.analyzeLogEntry(logEntry, txHash)
}
}
// isUnknownContract checks if a contract address is unknown
func (pd *PoolDiscovery) isUnknownContract(address string) bool {
_, exists := pd.exchanges[strings.ToLower(address)]
return !exists
}
// analyzeUnknownContract analyzes an unknown contract to determine if it's a DEX
func (pd *PoolDiscovery) analyzeUnknownContract(address, input string) {
// Check function signatures that indicate DEX functionality
if len(input) < 10 {
return
}
functionSig := input[:10]
isDEX := false
protocol := "Unknown"
switch functionSig {
case "0x38ed1739", "0x18cbafe5", "0x7ff36ab5": // Uniswap V2 functions
isDEX = true
protocol = "UniswapV2-Like"
case "0x414bf389", "0xac9650d8", "0x5ae401dc", "0x1f0464d1": // Uniswap V3 functions
isDEX = true
protocol = "UniswapV3-Like"
case "0xa9059cbb", "0x095ea7b3": // ERC20 functions (might be router)
isDEX = true
protocol = "Router-Like"
}
if isDEX {
exchange := &Exchange{
Name: fmt.Sprintf("Unknown-%s", address[:8]),
Router: address,
Protocol: protocol,
Discovered: time.Now(),
TotalVolume: big.NewInt(0),
}
pd.exchanges[strings.ToLower(address)] = exchange
pd.logger.Opportunity("", address, "", "NEW_EXCHANGE_DISCOVERED", protocol, 0, 0, 0, 0, map[string]interface{}{
"router": address,
"protocol": protocol,
"discovered": time.Now(),
})
pd.persistData()
}
}
// analyzeLogEntry analyzes a log entry for pool creation or swap events
func (pd *PoolDiscovery) analyzeLogEntry(logEntry interface{}, txHash string) {
// Convert log entry to map for analysis
logMap, ok := logEntry.(map[string]interface{})
if !ok {
return
}
topics, ok := logMap["topics"].([]interface{})
if !ok || len(topics) == 0 {
return
}
topic0, ok := topics[0].(string)
if !ok {
return
}
eventType, exists := pd.eventSignatures[topic0]
if !exists {
return
}
address, ok := logMap["address"].(string)
if !ok {
return
}
switch eventType {
case "PairCreated", "PoolCreated":
pd.handlePoolCreation(address, topics, logMap, txHash)
case "Swap":
pd.handleSwapEvent(address, topics, logMap, txHash)
case "Mint", "Burn":
pd.handleLiquidityEvent(address, topics, logMap, txHash, eventType)
case "Sync":
pd.handleSyncEvent(address, topics, logMap, txHash)
}
}
// handlePoolCreation processes pool creation events
func (pd *PoolDiscovery) handlePoolCreation(factoryAddress string, topics []interface{}, logData map[string]interface{}, txHash string) {
if len(topics) < 4 {
return
}
// Extract pool information from topics
token0 := pd.addressFromTopic(topics[1])
token1 := pd.addressFromTopic(topics[2])
poolAddress := pd.addressFromTopic(topics[3])
protocol := "Unknown"
if proto, exists := pd.knownFactories[strings.ToLower(factoryAddress)]; exists {
protocol = proto
}
pool := &Pool{
Address: poolAddress,
Token0: token0,
Token1: token1,
Protocol: protocol,
Factory: factoryAddress,
LastUpdated: time.Now(),
TotalVolume: big.NewInt(0),
SwapCount: 0,
}
pd.pools[strings.ToLower(poolAddress)] = pool
pd.logger.Opportunity(txHash, factoryAddress, poolAddress, "NEW_POOL_DISCOVERED", protocol, 0, 0, 0, 0, map[string]interface{}{
"poolAddress": poolAddress,
"token0": token0,
"token1": token1,
"factory": factoryAddress,
"protocol": protocol,
})
pd.persistData()
}
// handleSwapEvent processes swap events and calculates price impact
func (pd *PoolDiscovery) handleSwapEvent(poolAddress string, topics []interface{}, logData map[string]interface{}, txHash string) {
pool, exists := pd.pools[strings.ToLower(poolAddress)]
if !exists {
// Unknown pool, try to discover it
pd.discoverPoolFromSwap(poolAddress, txHash)
return
}
// Extract swap data from log
data, ok := logData["data"].(string)
if !ok {
return
}
swapData := pd.parseSwapData(data, pool.Protocol)
if swapData == nil {
return
}
// Calculate price impact
priceImpact := pd.calculatePriceImpact(pool, swapData.AmountIn, swapData.AmountOut)
isSignificant := priceImpact >= pd.priceImpactThreshold
// Update pool statistics
pool.SwapCount++
if pool.TotalVolume == nil {
pool.TotalVolume = big.NewInt(0)
}
pool.TotalVolume.Add(pool.TotalVolume, swapData.AmountIn)
pool.LastUpdated = time.Now()
// Log significant swaps
if isSignificant {
amountInFloat, _ := new(big.Float).Quo(new(big.Float).SetInt(swapData.AmountIn), big.NewFloat(1e18)).Float64()
amountOutFloat, _ := new(big.Float).Quo(new(big.Float).SetInt(swapData.AmountOut), big.NewFloat(1e18)).Float64()
pd.logger.Opportunity(txHash, "", poolAddress, "SIGNIFICANT_SWAP", pool.Protocol, amountInFloat, amountOutFloat, 0, priceImpact*100, map[string]interface{}{
"pool": poolAddress,
"token0": pool.Token0,
"token1": pool.Token1,
"priceImpact": fmt.Sprintf("%.2f%%", priceImpact*100),
"swapCount": pool.SwapCount,
"totalVolume": pool.TotalVolume.String(),
})
}
}
// SwapData represents parsed swap data
type SwapData struct {
AmountIn *big.Int
AmountOut *big.Int
TokenIn string
TokenOut string
}
// DetailedSwapInfo represents enhanced swap information from L2 parser
type DetailedSwapInfo struct {
TxHash string
From string
To string
MethodName string
Protocol string
AmountIn *big.Int
AmountOut *big.Int
AmountMin *big.Int
TokenIn string
TokenOut string
Fee uint32
Recipient string
IsValid bool
}
// parseSwapData parses swap data from log data
func (pd *PoolDiscovery) parseSwapData(data, protocol string) *SwapData {
if len(data) < 2 {
return nil
}
// Remove 0x prefix
dataBytes, err := hex.DecodeString(data[2:])
if err != nil {
return nil
}
if len(dataBytes) < 128 { // 4 * 32 bytes minimum for swap data
return nil
}
// Parse amounts based on protocol
var amountIn, amountOut *big.Int
switch protocol {
case "UniswapV2", "SushiSwap", "Camelot":
// Uniswap V2 Swap(sender,amount0In,amount1In,amount0Out,amount1Out,to)
amount0In := new(big.Int).SetBytes(dataBytes[32:64])
amount1In := new(big.Int).SetBytes(dataBytes[64:96])
amount0Out := new(big.Int).SetBytes(dataBytes[96:128])
amount1Out := new(big.Int).SetBytes(dataBytes[128:160])
if amount0In.Cmp(big.NewInt(0)) > 0 {
amountIn = amount0In
amountOut = amount1Out
} else {
amountIn = amount1In
amountOut = amount0Out
}
case "UniswapV3":
// Uniswap V3 uses signed int256 for amounts
amountIn = parseSignedInt256(dataBytes[0:32])
amountOut = parseSignedInt256(dataBytes[32:64])
// Convert to absolute values for display
amountIn = new(big.Int).Abs(amountIn)
amountOut = new(big.Int).Abs(amountOut)
default:
// Generic parsing
amountIn = new(big.Int).SetBytes(dataBytes[0:32])
amountOut = new(big.Int).SetBytes(dataBytes[32:64])
}
return &SwapData{
AmountIn: amountIn,
AmountOut: amountOut,
}
}
// calculatePriceImpact calculates the price impact of a swap
func (pd *PoolDiscovery) calculatePriceImpact(pool *Pool, amountIn, amountOut *big.Int) float64 {
if pool.Reserves0 == nil || pool.Reserves1 == nil || pool.Reserves0.Cmp(big.NewInt(0)) == 0 {
return 0.0
}
// Simplified price impact calculation
// Real implementation would use more sophisticated formulas based on AMM type
// Calculate expected amount out without price impact
reserve0Float := new(big.Float).SetInt(pool.Reserves0)
reserve1Float := new(big.Float).SetInt(pool.Reserves1)
amountInFloat := new(big.Float).SetInt(amountIn)
amountOutFloat := new(big.Float).SetInt(amountOut)
// Current price
currentPrice := new(big.Float).Quo(reserve1Float, reserve0Float)
// Expected amount out
expectedOut := new(big.Float).Mul(amountInFloat, currentPrice)
// Price impact = (expected - actual) / expected
diff := new(big.Float).Sub(expectedOut, amountOutFloat)
impact := new(big.Float).Quo(diff, expectedOut)
impactFloat, _ := impact.Float64()
if impactFloat < 0 {
impactFloat = -impactFloat
}
return impactFloat
}
// Additional helper methods...
func (pd *PoolDiscovery) addressFromTopic(topic interface{}) string {
topicStr, ok := topic.(string)
if !ok || len(topicStr) < 42 {
return ""
}
// Extract address from topic (last 20 bytes)
return "0x" + topicStr[26:]
}
func (pd *PoolDiscovery) handleLiquidityEvent(poolAddress string, topics []interface{}, logData map[string]interface{}, txHash, eventType string) {
pool, exists := pd.pools[strings.ToLower(poolAddress)]
if !exists {
// Try to discover this unknown pool
pd.discoverPoolFromSwap(poolAddress, txHash)
return
}
// Parse liquidity event data
data, ok := logData["data"].(string)
if !ok {
return
}
eventData := pd.parseLiquidityData(data, eventType, pool.Protocol)
if eventData == nil {
return
}
// Update pool liquidity
if eventType == "Mint" && eventData.AmountIn != nil {
if pool.Liquidity == nil {
pool.Liquidity = big.NewInt(0)
}
pool.Liquidity.Add(pool.Liquidity, eventData.AmountIn)
} else if eventType == "Burn" && eventData.AmountIn != nil {
if pool.Liquidity != nil {
pool.Liquidity.Sub(pool.Liquidity, eventData.AmountIn)
}
}
pool.LastUpdated = time.Now()
// Create liquidity event for potential future use
_ = &LiquidityEvent{
TxHash: txHash,
Pool: poolAddress,
Type: strings.ToLower(eventType),
Amount0: eventData.AmountIn,
Amount1: eventData.AmountOut,
Liquidity: pool.Liquidity,
Timestamp: time.Now(),
BlockNumber: 0, // Would be set by caller
}
pd.logger.Opportunity(txHash, "", poolAddress, "LIQUIDITY_EVENT", pool.Protocol, 0, 0, 0, 0, map[string]interface{}{
"eventType": eventType,
"amount0": eventData.AmountIn.String(),
"amount1": eventData.AmountOut.String(),
"newLiquidity": pool.Liquidity.String(),
})
pd.persistData()
}
func (pd *PoolDiscovery) handleSyncEvent(poolAddress string, topics []interface{}, logData map[string]interface{}, txHash string) {
pool, exists := pd.pools[strings.ToLower(poolAddress)]
if !exists {
pd.discoverPoolFromSwap(poolAddress, txHash)
return
}
// Parse sync event data (Uniswap V2 reserves update)
data, ok := logData["data"].(string)
if !ok {
return
}
syncData := pd.parseSyncData(data)
if syncData == nil {
return
}
// Update pool reserves
pool.Reserves0 = syncData.Reserve0
pool.Reserves1 = syncData.Reserve1
pool.LastUpdated = time.Now()
pd.logger.Debug(fmt.Sprintf("Updated reserves for pool %s: Reserve0=%s, Reserve1=%s",
poolAddress, syncData.Reserve0.String(), syncData.Reserve1.String()))
pd.persistData()
}
func (pd *PoolDiscovery) discoverPoolFromSwap(poolAddress, txHash string) {
// Check if we already know this pool
if _, exists := pd.pools[strings.ToLower(poolAddress)]; exists {
return
}
pd.logger.Info(fmt.Sprintf("Discovering unknown pool from swap: %s", poolAddress))
// Create Ethereum client to query pool contract
// Get RPC endpoint from config or environment
rpcEndpoint := os.Getenv("ARBITRUM_RPC_ENDPOINT")
if rpcEndpoint == "" {
rpcEndpoint = "wss://arbitrum-mainnet.core.chainstack.com/53c30e7a941160679fdcc396c894fc57" // fallback
}
client, err := ethclient.Dial(rpcEndpoint)
if err != nil {
pd.logger.Error(fmt.Sprintf("Failed to connect to Ethereum node for pool discovery: %v", err))
return
}
defer client.Close()
address := common.HexToAddress(poolAddress)
// Validate that this is a real pool contract
if !uniswap.IsValidPool(context.Background(), client, address) {
pd.logger.Debug(fmt.Sprintf("Address %s is not a valid pool contract", poolAddress))
return
}
// Create Uniswap V3 pool interface to fetch real data
uniswapPool := uniswap.NewUniswapV3Pool(address, client)
// Fetch pool state with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
poolState, err := uniswapPool.GetPoolState(ctx)
if err != nil {
pd.logger.Error(fmt.Sprintf("Failed to fetch pool state for %s: %v", poolAddress, err))
return
}
// Determine protocol (could be enhanced to detect different protocols)
protocol := "UniswapV3"
// Try to determine factory address by checking against known factories
factory := ""
for factoryAddr, proto := range pd.knownFactories {
// This is a simplified check - in practice you'd call the pool's factory() function
if proto == protocol {
factory = factoryAddr
break
}
}
// Safely convert fee from int64 to uint32
var feeUint64 uint64
if poolState.Fee < 0 {
feeUint64 = 0
} else {
feeUint64 = uint64(poolState.Fee)
}
safeFee, err := security.SafeUint32(feeUint64)
if err != nil {
pd.logger.Warn(fmt.Sprintf("Failed to safely convert fee %d for pool %s: %v", poolState.Fee, poolAddress, err))
// Use a default fee value if conversion fails
// Truncate to 32 bits safely by using math.MaxUint32 as mask
if poolState.Fee > math.MaxUint32 {
safeFee = math.MaxUint32
} else {
safeFee = uint32(poolState.Fee)
}
}
// Create pool entry with real data
pool := &Pool{
Address: poolAddress,
Token0: poolState.Token0.Hex(),
Token1: poolState.Token1.Hex(),
// Safely convert fee from uint64 to uint32
Fee: safeFee,
Protocol: protocol,
Factory: factory,
Liquidity: poolState.Liquidity.ToBig(),
LastUpdated: time.Now(),
TotalVolume: big.NewInt(0),
SwapCount: 0,
}
pd.pools[strings.ToLower(poolAddress)] = pool
pd.logger.Opportunity(txHash, "", poolAddress, "POOL_DISCOVERED", protocol, 0, 0, 0, 0, map[string]interface{}{
"source": "swap_event",
"poolAddress": poolAddress,
"protocol": protocol,
"token0": poolState.Token0.Hex(),
"token1": poolState.Token1.Hex(),
"fee": poolState.Fee,
"liquidity": poolState.Liquidity.String(),
"discoveredAt": time.Now(),
})
pd.persistData()
}
// parseLiquidityData parses liquidity event data
func (pd *PoolDiscovery) parseLiquidityData(data, eventType, protocol string) *SwapData {
if len(data) < 2 {
return nil
}
dataBytes, err := hex.DecodeString(data[2:])
if err != nil {
return nil
}
if len(dataBytes) < 64 { // 2 * 32 bytes minimum
return nil
}
var amount0, amount1 *big.Int
// UniswapV3 uses signed int256 for liquidity amounts
if protocol == "UniswapV3" {
amount0 = parseSignedInt256(dataBytes[0:32])
amount1 = parseSignedInt256(dataBytes[32:64])
// Convert to absolute values
amount0 = new(big.Int).Abs(amount0)
amount1 = new(big.Int).Abs(amount1)
} else {
// V2 and others use unsigned uint256
amount0 = new(big.Int).SetBytes(dataBytes[0:32])
amount1 = new(big.Int).SetBytes(dataBytes[32:64])
}
return &SwapData{
AmountIn: amount0,
AmountOut: amount1,
}
}
// SyncData represents reserves from a sync event
type SyncData struct {
Reserve0 *big.Int
Reserve1 *big.Int
}
// parseSyncData parses sync event data (Uniswap V2)
func (pd *PoolDiscovery) parseSyncData(data string) *SyncData {
if len(data) < 2 {
return nil
}
dataBytes, err := hex.DecodeString(data[2:])
if err != nil {
return nil
}
if len(dataBytes) < 64 { // 2 * 32 bytes
return nil
}
reserve0 := new(big.Int).SetBytes(dataBytes[0:32])
reserve1 := new(big.Int).SetBytes(dataBytes[32:64])
return &SyncData{
Reserve0: reserve0,
Reserve1: reserve1,
}
}
// persistData saves pools and exchanges to files
func (pd *PoolDiscovery) persistData() {
// Ensure data directory exists
os.MkdirAll("data", 0750)
// Save pools
poolsData, _ := json.MarshalIndent(pd.pools, "", " ")
os.WriteFile(pd.poolsFile, poolsData, 0644)
// Save exchanges
exchangesData, _ := json.MarshalIndent(pd.exchanges, "", " ")
os.WriteFile(pd.exchangesFile, exchangesData, 0644)
}
// loadPersistedData loads pools and exchanges from files
func (pd *PoolDiscovery) loadPersistedData() {
// Load pools
if data, err := os.ReadFile(pd.poolsFile); err == nil {
if err := json.Unmarshal(data, &pd.pools); err != nil {
pd.logger.Warn(fmt.Sprintf("Failed to unmarshal pools from %s: %v", pd.poolsFile, err))
} else {
pd.logger.Info(fmt.Sprintf("Loaded %d pools from cache", len(pd.pools)))
}
} else {
pd.logger.Warn(fmt.Sprintf("Failed to read pools file %s: %v", pd.poolsFile, err))
}
// Load exchanges
if data, err := os.ReadFile(pd.exchangesFile); err == nil {
if err := json.Unmarshal(data, &pd.exchanges); err != nil {
pd.logger.Warn(fmt.Sprintf("Failed to unmarshal exchanges from %s: %v", pd.exchangesFile, err))
} else {
pd.logger.Info(fmt.Sprintf("Loaded %d exchanges from cache", len(pd.exchanges)))
}
} else {
// Don't warn for missing exchanges file - it's optional
if !os.IsNotExist(err) {
pd.logger.Warn(fmt.Sprintf("Failed to read exchanges file %s: %v", pd.exchangesFile, err))
}
}
}
// GetPoolCount returns the number of discovered pools
func (pd *PoolDiscovery) GetPoolCount() int {
pd.mutex.RLock()
defer pd.mutex.RUnlock()
return len(pd.pools)
}
// SavePoolCache saves discovered pools and exchanges to disk
func (pd *PoolDiscovery) SavePoolCache() {
pd.mutex.RLock()
defer pd.mutex.RUnlock()
pd.persistData()
}
// GetExchangeCount returns the number of discovered exchanges
func (pd *PoolDiscovery) GetExchangeCount() int {
pd.mutex.RLock()
defer pd.mutex.RUnlock()
return len(pd.exchanges)
}
// GetPool returns a pool by address (checks blacklist)
func (pd *PoolDiscovery) GetPool(address string) (*Pool, bool) {
pd.mutex.RLock()
defer pd.mutex.RUnlock()
// Check blacklist first
poolAddr := common.HexToAddress(address)
if pd.blacklist != nil && pd.blacklist.IsBlacklisted(poolAddr) {
pd.logger.Debug(fmt.Sprintf("Pool %s is blacklisted, skipping", address[:10]))
return nil, false
}
pool, exists := pd.pools[strings.ToLower(address)]
return pool, exists
}
// IsPoolBlacklisted checks if a pool is blacklisted
func (pd *PoolDiscovery) IsPoolBlacklisted(address common.Address) bool {
if pd.blacklist == nil {
return false
}
return pd.blacklist.IsBlacklisted(address)
}
// RecordPoolFailure records a pool failure for blacklisting
func (pd *PoolDiscovery) RecordPoolFailure(poolAddress common.Address, reason string, protocol string, token0, token1 common.Address) {
if pd.blacklist != nil {
pd.blacklist.RecordFailure(poolAddress, reason, protocol, token0, token1)
}
}
// GetBlacklistStats returns blacklist statistics
func (pd *PoolDiscovery) GetBlacklistStats() map[string]interface{} {
if pd.blacklist == nil {
return map[string]interface{}{"status": "blacklist not initialized"}
}
return pd.blacklist.GetBlacklistStats()
}
// GetAllPools returns all discovered pools
func (pd *PoolDiscovery) GetAllPools() map[string]*Pool {
pd.mutex.RLock()
defer pd.mutex.RUnlock()
pools := make(map[string]*Pool)
for k, v := range pd.pools {
pools[k] = v
}
return pools
}
// DiscoverPoolsForTokenPair uses CREATE2 to discover all possible pools for a token pair
func (pd *PoolDiscovery) DiscoverPoolsForTokenPair(token0, token1 common.Address) ([]*Pool, error) {
// Use CREATE2 calculator to find all possible pool addresses
poolIdentifiers, err := pd.create2Calculator.FindPoolsForTokenPair(token0, token1)
if err != nil {
return nil, fmt.Errorf("failed to calculate pool addresses: %w", err)
}
pools := make([]*Pool, 0)
for _, poolId := range poolIdentifiers {
// Check if pool exists on-chain
exists, err := pd.verifyPoolExists(poolId.PoolAddr)
if err != nil {
pd.logger.Debug(fmt.Sprintf("Failed to verify pool %s: %v", poolId.PoolAddr.Hex(), err))
continue
}
if !exists {
pd.logger.Debug(fmt.Sprintf("Pool %s does not exist on-chain", poolId.PoolAddr.Hex()))
continue
}
// Create pool object
pool := &Pool{
Address: poolId.PoolAddr.Hex(),
Token0: poolId.Token0.Hex(),
Token1: poolId.Token1.Hex(),
Fee: poolId.Fee,
Protocol: poolId.Factory,
Factory: poolId.Factory,
CreatedAt: time.Now(),
}
// Get additional pool data
if err := pd.enrichPoolData(pool); err != nil {
pd.logger.Debug(fmt.Sprintf("Failed to enrich pool data for %s: %v", pool.Address, err))
}
pools = append(pools, pool)
// Add to our cache
pd.addPool(pool)
}
pd.logger.Info(fmt.Sprintf("Discovered %d pools for token pair %s/%s",
len(pools), token0.Hex(), token1.Hex()))
return pools, nil
}
// verifyPoolExists checks if a pool actually exists at the calculated address
func (pd *PoolDiscovery) verifyPoolExists(poolAddr common.Address) (bool, error) {
// Check if there's code at the address
var result string
err := pd.client.Call(&result, "eth_getCode", poolAddr.Hex(), "latest")
if err != nil {
return false, fmt.Errorf("failed to get code: %w", err)
}
// If there's no code, the pool doesn't exist
if result == "0x" || result == "" {
return false, nil
}
return true, nil
}
// enrichPoolData gets additional data about a pool
func (pd *PoolDiscovery) enrichPoolData(pool *Pool) error {
poolAddr := common.HexToAddress(pool.Address)
// For Uniswap V3 pools, get slot0 data
if pool.Protocol == "uniswap_v3" || pool.Protocol == "camelot_v3" {
return pd.enrichUniswapV3PoolData(pool, poolAddr)
}
// For Uniswap V2 style pools, get reserves
if pool.Protocol == "uniswap_v2" || pool.Protocol == "sushiswap" {
return pd.enrichUniswapV2PoolData(pool, poolAddr)
}
return nil
}
// enrichUniswapV3PoolData gets Uniswap V3 specific pool data
func (pd *PoolDiscovery) enrichUniswapV3PoolData(pool *Pool, poolAddr common.Address) error {
// Get slot0 data (price, tick, etc.)
slot0ABI := `[{"inputs":[],"name":"slot0","outputs":[{"internalType":"uint160","name":"sqrtPriceX96","type":"uint160"},{"internalType":"int24","name":"tick","type":"int24"},{"internalType":"uint16","name":"observationIndex","type":"uint16"},{"internalType":"uint16","name":"observationCardinality","type":"uint16"},{"internalType":"uint16","name":"observationCardinalityNext","type":"uint16"},{"internalType":"uint8","name":"feeProtocol","type":"uint8"},{"internalType":"bool","name":"unlocked","type":"bool"}],"stateMutability":"view","type":"function"}]`
contractABI, err := uniswap.ParseABI(slot0ABI)
if err != nil {
return fmt.Errorf("failed to parse slot0 ABI: %w", err)
}
callData, err := contractABI.Pack("slot0")
if err != nil {
return fmt.Errorf("failed to pack slot0 call: %w", err)
}
var result string
err = pd.client.Call(&result, "eth_call", map[string]interface{}{
"to": poolAddr.Hex(),
"data": "0x" + hex.EncodeToString(callData),
}, "latest")
if err != nil {
return fmt.Errorf("slot0 call failed: %w", err)
}
// Decode result
resultBytes, err := hex.DecodeString(strings.TrimPrefix(result, "0x"))
if err != nil {
return fmt.Errorf("failed to decode result: %w", err)
}
if len(resultBytes) == 0 {
return fmt.Errorf("empty result from slot0 call")
}
// Store the fact that this is a valid V3 pool
pool.BlockNumber = 0 // Will be set when we detect the creation event
return nil
}
// enrichUniswapV2PoolData gets Uniswap V2 specific pool data
func (pd *PoolDiscovery) enrichUniswapV2PoolData(pool *Pool, poolAddr common.Address) error {
// Get reserves from getReserves()
reservesABI := `[{"inputs":[],"name":"getReserves","outputs":[{"internalType":"uint112","name":"_reserve0","type":"uint112"},{"internalType":"uint112","name":"_reserve1","type":"uint112"},{"internalType":"uint32","name":"_blockTimestampLast","type":"uint32"}],"stateMutability":"view","type":"function"}]`
contractABI, err := uniswap.ParseABI(reservesABI)
if err != nil {
return fmt.Errorf("failed to parse reserves ABI: %w", err)
}
callData, err := contractABI.Pack("getReserves")
if err != nil {
return fmt.Errorf("failed to pack getReserves call: %w", err)
}
var result string
err = pd.client.Call(&result, "eth_call", map[string]interface{}{
"to": poolAddr.Hex(),
"data": "0x" + hex.EncodeToString(callData),
}, "latest")
if err != nil {
return fmt.Errorf("getReserves call failed: %w", err)
}
// Decode result
resultBytes, err := hex.DecodeString(strings.TrimPrefix(result, "0x"))
if err != nil {
return fmt.Errorf("failed to decode result: %w", err)
}
if len(resultBytes) >= 64 {
// Extract reserves (first 32 bytes for reserve0, second 32 bytes for reserve1)
reserve0 := new(big.Int).SetBytes(resultBytes[:32])
reserve1 := new(big.Int).SetBytes(resultBytes[32:64])
pool.Reserves0 = reserve0
pool.Reserves1 = reserve1
}
return nil
}
// ValidatePoolAddress validates a pool address using CREATE2 calculation
func (pd *PoolDiscovery) ValidatePoolAddress(factoryName string, token0, token1 common.Address, fee uint32, poolAddr common.Address) bool {
return pd.create2Calculator.ValidatePoolAddress(factoryName, token0, token1, fee, poolAddr)
}
// ProcessDetailedSwap processes a swap with detailed information from L2 parser
func (pd *PoolDiscovery) ProcessDetailedSwap(swapInfo *DetailedSwapInfo) {
if !swapInfo.IsValid {
return
}
// Convert amounts to float for logging
var amountInFloat, amountOutFloat, amountMinFloat float64
if swapInfo.AmountIn != nil {
amountInFloat, _ = new(big.Float).Quo(new(big.Float).SetInt(swapInfo.AmountIn), big.NewFloat(1e18)).Float64()
}
if swapInfo.AmountOut != nil {
amountOutFloat, _ = new(big.Float).Quo(new(big.Float).SetInt(swapInfo.AmountOut), big.NewFloat(1e18)).Float64()
}
if swapInfo.AmountMin != nil {
amountMinFloat, _ = new(big.Float).Quo(new(big.Float).SetInt(swapInfo.AmountMin), big.NewFloat(1e18)).Float64()
}
// Estimate profit (simplified - could be enhanced)
profitUSD := 0.0 // Would require price oracle integration
// Log the detailed opportunity
pd.logger.Opportunity(
swapInfo.TxHash,
swapInfo.From,
swapInfo.To,
swapInfo.MethodName,
swapInfo.Protocol,
amountInFloat,
amountOutFloat,
amountMinFloat,
profitUSD,
map[string]interface{}{
"tokenIn": swapInfo.TokenIn,
"tokenOut": swapInfo.TokenOut,
"recipient": swapInfo.Recipient,
"fee": swapInfo.Fee,
"functionSig": "", // Could be added if needed
"contractName": swapInfo.Protocol,
"deadline": 0, // Could be added if needed
},
)
}
// addPool adds a pool to the cache
func (pd *PoolDiscovery) addPool(pool *Pool) {
pd.mutex.Lock()
defer pd.mutex.Unlock()
if pd.pools == nil {
pd.pools = make(map[string]*Pool)
}
pd.pools[pool.Address] = pool
}