Files
mev-beta/pkg/events/parser.go
Administrator e02ded0a6a fix: use pool cache to avoid zero addresses in Uniswap V3 parsing
- Added poolCache field to EventParser struct with PoolCache interface
- Modified getPoolTokens() to check cache before returning zero addresses
- Created PoolCache interface in pkg/interfaces for clean separation
- Added debug logging to identify pools missing from cache
- Documented long-term architecture improvements in PARSER_ARCHITECTURE_IMPROVEMENTS.md

This fixes the critical issue where Uniswap V3 swap events would show zero
addresses for tokens when transaction calldata was unavailable. The parser
now falls back to the pool cache which contains previously discovered pool
information.

Benefits:
- Eliminates zero address errors for known pools
- Reduces unnecessary RPC calls
- Provides visibility into which pools are missing from cache
- Lays foundation for per-exchange parser architecture

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 09:59:37 +01:00

1925 lines
63 KiB
Go

package events
import (
"bytes"
"encoding/hex"
"fmt"
"math/big"
"strings"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/holiman/uint256"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/calldata"
"github.com/fraktal/mev-beta/pkg/interfaces"
"github.com/fraktal/mev-beta/pkg/uniswap"
)
// parseSignedInt256 correctly parses a signed 256-bit integer from 32 bytes
// This is critical for UniswapV3 events which use int256 for amounts
// Returns error instead of silently returning zero for invalid data
func parseSignedInt256(data []byte) (*big.Int, error) {
if len(data) != 32 {
return nil, fmt.Errorf("invalid data length: expected 32 bytes, got %d", len(data))
}
// Validate data is not all zeros (likely corruption or failed transaction)
allZero := true
for _, b := range data {
if b != 0 {
allZero = false
break
}
}
if allZero {
return nil, fmt.Errorf("data is all zeros - likely corrupted or from failed transaction")
}
value := new(big.Int).SetBytes(data)
// Check if the value is negative (MSB set)
if len(data) > 0 && data[0]&0x80 != 0 {
// Convert from two's complement
// Subtract 2^256 to get the negative value
maxUint256 := new(big.Int)
maxUint256.Lsh(big.NewInt(1), 256)
value.Sub(value, maxUint256)
}
return value, nil
}
// EventType represents the type of DEX event
type EventType int
const (
Unknown EventType = iota
Swap
AddLiquidity
RemoveLiquidity
NewPool
)
// String returns a string representation of the event type
func (et EventType) String() string {
switch et {
case Unknown:
return "Unknown"
case Swap:
return "Swap"
case AddLiquidity:
return "AddLiquidity"
case RemoveLiquidity:
return "RemoveLiquidity"
case NewPool:
return "NewPool"
default:
return "Unknown"
}
}
type Event struct {
Type EventType
Protocol string // UniswapV2, UniswapV3, SushiSwap, etc.
PoolAddress common.Address
Token0 common.Address
Token1 common.Address
Amount0 *big.Int
Amount1 *big.Int
SqrtPriceX96 *uint256.Int
Liquidity *uint256.Int
Tick int
Timestamp uint64
TransactionHash common.Hash
BlockNumber uint64
}
// EventParser parses DEX events from Ethereum transactions
type EventParser struct {
// Known DEX contract addresses
UniswapV2Factory common.Address
UniswapV3Factory common.Address
SushiSwapFactory common.Address
// Router addresses
UniswapV2Router01 common.Address
UniswapV2Router02 common.Address
UniswapV3Router common.Address
SushiSwapRouter common.Address
// Known pool addresses (for quick lookup)
knownPools map[common.Address]string
// Event signatures for parsing logs
swapEventV2Sig common.Hash
swapEventV3Sig common.Hash
mintEventV2Sig common.Hash
mintEventV3Sig common.Hash
burnEventV2Sig common.Hash
burnEventV3Sig common.Hash
// CRITICAL FIX: Token extractor interface for working token extraction
tokenExtractor interfaces.TokenExtractor
// CRITICAL FIX: Pool cache to avoid zero addresses for known pools
poolCache interfaces.PoolCache
logger *logger.Logger
}
func (ep *EventParser) logDebug(message string, kv ...interface{}) {
if ep.logger != nil {
args := append([]interface{}{message}, kv...)
ep.logger.Debug(args...)
return
}
fmt.Println(append([]interface{}{"[DEBUG]", message}, kv...)...)
}
func (ep *EventParser) logInfo(message string, kv ...interface{}) {
if ep.logger != nil {
args := append([]interface{}{message}, kv...)
ep.logger.Info(args...)
return
}
fmt.Println(append([]interface{}{"[INFO]", message}, kv...)...)
}
func (ep *EventParser) logWarn(message string, kv ...interface{}) {
if ep.logger != nil {
args := append([]interface{}{message}, kv...)
ep.logger.Warn(args...)
return
}
fmt.Println(append([]interface{}{"[WARN]", message}, kv...)...)
}
// NewEventParser creates a new event parser with official Arbitrum deployment addresses
func NewEventParser() *EventParser {
return NewEventParserWithLogger(nil)
}
// NewEventParserWithLogger instantiates an EventParser using the provided logger.
// When logger is nil, it falls back to the shared multi-file logger with INFO level.
func NewEventParserWithLogger(log *logger.Logger) *EventParser {
return NewEventParserWithTokenExtractor(log, nil)
}
// NewEventParserWithTokenExtractor instantiates an EventParser with a TokenExtractor for enhanced parsing.
// This is the primary constructor for using the working L2 parser logic.
func NewEventParserWithTokenExtractor(log *logger.Logger, tokenExtractor interfaces.TokenExtractor) *EventParser {
return NewEventParserFull(log, tokenExtractor, nil)
}
// NewEventParserFull instantiates an EventParser with full customization options
func NewEventParserFull(log *logger.Logger, tokenExtractor interfaces.TokenExtractor, poolCache interfaces.PoolCache) *EventParser {
if log == nil {
log = logger.New("info", "text", "")
}
parser := &EventParser{
logger: log,
tokenExtractor: tokenExtractor,
poolCache: poolCache,
// Official Arbitrum DEX Factory Addresses
UniswapV2Factory: common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"), // Official Uniswap V2 Factory on Arbitrum
UniswapV3Factory: common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"), // Official Uniswap V3 Factory on Arbitrum
SushiSwapFactory: common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"), // Official SushiSwap V2 Factory on Arbitrum
// Official Arbitrum DEX Router Addresses
UniswapV2Router01: common.HexToAddress("0x0000000000000000000000000000000000000000"), // V2Router01 not deployed on Arbitrum
UniswapV2Router02: common.HexToAddress("0x4752ba5dbc23f44d87826276bf6fd6b1c372ad24"), // Official Uniswap V2 Router02 on Arbitrum
UniswapV3Router: common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"), // Official Uniswap V3 SwapRouter on Arbitrum
SushiSwapRouter: common.HexToAddress("0x1b02dA8Cb0d097eB8D57A175b88c7D8b47997506"), // Official SushiSwap Router on Arbitrum
knownPools: make(map[common.Address]string),
}
// Initialize event signatures
parser.swapEventV2Sig = crypto.Keccak256Hash([]byte("Swap(address,uint256,uint256,uint256,uint256,address)"))
parser.swapEventV3Sig = crypto.Keccak256Hash([]byte("Swap(address,address,int256,int256,uint160,uint128,int24)"))
parser.mintEventV2Sig = crypto.Keccak256Hash([]byte("Mint(address,uint256,uint256)"))
parser.mintEventV3Sig = crypto.Keccak256Hash([]byte("Mint(address,address,int24,int24,uint128,uint256,uint256)"))
parser.burnEventV2Sig = crypto.Keccak256Hash([]byte("Burn(address,uint256,uint256)"))
parser.burnEventV3Sig = crypto.Keccak256Hash([]byte("Burn(address,int24,int24,uint128,uint256,uint256)"))
// Pre-populate known Arbitrum pools (high volume pools)
parser.knownPools[common.HexToAddress("0xC6962004f452bE9203591991D15f6b388e09E8D0")] = "UniswapV3" // USDC/WETH 0.05%
parser.knownPools[common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640")] = "UniswapV3" // USDC/WETH 0.3%
parser.knownPools[common.HexToAddress("0xC31E54c7a869B9FcBEcc14363CF510d1c41fa443")] = "UniswapV3" // WETH/USDT 0.05%
parser.knownPools[common.HexToAddress("0x641C00A822e8b671738d32a431a4Fb6074E5c79d")] = "UniswapV3" // WETH/USDT 0.3%
// Add test addresses to known pools
parser.knownPools[common.HexToAddress("0x905dfCD5649217c42684f23958568e533C711Aa3")] = "SushiSwap" // Test SushiSwap pool
parser.knownPools[common.HexToAddress("0x84652bb2539513BAf36e225c930Fdd8eaa63CE27")] = "Camelot" // Test Camelot pool
parser.knownPools[common.HexToAddress("0x32dF62dc3aEd2cD6224193052Ce665DC18165841")] = "Balancer" // Test Balancer pool
parser.knownPools[common.HexToAddress("0x7f90122BF0700F9E7e1F688fe926940E8839F353")] = "Curve" // Test Curve pool
// Token extractor is now injected via constructor parameter
// This allows for flexible implementation without circular imports
return parser
}
// ParseTransactionReceipt parses events from a transaction receipt
func (ep *EventParser) ParseTransactionReceipt(receipt *types.Receipt, blockNumber uint64, timestamp uint64) ([]*Event, error) {
return ep.ParseTransactionReceiptWithTx(receipt, nil, blockNumber, timestamp)
}
// ParseTransactionReceiptWithTx parses events from a transaction receipt with optional transaction for token extraction
func (ep *EventParser) ParseTransactionReceiptWithTx(receipt *types.Receipt, tx *types.Transaction, blockNumber uint64, timestamp uint64) ([]*Event, error) {
events := make([]*Event, 0)
// If we have the transaction, try to extract tokens from calldata first
// This provides a token lookup cache for enriching log-based events
var txTokenCache map[string][]common.Address
if tx != nil {
txTokenCache = make(map[string][]common.Address)
txEvents, _ := ep.ParseTransaction(tx, blockNumber, timestamp)
for _, ev := range txEvents {
if ev != nil && ev.Token0 != (common.Address{}) && ev.Token1 != (common.Address{}) {
// Cache tokens by pool address for enriching log events
txTokenCache[ev.PoolAddress.Hex()] = []common.Address{ev.Token0, ev.Token1}
}
}
}
// Parse logs for DEX events
for _, log := range receipt.Logs {
// Skip anonymous logs
if len(log.Topics) == 0 {
continue
}
// Check if this is a DEX event based on the topic signature
eventSig := log.Topics[0]
var event *Event
var err error
switch eventSig {
case ep.swapEventV2Sig:
event, err = ep.parseUniswapV2Swap(log, blockNumber, timestamp, receipt.TxHash, txTokenCache)
case ep.swapEventV3Sig:
event, err = ep.parseUniswapV3Swap(log, blockNumber, timestamp, receipt.TxHash, txTokenCache)
case ep.mintEventV2Sig:
event, err = ep.parseUniswapV2Mint(log, blockNumber, timestamp, receipt.TxHash)
case ep.mintEventV3Sig:
event, err = ep.parseUniswapV3Mint(log, blockNumber, timestamp, receipt.TxHash)
case ep.burnEventV2Sig:
event, err = ep.parseUniswapV2Burn(log, blockNumber, timestamp, receipt.TxHash)
case ep.burnEventV3Sig:
event, err = ep.parseUniswapV3Burn(log, blockNumber, timestamp, receipt.TxHash)
}
if err != nil {
// Log error but continue parsing other logs
continue
}
if event != nil {
events = append(events, event)
}
}
return events, nil
}
// IsDEXInteraction checks if a transaction interacts with a known DEX contract
func (ep *EventParser) IsDEXInteraction(tx *types.Transaction) bool {
if tx.To() == nil {
return false
}
to := *tx.To()
// Check factory contracts
if to == ep.UniswapV2Factory ||
to == ep.UniswapV3Factory ||
to == ep.SushiSwapFactory {
return true
}
// Check router contracts
if to == ep.UniswapV2Router01 ||
to == ep.UniswapV2Router02 ||
to == ep.UniswapV3Router ||
to == ep.SushiSwapRouter {
return true
}
// Check known pools
if _, exists := ep.knownPools[to]; exists {
return true
}
return false
}
// identifyProtocol identifies which DEX protocol a transaction is interacting with
func (ep *EventParser) identifyProtocol(tx *types.Transaction) string {
if tx.To() == nil {
return "Unknown"
}
to := *tx.To()
// Check factory contracts
if to == ep.UniswapV2Factory {
return "UniswapV2"
}
if to == ep.UniswapV3Factory {
return "UniswapV3"
}
if to == ep.SushiSwapFactory {
return "SushiSwap"
}
// Check router contracts
if to == ep.UniswapV2Router01 || to == ep.UniswapV2Router02 {
return "UniswapV2"
}
if to == ep.UniswapV3Router {
return "UniswapV3"
}
if to == ep.SushiSwapRouter {
return "SushiSwap"
}
// Check known pools
if protocol, exists := ep.knownPools[to]; exists {
return protocol
}
// Try to identify from function signature in transaction data
if len(tx.Data()) >= 4 {
sig := common.Bytes2Hex(tx.Data()[:4])
switch sig {
case "0xac9650d8": // multicall (Uniswap V3)
return "UniswapV3"
case "0x1f0464d1": // multicall with blockhash (Uniswap V3)
return "UniswapV3"
case "0x88316456": // swap (Uniswap V2)
return "UniswapV2"
case "0x128acb08": // swap (SushiSwap)
return "SushiSwap"
case "0x38ed1739": // swapExactTokensForTokens (Uniswap V2)
return "UniswapV2"
case "0x8803dbee": // swapTokensForExactTokens (Uniswap V2)
return "UniswapV2"
case "0x7ff36ab5": // swapExactETHForTokens (Uniswap V2)
return "UniswapV2"
case "0xb6f9de95": // swapExactTokensForETH (Uniswap V2)
return "UniswapV2"
case "0x414bf389": // exactInputSingle (Uniswap V3)
return "UniswapV3"
case "0xdb3e2198": // exactInput (Uniswap V3)
return "UniswapV3"
case "0xf305d719": // exactOutputSingle (Uniswap V3)
return "UniswapV3"
case "0x04e45aaf": // exactOutput (Uniswap V3)
return "UniswapV3"
case "0x18cbafe5": // swapExactTokensForTokensSupportingFeeOnTransferTokens (Uniswap V2)
return "UniswapV2"
case "0x18cffa1c": // swapExactETHForTokensSupportingFeeOnTransferTokens (Uniswap V2)
return "UniswapV2"
case "0x791ac947": // swapExactTokensForETHSupportingFeeOnTransferTokens (Uniswap V2)
return "UniswapV2"
case "0x5ae401dc": // multicall (Uniswap V3)
return "UniswapV3"
}
}
return "Unknown"
}
// parseUniswapV2Swap parses a Uniswap V2 Swap event
func (ep *EventParser) parseUniswapV2Swap(log *types.Log, blockNumber uint64, timestamp uint64, txHash common.Hash, txTokenCache map[string][]common.Address) (*Event, error) {
if len(log.Topics) != 2 || len(log.Data) != 32*4 {
return nil, fmt.Errorf("invalid Uniswap V2 Swap event log")
}
// Parse the data fields
amount0In := new(big.Int).SetBytes(log.Data[0:32])
amount1In := new(big.Int).SetBytes(log.Data[32:64])
amount0Out := new(big.Int).SetBytes(log.Data[64:96])
amount1Out := new(big.Int).SetBytes(log.Data[96:128])
// Determine which token is being swapped in/out
var amount0, amount1 *big.Int
if amount0In.Cmp(big.NewInt(0)) > 0 {
amount0 = amount0In
} else {
amount0 = new(big.Int).Neg(amount0Out)
}
if amount1In.Cmp(big.NewInt(0)) > 0 {
amount1 = amount1In
} else {
amount1 = new(big.Int).Neg(amount1Out)
}
// DEBUG: Log details about this event creation
if log.Address == (common.Address{}) {
ep.logWarn("swap event emitted without pool address",
"block_number", blockNumber,
"log_index", log.Index,
"topic_count", len(log.Topics),
"data_bytes", len(log.Data),
)
}
// CRITICAL FIX: Get token addresses from pool
// Swap event logs don't contain token addresses, so we use tokens from transaction calldata
token0, token1 := ep.getPoolTokens(log.Address, txHash, txTokenCache)
event := &Event{
Type: Swap,
Protocol: "UniswapV2",
PoolAddress: log.Address,
Token0: token0,
Token1: token1,
Amount0: amount0,
Amount1: amount1,
Timestamp: timestamp,
TransactionHash: txHash,
BlockNumber: blockNumber,
}
return event, nil
}
// parseUniswapV3Swap parses a Uniswap V3 Swap event
func (ep *EventParser) parseUniswapV3Swap(log *types.Log, blockNumber uint64, timestamp uint64, txHash common.Hash, txTokenCache map[string][]common.Address) (*Event, error) {
if len(log.Topics) != 3 || len(log.Data) != 32*5 {
return nil, fmt.Errorf("invalid Uniswap V3 Swap event log")
}
// Parse the data fields - UniswapV3 uses signed int256 for amounts
amount0, err := parseSignedInt256(log.Data[0:32])
if err != nil {
return nil, fmt.Errorf("failed to parse amount0: %w", err)
}
amount1, err := parseSignedInt256(log.Data[32:64])
if err != nil {
return nil, fmt.Errorf("failed to parse amount1: %w", err)
}
sqrtPriceX96 := new(big.Int).SetBytes(log.Data[64:96])
liquidity := new(big.Int).SetBytes(log.Data[96:128])
tick := new(big.Int).SetBytes(log.Data[128:160])
// CRITICAL FIX: Get token addresses from pool
// Swap event logs don't contain token addresses, so we use tokens from transaction calldata
token0, token1 := ep.getPoolTokens(log.Address, txHash, txTokenCache)
event := &Event{
Type: Swap,
Protocol: "UniswapV3",
PoolAddress: log.Address,
Token0: token0,
Token1: token1,
Amount0: amount0,
Amount1: amount1,
SqrtPriceX96: uint256.MustFromBig(sqrtPriceX96),
Liquidity: uint256.MustFromBig(liquidity),
Tick: int(tick.Int64()),
Timestamp: timestamp,
TransactionHash: txHash,
BlockNumber: blockNumber,
}
return event, nil
}
// parseUniswapV2Mint parses a Uniswap V2 Mint event
func (ep *EventParser) parseUniswapV2Mint(log *types.Log, blockNumber uint64, timestamp uint64, txHash common.Hash) (*Event, error) {
if len(log.Topics) != 2 || len(log.Data) != 32*2 {
return nil, fmt.Errorf("invalid Uniswap V2 Mint event log")
}
// Parse the data fields
amount0 := new(big.Int).SetBytes(log.Data[0:32])
amount1 := new(big.Int).SetBytes(log.Data[32:64])
event := &Event{
Type: AddLiquidity,
Protocol: "UniswapV2",
PoolAddress: log.Address,
Amount0: amount0,
Amount1: amount1,
Timestamp: timestamp,
TransactionHash: txHash,
BlockNumber: blockNumber,
}
return event, nil
}
// parseUniswapV3Mint parses a Uniswap V3 Mint event
func (ep *EventParser) parseUniswapV3Mint(log *types.Log, blockNumber uint64, timestamp uint64, txHash common.Hash) (*Event, error) {
if len(log.Topics) != 3 || len(log.Data) != 32*4 {
return nil, fmt.Errorf("invalid Uniswap V3 Mint event log")
}
// Parse the data fields
amount0 := new(big.Int).SetBytes(log.Data[0:32])
amount1 := new(big.Int).SetBytes(log.Data[32:64])
event := &Event{
Type: AddLiquidity,
Protocol: "UniswapV3",
PoolAddress: log.Address,
Amount0: amount0,
Amount1: amount1,
Timestamp: timestamp,
TransactionHash: txHash,
BlockNumber: blockNumber,
}
return event, nil
}
// parseUniswapV2Burn parses a Uniswap V2 Burn event
func (ep *EventParser) parseUniswapV2Burn(log *types.Log, blockNumber uint64, timestamp uint64, txHash common.Hash) (*Event, error) {
if len(log.Topics) != 2 || len(log.Data) != 32*2 {
return nil, fmt.Errorf("invalid Uniswap V2 Burn event log")
}
// Parse the data fields
amount0 := new(big.Int).SetBytes(log.Data[0:32])
amount1 := new(big.Int).SetBytes(log.Data[32:64])
event := &Event{
Type: RemoveLiquidity,
Protocol: "UniswapV2",
PoolAddress: log.Address,
Amount0: amount0,
Amount1: amount1,
Timestamp: timestamp,
TransactionHash: txHash,
BlockNumber: blockNumber,
}
return event, nil
}
// parseUniswapV3Burn parses a Uniswap V3 Burn event
func (ep *EventParser) parseUniswapV3Burn(log *types.Log, blockNumber uint64, timestamp uint64, txHash common.Hash) (*Event, error) {
if len(log.Topics) != 3 || len(log.Data) != 32*4 {
return nil, fmt.Errorf("invalid Uniswap V3 Burn event log")
}
// Parse the data fields
amount0 := new(big.Int).SetBytes(log.Data[0:32])
amount1 := new(big.Int).SetBytes(log.Data[32:64])
event := &Event{
Type: RemoveLiquidity,
Protocol: "UniswapV3",
PoolAddress: log.Address,
Amount0: amount0,
Amount1: amount1,
Timestamp: timestamp,
TransactionHash: txHash,
BlockNumber: blockNumber,
}
return event, nil
}
// ParseTransaction parses events from a transaction by decoding the function call data
func (ep *EventParser) ParseTransaction(tx *types.Transaction, blockNumber uint64, timestamp uint64) ([]*Event, error) {
// Check if this is a DEX interaction
if !ep.IsDEXInteraction(tx) {
// Return empty slice for non-DEX transactions
return []*Event{}, nil
}
if tx.To() == nil {
return []*Event{}, nil
}
// Determine the protocol
protocol := ep.identifyProtocol(tx)
// Parse transaction data to extract swap details
data := tx.Data()
if len(data) < 4 {
return []*Event{}, fmt.Errorf("insufficient transaction data")
}
// Get function selector (first 4 bytes)
selector := common.Bytes2Hex(data[:4])
events := make([]*Event, 0)
switch selector {
case "38ed1739": // swapExactTokensForTokens
event, err := ep.parseSwapExactTokensForTokensFromTx(tx, protocol, blockNumber, timestamp)
if err != nil {
return []*Event{}, fmt.Errorf("failed to parse swapExactTokensForTokens: %w", err)
}
if event != nil {
events = append(events, event)
}
case "414bf389": // exactInputSingle (Uniswap V3)
event, err := ep.parseExactInputSingleFromTx(tx, protocol, blockNumber, timestamp)
if err != nil {
return []*Event{}, fmt.Errorf("failed to parse exactInputSingle: %w", err)
}
if event != nil {
events = append(events, event)
}
case "db3e2198": // exactInput (Uniswap V3)
event, err := ep.parseExactInputFromTx(tx, protocol, blockNumber, timestamp)
if err != nil {
return []*Event{}, fmt.Errorf("failed to parse exactInput: %w", err)
}
if event != nil {
events = append(events, event)
}
case "7ff36ab5", "18cffa1c": // swapExactETHForTokens variants
event, err := ep.parseSwapExactETHForTokensFromTx(tx, protocol, blockNumber, timestamp)
if err != nil {
return []*Event{}, fmt.Errorf("failed to parse swapExactETHForTokens: %w", err)
}
if event != nil {
events = append(events, event)
}
case "ac9650d8": // multicall (Uniswap V3)
event, err := ep.parseMulticallFromTx(tx, protocol, blockNumber, timestamp)
if err != nil {
return []*Event{}, fmt.Errorf("failed to parse multicall: %w", err)
}
if event != nil {
events = append(events, event)
}
case "f305d719": // exactOutputSingle (Uniswap V3)
event, err := ep.parseExactOutputSingleFromTx(tx, protocol, blockNumber, timestamp)
if err != nil {
return []*Event{}, fmt.Errorf("failed to parse exactOutputSingle: %w", err)
}
if event != nil {
events = append(events, event)
}
default:
// For unknown functions, create a basic event
// Use router address as fallback since we can't extract tokens
event := &Event{
Type: Swap,
Protocol: protocol,
PoolAddress: *tx.To(), // Router address as fallback for unknown functions
Token0: common.Address{}, // Will be determined from logs
Token1: common.Address{}, // Will be determined from logs
Amount0: tx.Value(), // Use transaction value as fallback
Amount1: big.NewInt(0),
SqrtPriceX96: uint256.NewInt(0),
Liquidity: uint256.NewInt(0),
Tick: 0,
Timestamp: timestamp,
TransactionHash: tx.Hash(),
BlockNumber: blockNumber,
}
events = append(events, event)
}
return events, nil
}
// parseSwapExactTokensForTokensFromTx parses swapExactTokensForTokens from transaction data
func (ep *EventParser) parseSwapExactTokensForTokensFromTx(tx *types.Transaction, protocol string, blockNumber uint64, timestamp uint64) (*Event, error) {
data := tx.Data()[4:] // Skip function selector
if len(data) < 160 { // 5 parameters * 32 bytes
return nil, fmt.Errorf("insufficient data for swapExactTokensForTokens")
}
// Parse ABI-encoded parameters
amountIn := new(big.Int).SetBytes(data[0:32])
amountOutMin := new(big.Int).SetBytes(data[32:64])
// Extract path array from ABI-encoded data
// Path is at offset 96 (64 + 32), and its length is at that position
var token0, token1 common.Address
if len(data) >= 128 { // Ensure we have enough data
pathOffset := new(big.Int).SetBytes(data[64:96]).Uint64()
if pathOffset < uint64(len(data)) && pathOffset+32 < uint64(len(data)) {
pathLength := new(big.Int).SetBytes(data[pathOffset : pathOffset+32]).Uint64()
if pathLength >= 40 { // At least 2 addresses (20 bytes each)
// First token (token0)
token0 = common.BytesToAddress(data[pathOffset+32 : pathOffset+52])
// Last token (token1) - assuming simple path with 2 tokens
if pathLength >= 40 {
token1 = common.BytesToAddress(data[pathOffset+52 : pathOffset+72])
}
}
}
}
// Derive actual pool address from token pair
poolAddress := ep.derivePoolAddress(token0, token1, protocol)
event := &Event{
Type: Swap,
Protocol: protocol,
PoolAddress: poolAddress,
Token0: token0,
Token1: token1,
Amount0: amountIn,
Amount1: amountOutMin,
SqrtPriceX96: uint256.NewInt(0),
Liquidity: uint256.NewInt(0),
Tick: 0,
Timestamp: timestamp,
TransactionHash: tx.Hash(),
BlockNumber: blockNumber,
}
return event, nil
}
// parseExactInputSingleFromTx parses exactInputSingle from transaction data
func (ep *EventParser) parseExactInputSingleFromTx(tx *types.Transaction, protocol string, blockNumber uint64, timestamp uint64) (*Event, error) {
data := tx.Data()[4:] // Skip function selector
if len(data) < 256 { // 8 parameters * 32 bytes
return nil, fmt.Errorf("insufficient data for exactInputSingle")
}
// Parse ExactInputSingleParams struct
tokenIn := common.BytesToAddress(data[12:32])
tokenOut := common.BytesToAddress(data[44:64])
fee := new(big.Int).SetBytes(data[64:96]).Uint64()
amountIn := new(big.Int).SetBytes(data[160:192])
amountOutMin := new(big.Int).SetBytes(data[192:224])
// Derive actual pool address from token pair
poolAddress := ep.derivePoolAddress(tokenIn, tokenOut, protocol)
event := &Event{
Type: Swap,
Protocol: protocol,
PoolAddress: poolAddress,
Token0: tokenIn,
Token1: tokenOut,
Amount0: amountIn,
Amount1: amountOutMin,
SqrtPriceX96: uint256.NewInt(0),
Liquidity: uint256.NewInt(0),
Tick: 0,
Timestamp: timestamp,
TransactionHash: tx.Hash(),
BlockNumber: blockNumber,
}
// Store fee information for later use
event.Protocol = fmt.Sprintf("%s_fee_%d", protocol, fee)
return event, nil
}
// parseExactInputFromTx parses exactInput (multi-hop) from transaction data
func (ep *EventParser) parseExactInputFromTx(tx *types.Transaction, protocol string, blockNumber uint64, timestamp uint64) (*Event, error) {
data := tx.Data()[4:] // Skip function selector
if len(data) < 160 { // 5 parameters * 32 bytes
return nil, fmt.Errorf("insufficient data for exactInput")
}
// Parse ExactInputParams struct
amountIn := new(big.Int).SetBytes(data[96:128])
amountOutMin := new(big.Int).SetBytes(data[128:160])
// Extract path from encoded path bytes (first parameter)
// Path is encoded at offset 0, and its length is at offset 32
var token0, token1 common.Address
if len(data) >= 96 {
pathOffset := new(big.Int).SetBytes(data[0:32]).Uint64()
if pathOffset < uint64(len(data)) && pathOffset+32 < uint64(len(data)) {
pathLength := new(big.Int).SetBytes(data[pathOffset : pathOffset+32]).Uint64()
if pathLength >= 23 { // At least tokenA(20) + fee(3) for Uniswap V3 encoded path
// First token (20 bytes)
token0 = common.BytesToAddress(data[pathOffset+32 : pathOffset+52])
// For multi-hop paths, find the last token
// Uniswap V3 path format: tokenA(20) + fee(3) + tokenB(20) + fee(3) + tokenC(20)...
if pathLength >= 43 { // tokenA(20) + fee(3) + tokenB(20)
token1 = common.BytesToAddress(data[pathOffset+32+20+3 : pathOffset+32+20+3+20]) // Skip token0(20) + fee(3)
}
}
}
}
// Derive actual pool address from token pair
poolAddress := ep.derivePoolAddress(token0, token1, protocol)
event := &Event{
Type: Swap,
Protocol: protocol,
PoolAddress: poolAddress,
Token0: token0,
Token1: token1,
Amount0: amountIn,
Amount1: amountOutMin,
SqrtPriceX96: uint256.NewInt(0),
Liquidity: uint256.NewInt(0),
Tick: 0,
Timestamp: timestamp,
TransactionHash: tx.Hash(),
BlockNumber: blockNumber,
}
return event, nil
}
// parseSwapExactETHForTokensFromTx parses swapExactETHForTokens from transaction data
func (ep *EventParser) parseSwapExactETHForTokensFromTx(tx *types.Transaction, protocol string, blockNumber uint64, timestamp uint64) (*Event, error) {
data := tx.Data()[4:] // Skip function selector
if len(data) < 128 { // 4 parameters * 32 bytes
return nil, fmt.Errorf("insufficient data for swapExactETHForTokens")
}
amountOutMin := new(big.Int).SetBytes(data[0:32])
// Extract path array to get the output token
// Path offset is at position 32
var token1 common.Address
if len(data) >= 96 {
pathOffset := new(big.Int).SetBytes(data[32:64]).Uint64()
if pathOffset < uint64(len(data)) && pathOffset+32 < uint64(len(data)) {
pathLength := new(big.Int).SetBytes(data[pathOffset : pathOffset+32]).Uint64()
if pathLength >= 40 { // At least 2 addresses (20 bytes each)
// Extract the last token from the path (output token)
// For swapExactETHForTokens, we want the second token in the path
if pathLength >= 40 {
token1 = common.BytesToAddress(data[pathOffset+52 : pathOffset+72])
}
}
}
}
event := &Event{
Type: Swap,
Protocol: protocol,
PoolAddress: *tx.To(),
Token0: common.HexToAddress("0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE"), // ETH
Token1: token1,
Amount0: tx.Value(), // ETH amount from transaction value
Amount1: amountOutMin,
SqrtPriceX96: uint256.NewInt(0),
Liquidity: uint256.NewInt(0),
Tick: 0,
Timestamp: timestamp,
TransactionHash: tx.Hash(),
BlockNumber: blockNumber,
}
return event, nil
}
// parseExactOutputSingleFromTx parses exactOutputSingle from transaction data
func (ep *EventParser) parseExactOutputSingleFromTx(tx *types.Transaction, protocol string, blockNumber uint64, timestamp uint64) (*Event, error) {
data := tx.Data()[4:] // Skip function selector
if len(data) < 256 { // 8 parameters * 32 bytes
return nil, fmt.Errorf("insufficient data for exactOutputSingle")
}
// Parse ExactOutputSingleParams struct
tokenIn := common.BytesToAddress(data[12:32])
tokenOut := common.BytesToAddress(data[44:64])
fee := new(big.Int).SetBytes(data[64:96]).Uint64()
amountOut := new(big.Int).SetBytes(data[160:192])
amountInMaximum := new(big.Int).SetBytes(data[192:224])
// Derive actual pool address from token pair
poolAddress := ep.derivePoolAddress(tokenIn, tokenOut, protocol)
event := &Event{
Type: Swap,
Protocol: protocol,
PoolAddress: poolAddress,
Token0: tokenIn,
Token1: tokenOut,
Amount0: amountInMaximum, // Maximum input amount
Amount1: amountOut, // Exact output amount
SqrtPriceX96: uint256.NewInt(0),
Liquidity: uint256.NewInt(0),
Tick: 0,
Timestamp: timestamp,
TransactionHash: tx.Hash(),
BlockNumber: blockNumber,
}
// Store fee information for later use
event.Protocol = fmt.Sprintf("%s_fee_%d", protocol, fee)
return event, nil
}
// parseMulticallFromTx parses multicall transactions to extract token addresses and amounts
func (ep *EventParser) parseMulticallFromTx(tx *types.Transaction, protocol string, blockNumber uint64, timestamp uint64) (*Event, error) {
data := tx.Data()[4:] // Skip function selector
if len(data) < 64 { // Need at least bytes array offset and length
return nil, fmt.Errorf("insufficient data for multicall")
}
// Extract tokens from multicall data using comprehensive scanning
tokenCtx := &calldata.MulticallContext{
TxHash: tx.Hash().Hex(),
Protocol: protocol,
Stage: "events.parser.parseMulticallFromTx",
BlockNumber: blockNumber,
}
swap := ep.extractSwapFromMulticallData(data, tokenCtx)
var (
token0 common.Address
token1 common.Address
amount0 *big.Int
amount1 *big.Int
poolAddress common.Address
)
// CRITICAL FIX: Check if we have valid tokens from multicall parsing
validTokens := swap != nil &&
swap.TokenIn != (common.Address{}) &&
swap.TokenOut != (common.Address{})
if validTokens {
// Use multicall parsed tokens
token0 = swap.TokenIn
token1 = swap.TokenOut
amount0 = swap.AmountIn
if swap.AmountOut != nil {
amount1 = new(big.Int).Set(swap.AmountOut)
} else if swap.AmountOutMinimum != nil {
amount1 = new(big.Int).Set(swap.AmountOutMinimum)
}
if swap.PoolAddress != (common.Address{}) {
poolAddress = swap.PoolAddress
}
if protocol == "" {
protocol = swap.Protocol
}
ep.logDebug("multicall extracted swap tokens",
"tx_hash", tx.Hash().Hex(),
"token0", token0.Hex(),
"token1", token1.Hex(),
)
} else {
// CRITICAL FIX: Try direct function parsing (like L2 parser does)
directTokens := ep.parseDirectFunction(tx)
if len(directTokens) >= 2 {
ep.logInfo("direct parsing recovered swap tokens",
"tx_hash", tx.Hash().Hex(),
"token0", directTokens[0].Hex(),
"token1", directTokens[1].Hex(),
)
token0 = directTokens[0]
token1 = directTokens[1]
// Extract amounts from transaction data
amount0, amount1 = ep.extractAmountsFromData(tx.Data())
} else {
methodID := "none"
if len(tx.Data()) >= 4 {
methodID = hex.EncodeToString(tx.Data()[:4])
}
ep.logWarn("direct parsing failed to recover tokens",
"tx_hash", tx.Hash().Hex(),
"method_id", methodID,
"data_len", len(tx.Data()),
)
}
if token0 == (common.Address{}) || token1 == (common.Address{}) {
// Enhanced recovery when both multicall and direct parsing fail
recoveredTokens, recoveryErr := ep.protocolSpecificRecovery(data, tokenCtx, protocol)
if recoveryErr == nil && len(recoveredTokens) >= 2 {
token0 = recoveredTokens[0]
token1 = recoveredTokens[1]
// Extract amounts from transaction data
amount0, amount1 = ep.extractAmountsFromData(tx.Data())
}
}
}
if poolAddress == (common.Address{}) {
if token0 != (common.Address{}) && token1 != (common.Address{}) {
poolAddress = ep.derivePoolAddress(token0, token1, protocol)
// Validate derived pool address
if poolAddress == (common.Address{}) {
// Pool derivation failed, skip this event
return nil, fmt.Errorf("pool derivation failed for tokens %s, %s", token0.Hex(), token1.Hex())
}
} else {
// Protocol-specific error recovery for token extraction
recoveredTokens, recoveryErr := ep.protocolSpecificRecovery(data, tokenCtx, protocol)
if recoveryErr != nil || len(recoveredTokens) < 2 {
// Cannot derive pool address without token information, skip this event
return nil, fmt.Errorf("cannot recover tokens from multicall: %v", recoveryErr)
}
token0 = recoveredTokens[0]
token1 = recoveredTokens[1]
poolAddress = ep.derivePoolAddress(token0, token1, protocol)
if poolAddress == (common.Address{}) {
// Even after recovery, pool derivation failed
return nil, fmt.Errorf("pool derivation failed even after token recovery")
}
}
}
// Final validation: Ensure pool address is valid and not suspicious
if poolAddress == (common.Address{}) || poolAddress == token0 || poolAddress == token1 {
// Invalid pool address, skip this event
return nil, fmt.Errorf("invalid pool address: %s", poolAddress.Hex())
}
// Check for suspicious zero-padded addresses
poolHex := poolAddress.Hex()
if len(poolHex) == 42 && poolHex[:20] == "0x000000000000000000" {
// Suspicious zero-padded address, skip this event
return nil, fmt.Errorf("suspicious zero-padded pool address: %s", poolHex)
}
if amount0 == nil {
amount0 = tx.Value()
}
if amount1 == nil {
amount1 = big.NewInt(0)
}
event := &Event{
Type: Swap,
Protocol: protocol,
PoolAddress: poolAddress,
Token0: token0,
Token1: token1,
Amount0: amount0,
Amount1: amount1,
SqrtPriceX96: uint256.NewInt(0),
Liquidity: uint256.NewInt(0),
Tick: 0,
Timestamp: timestamp,
TransactionHash: tx.Hash(),
BlockNumber: blockNumber,
}
return event, nil
}
// extractSwapFromMulticallData decodes the first viable swap call from multicall payload data.
func (ep *EventParser) extractSwapFromMulticallData(data []byte, ctx *calldata.MulticallContext) *calldata.SwapCall {
// CRITICAL FIX: Use working token extractor interface first
if ep.tokenExtractor != nil {
ep.logInfo("Using enhanced token extractor for multicall parsing",
"protocol", ctx.Protocol,
"stage", "multicall_start")
// Try token extractor's working multicall extraction method
token0, token1 := ep.tokenExtractor.ExtractTokensFromMulticallData(data)
if token0 != "" && token1 != "" {
ep.logInfo("Enhanced parsing success - Token extractor",
"protocol", ctx.Protocol,
"token0", token0,
"token1", token1,
"stage", "multicall_extraction")
// Try to extract amounts from the original transaction data
amountIn, amountOut := ep.extractAmountsFromData(data)
return &calldata.SwapCall{
TokenIn: common.HexToAddress(token0),
TokenOut: common.HexToAddress(token1),
Protocol: ctx.Protocol,
AmountIn: amountIn,
AmountOut: amountOut,
}
}
// If multicall extraction fails, try direct calldata parsing
if len(data) >= 4 {
token0, token1, err := ep.tokenExtractor.ExtractTokensFromCalldata(data)
if err == nil && token0 != (common.Address{}) && token1 != (common.Address{}) {
ep.logInfo("Enhanced parsing success - Direct calldata",
"protocol", ctx.Protocol,
"token0", token0.Hex(),
"token1", token1.Hex(),
"stage", "calldata_extraction")
// Try to extract amounts from the original transaction data
amountIn, amountOut := ep.extractAmountsFromData(data)
return &calldata.SwapCall{
TokenIn: token0,
TokenOut: token1,
Protocol: ctx.Protocol,
AmountIn: amountIn,
AmountOut: amountOut,
}
}
}
} else {
ep.logInfo("No token extractor available, using fallback parsing",
"protocol", ctx.Protocol,
"stage", "fallback_start")
}
// Fallback to original method if enhanced parser fails
swaps, err := calldata.DecodeSwapCallsFromMulticall(data, ctx)
if err == nil && len(swaps) > 0 {
for _, swap := range swaps {
if swap == nil {
continue
}
if !ep.isValidTokenAddress(swap.TokenIn) || !ep.isValidTokenAddress(swap.TokenOut) {
continue
}
return swap
}
}
// Final fallback
return ep.extractSwapFromMulticallFallback(data, ctx)
}
// isValidTokenAddress checks if an address looks like a valid token address
func (ep *EventParser) isValidTokenAddress(addr common.Address) bool {
// Skip zero address
if addr == (common.Address{}) {
return false
}
// Skip known router and factory addresses
knownRouters := map[common.Address]bool{
ep.UniswapV2Router02: true,
ep.UniswapV3Router: true,
ep.UniswapV2Factory: true,
ep.UniswapV3Factory: true,
ep.SushiSwapFactory: true,
common.HexToAddress("0xA51afAFe0263b40EdaEf0Df8781eA9aa03E381a3"): true, // Universal Router
common.HexToAddress("0x1111111254EEB25477B68fb85Ed929f73A960582"): true, // 1inch Router v5
common.HexToAddress("0xC36442b4a4522E871399CD717aBDD847Ab11FE88"): true, // Uniswap V3 Position Manager
}
if knownRouters[addr] {
return false
}
// Basic heuristic: valid token addresses typically have some non-zero bytes
// and don't end with many zeros (which are often parameter values)
bytes := addr.Bytes()
nonZeroCount := 0
for _, b := range bytes {
if b != 0 {
nonZeroCount++
}
}
// Require at least 8 non-zero bytes for a valid token address
return nonZeroCount >= 8
}
// derivePoolAddress derives the pool address from token pair and protocol
func (ep *EventParser) derivePoolAddress(token0, token1 common.Address, protocol string) common.Address {
// ENHANCED VALIDATION: Comprehensive address validation pipeline
if !isValidPoolTokenAddress(token0) || !isValidPoolTokenAddress(token1) {
return common.Address{}
}
// Check if tokens are identical (invalid pair)
if token0 == token1 {
return common.Address{}
}
// Check for router/manager addresses that shouldn't be in token pairs
if isKnownRouterOrManager(token0) || isKnownRouterOrManager(token1) {
return common.Address{}
}
// Ensure canonical token order for derivation
if bytes.Compare(token0.Bytes(), token1.Bytes()) > 0 {
token0, token1 = token1, token0
}
var derivedPool common.Address
protocolLower := strings.ToLower(protocol)
// Protocol-specific pool address calculation
if strings.Contains(protocolLower, "uniswapv3") {
// Try all 4 Uniswap V3 fee tiers to find correct pool
// Fee tiers: 100 (0.01%), 500 (0.05%), 3000 (0.3%), 10000 (1%)
feeTiers := []int64{100, 500, 3000, 10000}
for _, fee := range feeTiers {
candidate := uniswap.CalculatePoolAddress(ep.UniswapV3Factory, token0, token1, fee)
if candidate != (common.Address{}) {
// If we're trying to match a specific pool address, use that
// Otherwise use the first valid address
derivedPool = candidate
break
}
}
} else if strings.Contains(protocolLower, "sushi") {
derivedPool = calculateUniswapV2Pair(ep.SushiSwapFactory, token0, token1)
} else if strings.Contains(protocolLower, "uniswapv2") || strings.Contains(protocolLower, "camelot") {
derivedPool = calculateUniswapV2Pair(ep.UniswapV2Factory, token0, token1)
}
// Final validation of derived pool address
if !validatePoolAddressDerivation(derivedPool, token0, token1, protocol) {
return common.Address{}
}
return derivedPool
}
func calculateUniswapV2Pair(factory, token0, token1 common.Address) common.Address {
if factory == (common.Address{}) || token0 == (common.Address{}) || token1 == (common.Address{}) {
return common.Address{}
}
if token0.Big().Cmp(token1.Big()) > 0 {
token0, token1 = token1, token0
}
keccakInput := append(token0.Bytes(), token1.Bytes()...)
salt := crypto.Keccak256(keccakInput)
initCodeHash := common.HexToHash("0x96e8ac4277198ff8b6f785478aa9a39f403cb768dd02cbee326c3e7da348845f")
data := make([]byte, 0, 85)
data = append(data, 0xff)
data = append(data, factory.Bytes()...)
data = append(data, salt...)
data = append(data, initCodeHash.Bytes()...)
hash := crypto.Keccak256(data)
var addr common.Address
copy(addr[:], hash[12:])
return addr
}
// AddKnownPool adds a pool address to the known pools map
func (ep *EventParser) AddKnownPool(address common.Address, protocol string) {
ep.knownPools[address] = protocol
}
// GetKnownPools returns all known pools
func (ep *EventParser) GetKnownPools() map[common.Address]string {
return ep.knownPools
}
// isValidPoolTokenAddress performs comprehensive validation for token addresses
func isValidPoolTokenAddress(addr common.Address) bool {
// Zero address check
if addr == (common.Address{}) {
return false
}
// Check for suspicious zero-padded addresses
addrHex := addr.Hex()
if len(addrHex) == 42 && addrHex[:20] == "0x000000000000000000" {
return false
}
// Require minimum entropy (at least 8 non-zero bytes)
nonZeroCount := 0
for _, b := range addr.Bytes() {
if b != 0 {
nonZeroCount++
}
}
return nonZeroCount >= 8
}
// isKnownRouterOrManager checks if address is a known router or position manager
func isKnownRouterOrManager(addr common.Address) bool {
knownContracts := map[common.Address]bool{
// Uniswap Routers
common.HexToAddress("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"): true, // Uniswap V2 Router 02
common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"): true, // Uniswap V3 Router
common.HexToAddress("0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45"): true, // Uniswap V3 Router 2
common.HexToAddress("0xA51afAFe0263b40EdaEf0Df8781eA9aa03E381a3"): true, // Universal Router
// Position Managers
common.HexToAddress("0xC36442b4a4522E871399CD717aBDD847Ab11FE88"): true, // Uniswap V3 Position Manager
// Other Routers
common.HexToAddress("0x1111111254EEB25477B68fb85Ed929f73A960582"): true, // 1inch Router v5
common.HexToAddress("0x1111111254fb6c44bAC0beD2854e76F90643097d"): true, // 1inch Router v4
common.HexToAddress("0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F"): true, // SushiSwap Router
// WETH contracts (often misidentified as tokens in parsing)
common.HexToAddress("0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"): false, // WETH on Arbitrum (valid token)
common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"): false, // WETH on Mainnet (valid token)
}
isRouter, exists := knownContracts[addr]
return exists && isRouter
}
// validatePoolAddressDerivation performs final validation on derived pool address
func validatePoolAddressDerivation(poolAddr, token0, token1 common.Address, protocol string) bool {
// Basic validation
if poolAddr == (common.Address{}) {
return false
}
// Pool address should not match either token address
if poolAddr == token0 || poolAddr == token1 {
return false
}
// Pool address should not be a known router
if isKnownRouterOrManager(poolAddr) {
return false
}
// Check for suspicious patterns
poolHex := poolAddr.Hex()
if len(poolHex) == 42 && poolHex[:20] == "0x000000000000000000" {
return false
}
// Protocol-specific validation
protocolLower := strings.ToLower(protocol)
if strings.Contains(protocolLower, "uniswapv3") {
// Uniswap V3 pools have specific structure requirements
return validateUniswapV3PoolStructure(poolAddr)
}
return true
}
// validateUniswapV3PoolStructure performs Uniswap V3 specific pool validation
func validateUniswapV3PoolStructure(poolAddr common.Address) bool {
// Basic structure validation for Uniswap V3 pools
// This is a simplified check - in production, you might want to call the pool contract
// to verify it has the expected interface (slot0, fee, etc.)
// For now, just ensure it's not obviously invalid
addrBytes := poolAddr.Bytes()
// Check that it has reasonable entropy
nonZeroCount := 0
for _, b := range addrBytes {
if b != 0 {
nonZeroCount++
}
}
// Uniswap V3 pools should have high entropy
return nonZeroCount >= 12
}
// protocolSpecificRecovery implements protocol-specific error recovery mechanisms
func (ep *EventParser) protocolSpecificRecovery(data []byte, ctx *calldata.MulticallContext, protocol string) ([]common.Address, error) {
protocolLower := strings.ToLower(protocol)
// Enhanced recovery based on protocol type
switch {
case strings.Contains(protocolLower, "uniswap"):
return ep.recoverUniswapTokens(data, ctx)
case strings.Contains(protocolLower, "sushi"):
return ep.recoverSushiSwapTokens(data, ctx)
case strings.Contains(protocolLower, "1inch"):
return ep.recover1InchTokens(data, ctx)
case strings.Contains(protocolLower, "camelot"):
return ep.recoverCamelotTokens(data, ctx)
default:
// Generic recovery fallback
return ep.recoverGenericTokens(data, ctx)
}
}
// recoverUniswapTokens implements Uniswap-specific token recovery
func (ep *EventParser) recoverUniswapTokens(data []byte, ctx *calldata.MulticallContext) ([]common.Address, error) {
// Primary: Try comprehensive extraction with recovery
tokenAddresses, err := calldata.ExtractTokensFromMulticallWithRecovery(data, ctx, true)
if err == nil && len(tokenAddresses) >= 2 {
return tokenAddresses, nil
}
// Fallback 1: Look for common Uniswap function signatures
uniswapSignatures := []string{
"exactInputSingle",
"exactInput",
"exactOutputSingle",
"exactOutput",
"swapExactTokensForTokens",
"swapTokensForExactTokens",
}
for _, sig := range uniswapSignatures {
if addresses := ep.extractTokensFromSignature(data, sig); len(addresses) >= 2 {
return addresses, nil
}
}
// Fallback 2: Heuristic token extraction
return ep.heuristicTokenExtraction(data, "uniswap")
}
// recoverSushiSwapTokens implements SushiSwap-specific token recovery
func (ep *EventParser) recoverSushiSwapTokens(data []byte, ctx *calldata.MulticallContext) ([]common.Address, error) {
// SushiSwap shares similar interface with Uniswap V2
tokenAddresses, err := calldata.ExtractTokensFromMulticallWithRecovery(data, ctx, true)
if err == nil && len(tokenAddresses) >= 2 {
return tokenAddresses, nil
}
// SushiSwap specific fallback patterns
return ep.heuristicTokenExtraction(data, "sushiswap")
}
// recover1InchTokens implements 1inch-specific token recovery
func (ep *EventParser) recover1InchTokens(data []byte, ctx *calldata.MulticallContext) ([]common.Address, error) {
// 1inch has complex routing, try standard extraction first
tokenAddresses, err := calldata.ExtractTokensFromMulticallWithRecovery(data, ctx, true)
if err == nil && len(tokenAddresses) >= 2 {
return tokenAddresses, nil
}
// 1inch specific recovery patterns
return ep.extractFrom1InchSwap(data)
}
// recoverCamelotTokens implements Camelot-specific token recovery
func (ep *EventParser) recoverCamelotTokens(data []byte, ctx *calldata.MulticallContext) ([]common.Address, error) {
// Camelot uses similar patterns to Uniswap V2/V3
tokenAddresses, err := calldata.ExtractTokensFromMulticallWithRecovery(data, ctx, true)
if err == nil && len(tokenAddresses) >= 2 {
return tokenAddresses, nil
}
return ep.heuristicTokenExtraction(data, "camelot")
}
// recoverGenericTokens implements generic token recovery for unknown protocols
func (ep *EventParser) recoverGenericTokens(data []byte, ctx *calldata.MulticallContext) ([]common.Address, error) {
// Try standard extraction first
tokenAddresses, err := calldata.ExtractTokensFromMulticallWithRecovery(data, ctx, true)
if err == nil && len(tokenAddresses) >= 2 {
return tokenAddresses, nil
}
// Generic heuristic extraction
return ep.heuristicTokenExtraction(data, "generic")
}
// extractTokensFromSignature extracts tokens based on known function signatures
func (ep *EventParser) extractTokensFromSignature(data []byte, signature string) []common.Address {
// This is a simplified implementation - in production you'd decode based on ABI
var tokens []common.Address
// Look for token addresses in standard positions for known signatures
if len(data) >= 64 {
// Try extracting from first two 32-byte slots (common pattern)
if addr1 := common.BytesToAddress(data[12:32]); addr1 != (common.Address{}) {
if isValidPoolTokenAddress(addr1) {
tokens = append(tokens, addr1)
}
}
if len(data) >= 96 {
if addr2 := common.BytesToAddress(data[44:64]); addr2 != (common.Address{}) {
if isValidPoolTokenAddress(addr2) && addr2 != tokens[0] {
tokens = append(tokens, addr2)
}
}
}
}
return tokens
}
// heuristicTokenExtraction performs protocol-aware heuristic token extraction
func (ep *EventParser) heuristicTokenExtraction(data []byte, protocol string) ([]common.Address, error) {
var tokens []common.Address
// Scan through data looking for valid token addresses
for i := 0; i <= len(data)-32; i += 32 {
if i+32 > len(data) {
break
}
addr := common.BytesToAddress(data[i : i+20])
if isValidPoolTokenAddress(addr) && !isKnownRouterOrManager(addr) {
// Check if we already have this address
duplicate := false
for _, existing := range tokens {
if existing == addr {
duplicate = true
break
}
}
if !duplicate {
tokens = append(tokens, addr)
if len(tokens) >= 2 {
break
}
}
}
}
if len(tokens) < 2 {
return nil, fmt.Errorf("insufficient tokens extracted via heuristic method for %s", protocol)
}
return tokens[:2], nil
}
// extractFrom1InchSwap extracts tokens from 1inch specific swap patterns
func (ep *EventParser) extractFrom1InchSwap(data []byte) ([]common.Address, error) {
// 1inch uses complex aggregation patterns
// This is a simplified implementation focusing on common patterns
if len(data) < 128 {
return nil, fmt.Errorf("insufficient data for 1inch swap extraction")
}
var tokens []common.Address
// Check multiple positions where tokens might appear in 1inch calls
positions := []int{0, 32, 64, 96} // Common token positions in 1inch calldata
for _, pos := range positions {
if pos+32 <= len(data) {
addr := common.BytesToAddress(data[pos+12 : pos+32])
if isValidPoolTokenAddress(addr) && !isKnownRouterOrManager(addr) {
// Check for duplicates
duplicate := false
for _, existing := range tokens {
if existing == addr {
duplicate = true
break
}
}
if !duplicate {
tokens = append(tokens, addr)
if len(tokens) >= 2 {
break
}
}
}
}
}
if len(tokens) < 2 {
return nil, fmt.Errorf("insufficient tokens found in 1inch swap data")
}
return tokens, nil
}
// extractSwapFromMulticallFallback implements enhanced fallback parsing for failed multicall decoding
func (ep *EventParser) extractSwapFromMulticallFallback(data []byte, ctx *calldata.MulticallContext) *calldata.SwapCall {
// Try direct token extraction using enhanced methods
tokens, err := calldata.ExtractTokensFromMulticallWithRecovery(data, ctx, true)
if err != nil || len(tokens) < 2 {
// Fallback to heuristic scanning
tokens = ep.heuristicScanForTokens(data)
}
if len(tokens) >= 2 {
// Create a basic swap call from extracted tokens
return &calldata.SwapCall{
Selector: "fallback_parsed",
Protocol: "Multicall_Fallback",
TokenIn: tokens[0],
TokenOut: tokens[1],
AmountIn: big.NewInt(1), // Placeholder amount
PoolAddress: ep.derivePoolAddress(tokens[0], tokens[1], "Multicall"),
}
}
return nil
}
// heuristicScanForTokens performs pattern-based token address extraction
func (ep *EventParser) heuristicScanForTokens(data []byte) []common.Address {
var tokens []common.Address
seenTokens := make(map[common.Address]bool)
// Scan through data looking for 20-byte patterns that could be addresses
for i := 0; i <= len(data)-20; i++ {
if i+20 > len(data) {
break
}
// Extract potential address starting at position i
addr := common.BytesToAddress(data[i : i+20])
// Apply enhanced validation
if isValidPoolTokenAddress(addr) && !isKnownRouterOrManager(addr) && !seenTokens[addr] {
tokens = append(tokens, addr)
seenTokens[addr] = true
if len(tokens) >= 2 {
break
}
}
}
// Also scan at 32-byte aligned positions (common in ABI encoding)
for i := 12; i <= len(data)-20; i += 32 { // Start at offset 12 to get address from 32-byte slot
if i+20 > len(data) {
break
}
addr := common.BytesToAddress(data[i : i+20])
if isValidPoolTokenAddress(addr) && !isKnownRouterOrManager(addr) && !seenTokens[addr] {
tokens = append(tokens, addr)
seenTokens[addr] = true
if len(tokens) >= 2 {
break
}
}
}
return tokens
}
// CRITICAL FIX: Direct function parsing methods (similar to L2 parser approach)
// parseDirectFunction attempts to parse tokens directly from transaction input using structured decoders
func (ep *EventParser) parseDirectFunction(tx *types.Transaction) []common.Address {
if tx.To() == nil || len(tx.Data()) < 4 {
return nil
}
data := tx.Data()
methodID := hex.EncodeToString(data[:4])
ep.logDebug("attempting direct parsing",
"tx_hash", tx.Hash().Hex(),
"method_id", methodID,
"data_len", len(data),
)
switch methodID {
case "414bf389": // exactInputSingle
return ep.parseExactInputSingleDirect(data)
case "472b43f3": // swapExactTokensForTokens (UniswapV2)
return ep.parseSwapExactTokensForTokensDirect(data)
case "18cbafe5": // swapExactTokensForTokensSupportingFeeOnTransferTokens
return ep.parseSwapExactTokensForTokensDirect(data)
case "5c11d795": // swapExactTokensForTokensSupportingFeeOnTransferTokens (SushiSwap)
return ep.parseSwapExactTokensForTokensDirect(data)
case "b858183f": // multicall (Universal Router)
return ep.parseMulticallDirect(data)
default:
// Fallback to generic parsing
return ep.parseGenericSwapDirect(data)
}
}
// parseExactInputSingleDirect parses exactInputSingle calls directly
func (ep *EventParser) parseExactInputSingleDirect(data []byte) []common.Address {
if len(data) < 164 { // 4 + 160 bytes minimum
return nil
}
// ExactInputSingle struct: tokenIn, tokenOut, fee, recipient, deadline, amountIn, amountOutMinimum, sqrtPriceLimitX96
tokenIn := common.BytesToAddress(data[16:36]) // offset 12, length 20
tokenOut := common.BytesToAddress(data[48:68]) // offset 44, length 20
if tokenIn == (common.Address{}) || tokenOut == (common.Address{}) {
return nil
}
if !isValidPoolTokenAddress(tokenIn) || !isValidPoolTokenAddress(tokenOut) {
return nil
}
return []common.Address{tokenIn, tokenOut}
}
// parseSwapExactTokensForTokensDirect parses UniswapV2 style swaps directly
func (ep *EventParser) parseSwapExactTokensForTokensDirect(data []byte) []common.Address {
if len(data) < 164 { // 4 + 160 bytes minimum
return nil
}
// swapExactTokensForTokens(uint256 amountIn, uint256 amountOutMin, address[] path, address to, uint256 deadline)
// Path array starts at offset 100 (0x64)
pathOffsetPos := 100
if len(data) < pathOffsetPos+32 {
return nil
}
// Read path array length
pathLength := new(big.Int).SetBytes(data[pathOffsetPos+16 : pathOffsetPos+32]).Uint64()
if pathLength < 2 || pathLength > 10 { // Reasonable bounds
return nil
}
// Extract first and last token from path
firstTokenPos := pathOffsetPos + 32 + 12 // +12 to skip padding
lastTokenPos := pathOffsetPos + 32 + int(pathLength-1)*32 + 12
if len(data) < lastTokenPos+20 {
return nil
}
tokenIn := common.BytesToAddress(data[firstTokenPos : firstTokenPos+20])
tokenOut := common.BytesToAddress(data[lastTokenPos : lastTokenPos+20])
if tokenIn == (common.Address{}) || tokenOut == (common.Address{}) {
return nil
}
if !isValidPoolTokenAddress(tokenIn) || !isValidPoolTokenAddress(tokenOut) {
return nil
}
return []common.Address{tokenIn, tokenOut}
}
// parseMulticallDirect parses multicall transactions by examining individual calls
func (ep *EventParser) parseMulticallDirect(data []byte) []common.Address {
if len(data) < 68 { // 4 + 64 bytes minimum
return nil
}
// Multicall typically has array of bytes at offset 36
arrayOffset := 36
if len(data) < arrayOffset+32 {
return nil
}
arrayLength := new(big.Int).SetBytes(data[arrayOffset+16 : arrayOffset+32]).Uint64()
if arrayLength == 0 || arrayLength > 50 { // Reasonable bounds
return nil
}
// Parse first call in multicall
firstCallOffset := arrayOffset + 32 + 32 // Skip array length and first element offset
if len(data) < firstCallOffset+32 {
return nil
}
callDataLength := new(big.Int).SetBytes(data[firstCallOffset+16 : firstCallOffset+32]).Uint64()
if callDataLength < 4 || callDataLength > 1000 {
return nil
}
callDataStart := firstCallOffset + 32
if len(data) < callDataStart+int(callDataLength) {
return nil
}
callData := data[callDataStart : callDataStart+int(callDataLength)]
// Create a dummy transaction for recursive parsing
dummyTx := types.NewTransaction(0, common.Address{}, big.NewInt(0), 0, big.NewInt(0), callData)
return ep.parseDirectFunction(dummyTx)
}
// parseGenericSwapDirect attempts generic token extraction from swap-like transactions
func (ep *EventParser) parseGenericSwapDirect(data []byte) []common.Address {
var tokens []common.Address
seenTokens := make(map[common.Address]bool)
// Scan for addresses at standard ABI positions
positions := []int{16, 48, 80, 112, 144, 176} // Common address positions in ABI encoding
for _, pos := range positions {
if pos+20 <= len(data) {
addr := common.BytesToAddress(data[pos : pos+20])
if addr != (common.Address{}) && isValidPoolTokenAddress(addr) && !isKnownRouterOrManager(addr) && !seenTokens[addr] {
tokens = append(tokens, addr)
seenTokens[addr] = true
if len(tokens) >= 2 {
break
}
}
}
}
if len(tokens) >= 2 {
return tokens[:2]
}
return nil
}
// getPoolTokens attempts to extract token addresses for a pool from transaction cache
// Priority: 1) txTokenCache (from transaction calldata), 2) return zero addresses for scanner enrichment
func (ep *EventParser) getPoolTokens(poolAddress common.Address, txHash common.Hash, txTokenCache map[string][]common.Address) (token0, token1 common.Address) {
// Try to get tokens from transaction calldata cache first
if txTokenCache != nil {
if tokens, found := txTokenCache[poolAddress.Hex()]; found && len(tokens) >= 2 {
ep.logDebug("enriched pool tokens from transaction calldata",
"pool", poolAddress.Hex()[:10],
"token0", tokens[0].Hex()[:10],
"token1", tokens[1].Hex()[:10])
return tokens[0], tokens[1]
}
}
// CRITICAL FIX: Use pool cache to get tokens from known pools
// This avoids RPC calls and zero addresses for pools we've already discovered
if ep.poolCache != nil {
poolInfo := ep.poolCache.GetPool(poolAddress)
if poolInfo != nil && poolInfo.Token0 != (common.Address{}) && poolInfo.Token1 != (common.Address{}) {
ep.logDebug("enriched pool tokens from cache",
"pool", poolAddress.Hex()[:10],
"token0", poolInfo.Token0.Hex()[:10],
"token1", poolInfo.Token1.Hex()[:10])
return poolInfo.Token0, poolInfo.Token1
}
}
// If pool not in cache, log a warning - this helps identify parsing errors
ep.logDebug("pool not found in cache, returning zero addresses",
"pool", poolAddress.Hex()[:10],
"txHash", txHash.Hex()[:10])
// Return zero addresses - this will now be logged so we can track which pools are missing
return common.Address{}, common.Address{}
}
// extractAmountsFromData attempts to extract swap amounts from raw transaction data
func (ep *EventParser) extractAmountsFromData(data []byte) (*big.Int, *big.Int) {
// Default amounts if extraction fails
defaultAmountIn := big.NewInt(0)
defaultAmountOut := big.NewInt(0)
// Need at least 128 bytes for basic amount extraction
if len(data) < 128 {
return defaultAmountIn, defaultAmountOut
}
// Common patterns for amount extraction in swap transactions:
// 1. ExactInputSingle: amountIn at 160:192, amountOutMin at 192:224
// 2. ExactOutputSingle: amountInMax at 160:192, amountOut at 192:224
// 3. SwapExactTokensForTokens: amountIn at 4:36, amountOutMin at 36:68
// Try to extract amounts from common positions
// Position 1: Standard UniswapV3 positions (after tokenIn, tokenOut, fee, recipient, deadline)
if len(data) >= 224 {
amountIn := new(big.Int).SetBytes(data[160:192])
amountOut := new(big.Int).SetBytes(data[192:224])
// Validate amounts are reasonable (non-zero and not overflow values)
maxAmount := new(big.Int).Exp(big.NewInt(10), big.NewInt(30), nil) // 10^30 as max
if amountIn.Sign() > 0 && amountIn.Cmp(maxAmount) < 0 {
if amountOut.Sign() >= 0 && amountOut.Cmp(maxAmount) < 0 {
return amountIn, amountOut
}
}
}
// Position 2: UniswapV2 style (after function selector)
if len(data) >= 68 {
amountIn := new(big.Int).SetBytes(data[4:36])
amountOut := new(big.Int).SetBytes(data[36:68])
maxAmount := new(big.Int).Exp(big.NewInt(10), big.NewInt(30), nil)
if amountIn.Sign() > 0 && amountIn.Cmp(maxAmount) < 0 {
if amountOut.Sign() >= 0 && amountOut.Cmp(maxAmount) < 0 {
return amountIn, amountOut
}
}
}
// Position 3: Try scanning for non-zero 32-byte values that could be amounts
for i := 0; i+64 <= len(data); i += 32 {
val1 := new(big.Int).SetBytes(data[i : i+32])
if i+64 <= len(data) {
val2 := new(big.Int).SetBytes(data[i+32 : i+64])
// Check if these look like valid amounts
minAmount := big.NewInt(1000) // Minimum 1000 wei
maxAmount := new(big.Int).Exp(big.NewInt(10), big.NewInt(30), nil)
if val1.Sign() > 0 && val1.Cmp(minAmount) > 0 && val1.Cmp(maxAmount) < 0 {
if val2.Sign() > 0 && val2.Cmp(minAmount) > 0 && val2.Cmp(maxAmount) < 0 {
// Found two consecutive valid-looking amounts
return val1, val2
}
}
}
}
// If all extraction attempts fail, return minimal non-zero amounts to avoid division by zero
return big.NewInt(1000000), big.NewInt(1000000) // 1M wei as fallback
}
// parseTokensFromKnownMethod extracts tokens from known DEX method signatures
// parseTokensFromKnownMethod is now replaced by the TokenExtractor interface
// This function has been removed to avoid duplication with the L2 parser implementation