Restructured project for V2 refactor: **Structure Changes:** - Moved all V1 code to orig/ folder (preserved with git mv) - Created docs/planning/ directory - Added orig/README_V1.md explaining V1 preservation **Planning Documents:** - 00_V2_MASTER_PLAN.md: Complete architecture overview - Executive summary of critical V1 issues - High-level component architecture diagrams - 5-phase implementation roadmap - Success metrics and risk mitigation - 07_TASK_BREAKDOWN.md: Atomic task breakdown - 99+ hours of detailed tasks - Every task < 2 hours (atomic) - Clear dependencies and success criteria - Organized by implementation phase **V2 Key Improvements:** - Per-exchange parsers (factory pattern) - Multi-layer strict validation - Multi-index pool cache - Background validation pipeline - Comprehensive observability **Critical Issues Addressed:** - Zero address tokens (strict validation + cache enrichment) - Parsing accuracy (protocol-specific parsers) - No audit trail (background validation channel) - Inefficient lookups (multi-index cache) - Stats disconnection (event-driven metrics) Next Steps: 1. Review planning documents 2. Begin Phase 1: Foundation (P1-001 through P1-010) 3. Implement parsers in Phase 2 4. Build cache system in Phase 3 5. Add validation pipeline in Phase 4 6. Migrate and test in Phase 5 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1347 lines
47 KiB
Go
1347 lines
47 KiB
Go
package parser
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"math"
|
|
"math/big"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/accounts/abi"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
"github.com/fraktal/mev-beta/internal/utils"
|
|
"github.com/fraktal/mev-beta/internal/validation"
|
|
"github.com/fraktal/mev-beta/pkg/calldata"
|
|
"github.com/fraktal/mev-beta/pkg/transport"
|
|
pkgtypes "github.com/fraktal/mev-beta/pkg/types"
|
|
)
|
|
|
|
// safeConvertUint64ToInt64 safely converts a uint64 to int64, capping at MaxInt64 if overflow would occur
|
|
func safeConvertUint64ToInt64(v uint64) int64 {
|
|
if v > math.MaxInt64 {
|
|
return math.MaxInt64
|
|
}
|
|
return int64(v)
|
|
}
|
|
|
|
// safeConvertUint64ToInt safely converts a uint64 to int, capping at MaxInt32 if overflow would occur
|
|
func safeConvertUint64ToInt(v uint64) int {
|
|
if v > math.MaxInt32 {
|
|
return math.MaxInt32
|
|
}
|
|
return int(v)
|
|
}
|
|
|
|
// MarketDiscovery interface defines the methods needed from market discovery
|
|
type MarketDiscovery interface {
|
|
GetPoolCache() interface{} // Will return *arbitrum.PoolCache but using interface{} to prevent import cycle
|
|
}
|
|
|
|
// MEVStrategyEngine interface defines the methods needed from strategy engine
|
|
type MEVStrategyEngine interface {
|
|
AnalyzeArbitrageOpportunity(ctx context.Context, swapEvent interface{}) (interface{}, error) // Will return *arbitrum.ProfitableStrategy, error
|
|
}
|
|
|
|
// EventMonitor interface defines methods needed for event monitoring
|
|
type EventMonitor interface {
|
|
Start() error
|
|
Stop() error
|
|
GetStatistics() map[string]interface{}
|
|
}
|
|
|
|
// SwapEventPipeline interface defines methods needed for processing swap events
|
|
type SwapEventPipeline interface {
|
|
Start() error
|
|
Stop() error
|
|
GetStatistics() map[string]interface{}
|
|
ProcessSwap(swap interface{}) error
|
|
}
|
|
|
|
// ABIDecoder interface defines methods needed for ABI decoding
|
|
type ABIDecoder interface {
|
|
DecodeSwapTransaction(protocol string, data []byte) (interface{}, error)
|
|
CalculatePoolAddress(protocol, tokenA, tokenB string, fee interface{}) (common.Address, error)
|
|
}
|
|
|
|
// EnhancedSequencerParser provides comprehensive MEV event detection
|
|
type EnhancedSequencerParser struct {
|
|
providerManager *transport.ProviderManager // Manages multiple RPC providers with rotation
|
|
logger *logger.Logger
|
|
protocolRegistry interface{} // Will hold the required protocol registry
|
|
eventMonitor EventMonitor // Will hold EventMonitor implementation
|
|
swapPipeline SwapEventPipeline // Will hold SwapEventPipeline implementation
|
|
marketDiscovery MarketDiscovery // Interface for market discovery
|
|
|
|
// MEV opportunity detection
|
|
mevAnalyzer *MEVAnalyzer
|
|
profitCalculator *ProfitCalculator
|
|
abiDecoder ABIDecoder // Added for transaction decoding
|
|
arbitrageExecutor interface{} // Will hold *arbitrage.ArbitrageExecutor when available, but we avoid direct import to prevent cycle
|
|
capitalOptimizer interface{} // Will hold capital optimization component
|
|
profitTracker interface{} // Will hold profitability tracking component
|
|
|
|
// Event counters
|
|
swapCount uint64
|
|
liquidationCount uint64
|
|
liquidityCount uint64
|
|
}
|
|
|
|
// PoolStateUpdate represents a pool state change from a swap event
|
|
type PoolStateUpdate struct {
|
|
Pool common.Address
|
|
TokenIn common.Address
|
|
TokenOut common.Address
|
|
AmountIn *big.Int
|
|
AmountOut *big.Int
|
|
Timestamp time.Time
|
|
UpdateType string // "swap", "liquidity_add", "liquidity_remove"
|
|
}
|
|
|
|
// PoolPriceUpdate represents a pool price update
|
|
type PoolPriceUpdate struct {
|
|
Pool common.Address
|
|
TokenIn common.Address
|
|
TokenOut common.Address
|
|
NewPrice *big.Float
|
|
PriceImpact float64
|
|
Timestamp time.Time
|
|
}
|
|
|
|
// MEVAnalyzer analyzes transactions for MEV opportunities
|
|
type MEVAnalyzer struct {
|
|
minProfitThreshold *big.Int // Minimum profit in wei to consider
|
|
gasPrice *big.Int // Current gas price
|
|
maxGasLimit uint64 // Maximum gas we're willing to use
|
|
|
|
// Profit margins
|
|
arbitrageMargin float64 // Minimum profit margin for arbitrage
|
|
liquidationMargin float64 // Minimum profit margin for liquidations
|
|
sandwichMargin float64 // Minimum profit margin for sandwich attacks
|
|
}
|
|
|
|
// ProfitCalculator calculates potential MEV profits
|
|
type ProfitCalculator struct {
|
|
gasOracle GasOracle
|
|
priceOracle PriceOracle
|
|
poolOracle PoolOracle
|
|
}
|
|
|
|
// GasOracle provides gas price information
|
|
type GasOracle interface {
|
|
GetCurrentGasPrice() (*big.Int, error)
|
|
EstimateGasUsage(txType string, complexity int) (uint64, error)
|
|
}
|
|
|
|
// PriceOracle provides token price information
|
|
type PriceOracle interface {
|
|
GetTokenPrice(token common.Address) (*big.Int, error)
|
|
GetPriceImpact(tokenIn, tokenOut common.Address, amountIn *big.Int) (float64, error)
|
|
}
|
|
|
|
// PoolOracle provides pool liquidity and state information
|
|
type PoolOracle interface {
|
|
GetPoolLiquidity(pool common.Address) (*big.Int, error)
|
|
GetPoolState(pool common.Address) (interface{}, error)
|
|
}
|
|
|
|
// NewEnhancedSequencerParser creates a new enhanced parser with provider manager
|
|
func NewEnhancedSequencerParser(providerManager *transport.ProviderManager, logger *logger.Logger, poolCache interface{}, marketDiscovery MarketDiscovery, strategyEngine MEVStrategyEngine, arbitrageService interface{}) (*EnhancedSequencerParser, error) {
|
|
// Initialize MEV analyzer
|
|
mevAnalyzer := &MEVAnalyzer{
|
|
minProfitThreshold: big.NewInt(1000000000000000), // 0.001 ETH minimum profit
|
|
gasPrice: big.NewInt(100000000), // 0.1 gwei default
|
|
maxGasLimit: 500000, // 500k gas limit
|
|
arbitrageMargin: 0.05, // 5% minimum margin
|
|
liquidationMargin: 0.10, // 10% minimum margin
|
|
sandwichMargin: 0.02, // 2% minimum margin
|
|
}
|
|
|
|
// Initialize ABI decoder
|
|
decoder, err := NewABIDecoder()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize ABI decoder: %w", err)
|
|
}
|
|
|
|
// Math calculator initialization removed to avoid dependency issues
|
|
|
|
// We're keeping the implementation minimal here to avoid circular imports
|
|
// The full initialization would happen in the main arbitrum package
|
|
parser := &EnhancedSequencerParser{
|
|
providerManager: providerManager,
|
|
logger: logger,
|
|
protocolRegistry: nil, // To be initialized properly
|
|
eventMonitor: nil, // To be initialized properly
|
|
swapPipeline: nil, // To be initialized properly
|
|
marketDiscovery: marketDiscovery,
|
|
mevAnalyzer: mevAnalyzer,
|
|
abiDecoder: decoder, // Using interface{}
|
|
}
|
|
|
|
logger.Info("Enhanced sequencer parser initialized with comprehensive MEV detection and event monitoring")
|
|
return parser, nil
|
|
}
|
|
|
|
// NewABIDecoder creates a new ABI decoder (using interface{} to avoid import issues)
|
|
func NewABIDecoder() (ABIDecoder, error) {
|
|
// Return a placeholder implementation
|
|
decoder := &sophisticatedABIDecoder{
|
|
protocolABIs: make(map[string]*abi.ABI),
|
|
logger: nil,
|
|
addressValidator: validation.NewAddressValidator(),
|
|
safeConverter: utils.NewSafeAddressConverter(),
|
|
}
|
|
return decoder, nil
|
|
}
|
|
|
|
// sophisticatedABIDecoder implements comprehensive ABI decoding for all DEX protocols
|
|
type sophisticatedABIDecoder struct {
|
|
protocolABIs map[string]*abi.ABI
|
|
logger *logger.Logger
|
|
addressValidator *validation.AddressValidator
|
|
safeConverter *utils.SafeAddressConverter
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) DecodeSwapTransaction(protocol string, data []byte) (interface{}, error) {
|
|
if len(data) < 4 {
|
|
return nil, fmt.Errorf("transaction data too short")
|
|
}
|
|
|
|
// Extract method signature (first 4 bytes)
|
|
methodSig := data[:4]
|
|
txData := data[4:]
|
|
|
|
// Decode based on protocol and method signature
|
|
switch protocol {
|
|
case "uniswap_v3":
|
|
return p.decodeUniswapV3Swap(methodSig, txData)
|
|
case "uniswap_v2":
|
|
return p.decodeUniswapV2Swap(methodSig, txData)
|
|
case "sushiswap":
|
|
return p.decodeSushiSwap(methodSig, txData)
|
|
case "camelot":
|
|
return p.decodeCamelotSwap(methodSig, txData)
|
|
case "balancer":
|
|
return p.decodeBalancerSwap(methodSig, txData)
|
|
case "curve":
|
|
return p.decodeCurveSwap(methodSig, txData)
|
|
default:
|
|
return p.decodeGenericSwap(methodSig, txData)
|
|
}
|
|
}
|
|
|
|
// decodeUniswapV3Swap decodes Uniswap V3 swap transactions
|
|
func (p *sophisticatedABIDecoder) decodeUniswapV3Swap(methodSig []byte, data []byte) (interface{}, error) {
|
|
// exactInputSingle: 0x414bf389
|
|
// exactOutputSingle: 0xdb3e2198
|
|
// exactInput: 0xc04b8d59
|
|
// exactOutput: 0xf28c0498
|
|
|
|
switch {
|
|
case bytes.Equal(methodSig, []byte{0xac, 0x96, 0x50, 0xd8}), // multicall(uint256,bytes[])
|
|
bytes.Equal(methodSig, []byte{0x5a, 0xe4, 0x01, 0xdc}): // multicall(bytes[])
|
|
return p.decodeUniswapV3Multicall(data)
|
|
case bytes.Equal(methodSig, []byte{0x41, 0x4b, 0xf3, 0x89}): // exactInputSingle
|
|
return p.decodeExactInputSingle(data)
|
|
case bytes.Equal(methodSig, []byte{0xdb, 0x3e, 0x21, 0x98}): // exactOutputSingle
|
|
return p.decodeExactOutputSingle(data)
|
|
case bytes.Equal(methodSig, []byte{0xc0, 0x4b, 0x8d, 0x59}): // exactInput
|
|
return p.decodeExactInput(data)
|
|
case bytes.Equal(methodSig, []byte{0xf2, 0x8c, 0x04, 0x98}): // exactOutput
|
|
return p.decodeExactOutput(data)
|
|
default:
|
|
return nil, fmt.Errorf("unknown Uniswap V3 method signature: %x", methodSig)
|
|
}
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) parseUniswapV3SingleSwapParams(data []byte) (tokenIn, tokenOut, recipient common.Address, fee, deadline, amountA, amountB *big.Int, err error) {
|
|
if len(data) < 224 {
|
|
err = fmt.Errorf("data too short for single swap")
|
|
return
|
|
}
|
|
|
|
// PHASE 5 FIX: Use safe address conversion for token extraction
|
|
rawTokenIn := common.BytesToAddress(data[12:32])
|
|
if p.addressValidator != nil {
|
|
validation := p.addressValidator.ValidateAddress(rawTokenIn.Hex())
|
|
if !validation.IsValid || validation.CorruptionScore > 30 {
|
|
err = fmt.Errorf("invalid tokenIn address: %s (corruption score: %d)", rawTokenIn.Hex(), validation.CorruptionScore)
|
|
return
|
|
}
|
|
}
|
|
tokenIn = rawTokenIn
|
|
|
|
rawTokenOut := common.BytesToAddress(data[44:64])
|
|
if p.addressValidator != nil {
|
|
validation := p.addressValidator.ValidateAddress(rawTokenOut.Hex())
|
|
if !validation.IsValid || validation.CorruptionScore > 30 {
|
|
err = fmt.Errorf("invalid tokenOut address: %s (corruption score: %d)", rawTokenOut.Hex(), validation.CorruptionScore)
|
|
return
|
|
}
|
|
}
|
|
tokenOut = rawTokenOut
|
|
|
|
rawRecipient := common.BytesToAddress(data[108:128])
|
|
if p.addressValidator != nil {
|
|
validation := p.addressValidator.ValidateAddress(rawRecipient.Hex())
|
|
if !validation.IsValid || validation.CorruptionScore > 30 {
|
|
err = fmt.Errorf("invalid recipient address: %s (corruption score: %d)", rawRecipient.Hex(), validation.CorruptionScore)
|
|
return
|
|
}
|
|
}
|
|
recipient = rawRecipient
|
|
|
|
fee = new(big.Int).SetBytes(data[64:96])
|
|
deadline = new(big.Int).SetBytes(data[128:160])
|
|
amountA = new(big.Int).SetBytes(data[160:192])
|
|
amountB = new(big.Int).SetBytes(data[192:224])
|
|
return
|
|
}
|
|
|
|
// decodeExactInputSingle decodes exactInputSingle parameters
|
|
func (p *sophisticatedABIDecoder) decodeExactInputSingle(data []byte) (*SwapEvent, error) {
|
|
tokenIn, tokenOut, recipient, fee, deadline, amountIn, amountOutMinimum, err := p.parseUniswapV3SingleSwapParams(data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing exactInputSingle: %w", err)
|
|
}
|
|
|
|
return &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "uniswap_v3",
|
|
TokenIn: tokenIn,
|
|
TokenOut: tokenOut,
|
|
AmountIn: amountIn,
|
|
AmountOut: amountOutMinimum,
|
|
Recipient: recipient,
|
|
Fee: fee.Uint64(),
|
|
Deadline: deadline.Uint64(),
|
|
}, nil
|
|
}
|
|
|
|
// decodeUniswapV2Swap decodes Uniswap V2 style swaps
|
|
func (p *sophisticatedABIDecoder) decodeUniswapV2Swap(methodSig []byte, data []byte) (interface{}, error) {
|
|
// swapExactTokensForTokens: 0x38ed1739
|
|
// swapTokensForExactTokens: 0x8803dbee
|
|
// swapExactETHForTokens: 0x7ff36ab5
|
|
// swapTokensForExactETH: 0x4a25d94a
|
|
|
|
switch {
|
|
case bytes.Equal(methodSig, []byte{0x38, 0xed, 0x17, 0x39}):
|
|
return p.decodeSwapExactTokensForTokens(data)
|
|
case bytes.Equal(methodSig, []byte{0x88, 0x03, 0xdb, 0xee}):
|
|
return p.decodeSwapTokensForExactTokens(data)
|
|
default:
|
|
return nil, fmt.Errorf("unknown Uniswap V2 method signature: %x", methodSig)
|
|
}
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) CalculatePoolAddress(protocol, tokenA, tokenB string, fee interface{}) (common.Address, error) {
|
|
// PHASE 5 FIX: Use safe address conversion for pool calculation
|
|
var tokenAAddr, tokenBAddr common.Address
|
|
|
|
if p.safeConverter != nil {
|
|
result := p.safeConverter.SafeHexToAddress(tokenA)
|
|
if !result.IsValid {
|
|
return common.Address{}, fmt.Errorf("invalid tokenA address: %s (%v)", tokenA, result.Error)
|
|
}
|
|
tokenAAddr = result.Address
|
|
|
|
result = p.safeConverter.SafeHexToAddress(tokenB)
|
|
if !result.IsValid {
|
|
return common.Address{}, fmt.Errorf("invalid tokenB address: %s (%v)", tokenB, result.Error)
|
|
}
|
|
tokenBAddr = result.Address
|
|
} else {
|
|
// Fallback to unsafe conversion if safe converter not available
|
|
tokenAAddr = common.HexToAddress(tokenA)
|
|
tokenBAddr = common.HexToAddress(tokenB)
|
|
}
|
|
|
|
// Ensure token ordering (token0 < token1)
|
|
if bytes.Compare(tokenAAddr.Bytes(), tokenBAddr.Bytes()) > 0 {
|
|
tokenAAddr, tokenBAddr = tokenBAddr, tokenAAddr
|
|
}
|
|
|
|
switch protocol {
|
|
case "uniswap_v3":
|
|
return p.calculateUniswapV3PoolAddress(tokenAAddr, tokenBAddr, fee)
|
|
case "uniswap_v2", "sushiswap":
|
|
return p.calculateUniswapV2PoolAddress(tokenAAddr, tokenBAddr, protocol)
|
|
case "camelot":
|
|
return p.calculateCamelotPoolAddress(tokenAAddr, tokenBAddr)
|
|
case "balancer":
|
|
return p.calculateBalancerPoolAddress(tokenAAddr, tokenBAddr)
|
|
default:
|
|
return common.Address{}, fmt.Errorf("unsupported protocol: %s", protocol)
|
|
}
|
|
}
|
|
|
|
// calculateUniswapV3PoolAddress calculates Uniswap V3 pool address using CREATE2
|
|
func (p *sophisticatedABIDecoder) calculateUniswapV3PoolAddress(token0, token1 common.Address, fee interface{}) (common.Address, error) {
|
|
feeInt, ok := fee.(int)
|
|
if !ok {
|
|
return common.Address{}, fmt.Errorf("invalid fee type for Uniswap V3")
|
|
}
|
|
|
|
// Uniswap V3 Factory on Arbitrum: 0x1F98431c8aD98523631AE4a59f267346ea31F984
|
|
factory := common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984")
|
|
initCodeHash := common.HexToHash("0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54")
|
|
|
|
// Encode (address token0, address token1, uint24 fee)
|
|
encoded := make([]byte, 0, 96)
|
|
encoded = append(encoded, token0.Bytes()...)
|
|
encoded = append(encoded, token1.Bytes()...)
|
|
feeBytes := make([]byte, 32)
|
|
binary.BigEndian.PutUint32(feeBytes[28:], uint32(feeInt))
|
|
encoded = append(encoded, feeBytes...)
|
|
|
|
salt := crypto.Keccak256(encoded)
|
|
return p.computeCREATE2Address(factory, salt, initCodeHash), nil
|
|
}
|
|
|
|
// computeCREATE2Address computes CREATE2 address
|
|
func (p *sophisticatedABIDecoder) computeCREATE2Address(factory common.Address, salt []byte, initCodeHash common.Hash) common.Address {
|
|
data := make([]byte, 0, 85)
|
|
data = append(data, 0xff)
|
|
data = append(data, factory.Bytes()...)
|
|
data = append(data, salt...)
|
|
data = append(data, initCodeHash.Bytes()...)
|
|
|
|
hash := crypto.Keccak256(data)
|
|
return common.BytesToAddress(hash[12:])
|
|
}
|
|
|
|
// ParseBlockForMEV analyzes a block for all MEV opportunities
|
|
func (p *EnhancedSequencerParser) ParseBlockForMEV(ctx context.Context, blockNumber uint64) (*MEVOpportunities, error) {
|
|
startTime := time.Now()
|
|
|
|
// Create opportunities structure early to ensure we always return something
|
|
opportunities := &MEVOpportunities{
|
|
BlockNumber: blockNumber,
|
|
Timestamp: time.Now(),
|
|
SwapEvents: make([]*SwapEvent, 0),
|
|
LiquidationEvents: make([]*LiquidationEvent, 0),
|
|
LiquidityEvents: make([]*LiquidityEvent, 0),
|
|
ArbitrageOps: make([]*pkgtypes.ArbitrageOpportunity, 0),
|
|
SandwichOps: make([]*SandwichOpportunity, 0),
|
|
LiquidationOps: make([]*LiquidationOpportunity, 0),
|
|
}
|
|
|
|
// Try to fetch block with more robust error handling
|
|
block, err := p.fetchBlockSafely(ctx, blockNumber)
|
|
if err != nil {
|
|
p.logger.Debug(fmt.Sprintf("Could not fetch block %d: %v", blockNumber, err))
|
|
// Return empty opportunities instead of failing completely
|
|
opportunities.ProcessingTime = time.Since(startTime)
|
|
return opportunities, nil
|
|
}
|
|
|
|
// Update timestamp with actual block time
|
|
blockTime := block.Time()
|
|
if blockTime > math.MaxInt64 {
|
|
return nil, fmt.Errorf("block timestamp %d exceeds maximum int64 value", blockTime)
|
|
}
|
|
opportunities.Timestamp = time.Unix(safeConvertUint64ToInt64(blockTime), 0)
|
|
|
|
// Process all transactions in the block
|
|
for _, tx := range block.Transactions() {
|
|
if err := p.analyzeTransaction(ctx, tx, opportunities); err != nil {
|
|
p.logger.Debug(fmt.Sprintf("Failed to analyze transaction %s: %v", tx.Hash().Hex(), err))
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Detect MEV opportunities from parsed events
|
|
p.detectArbitrageOpportunities(opportunities)
|
|
p.detectSandwichAttacks(opportunities)
|
|
p.detectLiquidationOpportunities(opportunities)
|
|
|
|
// Calculate overall MEV potential for the block
|
|
opportunities.TotalProfit = p.calculateTotalProfit(opportunities)
|
|
opportunities.ProcessingTime = time.Since(startTime)
|
|
|
|
// Log comprehensive block analysis results
|
|
p.logger.Info(fmt.Sprintf("Block %d: Processed %d swaps, %d arbitrage ops, %d liquidations, %d MEV opportunities (%.2f ms)",
|
|
blockNumber, len(opportunities.SwapEvents), len(opportunities.ArbitrageOps),
|
|
len(opportunities.LiquidationOps), len(opportunities.SandwichOps),
|
|
float64(opportunities.ProcessingTime.Nanoseconds())/1e6))
|
|
|
|
return opportunities, nil
|
|
}
|
|
|
|
// GetStatistics returns parser statistics
|
|
func (p *EnhancedSequencerParser) GetStatistics() map[string]interface{} {
|
|
count := 0
|
|
if p.protocolRegistry != nil {
|
|
// In a complete implementation, we would have a way to count active protocols
|
|
// For now, just return a placeholder
|
|
count = 0
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"swaps_detected": p.swapCount,
|
|
"liquidations_detected": p.liquidationCount,
|
|
"liquidity_events": p.liquidityCount,
|
|
"active_protocols": count,
|
|
}
|
|
}
|
|
|
|
// Close closes the parser and all associated resources
|
|
func (p *EnhancedSequencerParser) Close() error {
|
|
// In a complete implementation, we would properly close all resources
|
|
// For now, return nil to avoid issues with the interface
|
|
return nil
|
|
}
|
|
|
|
// calculateTotalProfit calculates the total profit from all opportunities
|
|
func (p *EnhancedSequencerParser) calculateTotalProfit(opportunities *MEVOpportunities) *big.Int {
|
|
total := big.NewInt(0)
|
|
for _, op := range opportunities.ArbitrageOps {
|
|
if op.Profit != nil {
|
|
total.Add(total, op.Profit)
|
|
}
|
|
}
|
|
for _, op := range opportunities.SandwichOps {
|
|
if op.ExpectedProfit != nil {
|
|
total.Add(total, op.ExpectedProfit)
|
|
}
|
|
}
|
|
for _, op := range opportunities.LiquidationOps {
|
|
if op.ExpectedProfit != nil {
|
|
total.Add(total, op.ExpectedProfit)
|
|
}
|
|
}
|
|
return total
|
|
}
|
|
|
|
// calculateROI calculates the return on investment as a percentage
|
|
func (p *EnhancedSequencerParser) calculateROI(profit, investment *big.Int) float64 {
|
|
if investment == nil || investment.Cmp(big.NewInt(0)) == 0 {
|
|
return 0.0
|
|
}
|
|
|
|
// Convert to float64 for percentage calculation
|
|
profitFloat := new(big.Float).SetInt(profit)
|
|
investmentFloat := new(big.Float).SetInt(investment)
|
|
|
|
// Calculate ROI = (profit / investment) * 100
|
|
roi := new(big.Float).Quo(profitFloat, investmentFloat)
|
|
roi.Mul(roi, big.NewFloat(100))
|
|
|
|
result, _ := roi.Float64()
|
|
return result
|
|
}
|
|
|
|
// Helper functions for sophisticated ABI decoder
|
|
|
|
// decodeExactOutputSingle decodes exactOutputSingle parameters
|
|
func (p *sophisticatedABIDecoder) decodeExactOutputSingle(data []byte) (*SwapEvent, error) {
|
|
tokenIn, tokenOut, recipient, fee, deadline, amountOut, amountInMaximum, err := p.parseUniswapV3SingleSwapParams(data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing exactOutputSingle: %w", err)
|
|
}
|
|
|
|
return &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "uniswap_v3",
|
|
TokenIn: tokenIn,
|
|
TokenOut: tokenOut,
|
|
AmountIn: amountInMaximum,
|
|
AmountOut: amountOut,
|
|
Recipient: recipient,
|
|
Fee: fee.Uint64(),
|
|
Deadline: deadline.Uint64(),
|
|
}, nil
|
|
}
|
|
|
|
// decodeExactInput decodes multi-hop exactInput
|
|
func (p *sophisticatedABIDecoder) decodeExactInput(data []byte) (*SwapEvent, error) {
|
|
// Multi-hop swaps have path encoding
|
|
if len(data) < 160 {
|
|
return nil, fmt.Errorf("data too short for exactInput")
|
|
}
|
|
|
|
// PHASE 5 FIX: Validate recipient address in multi-hop swaps
|
|
rawRecipient := common.BytesToAddress(data[12:32])
|
|
if p.addressValidator != nil {
|
|
validation := p.addressValidator.ValidateAddress(rawRecipient.Hex())
|
|
if !validation.IsValid || validation.CorruptionScore > 30 {
|
|
return nil, fmt.Errorf("invalid recipient address in exactInput: %s (corruption score: %d)", rawRecipient.Hex(), validation.CorruptionScore)
|
|
}
|
|
}
|
|
recipient := rawRecipient
|
|
deadline := new(big.Int).SetBytes(data[32:64])
|
|
amountIn := new(big.Int).SetBytes(data[64:96])
|
|
amountOutMinimum := new(big.Int).SetBytes(data[96:128])
|
|
|
|
// Extract first and last tokens from path (simplified)
|
|
pathOffset := new(big.Int).SetBytes(data[128:160]).Uint64()
|
|
if len(data) < safeConvertUint64ToInt(pathOffset)+64 {
|
|
return nil, fmt.Errorf("invalid path data")
|
|
}
|
|
|
|
// PHASE 5 FIX: Validate tokenIn in multi-hop path
|
|
rawTokenIn := common.BytesToAddress(data[pathOffset+12 : pathOffset+32])
|
|
if p.addressValidator != nil {
|
|
validation := p.addressValidator.ValidateAddress(rawTokenIn.Hex())
|
|
if !validation.IsValid || validation.CorruptionScore > 30 {
|
|
return nil, fmt.Errorf("invalid tokenIn in exactInput path: %s (corruption score: %d)", rawTokenIn.Hex(), validation.CorruptionScore)
|
|
}
|
|
}
|
|
tokenIn := rawTokenIn
|
|
|
|
// For multi-hop, we'd need to parse the full path, simplified here
|
|
tokenOut := common.Address{} // Would extract from end of path
|
|
|
|
return &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "uniswap_v3",
|
|
TokenIn: tokenIn,
|
|
TokenOut: tokenOut,
|
|
AmountIn: amountIn,
|
|
AmountOut: amountOutMinimum,
|
|
Recipient: recipient,
|
|
Deadline: deadline.Uint64(),
|
|
}, nil
|
|
}
|
|
|
|
// decodeExactOutput decodes multi-hop exactOutput
|
|
func (p *sophisticatedABIDecoder) decodeExactOutput(data []byte) (*SwapEvent, error) {
|
|
return p.decodeExactInput(data) // Similar structure, different semantics
|
|
}
|
|
|
|
// decodeUniswapV3Multicall decodes multicall payloads by inspecting embedded calls
|
|
func (p *sophisticatedABIDecoder) decodeUniswapV3Multicall(data []byte) (interface{}, error) {
|
|
if len(data) == 0 {
|
|
return nil, fmt.Errorf("empty payload for Uniswap V3 multicall")
|
|
}
|
|
|
|
calls, err := calldata.DecodeMulticallCalls(data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode multicall calls: %w", err)
|
|
}
|
|
|
|
for _, call := range calls {
|
|
if len(call) < 4 {
|
|
continue
|
|
}
|
|
nested, err := p.decodeUniswapV3Swap(call[:4], call[4:])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if swap, ok := nested.(*SwapEvent); ok {
|
|
return swap, nil
|
|
}
|
|
}
|
|
|
|
// Fallback: derive tokens directly if no nested call decoded successfully
|
|
tokens, err := calldata.ExtractTokensFromMulticallWithContext(data, &calldata.MulticallContext{
|
|
Protocol: "uniswap_v3",
|
|
Stage: "arbitrum.parser.decodeUniswapV3Multicall",
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to extract tokens from multicall: %w", err)
|
|
}
|
|
if len(tokens) == 0 {
|
|
return nil, fmt.Errorf("no recognizable swaps found in multicall payload")
|
|
}
|
|
|
|
swap := &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "uniswap_v3",
|
|
TokenIn: tokens[0],
|
|
TokenOut: common.Address{},
|
|
AmountIn: big.NewInt(0),
|
|
AmountOut: big.NewInt(0),
|
|
}
|
|
if len(tokens) > 1 {
|
|
swap.TokenOut = tokens[1]
|
|
}
|
|
|
|
return swap, nil
|
|
}
|
|
|
|
// decodeSushiSwap decodes SushiSwap transactions
|
|
func (p *sophisticatedABIDecoder) decodeSushiSwap(methodSig []byte, data []byte) (interface{}, error) {
|
|
// SushiSwap uses Uniswap V2 style interface
|
|
return p.decodeUniswapV2Swap(methodSig, data)
|
|
}
|
|
|
|
// decodeCamelotSwap decodes Camelot DEX transactions
|
|
func (p *sophisticatedABIDecoder) decodeCamelotSwap(methodSig []byte, data []byte) (interface{}, error) {
|
|
// Camelot has similar interface to Uniswap V2 with some modifications
|
|
return p.decodeUniswapV2Swap(methodSig, data)
|
|
}
|
|
|
|
// decodeBalancerSwap decodes Balancer V2 transactions
|
|
func (p *sophisticatedABIDecoder) decodeBalancerSwap(methodSig []byte, data []byte) (interface{}, error) {
|
|
// Balancer has different structure - batchSwap, swap, etc.
|
|
switch {
|
|
case bytes.Equal(methodSig, []byte{0x94, 0x5b, 0xcf, 0x7c}): // batchSwap
|
|
return p.decodeBalancerBatchSwap(data)
|
|
case bytes.Equal(methodSig, []byte{0x52, 0xba, 0xb9, 0x02}): // swap
|
|
return p.decodeBalancerSingleSwap(data)
|
|
default:
|
|
return nil, fmt.Errorf("unknown Balancer method signature: %x", methodSig)
|
|
}
|
|
}
|
|
|
|
// decodeCurveSwap decodes Curve finance transactions
|
|
func (p *sophisticatedABIDecoder) decodeCurveSwap(methodSig []byte, data []byte) (interface{}, error) {
|
|
// Curve has exchange, exchange_underlying methods
|
|
switch {
|
|
case bytes.Equal(methodSig, []byte{0x3d, 0xf0, 0x21, 0x24}): // exchange
|
|
return p.decodeCurveExchange(data)
|
|
case bytes.Equal(methodSig, []byte{0xa6, 0x41, 0x7e, 0xd6}): // exchange_underlying
|
|
return p.decodeCurveExchangeUnderlying(data)
|
|
default:
|
|
return nil, fmt.Errorf("unknown Curve method signature: %x", methodSig)
|
|
}
|
|
}
|
|
|
|
// decodeGenericSwap attempts to decode unknown protocols
|
|
func (p *sophisticatedABIDecoder) decodeGenericSwap(methodSig []byte, data []byte) (interface{}, error) {
|
|
// Generic decoder for unknown protocols
|
|
// Try to extract basic swap information using common patterns
|
|
if len(data) < 64 {
|
|
return nil, fmt.Errorf("insufficient data for generic decode")
|
|
}
|
|
|
|
return &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "unknown",
|
|
TokenIn: common.Address{},
|
|
TokenOut: common.Address{},
|
|
AmountIn: big.NewInt(0),
|
|
AmountOut: big.NewInt(0),
|
|
}, nil
|
|
}
|
|
|
|
// Additional helper functions for specific decoders
|
|
func (p *sophisticatedABIDecoder) decodeSwapExactTokensForTokens(data []byte) (*SwapEvent, error) {
|
|
if len(data) < 160 {
|
|
return nil, fmt.Errorf("data too short for swapExactTokensForTokens")
|
|
}
|
|
|
|
amountIn := new(big.Int).SetBytes(data[0:32])
|
|
amountOutMin := new(big.Int).SetBytes(data[32:64])
|
|
pathOffset := new(big.Int).SetBytes(data[64:96]).Uint64()
|
|
|
|
// PHASE 5 FIX: Validate recipient address in V2 swaps
|
|
rawRecipient := common.BytesToAddress(data[108:128])
|
|
if p.addressValidator != nil {
|
|
validation := p.addressValidator.ValidateAddress(rawRecipient.Hex())
|
|
if !validation.IsValid || validation.CorruptionScore > 30 {
|
|
return nil, fmt.Errorf("invalid recipient address in V2 swap: %s (corruption score: %d)", rawRecipient.Hex(), validation.CorruptionScore)
|
|
}
|
|
}
|
|
recipient := rawRecipient
|
|
deadline := new(big.Int).SetBytes(data[128:160])
|
|
|
|
// Extract tokens from path
|
|
if len(data) < safeConvertUint64ToInt(pathOffset)+64 {
|
|
return nil, fmt.Errorf("invalid path data")
|
|
}
|
|
|
|
// PHASE 5 FIX: Validate token addresses in V2 swap path
|
|
rawTokenIn := common.BytesToAddress(data[pathOffset+12 : pathOffset+32])
|
|
if p.addressValidator != nil {
|
|
validation := p.addressValidator.ValidateAddress(rawTokenIn.Hex())
|
|
if !validation.IsValid || validation.CorruptionScore > 30 {
|
|
return nil, fmt.Errorf("invalid tokenIn in V2 swap path: %s (corruption score: %d)", rawTokenIn.Hex(), validation.CorruptionScore)
|
|
}
|
|
}
|
|
tokenIn := rawTokenIn
|
|
|
|
rawTokenOut := common.BytesToAddress(data[pathOffset+44 : pathOffset+64])
|
|
if p.addressValidator != nil {
|
|
validation := p.addressValidator.ValidateAddress(rawTokenOut.Hex())
|
|
if !validation.IsValid || validation.CorruptionScore > 30 {
|
|
return nil, fmt.Errorf("invalid tokenOut in V2 swap path: %s (corruption score: %d)", rawTokenOut.Hex(), validation.CorruptionScore)
|
|
}
|
|
}
|
|
tokenOut := rawTokenOut
|
|
|
|
return &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "uniswap_v2",
|
|
TokenIn: tokenIn,
|
|
TokenOut: tokenOut,
|
|
AmountIn: amountIn,
|
|
AmountOut: amountOutMin,
|
|
Recipient: recipient,
|
|
Deadline: deadline.Uint64(),
|
|
}, nil
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) decodeSwapTokensForExactTokens(data []byte) (*SwapEvent, error) {
|
|
// Similar to above but with different semantics
|
|
return p.decodeSwapExactTokensForTokens(data)
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) calculateUniswapV2PoolAddress(token0, token1 common.Address, protocol string) (common.Address, error) {
|
|
var factory common.Address
|
|
var initCodeHash common.Hash
|
|
|
|
switch protocol {
|
|
case "uniswap_v2":
|
|
factory = common.HexToAddress("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f")
|
|
initCodeHash = common.HexToHash("0x96e8ac4277198ff8b6f785478aa9a39f403cb768dd02cbee326c3e7da348845f")
|
|
case "sushiswap":
|
|
factory = common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4")
|
|
initCodeHash = common.HexToHash("0xe18a34eb0e04b04f7a0ac29a6e80748dca96319b42c54d679cb821dca90c6303")
|
|
default:
|
|
return common.Address{}, fmt.Errorf("unsupported Uniswap V2 protocol: %s", protocol)
|
|
}
|
|
|
|
salt := crypto.Keccak256(append(token0.Bytes(), token1.Bytes()...))
|
|
return p.computeCREATE2Address(factory, salt, initCodeHash), nil
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) calculateCamelotPoolAddress(token0, token1 common.Address) (common.Address, error) {
|
|
factory := common.HexToAddress("0x6EcCab422D763aC031210895C81787E87B91425E")
|
|
initCodeHash := common.HexToHash("0xa856464ae65f7619087bc369daaf7e387dae1e5af69cfa7935850ebf754b04c1")
|
|
|
|
salt := crypto.Keccak256(append(token0.Bytes(), token1.Bytes()...))
|
|
return p.computeCREATE2Address(factory, salt, initCodeHash), nil
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) calculateBalancerPoolAddress(token0, token1 common.Address) (common.Address, error) {
|
|
// Balancer uses a different pool creation mechanism
|
|
// This is a simplified implementation
|
|
return common.Address{}, fmt.Errorf("Balancer pool address calculation not yet implemented")
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) decodeBalancerBatchSwap(data []byte) (*SwapEvent, error) {
|
|
// Balancer batchSwap is complex - simplified here
|
|
return &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "balancer",
|
|
TokenIn: common.Address{},
|
|
TokenOut: common.Address{},
|
|
AmountIn: big.NewInt(0),
|
|
AmountOut: big.NewInt(0),
|
|
}, nil
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) decodeBalancerSingleSwap(data []byte) (*SwapEvent, error) {
|
|
// Balancer single swap decoder
|
|
return p.decodeBalancerBatchSwap(data)
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) decodeCurveExchange(data []byte) (*SwapEvent, error) {
|
|
if len(data) < 96 {
|
|
return nil, fmt.Errorf("data too short for Curve exchange")
|
|
}
|
|
|
|
i := new(big.Int).SetBytes(data[0:32])
|
|
j := new(big.Int).SetBytes(data[32:64])
|
|
dx := new(big.Int).SetBytes(data[64:96])
|
|
|
|
return &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "curve",
|
|
TokenIn: common.Address{}, // Would need to resolve from pool and index
|
|
TokenOut: common.Address{}, // Would need to resolve from pool and index
|
|
AmountIn: dx,
|
|
AmountOut: big.NewInt(0), // Would need to calculate
|
|
CurveI: i.Uint64(),
|
|
CurveJ: j.Uint64(),
|
|
}, nil
|
|
}
|
|
|
|
func (p *sophisticatedABIDecoder) decodeCurveExchangeUnderlying(data []byte) (*SwapEvent, error) {
|
|
// Similar to exchange but with underlying tokens
|
|
return p.decodeCurveExchange(data)
|
|
}
|
|
|
|
// analyzeTransaction examines a transaction for DEX interactions and MEV opportunities
|
|
func (p *EnhancedSequencerParser) analyzeTransaction(ctx context.Context, tx *types.Transaction, opportunities *MEVOpportunities) error {
|
|
// Get HTTP client from provider manager
|
|
ethClient, err := p.providerManager.GetHTTPClient()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get HTTP client: %w", err)
|
|
}
|
|
|
|
// Get transaction receipt for logs
|
|
receipt, err := ethClient.TransactionReceipt(ctx, tx.Hash())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get receipt: %w", err)
|
|
}
|
|
|
|
// Analyze transaction data for direct DEX calls
|
|
if err := p.analyzeTransactionData(tx, receipt, opportunities); err != nil {
|
|
p.logger.Debug(fmt.Sprintf("Failed to analyze tx data for %s: %v", tx.Hash().Hex(), err))
|
|
}
|
|
|
|
// Analyze transaction logs for DEX events
|
|
if err := p.analyzeTransactionLogs(tx, receipt, opportunities); err != nil {
|
|
p.logger.Debug(fmt.Sprintf("Failed to analyze tx logs for %s: %v", tx.Hash().Hex(), err))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// analyzeTransactionData examines transaction input data for DEX calls
|
|
func (p *EnhancedSequencerParser) analyzeTransactionData(tx *types.Transaction, receipt *types.Receipt, opportunities *MEVOpportunities) error {
|
|
if tx.To() == nil || len(tx.Data()) < 4 {
|
|
return nil // No contract call or insufficient data
|
|
}
|
|
|
|
// Check if this is a call to a known DEX router
|
|
router := *tx.To()
|
|
protocol := p.identifyDEXProtocol(router)
|
|
if protocol == "" {
|
|
return nil // Not a known DEX
|
|
}
|
|
|
|
// Decode the swap transaction
|
|
swapData, err := p.abiDecoder.DecodeSwapTransaction(protocol, tx.Data())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
swapEvent, ok := swapData.(*SwapEvent)
|
|
if !ok {
|
|
return fmt.Errorf("invalid swap event type")
|
|
}
|
|
|
|
// Enrich swap event with transaction metadata
|
|
swapEvent.BlockNumber = receipt.BlockNumber.Uint64()
|
|
swapEvent.TxHash = tx.Hash().Hex()
|
|
swapEvent.Router = router
|
|
swapEvent.GasUsed = receipt.GasUsed
|
|
swapEvent.GasPrice = tx.GasPrice().String()
|
|
swapEvent.Sender = p.getSender(tx)
|
|
|
|
// Calculate price impact and MEV score
|
|
swapEvent.PriceImpact = p.calculatePriceImpact(swapEvent)
|
|
swapEvent.MEVScore = p.calculateMEVScore(swapEvent)
|
|
swapEvent.Profitable = swapEvent.MEVScore > 1.0
|
|
|
|
opportunities.SwapEvents = append(opportunities.SwapEvents, swapEvent)
|
|
return nil
|
|
}
|
|
|
|
// analyzeTransactionLogs examines transaction logs for DEX events
|
|
func (p *EnhancedSequencerParser) analyzeTransactionLogs(tx *types.Transaction, receipt *types.Receipt, opportunities *MEVOpportunities) error {
|
|
for _, log := range receipt.Logs {
|
|
// Check for Swap events from Uniswap V2/V3 style DEXs
|
|
if len(log.Topics) > 0 {
|
|
swapEvent := p.parseSwapLog(log)
|
|
if swapEvent != nil {
|
|
swapEvent.BlockNumber = receipt.BlockNumber.Uint64()
|
|
swapEvent.TxHash = tx.Hash().Hex()
|
|
swapEvent.GasUsed = receipt.GasUsed
|
|
swapEvent.GasPrice = tx.GasPrice().String()
|
|
swapEvent.Sender = p.getSender(tx)
|
|
|
|
// Calculate metrics
|
|
swapEvent.PriceImpact = p.calculatePriceImpact(swapEvent)
|
|
swapEvent.MEVScore = p.calculateMEVScore(swapEvent)
|
|
swapEvent.Profitable = swapEvent.MEVScore > 1.0
|
|
|
|
opportunities.SwapEvents = append(opportunities.SwapEvents, swapEvent)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// identifyDEXProtocol identifies which DEX protocol based on router address
|
|
func (p *EnhancedSequencerParser) identifyDEXProtocol(router common.Address) string {
|
|
// Known DEX router addresses on Arbitrum
|
|
knownRouters := map[common.Address]string{
|
|
common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"): "uniswap_v3", // Uniswap V3 Router
|
|
common.HexToAddress("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"): "uniswap_v2", // Uniswap V2 Router
|
|
common.HexToAddress("0x1b02dA8Cb0d097eB8D57A175b88c7D8b47997506"): "sushiswap", // SushiSwap Router
|
|
common.HexToAddress("0xc873fEcbd354f5A56E00E710B90EF4201db2448d"): "camelot", // Camelot Router
|
|
common.HexToAddress("0xBA12222222228d8Ba445958a75a0704d566BF2C8"): "balancer", // Balancer V2 Vault
|
|
common.HexToAddress("0x7f90122BF0700F9E7e1F688fe926940E8839F353"): "curve", // Curve Router
|
|
}
|
|
|
|
if protocol, exists := knownRouters[router]; exists {
|
|
return protocol
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// parseSwapLog parses swap events from transaction logs
|
|
func (p *EnhancedSequencerParser) parseSwapLog(log *types.Log) *SwapEvent {
|
|
// Uniswap V2 Swap event: Swap(address,uint256,uint256,uint256,uint256,address)
|
|
uniV2SwapSig := crypto.Keccak256Hash([]byte("Swap(address,uint256,uint256,uint256,uint256,address)"))
|
|
// Uniswap V3 Swap event: Swap(address,address,int256,int256,uint160,uint128,int24)
|
|
uniV3SwapSig := crypto.Keccak256Hash([]byte("Swap(address,address,int256,int256,uint160,uint128,int24)"))
|
|
|
|
if len(log.Topics) == 0 {
|
|
return nil
|
|
}
|
|
|
|
switch log.Topics[0] {
|
|
case uniV2SwapSig:
|
|
return p.parseUniV2SwapLog(log)
|
|
case uniV3SwapSig:
|
|
return p.parseUniV3SwapLog(log)
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// parseUniV2SwapLog parses Uniswap V2 style swap events
|
|
func (p *EnhancedSequencerParser) parseUniV2SwapLog(log *types.Log) *SwapEvent {
|
|
if len(log.Topics) < 3 || len(log.Data) < 128 {
|
|
return nil
|
|
}
|
|
|
|
// Extract data from log
|
|
sender := common.BytesToAddress(log.Topics[1].Bytes())
|
|
recipient := common.BytesToAddress(log.Topics[2].Bytes())
|
|
|
|
// Parse amounts from log data
|
|
amount0In := new(big.Int).SetBytes(log.Data[0:32])
|
|
amount1In := new(big.Int).SetBytes(log.Data[32:64])
|
|
amount0Out := new(big.Int).SetBytes(log.Data[64:96])
|
|
amount1Out := new(big.Int).SetBytes(log.Data[96:128])
|
|
|
|
// Determine swap direction and amounts
|
|
var tokenIn, tokenOut common.Address
|
|
var amountIn, amountOut *big.Int
|
|
|
|
if amount0In.Cmp(big.NewInt(0)) > 0 {
|
|
// Swapping token0 for token1
|
|
tokenIn = p.getToken0FromPool(log.Address)
|
|
tokenOut = p.getToken1FromPool(log.Address)
|
|
amountIn = amount0In
|
|
amountOut = amount1Out
|
|
} else {
|
|
// Swapping token1 for token0
|
|
tokenIn = p.getToken1FromPool(log.Address)
|
|
tokenOut = p.getToken0FromPool(log.Address)
|
|
amountIn = amount1In
|
|
amountOut = amount0Out
|
|
}
|
|
|
|
return &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "uniswap_v2",
|
|
Pool: log.Address,
|
|
TokenIn: tokenIn,
|
|
TokenOut: tokenOut,
|
|
AmountIn: amountIn,
|
|
AmountOut: amountOut,
|
|
Sender: sender,
|
|
Recipient: recipient,
|
|
}
|
|
}
|
|
|
|
// parseUniV3SwapLog parses Uniswap V3 swap events
|
|
func (p *EnhancedSequencerParser) parseUniV3SwapLog(log *types.Log) *SwapEvent {
|
|
if len(log.Topics) < 3 || len(log.Data) < 160 {
|
|
return nil
|
|
}
|
|
|
|
// Extract data from log
|
|
sender := common.BytesToAddress(log.Topics[1].Bytes())
|
|
recipient := common.BytesToAddress(log.Topics[2].Bytes())
|
|
|
|
// Parse amounts from log data (int256 values)
|
|
amount0 := new(big.Int).SetBytes(log.Data[0:32])
|
|
amount1 := new(big.Int).SetBytes(log.Data[32:64])
|
|
|
|
// Determine swap direction
|
|
var tokenIn, tokenOut common.Address
|
|
var amountIn, amountOut *big.Int
|
|
|
|
if amount0.Sign() > 0 {
|
|
// Positive amount0 means token0 is input
|
|
tokenIn = p.getToken0FromPool(log.Address)
|
|
tokenOut = p.getToken1FromPool(log.Address)
|
|
amountIn = amount0
|
|
amountOut = new(big.Int).Abs(amount1) // amount1 will be negative
|
|
} else {
|
|
// Negative amount0 means token1 is input
|
|
tokenIn = p.getToken1FromPool(log.Address)
|
|
tokenOut = p.getToken0FromPool(log.Address)
|
|
amountIn = new(big.Int).Abs(amount1)
|
|
amountOut = new(big.Int).Abs(amount0)
|
|
}
|
|
|
|
return &SwapEvent{
|
|
Timestamp: time.Now(),
|
|
Protocol: "uniswap_v3",
|
|
Pool: log.Address,
|
|
TokenIn: tokenIn,
|
|
TokenOut: tokenOut,
|
|
AmountIn: amountIn,
|
|
AmountOut: amountOut,
|
|
Sender: sender,
|
|
Recipient: recipient,
|
|
}
|
|
}
|
|
|
|
// Helper functions for token resolution and MEV detection
|
|
func (p *EnhancedSequencerParser) getToken0FromPool(poolAddress common.Address) common.Address {
|
|
// In production, this would query the pool contract for token0
|
|
// For now, return a placeholder that can be resolved later
|
|
return common.Address{}
|
|
}
|
|
|
|
func (p *EnhancedSequencerParser) getToken1FromPool(poolAddress common.Address) common.Address {
|
|
// In production, this would query the pool contract for token1
|
|
return common.Address{}
|
|
}
|
|
|
|
func (p *EnhancedSequencerParser) getSender(tx *types.Transaction) common.Address {
|
|
// Extract sender from transaction
|
|
signer := types.NewLondonSigner(tx.ChainId())
|
|
sender, _ := types.Sender(signer, tx)
|
|
return sender
|
|
}
|
|
|
|
func (p *EnhancedSequencerParser) calculatePriceImpact(swap *SwapEvent) float64 {
|
|
// Calculate price impact based on swap amounts
|
|
if swap.AmountIn.Cmp(big.NewInt(0)) == 0 {
|
|
return 0
|
|
}
|
|
// Simplified price impact calculation
|
|
ratio := new(big.Float).Quo(new(big.Float).SetInt(swap.AmountOut), new(big.Float).SetInt(swap.AmountIn))
|
|
result, _ := ratio.Float64()
|
|
return result
|
|
}
|
|
|
|
func (p *EnhancedSequencerParser) calculateMEVScore(swap *SwapEvent) float64 {
|
|
// Calculate MEV potential score
|
|
baseScore := swap.PriceImpact * 0.5
|
|
if swap.AmountIn.Cmp(big.NewInt(1e18)) > 0 { // Large swap
|
|
baseScore += 2.0
|
|
}
|
|
return baseScore
|
|
}
|
|
|
|
// detectArbitrageOpportunities finds arbitrage opportunities from swap events
|
|
func (p *EnhancedSequencerParser) detectArbitrageOpportunities(opportunities *MEVOpportunities) {
|
|
// Group swaps by token pairs
|
|
tokenPairs := make(map[string][]*SwapEvent)
|
|
for _, swap := range opportunities.SwapEvents {
|
|
pairKey := p.getPairKey(swap.TokenIn, swap.TokenOut)
|
|
tokenPairs[pairKey] = append(tokenPairs[pairKey], swap)
|
|
}
|
|
|
|
// Look for price differences between DEXs
|
|
for _, swaps := range tokenPairs {
|
|
if len(swaps) < 2 {
|
|
continue // Need at least 2 swaps for arbitrage
|
|
}
|
|
|
|
// Find price differences
|
|
for i, swap1 := range swaps {
|
|
for _, swap2 := range swaps[i+1:] {
|
|
if arb := p.calculateArbitrage(swap1, swap2); arb != nil {
|
|
opportunities.ArbitrageOps = append(opportunities.ArbitrageOps, arb)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *EnhancedSequencerParser) detectSandwichAttacks(opportunities *MEVOpportunities) {
|
|
// Detect potential sandwich attack opportunities
|
|
for _, swap := range opportunities.SwapEvents {
|
|
if swap.AmountIn.Cmp(big.NewInt(1e18)) > 0 && swap.PriceImpact > 0.01 {
|
|
// Large swap with significant price impact - sandwich opportunity
|
|
sandwich := &SandwichOpportunity{
|
|
TargetTx: swap.TxHash,
|
|
TokenIn: swap.TokenIn,
|
|
TokenOut: swap.TokenOut,
|
|
ExpectedProfit: p.calculateSandwichProfit(swap),
|
|
FrontrunAmount: new(big.Int).Div(swap.AmountIn, big.NewInt(10)), // 10% of target
|
|
BackrunAmount: new(big.Int).Div(swap.AmountIn, big.NewInt(10)),
|
|
MaxSlippage: 0.05, // 5% max slippage
|
|
GasCost: big.NewInt(300000), // Estimated gas for sandwich
|
|
ProfitMargin: swap.PriceImpact,
|
|
}
|
|
opportunities.SandwichOps = append(opportunities.SandwichOps, sandwich)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *EnhancedSequencerParser) detectLiquidationOpportunities(opportunities *MEVOpportunities) {
|
|
// Placeholder for liquidation detection
|
|
// Would analyze lending protocols for underwater positions
|
|
}
|
|
|
|
func (p *EnhancedSequencerParser) getPairKey(token0, token1 common.Address) string {
|
|
// Create canonical pair key
|
|
if bytes.Compare(token0.Bytes(), token1.Bytes()) > 0 {
|
|
return token1.Hex() + "-" + token0.Hex()
|
|
}
|
|
return token0.Hex() + "-" + token1.Hex()
|
|
}
|
|
|
|
func (p *EnhancedSequencerParser) calculateArbitrage(swap1, swap2 *SwapEvent) *pkgtypes.ArbitrageOpportunity {
|
|
// Calculate potential arbitrage between two swaps
|
|
priceDiff := math.Abs(swap1.PriceImpact - swap2.PriceImpact)
|
|
if priceDiff < 0.001 { // 0.1% minimum difference
|
|
return nil
|
|
}
|
|
|
|
// Estimate profit
|
|
baseAmount := big.NewInt(1e18) // 1 token
|
|
profit := new(big.Int).Mul(baseAmount, big.NewInt(safeConvertUint64ToInt64(uint64(priceDiff*1000))))
|
|
|
|
return &pkgtypes.ArbitrageOpportunity{
|
|
Path: []string{swap1.TokenIn.Hex(), swap1.TokenOut.Hex()},
|
|
Pools: []string{swap1.Pool.Hex(), swap2.Pool.Hex()},
|
|
AmountIn: baseAmount,
|
|
Profit: profit,
|
|
NetProfit: profit, // Simplified - in production would subtract gas
|
|
GasEstimate: big.NewInt(150000), // Estimated gas cost
|
|
ROI: priceDiff * 100, // Convert to percentage
|
|
Protocol: swap1.Protocol,
|
|
ExecutionTime: 5000, // 5 seconds
|
|
Confidence: 0.8,
|
|
PriceImpact: priceDiff,
|
|
MaxSlippage: 0.01, // 1% max slippage
|
|
TokenIn: swap1.TokenIn,
|
|
TokenOut: swap1.TokenOut,
|
|
Timestamp: time.Now().Unix(),
|
|
Risk: 0.3, // Medium risk
|
|
}
|
|
}
|
|
|
|
func (p *EnhancedSequencerParser) calculateSandwichProfit(swap *SwapEvent) *big.Int {
|
|
// Estimate sandwich attack profit
|
|
impact := big.NewFloat(swap.PriceImpact)
|
|
amount := new(big.Float).SetInt(swap.AmountIn)
|
|
profit := new(big.Float).Mul(impact, amount)
|
|
profit.Mul(profit, big.NewFloat(0.5)) // 50% capture rate
|
|
|
|
// CRITICAL FIX: Multiply by 1e18 before converting to preserve decimal precision
|
|
// Bug: Direct .Int(nil) conversion truncates all decimals, rejecting valid <1 wei profits
|
|
// Fix: Scale up to wei (multiply by 10^18) before conversion
|
|
weiMultiplier := new(big.Float).SetInt(new(big.Int).Exp(
|
|
big.NewInt(10),
|
|
big.NewInt(18),
|
|
nil,
|
|
))
|
|
profitWei := new(big.Float).Mul(profit, weiMultiplier)
|
|
|
|
result := new(big.Int)
|
|
profitWei.Int(result)
|
|
return result
|
|
}
|
|
|
|
// fetchBlockSafely attempts to fetch a block with robust error handling for different transaction types
|
|
func (p *EnhancedSequencerParser) fetchBlockSafely(ctx context.Context, blockNumber uint64) (*types.Block, error) {
|
|
// Get HTTP client from provider manager
|
|
ethClient, err := p.providerManager.GetHTTPClient()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get HTTP client: %w", err)
|
|
}
|
|
|
|
// First try to get block header only
|
|
blockNumBigInt := new(big.Int).SetUint64(blockNumber)
|
|
header, err := ethClient.HeaderByNumber(ctx, blockNumBigInt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch block header: %w", err)
|
|
}
|
|
|
|
// Try to get full block with transactions
|
|
block, err := ethClient.BlockByNumber(ctx, blockNumBigInt)
|
|
if err != nil {
|
|
// If full block fetch fails, we'll use the RPC client to get block data differently
|
|
return p.fetchBlockViaRPC(ctx, blockNumber, header)
|
|
}
|
|
|
|
return block, nil
|
|
}
|
|
|
|
// fetchBlockViaRPC fetches block data using RPC calls to handle unsupported transaction types
|
|
func (p *EnhancedSequencerParser) fetchBlockViaRPC(ctx context.Context, blockNumber uint64, header *types.Header) (*types.Block, error) {
|
|
p.logger.Debug(fmt.Sprintf("Using alternative transaction fetching for block %d", blockNumber))
|
|
|
|
// Get RPC client from provider manager
|
|
rpcClient, err := p.providerManager.GetRPCClient(false) // Prefer HTTP for this operation
|
|
if err != nil {
|
|
p.logger.Debug(fmt.Sprintf("Failed to get RPC client: %v", err))
|
|
return types.NewBlockWithHeader(header), nil
|
|
}
|
|
|
|
// Get transaction receipts directly instead of trying to parse transaction types
|
|
blockHex := fmt.Sprintf("0x%x", blockNumber)
|
|
|
|
// Use raw RPC call to get block with transaction hashes only
|
|
var blockData map[string]interface{}
|
|
err = rpcClient.CallContext(ctx, &blockData, "eth_getBlockByNumber", blockHex, false)
|
|
if err != nil {
|
|
p.logger.Debug(fmt.Sprintf("Failed to get block via RPC: %v", err))
|
|
return types.NewBlockWithHeader(header), nil
|
|
}
|
|
|
|
// Extract transaction hashes
|
|
txHashes, ok := blockData["transactions"].([]interface{})
|
|
if !ok || len(txHashes) == 0 {
|
|
return types.NewBlockWithHeader(header), nil
|
|
}
|
|
|
|
// Process transaction receipts to find DEX interactions
|
|
p.processTransactionReceipts(ctx, txHashes, blockNumber)
|
|
|
|
// Return header-only block since we processed transactions separately
|
|
return types.NewBlockWithHeader(header), nil
|
|
}
|
|
|
|
// processTransactionReceipts processes transaction receipts to find DEX swaps
|
|
func (p *EnhancedSequencerParser) processTransactionReceipts(ctx context.Context, txHashes []interface{}, blockNumber uint64) {
|
|
// Get HTTP client from provider manager
|
|
ethClient, err := p.providerManager.GetHTTPClient()
|
|
if err != nil {
|
|
p.logger.Debug(fmt.Sprintf("Failed to get HTTP client for receipts: %v", err))
|
|
return
|
|
}
|
|
|
|
for _, hashInterface := range txHashes {
|
|
txHashStr, ok := hashInterface.(string)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
txHash := common.HexToHash(txHashStr)
|
|
receipt, err := ethClient.TransactionReceipt(ctx, txHash)
|
|
if err != nil {
|
|
continue // Skip failed receipts
|
|
}
|
|
|
|
// Analyze logs for DEX events even without full transaction data
|
|
p.analyzeReceiptLogs(receipt, blockNumber)
|
|
}
|
|
}
|
|
|
|
// analyzeReceiptLogs analyzes transaction logs for DEX events
|
|
func (p *EnhancedSequencerParser) analyzeReceiptLogs(receipt *types.Receipt, blockNumber uint64) {
|
|
for _, log := range receipt.Logs {
|
|
if swapEvent := p.parseSwapLog(log); swapEvent != nil {
|
|
swapEvent.BlockNumber = blockNumber
|
|
swapEvent.TxHash = receipt.TxHash.Hex()
|
|
swapEvent.GasUsed = receipt.GasUsed
|
|
|
|
// Calculate basic metrics
|
|
swapEvent.PriceImpact = p.calculatePriceImpact(swapEvent)
|
|
swapEvent.MEVScore = p.calculateMEVScore(swapEvent)
|
|
swapEvent.Profitable = swapEvent.MEVScore > 1.0
|
|
|
|
// Log the detected swap
|
|
p.logger.Info(fmt.Sprintf("🔄 Detected %s swap: %s → %s, Amount: %s",
|
|
swapEvent.Protocol,
|
|
swapEvent.TokenIn.Hex()[:8],
|
|
swapEvent.TokenOut.Hex()[:8],
|
|
swapEvent.AmountIn.String()))
|
|
}
|
|
}
|
|
}
|