Restructured project for V2 refactor: **Structure Changes:** - Moved all V1 code to orig/ folder (preserved with git mv) - Created docs/planning/ directory - Added orig/README_V1.md explaining V1 preservation **Planning Documents:** - 00_V2_MASTER_PLAN.md: Complete architecture overview - Executive summary of critical V1 issues - High-level component architecture diagrams - 5-phase implementation roadmap - Success metrics and risk mitigation - 07_TASK_BREAKDOWN.md: Atomic task breakdown - 99+ hours of detailed tasks - Every task < 2 hours (atomic) - Clear dependencies and success criteria - Organized by implementation phase **V2 Key Improvements:** - Per-exchange parsers (factory pattern) - Multi-layer strict validation - Multi-index pool cache - Background validation pipeline - Comprehensive observability **Critical Issues Addressed:** - Zero address tokens (strict validation + cache enrichment) - Parsing accuracy (protocol-specific parsers) - No audit trail (background validation channel) - Inefficient lookups (multi-index cache) - Stats disconnection (event-driven metrics) Next Steps: 1. Review planning documents 2. Begin Phase 1: Foundation (P1-001 through P1-010) 3. Implement parsers in Phase 2 4. Build cache system in Phase 3 5. Add validation pipeline in Phase 4 6. Migrate and test in Phase 5 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1099 lines
33 KiB
Go
1099 lines
33 KiB
Go
package pools
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math"
|
|
"math/big"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/ethclient"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
"github.com/fraktal/mev-beta/pkg/security"
|
|
"github.com/fraktal/mev-beta/pkg/uniswap"
|
|
)
|
|
|
|
// parseSignedInt256 correctly parses a signed 256-bit integer from 32 bytes
|
|
// This is critical for UniswapV3 events which use int256 for amounts
|
|
func parseSignedInt256(data []byte) *big.Int {
|
|
if len(data) != 32 {
|
|
return big.NewInt(0)
|
|
}
|
|
|
|
value := new(big.Int).SetBytes(data)
|
|
|
|
// Check if the value is negative (MSB set)
|
|
if len(data) > 0 && data[0]&0x80 != 0 {
|
|
// Convert from two's complement
|
|
// Subtract 2^256 to get the negative value
|
|
maxUint256 := new(big.Int)
|
|
maxUint256.Lsh(big.NewInt(1), 256)
|
|
value.Sub(value, maxUint256)
|
|
}
|
|
|
|
return value
|
|
}
|
|
|
|
// Pool represents a discovered liquidity pool
|
|
type Pool struct {
|
|
Address string `json:"address"`
|
|
Token0 string `json:"token0"`
|
|
Token1 string `json:"token1"`
|
|
Fee uint32 `json:"fee"`
|
|
Protocol string `json:"protocol"`
|
|
Factory string `json:"factory"`
|
|
Reserves0 *big.Int `json:"reserves0,omitempty"`
|
|
Reserves1 *big.Int `json:"reserves1,omitempty"`
|
|
Liquidity *big.Int `json:"liquidity,omitempty"`
|
|
LastUpdated time.Time `json:"lastUpdated"`
|
|
TotalVolume *big.Int `json:"totalVolume"`
|
|
SwapCount uint64 `json:"swapCount"`
|
|
CreatedAt time.Time `json:"createdAt"`
|
|
BlockNumber uint64 `json:"blockNumber"`
|
|
}
|
|
|
|
// Exchange represents a discovered exchange/DEX
|
|
type Exchange struct {
|
|
Name string `json:"name"`
|
|
Router string `json:"router"`
|
|
Factory string `json:"factory"`
|
|
Protocol string `json:"protocol"`
|
|
Version string `json:"version"`
|
|
Discovered time.Time `json:"discovered"`
|
|
PoolCount int `json:"poolCount"`
|
|
TotalVolume *big.Int `json:"totalVolume"`
|
|
}
|
|
|
|
// LiquidityEvent represents a liquidity change event
|
|
type LiquidityEvent struct {
|
|
TxHash string `json:"txHash"`
|
|
Pool string `json:"pool"`
|
|
Type string `json:"type"` // "mint", "burn", "sync"
|
|
Amount0 *big.Int `json:"amount0"`
|
|
Amount1 *big.Int `json:"amount1"`
|
|
Liquidity *big.Int `json:"liquidity"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
BlockNumber uint64 `json:"blockNumber"`
|
|
}
|
|
|
|
// SwapEvent represents a swap event with price impact analysis
|
|
type SwapEvent struct {
|
|
TxHash string `json:"txHash"`
|
|
Pool string `json:"pool"`
|
|
TokenIn string `json:"tokenIn"`
|
|
TokenOut string `json:"tokenOut"`
|
|
AmountIn *big.Int `json:"amountIn"`
|
|
AmountOut *big.Int `json:"amountOut"`
|
|
PriceImpact float64 `json:"priceImpact"`
|
|
IsSignificant bool `json:"isSignificant"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
BlockNumber uint64 `json:"blockNumber"`
|
|
}
|
|
|
|
// PoolDiscovery manages dynamic pool and exchange discovery
|
|
type PoolDiscovery struct {
|
|
client *rpc.Client
|
|
logger *logger.Logger
|
|
create2Calculator *CREATE2Calculator
|
|
blacklist *PoolBlacklist // Pool blacklist manager
|
|
|
|
// Storage
|
|
pools map[string]*Pool // address -> pool
|
|
exchanges map[string]*Exchange // address -> exchange
|
|
mutex sync.RWMutex
|
|
|
|
// Persistence
|
|
poolsFile string
|
|
exchangesFile string
|
|
|
|
// Event signatures for discovery
|
|
eventSignatures map[string]string
|
|
|
|
// Factory contracts for pool creation events
|
|
knownFactories map[string]string
|
|
|
|
// Configuration
|
|
minLiquidityThreshold *big.Int
|
|
priceImpactThreshold float64
|
|
}
|
|
|
|
// NewPoolDiscovery creates a new pool discovery system
|
|
func NewPoolDiscovery(rpcClient *rpc.Client, logger *logger.Logger) *PoolDiscovery {
|
|
// Create ethclient from rpc client for CREATE2 calculator
|
|
ethClient := ethclient.NewClient(rpcClient)
|
|
|
|
pd := &PoolDiscovery{
|
|
client: rpcClient,
|
|
logger: logger,
|
|
create2Calculator: NewCREATE2Calculator(logger, ethClient),
|
|
blacklist: NewPoolBlacklist(logger), // Initialize blacklist
|
|
pools: make(map[string]*Pool),
|
|
exchanges: make(map[string]*Exchange),
|
|
poolsFile: "data/pools.json",
|
|
exchangesFile: "data/exchanges.json",
|
|
eventSignatures: make(map[string]string),
|
|
knownFactories: make(map[string]string),
|
|
minLiquidityThreshold: big.NewInt(1000000000000000000), // 1 ETH equivalent
|
|
priceImpactThreshold: 0.01, // 1% price impact threshold
|
|
}
|
|
|
|
pd.initializeEventSignatures()
|
|
pd.initializeKnownFactories()
|
|
pd.loadPersistedData()
|
|
|
|
return pd
|
|
}
|
|
|
|
// initializeEventSignatures sets up event signatures for discovery
|
|
func (pd *PoolDiscovery) initializeEventSignatures() {
|
|
// Uniswap V2 events
|
|
pd.eventSignatures["0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9"] = "PairCreated" // Uniswap V2
|
|
pd.eventSignatures["0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118"] = "PoolCreated" // Uniswap V3
|
|
pd.eventSignatures["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"] = "Transfer" // ERC20 Transfer
|
|
pd.eventSignatures["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"] = "Swap" // Uniswap V2 Swap
|
|
pd.eventSignatures["0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"] = "Swap" // Uniswap V3 Swap
|
|
pd.eventSignatures["0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f"] = "Mint" // Liquidity mint
|
|
pd.eventSignatures["0xdccd412f0b1252819cb1fd330b93224ca42612892bb3f4f789976e6d81936496"] = "Burn" // Liquidity burn
|
|
pd.eventSignatures["0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1"] = "Sync" // Reserve sync
|
|
}
|
|
|
|
// initializeKnownFactories sets up known factory contracts to monitor for new pools
|
|
func (pd *PoolDiscovery) initializeKnownFactories() {
|
|
// Uniswap V2 Factory
|
|
pd.knownFactories["0xf1d7cc64fb4452f05c498126312ebe29f30fbcf9"] = "UniswapV2"
|
|
// Uniswap V3 Factory
|
|
pd.knownFactories["0x1f98431c8ad98523631ae4a59f267346ea31f984"] = "UniswapV3"
|
|
// SushiSwap Factory
|
|
pd.knownFactories["0xc35dadb65012ec5796536bd9864ed8773abc74c4"] = "SushiSwap"
|
|
// Camelot V2 Factory
|
|
pd.knownFactories["0x6eccab422d763ac031210895c81787e87b43a652"] = "Camelot"
|
|
// Balancer V2 Vault
|
|
pd.knownFactories["0xba12222222228d8ba445958a75a0704d566bf2c8"] = "Balancer"
|
|
// Curve Factory
|
|
pd.knownFactories["0xb17b674d9c5cb2e441f8e196a2f048a81355d031"] = "Curve"
|
|
}
|
|
|
|
// DiscoverFromTransaction analyzes a transaction for new pools and exchanges
|
|
func (pd *PoolDiscovery) DiscoverFromTransaction(txHash, to, input string, logs []interface{}) {
|
|
pd.mutex.Lock()
|
|
defer pd.mutex.Unlock()
|
|
|
|
// Check if transaction is to an unknown contract (potential new DEX)
|
|
if to != "" && pd.isUnknownContract(to) {
|
|
pd.analyzeUnknownContract(to, input)
|
|
}
|
|
|
|
// Analyze logs for pool creation and swap events
|
|
for _, logEntry := range logs {
|
|
pd.analyzeLogEntry(logEntry, txHash)
|
|
}
|
|
}
|
|
|
|
// isUnknownContract checks if a contract address is unknown
|
|
func (pd *PoolDiscovery) isUnknownContract(address string) bool {
|
|
_, exists := pd.exchanges[strings.ToLower(address)]
|
|
return !exists
|
|
}
|
|
|
|
// analyzeUnknownContract analyzes an unknown contract to determine if it's a DEX
|
|
func (pd *PoolDiscovery) analyzeUnknownContract(address, input string) {
|
|
// Check function signatures that indicate DEX functionality
|
|
if len(input) < 10 {
|
|
return
|
|
}
|
|
|
|
functionSig := input[:10]
|
|
isDEX := false
|
|
protocol := "Unknown"
|
|
|
|
switch functionSig {
|
|
case "0x38ed1739", "0x18cbafe5", "0x7ff36ab5": // Uniswap V2 functions
|
|
isDEX = true
|
|
protocol = "UniswapV2-Like"
|
|
case "0x414bf389", "0xac9650d8", "0x5ae401dc", "0x1f0464d1": // Uniswap V3 functions
|
|
isDEX = true
|
|
protocol = "UniswapV3-Like"
|
|
case "0xa9059cbb", "0x095ea7b3": // ERC20 functions (might be router)
|
|
isDEX = true
|
|
protocol = "Router-Like"
|
|
}
|
|
|
|
if isDEX {
|
|
exchange := &Exchange{
|
|
Name: fmt.Sprintf("Unknown-%s", address[:8]),
|
|
Router: address,
|
|
Protocol: protocol,
|
|
Discovered: time.Now(),
|
|
TotalVolume: big.NewInt(0),
|
|
}
|
|
|
|
pd.exchanges[strings.ToLower(address)] = exchange
|
|
pd.logger.Opportunity("", address, "", "NEW_EXCHANGE_DISCOVERED", protocol, 0, 0, 0, 0, map[string]interface{}{
|
|
"router": address,
|
|
"protocol": protocol,
|
|
"discovered": time.Now(),
|
|
})
|
|
|
|
pd.persistData()
|
|
}
|
|
}
|
|
|
|
// analyzeLogEntry analyzes a log entry for pool creation or swap events
|
|
func (pd *PoolDiscovery) analyzeLogEntry(logEntry interface{}, txHash string) {
|
|
// Convert log entry to map for analysis
|
|
logMap, ok := logEntry.(map[string]interface{})
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
topics, ok := logMap["topics"].([]interface{})
|
|
if !ok || len(topics) == 0 {
|
|
return
|
|
}
|
|
|
|
topic0, ok := topics[0].(string)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
eventType, exists := pd.eventSignatures[topic0]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
address, ok := logMap["address"].(string)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
switch eventType {
|
|
case "PairCreated", "PoolCreated":
|
|
pd.handlePoolCreation(address, topics, logMap, txHash)
|
|
case "Swap":
|
|
pd.handleSwapEvent(address, topics, logMap, txHash)
|
|
case "Mint", "Burn":
|
|
pd.handleLiquidityEvent(address, topics, logMap, txHash, eventType)
|
|
case "Sync":
|
|
pd.handleSyncEvent(address, topics, logMap, txHash)
|
|
}
|
|
}
|
|
|
|
// handlePoolCreation processes pool creation events
|
|
func (pd *PoolDiscovery) handlePoolCreation(factoryAddress string, topics []interface{}, logData map[string]interface{}, txHash string) {
|
|
if len(topics) < 4 {
|
|
return
|
|
}
|
|
|
|
// Extract pool information from topics
|
|
token0 := pd.addressFromTopic(topics[1])
|
|
token1 := pd.addressFromTopic(topics[2])
|
|
poolAddress := pd.addressFromTopic(topics[3])
|
|
|
|
protocol := "Unknown"
|
|
if proto, exists := pd.knownFactories[strings.ToLower(factoryAddress)]; exists {
|
|
protocol = proto
|
|
}
|
|
|
|
pool := &Pool{
|
|
Address: poolAddress,
|
|
Token0: token0,
|
|
Token1: token1,
|
|
Protocol: protocol,
|
|
Factory: factoryAddress,
|
|
LastUpdated: time.Now(),
|
|
TotalVolume: big.NewInt(0),
|
|
SwapCount: 0,
|
|
}
|
|
|
|
pd.pools[strings.ToLower(poolAddress)] = pool
|
|
|
|
pd.logger.Opportunity(txHash, factoryAddress, poolAddress, "NEW_POOL_DISCOVERED", protocol, 0, 0, 0, 0, map[string]interface{}{
|
|
"poolAddress": poolAddress,
|
|
"token0": token0,
|
|
"token1": token1,
|
|
"factory": factoryAddress,
|
|
"protocol": protocol,
|
|
})
|
|
|
|
pd.persistData()
|
|
}
|
|
|
|
// handleSwapEvent processes swap events and calculates price impact
|
|
func (pd *PoolDiscovery) handleSwapEvent(poolAddress string, topics []interface{}, logData map[string]interface{}, txHash string) {
|
|
pool, exists := pd.pools[strings.ToLower(poolAddress)]
|
|
if !exists {
|
|
// Unknown pool, try to discover it
|
|
pd.discoverPoolFromSwap(poolAddress, txHash)
|
|
return
|
|
}
|
|
|
|
// Extract swap data from log
|
|
data, ok := logData["data"].(string)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
swapData := pd.parseSwapData(data, pool.Protocol)
|
|
if swapData == nil {
|
|
return
|
|
}
|
|
|
|
// Calculate price impact
|
|
priceImpact := pd.calculatePriceImpact(pool, swapData.AmountIn, swapData.AmountOut)
|
|
isSignificant := priceImpact >= pd.priceImpactThreshold
|
|
|
|
// Update pool statistics
|
|
pool.SwapCount++
|
|
if pool.TotalVolume == nil {
|
|
pool.TotalVolume = big.NewInt(0)
|
|
}
|
|
pool.TotalVolume.Add(pool.TotalVolume, swapData.AmountIn)
|
|
pool.LastUpdated = time.Now()
|
|
|
|
// Log significant swaps
|
|
if isSignificant {
|
|
amountInFloat, _ := new(big.Float).Quo(new(big.Float).SetInt(swapData.AmountIn), big.NewFloat(1e18)).Float64()
|
|
amountOutFloat, _ := new(big.Float).Quo(new(big.Float).SetInt(swapData.AmountOut), big.NewFloat(1e18)).Float64()
|
|
|
|
pd.logger.Opportunity(txHash, "", poolAddress, "SIGNIFICANT_SWAP", pool.Protocol, amountInFloat, amountOutFloat, 0, priceImpact*100, map[string]interface{}{
|
|
"pool": poolAddress,
|
|
"token0": pool.Token0,
|
|
"token1": pool.Token1,
|
|
"priceImpact": fmt.Sprintf("%.2f%%", priceImpact*100),
|
|
"swapCount": pool.SwapCount,
|
|
"totalVolume": pool.TotalVolume.String(),
|
|
})
|
|
}
|
|
}
|
|
|
|
// SwapData represents parsed swap data
|
|
type SwapData struct {
|
|
AmountIn *big.Int
|
|
AmountOut *big.Int
|
|
TokenIn string
|
|
TokenOut string
|
|
}
|
|
|
|
// DetailedSwapInfo represents enhanced swap information from L2 parser
|
|
type DetailedSwapInfo struct {
|
|
TxHash string
|
|
From string
|
|
To string
|
|
MethodName string
|
|
Protocol string
|
|
AmountIn *big.Int
|
|
AmountOut *big.Int
|
|
AmountMin *big.Int
|
|
TokenIn string
|
|
TokenOut string
|
|
Fee uint32
|
|
Recipient string
|
|
IsValid bool
|
|
}
|
|
|
|
// parseSwapData parses swap data from log data
|
|
func (pd *PoolDiscovery) parseSwapData(data, protocol string) *SwapData {
|
|
if len(data) < 2 {
|
|
return nil
|
|
}
|
|
|
|
// Remove 0x prefix
|
|
dataBytes, err := hex.DecodeString(data[2:])
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
if len(dataBytes) < 128 { // 4 * 32 bytes minimum for swap data
|
|
return nil
|
|
}
|
|
|
|
// Parse amounts based on protocol
|
|
var amountIn, amountOut *big.Int
|
|
|
|
switch protocol {
|
|
case "UniswapV2", "SushiSwap", "Camelot":
|
|
// Uniswap V2 Swap(sender,amount0In,amount1In,amount0Out,amount1Out,to)
|
|
amount0In := new(big.Int).SetBytes(dataBytes[32:64])
|
|
amount1In := new(big.Int).SetBytes(dataBytes[64:96])
|
|
amount0Out := new(big.Int).SetBytes(dataBytes[96:128])
|
|
amount1Out := new(big.Int).SetBytes(dataBytes[128:160])
|
|
|
|
if amount0In.Cmp(big.NewInt(0)) > 0 {
|
|
amountIn = amount0In
|
|
amountOut = amount1Out
|
|
} else {
|
|
amountIn = amount1In
|
|
amountOut = amount0Out
|
|
}
|
|
|
|
case "UniswapV3":
|
|
// Uniswap V3 uses signed int256 for amounts
|
|
amountIn = parseSignedInt256(dataBytes[0:32])
|
|
amountOut = parseSignedInt256(dataBytes[32:64])
|
|
// Convert to absolute values for display
|
|
amountIn = new(big.Int).Abs(amountIn)
|
|
amountOut = new(big.Int).Abs(amountOut)
|
|
|
|
default:
|
|
// Generic parsing
|
|
amountIn = new(big.Int).SetBytes(dataBytes[0:32])
|
|
amountOut = new(big.Int).SetBytes(dataBytes[32:64])
|
|
}
|
|
|
|
return &SwapData{
|
|
AmountIn: amountIn,
|
|
AmountOut: amountOut,
|
|
}
|
|
}
|
|
|
|
// calculatePriceImpact calculates the price impact of a swap
|
|
func (pd *PoolDiscovery) calculatePriceImpact(pool *Pool, amountIn, amountOut *big.Int) float64 {
|
|
if pool.Reserves0 == nil || pool.Reserves1 == nil || pool.Reserves0.Cmp(big.NewInt(0)) == 0 {
|
|
return 0.0
|
|
}
|
|
|
|
// Simplified price impact calculation
|
|
// Real implementation would use more sophisticated formulas based on AMM type
|
|
|
|
// Calculate expected amount out without price impact
|
|
reserve0Float := new(big.Float).SetInt(pool.Reserves0)
|
|
reserve1Float := new(big.Float).SetInt(pool.Reserves1)
|
|
amountInFloat := new(big.Float).SetInt(amountIn)
|
|
amountOutFloat := new(big.Float).SetInt(amountOut)
|
|
|
|
// Current price
|
|
currentPrice := new(big.Float).Quo(reserve1Float, reserve0Float)
|
|
|
|
// Expected amount out
|
|
expectedOut := new(big.Float).Mul(amountInFloat, currentPrice)
|
|
|
|
// Price impact = (expected - actual) / expected
|
|
diff := new(big.Float).Sub(expectedOut, amountOutFloat)
|
|
impact := new(big.Float).Quo(diff, expectedOut)
|
|
|
|
impactFloat, _ := impact.Float64()
|
|
if impactFloat < 0 {
|
|
impactFloat = -impactFloat
|
|
}
|
|
|
|
return impactFloat
|
|
}
|
|
|
|
// Additional helper methods...
|
|
func (pd *PoolDiscovery) addressFromTopic(topic interface{}) string {
|
|
topicStr, ok := topic.(string)
|
|
if !ok || len(topicStr) < 42 {
|
|
return ""
|
|
}
|
|
// Extract address from topic (last 20 bytes)
|
|
return "0x" + topicStr[26:]
|
|
}
|
|
|
|
func (pd *PoolDiscovery) handleLiquidityEvent(poolAddress string, topics []interface{}, logData map[string]interface{}, txHash, eventType string) {
|
|
pool, exists := pd.pools[strings.ToLower(poolAddress)]
|
|
if !exists {
|
|
// Try to discover this unknown pool
|
|
pd.discoverPoolFromSwap(poolAddress, txHash)
|
|
return
|
|
}
|
|
|
|
// Parse liquidity event data
|
|
data, ok := logData["data"].(string)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
eventData := pd.parseLiquidityData(data, eventType, pool.Protocol)
|
|
if eventData == nil {
|
|
return
|
|
}
|
|
|
|
// Update pool liquidity
|
|
if eventType == "Mint" && eventData.AmountIn != nil {
|
|
if pool.Liquidity == nil {
|
|
pool.Liquidity = big.NewInt(0)
|
|
}
|
|
pool.Liquidity.Add(pool.Liquidity, eventData.AmountIn)
|
|
} else if eventType == "Burn" && eventData.AmountIn != nil {
|
|
if pool.Liquidity != nil {
|
|
pool.Liquidity.Sub(pool.Liquidity, eventData.AmountIn)
|
|
}
|
|
}
|
|
|
|
pool.LastUpdated = time.Now()
|
|
|
|
// Create liquidity event for potential future use
|
|
_ = &LiquidityEvent{
|
|
TxHash: txHash,
|
|
Pool: poolAddress,
|
|
Type: strings.ToLower(eventType),
|
|
Amount0: eventData.AmountIn,
|
|
Amount1: eventData.AmountOut,
|
|
Liquidity: pool.Liquidity,
|
|
Timestamp: time.Now(),
|
|
BlockNumber: 0, // Would be set by caller
|
|
}
|
|
|
|
pd.logger.Opportunity(txHash, "", poolAddress, "LIQUIDITY_EVENT", pool.Protocol, 0, 0, 0, 0, map[string]interface{}{
|
|
"eventType": eventType,
|
|
"amount0": eventData.AmountIn.String(),
|
|
"amount1": eventData.AmountOut.String(),
|
|
"newLiquidity": pool.Liquidity.String(),
|
|
})
|
|
|
|
pd.persistData()
|
|
}
|
|
|
|
func (pd *PoolDiscovery) handleSyncEvent(poolAddress string, topics []interface{}, logData map[string]interface{}, txHash string) {
|
|
pool, exists := pd.pools[strings.ToLower(poolAddress)]
|
|
if !exists {
|
|
pd.discoverPoolFromSwap(poolAddress, txHash)
|
|
return
|
|
}
|
|
|
|
// Parse sync event data (Uniswap V2 reserves update)
|
|
data, ok := logData["data"].(string)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
syncData := pd.parseSyncData(data)
|
|
if syncData == nil {
|
|
return
|
|
}
|
|
|
|
// Update pool reserves
|
|
pool.Reserves0 = syncData.Reserve0
|
|
pool.Reserves1 = syncData.Reserve1
|
|
pool.LastUpdated = time.Now()
|
|
|
|
pd.logger.Debug(fmt.Sprintf("Updated reserves for pool %s: Reserve0=%s, Reserve1=%s",
|
|
poolAddress, syncData.Reserve0.String(), syncData.Reserve1.String()))
|
|
|
|
pd.persistData()
|
|
}
|
|
|
|
func (pd *PoolDiscovery) discoverPoolFromSwap(poolAddress, txHash string) {
|
|
// Check if we already know this pool
|
|
if _, exists := pd.pools[strings.ToLower(poolAddress)]; exists {
|
|
return
|
|
}
|
|
|
|
pd.logger.Info(fmt.Sprintf("Discovering unknown pool from swap: %s", poolAddress))
|
|
|
|
// Create Ethereum client to query pool contract
|
|
// Get RPC endpoint from config or environment
|
|
rpcEndpoint := os.Getenv("ARBITRUM_RPC_ENDPOINT")
|
|
if rpcEndpoint == "" {
|
|
rpcEndpoint = "wss://arbitrum-mainnet.core.chainstack.com/53c30e7a941160679fdcc396c894fc57" // fallback
|
|
}
|
|
client, err := ethclient.Dial(rpcEndpoint)
|
|
if err != nil {
|
|
pd.logger.Error(fmt.Sprintf("Failed to connect to Ethereum node for pool discovery: %v", err))
|
|
return
|
|
}
|
|
defer client.Close()
|
|
|
|
address := common.HexToAddress(poolAddress)
|
|
|
|
// Validate that this is a real pool contract
|
|
if !uniswap.IsValidPool(context.Background(), client, address) {
|
|
pd.logger.Debug(fmt.Sprintf("Address %s is not a valid pool contract", poolAddress))
|
|
return
|
|
}
|
|
|
|
// Create Uniswap V3 pool interface to fetch real data
|
|
uniswapPool := uniswap.NewUniswapV3Pool(address, client)
|
|
|
|
// Fetch pool state with timeout
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
poolState, err := uniswapPool.GetPoolState(ctx)
|
|
if err != nil {
|
|
pd.logger.Error(fmt.Sprintf("Failed to fetch pool state for %s: %v", poolAddress, err))
|
|
return
|
|
}
|
|
|
|
// Determine protocol (could be enhanced to detect different protocols)
|
|
protocol := "UniswapV3"
|
|
|
|
// Try to determine factory address by checking against known factories
|
|
factory := ""
|
|
for factoryAddr, proto := range pd.knownFactories {
|
|
// This is a simplified check - in practice you'd call the pool's factory() function
|
|
if proto == protocol {
|
|
factory = factoryAddr
|
|
break
|
|
}
|
|
}
|
|
|
|
// Safely convert fee from int64 to uint32
|
|
var feeUint64 uint64
|
|
if poolState.Fee < 0 {
|
|
feeUint64 = 0
|
|
} else {
|
|
feeUint64 = uint64(poolState.Fee)
|
|
}
|
|
safeFee, err := security.SafeUint32(feeUint64)
|
|
if err != nil {
|
|
pd.logger.Warn(fmt.Sprintf("Failed to safely convert fee %d for pool %s: %v", poolState.Fee, poolAddress, err))
|
|
// Use a default fee value if conversion fails
|
|
// Truncate to 32 bits safely by using math.MaxUint32 as mask
|
|
if poolState.Fee > math.MaxUint32 {
|
|
safeFee = math.MaxUint32
|
|
} else {
|
|
safeFee = uint32(poolState.Fee)
|
|
}
|
|
}
|
|
|
|
// Create pool entry with real data
|
|
pool := &Pool{
|
|
Address: poolAddress,
|
|
Token0: poolState.Token0.Hex(),
|
|
Token1: poolState.Token1.Hex(),
|
|
// Safely convert fee from uint64 to uint32
|
|
Fee: safeFee,
|
|
Protocol: protocol,
|
|
Factory: factory,
|
|
Liquidity: poolState.Liquidity.ToBig(),
|
|
LastUpdated: time.Now(),
|
|
TotalVolume: big.NewInt(0),
|
|
SwapCount: 0,
|
|
}
|
|
|
|
pd.pools[strings.ToLower(poolAddress)] = pool
|
|
|
|
pd.logger.Opportunity(txHash, "", poolAddress, "POOL_DISCOVERED", protocol, 0, 0, 0, 0, map[string]interface{}{
|
|
"source": "swap_event",
|
|
"poolAddress": poolAddress,
|
|
"protocol": protocol,
|
|
"token0": poolState.Token0.Hex(),
|
|
"token1": poolState.Token1.Hex(),
|
|
"fee": poolState.Fee,
|
|
"liquidity": poolState.Liquidity.String(),
|
|
"discoveredAt": time.Now(),
|
|
})
|
|
|
|
pd.persistData()
|
|
}
|
|
|
|
// parseLiquidityData parses liquidity event data
|
|
func (pd *PoolDiscovery) parseLiquidityData(data, eventType, protocol string) *SwapData {
|
|
if len(data) < 2 {
|
|
return nil
|
|
}
|
|
|
|
dataBytes, err := hex.DecodeString(data[2:])
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
if len(dataBytes) < 64 { // 2 * 32 bytes minimum
|
|
return nil
|
|
}
|
|
|
|
var amount0, amount1 *big.Int
|
|
|
|
// UniswapV3 uses signed int256 for liquidity amounts
|
|
if protocol == "UniswapV3" {
|
|
amount0 = parseSignedInt256(dataBytes[0:32])
|
|
amount1 = parseSignedInt256(dataBytes[32:64])
|
|
// Convert to absolute values
|
|
amount0 = new(big.Int).Abs(amount0)
|
|
amount1 = new(big.Int).Abs(amount1)
|
|
} else {
|
|
// V2 and others use unsigned uint256
|
|
amount0 = new(big.Int).SetBytes(dataBytes[0:32])
|
|
amount1 = new(big.Int).SetBytes(dataBytes[32:64])
|
|
}
|
|
|
|
return &SwapData{
|
|
AmountIn: amount0,
|
|
AmountOut: amount1,
|
|
}
|
|
}
|
|
|
|
// SyncData represents reserves from a sync event
|
|
type SyncData struct {
|
|
Reserve0 *big.Int
|
|
Reserve1 *big.Int
|
|
}
|
|
|
|
// parseSyncData parses sync event data (Uniswap V2)
|
|
func (pd *PoolDiscovery) parseSyncData(data string) *SyncData {
|
|
if len(data) < 2 {
|
|
return nil
|
|
}
|
|
|
|
dataBytes, err := hex.DecodeString(data[2:])
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
if len(dataBytes) < 64 { // 2 * 32 bytes
|
|
return nil
|
|
}
|
|
|
|
reserve0 := new(big.Int).SetBytes(dataBytes[0:32])
|
|
reserve1 := new(big.Int).SetBytes(dataBytes[32:64])
|
|
|
|
return &SyncData{
|
|
Reserve0: reserve0,
|
|
Reserve1: reserve1,
|
|
}
|
|
}
|
|
|
|
// persistData saves pools and exchanges to files
|
|
func (pd *PoolDiscovery) persistData() {
|
|
// Ensure data directory exists
|
|
os.MkdirAll("data", 0750)
|
|
|
|
// Save pools
|
|
poolsData, _ := json.MarshalIndent(pd.pools, "", " ")
|
|
os.WriteFile(pd.poolsFile, poolsData, 0644)
|
|
|
|
// Save exchanges
|
|
exchangesData, _ := json.MarshalIndent(pd.exchanges, "", " ")
|
|
os.WriteFile(pd.exchangesFile, exchangesData, 0644)
|
|
}
|
|
|
|
// loadPersistedData loads pools and exchanges from files
|
|
func (pd *PoolDiscovery) loadPersistedData() {
|
|
// Load pools
|
|
if data, err := os.ReadFile(pd.poolsFile); err == nil {
|
|
if err := json.Unmarshal(data, &pd.pools); err != nil {
|
|
pd.logger.Warn(fmt.Sprintf("Failed to unmarshal pools from %s: %v", pd.poolsFile, err))
|
|
} else {
|
|
pd.logger.Info(fmt.Sprintf("Loaded %d pools from cache", len(pd.pools)))
|
|
}
|
|
} else {
|
|
pd.logger.Warn(fmt.Sprintf("Failed to read pools file %s: %v", pd.poolsFile, err))
|
|
}
|
|
|
|
// Load exchanges
|
|
if data, err := os.ReadFile(pd.exchangesFile); err == nil {
|
|
if err := json.Unmarshal(data, &pd.exchanges); err != nil {
|
|
pd.logger.Warn(fmt.Sprintf("Failed to unmarshal exchanges from %s: %v", pd.exchangesFile, err))
|
|
} else {
|
|
pd.logger.Info(fmt.Sprintf("Loaded %d exchanges from cache", len(pd.exchanges)))
|
|
}
|
|
} else {
|
|
// Don't warn for missing exchanges file - it's optional
|
|
if !os.IsNotExist(err) {
|
|
pd.logger.Warn(fmt.Sprintf("Failed to read exchanges file %s: %v", pd.exchangesFile, err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetPoolCount returns the number of discovered pools
|
|
func (pd *PoolDiscovery) GetPoolCount() int {
|
|
pd.mutex.RLock()
|
|
defer pd.mutex.RUnlock()
|
|
return len(pd.pools)
|
|
}
|
|
|
|
// SavePoolCache saves discovered pools and exchanges to disk
|
|
func (pd *PoolDiscovery) SavePoolCache() {
|
|
pd.mutex.RLock()
|
|
defer pd.mutex.RUnlock()
|
|
pd.persistData()
|
|
}
|
|
|
|
// GetExchangeCount returns the number of discovered exchanges
|
|
func (pd *PoolDiscovery) GetExchangeCount() int {
|
|
pd.mutex.RLock()
|
|
defer pd.mutex.RUnlock()
|
|
return len(pd.exchanges)
|
|
}
|
|
|
|
// GetPool returns a pool by address (checks blacklist)
|
|
func (pd *PoolDiscovery) GetPool(address string) (*Pool, bool) {
|
|
pd.mutex.RLock()
|
|
defer pd.mutex.RUnlock()
|
|
|
|
// Check blacklist first
|
|
poolAddr := common.HexToAddress(address)
|
|
if pd.blacklist != nil && pd.blacklist.IsBlacklisted(poolAddr) {
|
|
pd.logger.Debug(fmt.Sprintf("Pool %s is blacklisted, skipping", address[:10]))
|
|
return nil, false
|
|
}
|
|
|
|
pool, exists := pd.pools[strings.ToLower(address)]
|
|
return pool, exists
|
|
}
|
|
|
|
// IsPoolBlacklisted checks if a pool is blacklisted
|
|
func (pd *PoolDiscovery) IsPoolBlacklisted(address common.Address) bool {
|
|
if pd.blacklist == nil {
|
|
return false
|
|
}
|
|
return pd.blacklist.IsBlacklisted(address)
|
|
}
|
|
|
|
// RecordPoolFailure records a pool failure for blacklisting
|
|
func (pd *PoolDiscovery) RecordPoolFailure(poolAddress common.Address, reason string, protocol string, token0, token1 common.Address) {
|
|
if pd.blacklist != nil {
|
|
pd.blacklist.RecordFailure(poolAddress, reason, protocol, token0, token1)
|
|
}
|
|
}
|
|
|
|
// GetBlacklistStats returns blacklist statistics
|
|
func (pd *PoolDiscovery) GetBlacklistStats() map[string]interface{} {
|
|
if pd.blacklist == nil {
|
|
return map[string]interface{}{"status": "blacklist not initialized"}
|
|
}
|
|
return pd.blacklist.GetBlacklistStats()
|
|
}
|
|
|
|
// GetAllPools returns all discovered pools
|
|
func (pd *PoolDiscovery) GetAllPools() map[string]*Pool {
|
|
pd.mutex.RLock()
|
|
defer pd.mutex.RUnlock()
|
|
|
|
pools := make(map[string]*Pool)
|
|
for k, v := range pd.pools {
|
|
pools[k] = v
|
|
}
|
|
return pools
|
|
}
|
|
|
|
// DiscoverPoolsForTokenPair uses CREATE2 to discover all possible pools for a token pair
|
|
func (pd *PoolDiscovery) DiscoverPoolsForTokenPair(token0, token1 common.Address) ([]*Pool, error) {
|
|
// Use CREATE2 calculator to find all possible pool addresses
|
|
poolIdentifiers, err := pd.create2Calculator.FindPoolsForTokenPair(token0, token1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to calculate pool addresses: %w", err)
|
|
}
|
|
|
|
pools := make([]*Pool, 0)
|
|
|
|
for _, poolId := range poolIdentifiers {
|
|
// Check if pool exists on-chain
|
|
exists, err := pd.verifyPoolExists(poolId.PoolAddr)
|
|
if err != nil {
|
|
pd.logger.Debug(fmt.Sprintf("Failed to verify pool %s: %v", poolId.PoolAddr.Hex(), err))
|
|
continue
|
|
}
|
|
|
|
if !exists {
|
|
pd.logger.Debug(fmt.Sprintf("Pool %s does not exist on-chain", poolId.PoolAddr.Hex()))
|
|
continue
|
|
}
|
|
|
|
// Create pool object
|
|
pool := &Pool{
|
|
Address: poolId.PoolAddr.Hex(),
|
|
Token0: poolId.Token0.Hex(),
|
|
Token1: poolId.Token1.Hex(),
|
|
Fee: poolId.Fee,
|
|
Protocol: poolId.Factory,
|
|
Factory: poolId.Factory,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
// Get additional pool data
|
|
if err := pd.enrichPoolData(pool); err != nil {
|
|
pd.logger.Debug(fmt.Sprintf("Failed to enrich pool data for %s: %v", pool.Address, err))
|
|
}
|
|
|
|
pools = append(pools, pool)
|
|
|
|
// Add to our cache
|
|
pd.addPool(pool)
|
|
}
|
|
|
|
pd.logger.Info(fmt.Sprintf("Discovered %d pools for token pair %s/%s",
|
|
len(pools), token0.Hex(), token1.Hex()))
|
|
|
|
return pools, nil
|
|
}
|
|
|
|
// verifyPoolExists checks if a pool actually exists at the calculated address
|
|
func (pd *PoolDiscovery) verifyPoolExists(poolAddr common.Address) (bool, error) {
|
|
// Check if there's code at the address
|
|
var result string
|
|
err := pd.client.Call(&result, "eth_getCode", poolAddr.Hex(), "latest")
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to get code: %w", err)
|
|
}
|
|
|
|
// If there's no code, the pool doesn't exist
|
|
if result == "0x" || result == "" {
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// enrichPoolData gets additional data about a pool
|
|
func (pd *PoolDiscovery) enrichPoolData(pool *Pool) error {
|
|
poolAddr := common.HexToAddress(pool.Address)
|
|
|
|
// For Uniswap V3 pools, get slot0 data
|
|
if pool.Protocol == "uniswap_v3" || pool.Protocol == "camelot_v3" {
|
|
return pd.enrichUniswapV3PoolData(pool, poolAddr)
|
|
}
|
|
|
|
// For Uniswap V2 style pools, get reserves
|
|
if pool.Protocol == "uniswap_v2" || pool.Protocol == "sushiswap" {
|
|
return pd.enrichUniswapV2PoolData(pool, poolAddr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// enrichUniswapV3PoolData gets Uniswap V3 specific pool data
|
|
func (pd *PoolDiscovery) enrichUniswapV3PoolData(pool *Pool, poolAddr common.Address) error {
|
|
// Get slot0 data (price, tick, etc.)
|
|
slot0ABI := `[{"inputs":[],"name":"slot0","outputs":[{"internalType":"uint160","name":"sqrtPriceX96","type":"uint160"},{"internalType":"int24","name":"tick","type":"int24"},{"internalType":"uint16","name":"observationIndex","type":"uint16"},{"internalType":"uint16","name":"observationCardinality","type":"uint16"},{"internalType":"uint16","name":"observationCardinalityNext","type":"uint16"},{"internalType":"uint8","name":"feeProtocol","type":"uint8"},{"internalType":"bool","name":"unlocked","type":"bool"}],"stateMutability":"view","type":"function"}]`
|
|
|
|
contractABI, err := uniswap.ParseABI(slot0ABI)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse slot0 ABI: %w", err)
|
|
}
|
|
|
|
callData, err := contractABI.Pack("slot0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to pack slot0 call: %w", err)
|
|
}
|
|
|
|
var result string
|
|
err = pd.client.Call(&result, "eth_call", map[string]interface{}{
|
|
"to": poolAddr.Hex(),
|
|
"data": "0x" + hex.EncodeToString(callData),
|
|
}, "latest")
|
|
if err != nil {
|
|
return fmt.Errorf("slot0 call failed: %w", err)
|
|
}
|
|
|
|
// Decode result
|
|
resultBytes, err := hex.DecodeString(strings.TrimPrefix(result, "0x"))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to decode result: %w", err)
|
|
}
|
|
|
|
if len(resultBytes) == 0 {
|
|
return fmt.Errorf("empty result from slot0 call")
|
|
}
|
|
|
|
// Store the fact that this is a valid V3 pool
|
|
pool.BlockNumber = 0 // Will be set when we detect the creation event
|
|
|
|
return nil
|
|
}
|
|
|
|
// enrichUniswapV2PoolData gets Uniswap V2 specific pool data
|
|
func (pd *PoolDiscovery) enrichUniswapV2PoolData(pool *Pool, poolAddr common.Address) error {
|
|
// Get reserves from getReserves()
|
|
reservesABI := `[{"inputs":[],"name":"getReserves","outputs":[{"internalType":"uint112","name":"_reserve0","type":"uint112"},{"internalType":"uint112","name":"_reserve1","type":"uint112"},{"internalType":"uint32","name":"_blockTimestampLast","type":"uint32"}],"stateMutability":"view","type":"function"}]`
|
|
|
|
contractABI, err := uniswap.ParseABI(reservesABI)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse reserves ABI: %w", err)
|
|
}
|
|
|
|
callData, err := contractABI.Pack("getReserves")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to pack getReserves call: %w", err)
|
|
}
|
|
|
|
var result string
|
|
err = pd.client.Call(&result, "eth_call", map[string]interface{}{
|
|
"to": poolAddr.Hex(),
|
|
"data": "0x" + hex.EncodeToString(callData),
|
|
}, "latest")
|
|
if err != nil {
|
|
return fmt.Errorf("getReserves call failed: %w", err)
|
|
}
|
|
|
|
// Decode result
|
|
resultBytes, err := hex.DecodeString(strings.TrimPrefix(result, "0x"))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to decode result: %w", err)
|
|
}
|
|
|
|
if len(resultBytes) >= 64 {
|
|
// Extract reserves (first 32 bytes for reserve0, second 32 bytes for reserve1)
|
|
reserve0 := new(big.Int).SetBytes(resultBytes[:32])
|
|
reserve1 := new(big.Int).SetBytes(resultBytes[32:64])
|
|
|
|
pool.Reserves0 = reserve0
|
|
pool.Reserves1 = reserve1
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ValidatePoolAddress validates a pool address using CREATE2 calculation
|
|
func (pd *PoolDiscovery) ValidatePoolAddress(factoryName string, token0, token1 common.Address, fee uint32, poolAddr common.Address) bool {
|
|
return pd.create2Calculator.ValidatePoolAddress(factoryName, token0, token1, fee, poolAddr)
|
|
}
|
|
|
|
// ProcessDetailedSwap processes a swap with detailed information from L2 parser
|
|
func (pd *PoolDiscovery) ProcessDetailedSwap(swapInfo *DetailedSwapInfo) {
|
|
if !swapInfo.IsValid {
|
|
return
|
|
}
|
|
|
|
// Convert amounts to float for logging
|
|
var amountInFloat, amountOutFloat, amountMinFloat float64
|
|
|
|
if swapInfo.AmountIn != nil {
|
|
amountInFloat, _ = new(big.Float).Quo(new(big.Float).SetInt(swapInfo.AmountIn), big.NewFloat(1e18)).Float64()
|
|
}
|
|
|
|
if swapInfo.AmountOut != nil {
|
|
amountOutFloat, _ = new(big.Float).Quo(new(big.Float).SetInt(swapInfo.AmountOut), big.NewFloat(1e18)).Float64()
|
|
}
|
|
|
|
if swapInfo.AmountMin != nil {
|
|
amountMinFloat, _ = new(big.Float).Quo(new(big.Float).SetInt(swapInfo.AmountMin), big.NewFloat(1e18)).Float64()
|
|
}
|
|
|
|
// Estimate profit (simplified - could be enhanced)
|
|
profitUSD := 0.0 // Would require price oracle integration
|
|
|
|
// Log the detailed opportunity
|
|
pd.logger.Opportunity(
|
|
swapInfo.TxHash,
|
|
swapInfo.From,
|
|
swapInfo.To,
|
|
swapInfo.MethodName,
|
|
swapInfo.Protocol,
|
|
amountInFloat,
|
|
amountOutFloat,
|
|
amountMinFloat,
|
|
profitUSD,
|
|
map[string]interface{}{
|
|
"tokenIn": swapInfo.TokenIn,
|
|
"tokenOut": swapInfo.TokenOut,
|
|
"recipient": swapInfo.Recipient,
|
|
"fee": swapInfo.Fee,
|
|
"functionSig": "", // Could be added if needed
|
|
"contractName": swapInfo.Protocol,
|
|
"deadline": 0, // Could be added if needed
|
|
},
|
|
)
|
|
}
|
|
|
|
// addPool adds a pool to the cache
|
|
func (pd *PoolDiscovery) addPool(pool *Pool) {
|
|
pd.mutex.Lock()
|
|
defer pd.mutex.Unlock()
|
|
|
|
if pd.pools == nil {
|
|
pd.pools = make(map[string]*Pool)
|
|
}
|
|
|
|
pd.pools[pool.Address] = pool
|
|
}
|