Files
mev-beta/pkg/arbitrum/parser/core.go

1347 lines
47 KiB
Go

package parser
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math"
"math/big"
"time"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/internal/utils"
"github.com/fraktal/mev-beta/internal/validation"
"github.com/fraktal/mev-beta/pkg/calldata"
"github.com/fraktal/mev-beta/pkg/transport"
pkgtypes "github.com/fraktal/mev-beta/pkg/types"
)
// safeConvertUint64ToInt64 safely converts a uint64 to int64, capping at MaxInt64 if overflow would occur
func safeConvertUint64ToInt64(v uint64) int64 {
if v > math.MaxInt64 {
return math.MaxInt64
}
return int64(v)
}
// safeConvertUint64ToInt safely converts a uint64 to int, capping at MaxInt32 if overflow would occur
func safeConvertUint64ToInt(v uint64) int {
if v > math.MaxInt32 {
return math.MaxInt32
}
return int(v)
}
// MarketDiscovery interface defines the methods needed from market discovery
type MarketDiscovery interface {
GetPoolCache() interface{} // Will return *arbitrum.PoolCache but using interface{} to prevent import cycle
}
// MEVStrategyEngine interface defines the methods needed from strategy engine
type MEVStrategyEngine interface {
AnalyzeArbitrageOpportunity(ctx context.Context, swapEvent interface{}) (interface{}, error) // Will return *arbitrum.ProfitableStrategy, error
}
// EventMonitor interface defines methods needed for event monitoring
type EventMonitor interface {
Start() error
Stop() error
GetStatistics() map[string]interface{}
}
// SwapEventPipeline interface defines methods needed for processing swap events
type SwapEventPipeline interface {
Start() error
Stop() error
GetStatistics() map[string]interface{}
ProcessSwap(swap interface{}) error
}
// ABIDecoder interface defines methods needed for ABI decoding
type ABIDecoder interface {
DecodeSwapTransaction(protocol string, data []byte) (interface{}, error)
CalculatePoolAddress(protocol, tokenA, tokenB string, fee interface{}) (common.Address, error)
}
// EnhancedSequencerParser provides comprehensive MEV event detection
type EnhancedSequencerParser struct {
providerManager *transport.ProviderManager // Manages multiple RPC providers with rotation
logger *logger.Logger
protocolRegistry interface{} // Will hold the required protocol registry
eventMonitor EventMonitor // Will hold EventMonitor implementation
swapPipeline SwapEventPipeline // Will hold SwapEventPipeline implementation
marketDiscovery MarketDiscovery // Interface for market discovery
// MEV opportunity detection
mevAnalyzer *MEVAnalyzer
profitCalculator *ProfitCalculator
abiDecoder ABIDecoder // Added for transaction decoding
arbitrageExecutor interface{} // Will hold *arbitrage.ArbitrageExecutor when available, but we avoid direct import to prevent cycle
capitalOptimizer interface{} // Will hold capital optimization component
profitTracker interface{} // Will hold profitability tracking component
// Event counters
swapCount uint64
liquidationCount uint64
liquidityCount uint64
}
// PoolStateUpdate represents a pool state change from a swap event
type PoolStateUpdate struct {
Pool common.Address
TokenIn common.Address
TokenOut common.Address
AmountIn *big.Int
AmountOut *big.Int
Timestamp time.Time
UpdateType string // "swap", "liquidity_add", "liquidity_remove"
}
// PoolPriceUpdate represents a pool price update
type PoolPriceUpdate struct {
Pool common.Address
TokenIn common.Address
TokenOut common.Address
NewPrice *big.Float
PriceImpact float64
Timestamp time.Time
}
// MEVAnalyzer analyzes transactions for MEV opportunities
type MEVAnalyzer struct {
minProfitThreshold *big.Int // Minimum profit in wei to consider
gasPrice *big.Int // Current gas price
maxGasLimit uint64 // Maximum gas we're willing to use
// Profit margins
arbitrageMargin float64 // Minimum profit margin for arbitrage
liquidationMargin float64 // Minimum profit margin for liquidations
sandwichMargin float64 // Minimum profit margin for sandwich attacks
}
// ProfitCalculator calculates potential MEV profits
type ProfitCalculator struct {
gasOracle GasOracle
priceOracle PriceOracle
poolOracle PoolOracle
}
// GasOracle provides gas price information
type GasOracle interface {
GetCurrentGasPrice() (*big.Int, error)
EstimateGasUsage(txType string, complexity int) (uint64, error)
}
// PriceOracle provides token price information
type PriceOracle interface {
GetTokenPrice(token common.Address) (*big.Int, error)
GetPriceImpact(tokenIn, tokenOut common.Address, amountIn *big.Int) (float64, error)
}
// PoolOracle provides pool liquidity and state information
type PoolOracle interface {
GetPoolLiquidity(pool common.Address) (*big.Int, error)
GetPoolState(pool common.Address) (interface{}, error)
}
// NewEnhancedSequencerParser creates a new enhanced parser with provider manager
func NewEnhancedSequencerParser(providerManager *transport.ProviderManager, logger *logger.Logger, poolCache interface{}, marketDiscovery MarketDiscovery, strategyEngine MEVStrategyEngine, arbitrageService interface{}) (*EnhancedSequencerParser, error) {
// Initialize MEV analyzer
mevAnalyzer := &MEVAnalyzer{
minProfitThreshold: big.NewInt(1000000000000000), // 0.001 ETH minimum profit
gasPrice: big.NewInt(100000000), // 0.1 gwei default
maxGasLimit: 500000, // 500k gas limit
arbitrageMargin: 0.05, // 5% minimum margin
liquidationMargin: 0.10, // 10% minimum margin
sandwichMargin: 0.02, // 2% minimum margin
}
// Initialize ABI decoder
decoder, err := NewABIDecoder()
if err != nil {
return nil, fmt.Errorf("failed to initialize ABI decoder: %w", err)
}
// Math calculator initialization removed to avoid dependency issues
// We're keeping the implementation minimal here to avoid circular imports
// The full initialization would happen in the main arbitrum package
parser := &EnhancedSequencerParser{
providerManager: providerManager,
logger: logger,
protocolRegistry: nil, // To be initialized properly
eventMonitor: nil, // To be initialized properly
swapPipeline: nil, // To be initialized properly
marketDiscovery: marketDiscovery,
mevAnalyzer: mevAnalyzer,
abiDecoder: decoder, // Using interface{}
}
logger.Info("Enhanced sequencer parser initialized with comprehensive MEV detection and event monitoring")
return parser, nil
}
// NewABIDecoder creates a new ABI decoder (using interface{} to avoid import issues)
func NewABIDecoder() (ABIDecoder, error) {
// Return a placeholder implementation
decoder := &sophisticatedABIDecoder{
protocolABIs: make(map[string]*abi.ABI),
logger: nil,
addressValidator: validation.NewAddressValidator(),
safeConverter: utils.NewSafeAddressConverter(),
}
return decoder, nil
}
// sophisticatedABIDecoder implements comprehensive ABI decoding for all DEX protocols
type sophisticatedABIDecoder struct {
protocolABIs map[string]*abi.ABI
logger *logger.Logger
addressValidator *validation.AddressValidator
safeConverter *utils.SafeAddressConverter
}
func (p *sophisticatedABIDecoder) DecodeSwapTransaction(protocol string, data []byte) (interface{}, error) {
if len(data) < 4 {
return nil, fmt.Errorf("transaction data too short")
}
// Extract method signature (first 4 bytes)
methodSig := data[:4]
txData := data[4:]
// Decode based on protocol and method signature
switch protocol {
case "uniswap_v3":
return p.decodeUniswapV3Swap(methodSig, txData)
case "uniswap_v2":
return p.decodeUniswapV2Swap(methodSig, txData)
case "sushiswap":
return p.decodeSushiSwap(methodSig, txData)
case "camelot":
return p.decodeCamelotSwap(methodSig, txData)
case "balancer":
return p.decodeBalancerSwap(methodSig, txData)
case "curve":
return p.decodeCurveSwap(methodSig, txData)
default:
return p.decodeGenericSwap(methodSig, txData)
}
}
// decodeUniswapV3Swap decodes Uniswap V3 swap transactions
func (p *sophisticatedABIDecoder) decodeUniswapV3Swap(methodSig []byte, data []byte) (interface{}, error) {
// exactInputSingle: 0x414bf389
// exactOutputSingle: 0xdb3e2198
// exactInput: 0xc04b8d59
// exactOutput: 0xf28c0498
switch {
case bytes.Equal(methodSig, []byte{0xac, 0x96, 0x50, 0xd8}), // multicall(uint256,bytes[])
bytes.Equal(methodSig, []byte{0x5a, 0xe4, 0x01, 0xdc}): // multicall(bytes[])
return p.decodeUniswapV3Multicall(data)
case bytes.Equal(methodSig, []byte{0x41, 0x4b, 0xf3, 0x89}): // exactInputSingle
return p.decodeExactInputSingle(data)
case bytes.Equal(methodSig, []byte{0xdb, 0x3e, 0x21, 0x98}): // exactOutputSingle
return p.decodeExactOutputSingle(data)
case bytes.Equal(methodSig, []byte{0xc0, 0x4b, 0x8d, 0x59}): // exactInput
return p.decodeExactInput(data)
case bytes.Equal(methodSig, []byte{0xf2, 0x8c, 0x04, 0x98}): // exactOutput
return p.decodeExactOutput(data)
default:
return nil, fmt.Errorf("unknown Uniswap V3 method signature: %x", methodSig)
}
}
func (p *sophisticatedABIDecoder) parseUniswapV3SingleSwapParams(data []byte) (tokenIn, tokenOut, recipient common.Address, fee, deadline, amountA, amountB *big.Int, err error) {
if len(data) < 224 {
err = fmt.Errorf("data too short for single swap")
return
}
// PHASE 5 FIX: Use safe address conversion for token extraction
rawTokenIn := common.BytesToAddress(data[12:32])
if p.addressValidator != nil {
validation := p.addressValidator.ValidateAddress(rawTokenIn.Hex())
if !validation.IsValid || validation.CorruptionScore > 30 {
err = fmt.Errorf("invalid tokenIn address: %s (corruption score: %d)", rawTokenIn.Hex(), validation.CorruptionScore)
return
}
}
tokenIn = rawTokenIn
rawTokenOut := common.BytesToAddress(data[44:64])
if p.addressValidator != nil {
validation := p.addressValidator.ValidateAddress(rawTokenOut.Hex())
if !validation.IsValid || validation.CorruptionScore > 30 {
err = fmt.Errorf("invalid tokenOut address: %s (corruption score: %d)", rawTokenOut.Hex(), validation.CorruptionScore)
return
}
}
tokenOut = rawTokenOut
rawRecipient := common.BytesToAddress(data[108:128])
if p.addressValidator != nil {
validation := p.addressValidator.ValidateAddress(rawRecipient.Hex())
if !validation.IsValid || validation.CorruptionScore > 30 {
err = fmt.Errorf("invalid recipient address: %s (corruption score: %d)", rawRecipient.Hex(), validation.CorruptionScore)
return
}
}
recipient = rawRecipient
fee = new(big.Int).SetBytes(data[64:96])
deadline = new(big.Int).SetBytes(data[128:160])
amountA = new(big.Int).SetBytes(data[160:192])
amountB = new(big.Int).SetBytes(data[192:224])
return
}
// decodeExactInputSingle decodes exactInputSingle parameters
func (p *sophisticatedABIDecoder) decodeExactInputSingle(data []byte) (*SwapEvent, error) {
tokenIn, tokenOut, recipient, fee, deadline, amountIn, amountOutMinimum, err := p.parseUniswapV3SingleSwapParams(data)
if err != nil {
return nil, fmt.Errorf("error parsing exactInputSingle: %w", err)
}
return &SwapEvent{
Timestamp: time.Now(),
Protocol: "uniswap_v3",
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: amountIn,
AmountOut: amountOutMinimum,
Recipient: recipient,
Fee: fee.Uint64(),
Deadline: deadline.Uint64(),
}, nil
}
// decodeUniswapV2Swap decodes Uniswap V2 style swaps
func (p *sophisticatedABIDecoder) decodeUniswapV2Swap(methodSig []byte, data []byte) (interface{}, error) {
// swapExactTokensForTokens: 0x38ed1739
// swapTokensForExactTokens: 0x8803dbee
// swapExactETHForTokens: 0x7ff36ab5
// swapTokensForExactETH: 0x4a25d94a
switch {
case bytes.Equal(methodSig, []byte{0x38, 0xed, 0x17, 0x39}):
return p.decodeSwapExactTokensForTokens(data)
case bytes.Equal(methodSig, []byte{0x88, 0x03, 0xdb, 0xee}):
return p.decodeSwapTokensForExactTokens(data)
default:
return nil, fmt.Errorf("unknown Uniswap V2 method signature: %x", methodSig)
}
}
func (p *sophisticatedABIDecoder) CalculatePoolAddress(protocol, tokenA, tokenB string, fee interface{}) (common.Address, error) {
// PHASE 5 FIX: Use safe address conversion for pool calculation
var tokenAAddr, tokenBAddr common.Address
if p.safeConverter != nil {
result := p.safeConverter.SafeHexToAddress(tokenA)
if !result.IsValid {
return common.Address{}, fmt.Errorf("invalid tokenA address: %s (%v)", tokenA, result.Error)
}
tokenAAddr = result.Address
result = p.safeConverter.SafeHexToAddress(tokenB)
if !result.IsValid {
return common.Address{}, fmt.Errorf("invalid tokenB address: %s (%v)", tokenB, result.Error)
}
tokenBAddr = result.Address
} else {
// Fallback to unsafe conversion if safe converter not available
tokenAAddr = common.HexToAddress(tokenA)
tokenBAddr = common.HexToAddress(tokenB)
}
// Ensure token ordering (token0 < token1)
if bytes.Compare(tokenAAddr.Bytes(), tokenBAddr.Bytes()) > 0 {
tokenAAddr, tokenBAddr = tokenBAddr, tokenAAddr
}
switch protocol {
case "uniswap_v3":
return p.calculateUniswapV3PoolAddress(tokenAAddr, tokenBAddr, fee)
case "uniswap_v2", "sushiswap":
return p.calculateUniswapV2PoolAddress(tokenAAddr, tokenBAddr, protocol)
case "camelot":
return p.calculateCamelotPoolAddress(tokenAAddr, tokenBAddr)
case "balancer":
return p.calculateBalancerPoolAddress(tokenAAddr, tokenBAddr)
default:
return common.Address{}, fmt.Errorf("unsupported protocol: %s", protocol)
}
}
// calculateUniswapV3PoolAddress calculates Uniswap V3 pool address using CREATE2
func (p *sophisticatedABIDecoder) calculateUniswapV3PoolAddress(token0, token1 common.Address, fee interface{}) (common.Address, error) {
feeInt, ok := fee.(int)
if !ok {
return common.Address{}, fmt.Errorf("invalid fee type for Uniswap V3")
}
// Uniswap V3 Factory on Arbitrum: 0x1F98431c8aD98523631AE4a59f267346ea31F984
factory := common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984")
initCodeHash := common.HexToHash("0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54")
// Encode (address token0, address token1, uint24 fee)
encoded := make([]byte, 0, 96)
encoded = append(encoded, token0.Bytes()...)
encoded = append(encoded, token1.Bytes()...)
feeBytes := make([]byte, 32)
binary.BigEndian.PutUint32(feeBytes[28:], uint32(feeInt))
encoded = append(encoded, feeBytes...)
salt := crypto.Keccak256(encoded)
return p.computeCREATE2Address(factory, salt, initCodeHash), nil
}
// computeCREATE2Address computes CREATE2 address
func (p *sophisticatedABIDecoder) computeCREATE2Address(factory common.Address, salt []byte, initCodeHash common.Hash) common.Address {
data := make([]byte, 0, 85)
data = append(data, 0xff)
data = append(data, factory.Bytes()...)
data = append(data, salt...)
data = append(data, initCodeHash.Bytes()...)
hash := crypto.Keccak256(data)
return common.BytesToAddress(hash[12:])
}
// ParseBlockForMEV analyzes a block for all MEV opportunities
func (p *EnhancedSequencerParser) ParseBlockForMEV(ctx context.Context, blockNumber uint64) (*MEVOpportunities, error) {
startTime := time.Now()
// Create opportunities structure early to ensure we always return something
opportunities := &MEVOpportunities{
BlockNumber: blockNumber,
Timestamp: time.Now(),
SwapEvents: make([]*SwapEvent, 0),
LiquidationEvents: make([]*LiquidationEvent, 0),
LiquidityEvents: make([]*LiquidityEvent, 0),
ArbitrageOps: make([]*pkgtypes.ArbitrageOpportunity, 0),
SandwichOps: make([]*SandwichOpportunity, 0),
LiquidationOps: make([]*LiquidationOpportunity, 0),
}
// Try to fetch block with more robust error handling
block, err := p.fetchBlockSafely(ctx, blockNumber)
if err != nil {
p.logger.Debug(fmt.Sprintf("Could not fetch block %d: %v", blockNumber, err))
// Return empty opportunities instead of failing completely
opportunities.ProcessingTime = time.Since(startTime)
return opportunities, nil
}
// Update timestamp with actual block time
blockTime := block.Time()
if blockTime > math.MaxInt64 {
return nil, fmt.Errorf("block timestamp %d exceeds maximum int64 value", blockTime)
}
opportunities.Timestamp = time.Unix(safeConvertUint64ToInt64(blockTime), 0)
// Process all transactions in the block
for _, tx := range block.Transactions() {
if err := p.analyzeTransaction(ctx, tx, opportunities); err != nil {
p.logger.Debug(fmt.Sprintf("Failed to analyze transaction %s: %v", tx.Hash().Hex(), err))
continue
}
}
// Detect MEV opportunities from parsed events
p.detectArbitrageOpportunities(opportunities)
p.detectSandwichAttacks(opportunities)
p.detectLiquidationOpportunities(opportunities)
// Calculate overall MEV potential for the block
opportunities.TotalProfit = p.calculateTotalProfit(opportunities)
opportunities.ProcessingTime = time.Since(startTime)
// Log comprehensive block analysis results
p.logger.Info(fmt.Sprintf("Block %d: Processed %d swaps, %d arbitrage ops, %d liquidations, %d MEV opportunities (%.2f ms)",
blockNumber, len(opportunities.SwapEvents), len(opportunities.ArbitrageOps),
len(opportunities.LiquidationOps), len(opportunities.SandwichOps),
float64(opportunities.ProcessingTime.Nanoseconds())/1e6))
return opportunities, nil
}
// GetStatistics returns parser statistics
func (p *EnhancedSequencerParser) GetStatistics() map[string]interface{} {
count := 0
if p.protocolRegistry != nil {
// In a complete implementation, we would have a way to count active protocols
// For now, just return a placeholder
count = 0
}
return map[string]interface{}{
"swaps_detected": p.swapCount,
"liquidations_detected": p.liquidationCount,
"liquidity_events": p.liquidityCount,
"active_protocols": count,
}
}
// Close closes the parser and all associated resources
func (p *EnhancedSequencerParser) Close() error {
// In a complete implementation, we would properly close all resources
// For now, return nil to avoid issues with the interface
return nil
}
// calculateTotalProfit calculates the total profit from all opportunities
func (p *EnhancedSequencerParser) calculateTotalProfit(opportunities *MEVOpportunities) *big.Int {
total := big.NewInt(0)
for _, op := range opportunities.ArbitrageOps {
if op.Profit != nil {
total.Add(total, op.Profit)
}
}
for _, op := range opportunities.SandwichOps {
if op.ExpectedProfit != nil {
total.Add(total, op.ExpectedProfit)
}
}
for _, op := range opportunities.LiquidationOps {
if op.ExpectedProfit != nil {
total.Add(total, op.ExpectedProfit)
}
}
return total
}
// calculateROI calculates the return on investment as a percentage
func (p *EnhancedSequencerParser) calculateROI(profit, investment *big.Int) float64 {
if investment == nil || investment.Cmp(big.NewInt(0)) == 0 {
return 0.0
}
// Convert to float64 for percentage calculation
profitFloat := new(big.Float).SetInt(profit)
investmentFloat := new(big.Float).SetInt(investment)
// Calculate ROI = (profit / investment) * 100
roi := new(big.Float).Quo(profitFloat, investmentFloat)
roi.Mul(roi, big.NewFloat(100))
result, _ := roi.Float64()
return result
}
// Helper functions for sophisticated ABI decoder
// decodeExactOutputSingle decodes exactOutputSingle parameters
func (p *sophisticatedABIDecoder) decodeExactOutputSingle(data []byte) (*SwapEvent, error) {
tokenIn, tokenOut, recipient, fee, deadline, amountOut, amountInMaximum, err := p.parseUniswapV3SingleSwapParams(data)
if err != nil {
return nil, fmt.Errorf("error parsing exactOutputSingle: %w", err)
}
return &SwapEvent{
Timestamp: time.Now(),
Protocol: "uniswap_v3",
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: amountInMaximum,
AmountOut: amountOut,
Recipient: recipient,
Fee: fee.Uint64(),
Deadline: deadline.Uint64(),
}, nil
}
// decodeExactInput decodes multi-hop exactInput
func (p *sophisticatedABIDecoder) decodeExactInput(data []byte) (*SwapEvent, error) {
// Multi-hop swaps have path encoding
if len(data) < 160 {
return nil, fmt.Errorf("data too short for exactInput")
}
// PHASE 5 FIX: Validate recipient address in multi-hop swaps
rawRecipient := common.BytesToAddress(data[12:32])
if p.addressValidator != nil {
validation := p.addressValidator.ValidateAddress(rawRecipient.Hex())
if !validation.IsValid || validation.CorruptionScore > 30 {
return nil, fmt.Errorf("invalid recipient address in exactInput: %s (corruption score: %d)", rawRecipient.Hex(), validation.CorruptionScore)
}
}
recipient := rawRecipient
deadline := new(big.Int).SetBytes(data[32:64])
amountIn := new(big.Int).SetBytes(data[64:96])
amountOutMinimum := new(big.Int).SetBytes(data[96:128])
// Extract first and last tokens from path (simplified)
pathOffset := new(big.Int).SetBytes(data[128:160]).Uint64()
if len(data) < safeConvertUint64ToInt(pathOffset)+64 {
return nil, fmt.Errorf("invalid path data")
}
// PHASE 5 FIX: Validate tokenIn in multi-hop path
rawTokenIn := common.BytesToAddress(data[pathOffset+12 : pathOffset+32])
if p.addressValidator != nil {
validation := p.addressValidator.ValidateAddress(rawTokenIn.Hex())
if !validation.IsValid || validation.CorruptionScore > 30 {
return nil, fmt.Errorf("invalid tokenIn in exactInput path: %s (corruption score: %d)", rawTokenIn.Hex(), validation.CorruptionScore)
}
}
tokenIn := rawTokenIn
// For multi-hop, we'd need to parse the full path, simplified here
tokenOut := common.Address{} // Would extract from end of path
return &SwapEvent{
Timestamp: time.Now(),
Protocol: "uniswap_v3",
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: amountIn,
AmountOut: amountOutMinimum,
Recipient: recipient,
Deadline: deadline.Uint64(),
}, nil
}
// decodeExactOutput decodes multi-hop exactOutput
func (p *sophisticatedABIDecoder) decodeExactOutput(data []byte) (*SwapEvent, error) {
return p.decodeExactInput(data) // Similar structure, different semantics
}
// decodeUniswapV3Multicall decodes multicall payloads by inspecting embedded calls
func (p *sophisticatedABIDecoder) decodeUniswapV3Multicall(data []byte) (interface{}, error) {
if len(data) == 0 {
return nil, fmt.Errorf("empty payload for Uniswap V3 multicall")
}
calls, err := calldata.DecodeMulticallCalls(data)
if err != nil {
return nil, fmt.Errorf("failed to decode multicall calls: %w", err)
}
for _, call := range calls {
if len(call) < 4 {
continue
}
nested, err := p.decodeUniswapV3Swap(call[:4], call[4:])
if err != nil {
continue
}
if swap, ok := nested.(*SwapEvent); ok {
return swap, nil
}
}
// Fallback: derive tokens directly if no nested call decoded successfully
tokens, err := calldata.ExtractTokensFromMulticallWithContext(data, &calldata.MulticallContext{
Protocol: "uniswap_v3",
Stage: "arbitrum.parser.decodeUniswapV3Multicall",
})
if err != nil {
return nil, fmt.Errorf("failed to extract tokens from multicall: %w", err)
}
if len(tokens) == 0 {
return nil, fmt.Errorf("no recognizable swaps found in multicall payload")
}
swap := &SwapEvent{
Timestamp: time.Now(),
Protocol: "uniswap_v3",
TokenIn: tokens[0],
TokenOut: common.Address{},
AmountIn: big.NewInt(0),
AmountOut: big.NewInt(0),
}
if len(tokens) > 1 {
swap.TokenOut = tokens[1]
}
return swap, nil
}
// decodeSushiSwap decodes SushiSwap transactions
func (p *sophisticatedABIDecoder) decodeSushiSwap(methodSig []byte, data []byte) (interface{}, error) {
// SushiSwap uses Uniswap V2 style interface
return p.decodeUniswapV2Swap(methodSig, data)
}
// decodeCamelotSwap decodes Camelot DEX transactions
func (p *sophisticatedABIDecoder) decodeCamelotSwap(methodSig []byte, data []byte) (interface{}, error) {
// Camelot has similar interface to Uniswap V2 with some modifications
return p.decodeUniswapV2Swap(methodSig, data)
}
// decodeBalancerSwap decodes Balancer V2 transactions
func (p *sophisticatedABIDecoder) decodeBalancerSwap(methodSig []byte, data []byte) (interface{}, error) {
// Balancer has different structure - batchSwap, swap, etc.
switch {
case bytes.Equal(methodSig, []byte{0x94, 0x5b, 0xcf, 0x7c}): // batchSwap
return p.decodeBalancerBatchSwap(data)
case bytes.Equal(methodSig, []byte{0x52, 0xba, 0xb9, 0x02}): // swap
return p.decodeBalancerSingleSwap(data)
default:
return nil, fmt.Errorf("unknown Balancer method signature: %x", methodSig)
}
}
// decodeCurveSwap decodes Curve finance transactions
func (p *sophisticatedABIDecoder) decodeCurveSwap(methodSig []byte, data []byte) (interface{}, error) {
// Curve has exchange, exchange_underlying methods
switch {
case bytes.Equal(methodSig, []byte{0x3d, 0xf0, 0x21, 0x24}): // exchange
return p.decodeCurveExchange(data)
case bytes.Equal(methodSig, []byte{0xa6, 0x41, 0x7e, 0xd6}): // exchange_underlying
return p.decodeCurveExchangeUnderlying(data)
default:
return nil, fmt.Errorf("unknown Curve method signature: %x", methodSig)
}
}
// decodeGenericSwap attempts to decode unknown protocols
func (p *sophisticatedABIDecoder) decodeGenericSwap(methodSig []byte, data []byte) (interface{}, error) {
// Generic decoder for unknown protocols
// Try to extract basic swap information using common patterns
if len(data) < 64 {
return nil, fmt.Errorf("insufficient data for generic decode")
}
return &SwapEvent{
Timestamp: time.Now(),
Protocol: "unknown",
TokenIn: common.Address{},
TokenOut: common.Address{},
AmountIn: big.NewInt(0),
AmountOut: big.NewInt(0),
}, nil
}
// Additional helper functions for specific decoders
func (p *sophisticatedABIDecoder) decodeSwapExactTokensForTokens(data []byte) (*SwapEvent, error) {
if len(data) < 160 {
return nil, fmt.Errorf("data too short for swapExactTokensForTokens")
}
amountIn := new(big.Int).SetBytes(data[0:32])
amountOutMin := new(big.Int).SetBytes(data[32:64])
pathOffset := new(big.Int).SetBytes(data[64:96]).Uint64()
// PHASE 5 FIX: Validate recipient address in V2 swaps
rawRecipient := common.BytesToAddress(data[108:128])
if p.addressValidator != nil {
validation := p.addressValidator.ValidateAddress(rawRecipient.Hex())
if !validation.IsValid || validation.CorruptionScore > 30 {
return nil, fmt.Errorf("invalid recipient address in V2 swap: %s (corruption score: %d)", rawRecipient.Hex(), validation.CorruptionScore)
}
}
recipient := rawRecipient
deadline := new(big.Int).SetBytes(data[128:160])
// Extract tokens from path
if len(data) < safeConvertUint64ToInt(pathOffset)+64 {
return nil, fmt.Errorf("invalid path data")
}
// PHASE 5 FIX: Validate token addresses in V2 swap path
rawTokenIn := common.BytesToAddress(data[pathOffset+12 : pathOffset+32])
if p.addressValidator != nil {
validation := p.addressValidator.ValidateAddress(rawTokenIn.Hex())
if !validation.IsValid || validation.CorruptionScore > 30 {
return nil, fmt.Errorf("invalid tokenIn in V2 swap path: %s (corruption score: %d)", rawTokenIn.Hex(), validation.CorruptionScore)
}
}
tokenIn := rawTokenIn
rawTokenOut := common.BytesToAddress(data[pathOffset+44 : pathOffset+64])
if p.addressValidator != nil {
validation := p.addressValidator.ValidateAddress(rawTokenOut.Hex())
if !validation.IsValid || validation.CorruptionScore > 30 {
return nil, fmt.Errorf("invalid tokenOut in V2 swap path: %s (corruption score: %d)", rawTokenOut.Hex(), validation.CorruptionScore)
}
}
tokenOut := rawTokenOut
return &SwapEvent{
Timestamp: time.Now(),
Protocol: "uniswap_v2",
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: amountIn,
AmountOut: amountOutMin,
Recipient: recipient,
Deadline: deadline.Uint64(),
}, nil
}
func (p *sophisticatedABIDecoder) decodeSwapTokensForExactTokens(data []byte) (*SwapEvent, error) {
// Similar to above but with different semantics
return p.decodeSwapExactTokensForTokens(data)
}
func (p *sophisticatedABIDecoder) calculateUniswapV2PoolAddress(token0, token1 common.Address, protocol string) (common.Address, error) {
var factory common.Address
var initCodeHash common.Hash
switch protocol {
case "uniswap_v2":
factory = common.HexToAddress("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f")
initCodeHash = common.HexToHash("0x96e8ac4277198ff8b6f785478aa9a39f403cb768dd02cbee326c3e7da348845f")
case "sushiswap":
factory = common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4")
initCodeHash = common.HexToHash("0xe18a34eb0e04b04f7a0ac29a6e80748dca96319b42c54d679cb821dca90c6303")
default:
return common.Address{}, fmt.Errorf("unsupported Uniswap V2 protocol: %s", protocol)
}
salt := crypto.Keccak256(append(token0.Bytes(), token1.Bytes()...))
return p.computeCREATE2Address(factory, salt, initCodeHash), nil
}
func (p *sophisticatedABIDecoder) calculateCamelotPoolAddress(token0, token1 common.Address) (common.Address, error) {
factory := common.HexToAddress("0x6EcCab422D763aC031210895C81787E87B91425E")
initCodeHash := common.HexToHash("0xa856464ae65f7619087bc369daaf7e387dae1e5af69cfa7935850ebf754b04c1")
salt := crypto.Keccak256(append(token0.Bytes(), token1.Bytes()...))
return p.computeCREATE2Address(factory, salt, initCodeHash), nil
}
func (p *sophisticatedABIDecoder) calculateBalancerPoolAddress(token0, token1 common.Address) (common.Address, error) {
// Balancer uses a different pool creation mechanism
// This is a simplified implementation
return common.Address{}, fmt.Errorf("Balancer pool address calculation not yet implemented")
}
func (p *sophisticatedABIDecoder) decodeBalancerBatchSwap(data []byte) (*SwapEvent, error) {
// Balancer batchSwap is complex - simplified here
return &SwapEvent{
Timestamp: time.Now(),
Protocol: "balancer",
TokenIn: common.Address{},
TokenOut: common.Address{},
AmountIn: big.NewInt(0),
AmountOut: big.NewInt(0),
}, nil
}
func (p *sophisticatedABIDecoder) decodeBalancerSingleSwap(data []byte) (*SwapEvent, error) {
// Balancer single swap decoder
return p.decodeBalancerBatchSwap(data)
}
func (p *sophisticatedABIDecoder) decodeCurveExchange(data []byte) (*SwapEvent, error) {
if len(data) < 96 {
return nil, fmt.Errorf("data too short for Curve exchange")
}
i := new(big.Int).SetBytes(data[0:32])
j := new(big.Int).SetBytes(data[32:64])
dx := new(big.Int).SetBytes(data[64:96])
return &SwapEvent{
Timestamp: time.Now(),
Protocol: "curve",
TokenIn: common.Address{}, // Would need to resolve from pool and index
TokenOut: common.Address{}, // Would need to resolve from pool and index
AmountIn: dx,
AmountOut: big.NewInt(0), // Would need to calculate
CurveI: i.Uint64(),
CurveJ: j.Uint64(),
}, nil
}
func (p *sophisticatedABIDecoder) decodeCurveExchangeUnderlying(data []byte) (*SwapEvent, error) {
// Similar to exchange but with underlying tokens
return p.decodeCurveExchange(data)
}
// analyzeTransaction examines a transaction for DEX interactions and MEV opportunities
func (p *EnhancedSequencerParser) analyzeTransaction(ctx context.Context, tx *types.Transaction, opportunities *MEVOpportunities) error {
// Get HTTP client from provider manager
ethClient, err := p.providerManager.GetHTTPClient()
if err != nil {
return fmt.Errorf("failed to get HTTP client: %w", err)
}
// Get transaction receipt for logs
receipt, err := ethClient.TransactionReceipt(ctx, tx.Hash())
if err != nil {
return fmt.Errorf("failed to get receipt: %w", err)
}
// Analyze transaction data for direct DEX calls
if err := p.analyzeTransactionData(tx, receipt, opportunities); err != nil {
p.logger.Debug(fmt.Sprintf("Failed to analyze tx data for %s: %v", tx.Hash().Hex(), err))
}
// Analyze transaction logs for DEX events
if err := p.analyzeTransactionLogs(tx, receipt, opportunities); err != nil {
p.logger.Debug(fmt.Sprintf("Failed to analyze tx logs for %s: %v", tx.Hash().Hex(), err))
}
return nil
}
// analyzeTransactionData examines transaction input data for DEX calls
func (p *EnhancedSequencerParser) analyzeTransactionData(tx *types.Transaction, receipt *types.Receipt, opportunities *MEVOpportunities) error {
if tx.To() == nil || len(tx.Data()) < 4 {
return nil // No contract call or insufficient data
}
// Check if this is a call to a known DEX router
router := *tx.To()
protocol := p.identifyDEXProtocol(router)
if protocol == "" {
return nil // Not a known DEX
}
// Decode the swap transaction
swapData, err := p.abiDecoder.DecodeSwapTransaction(protocol, tx.Data())
if err != nil {
return err
}
swapEvent, ok := swapData.(*SwapEvent)
if !ok {
return fmt.Errorf("invalid swap event type")
}
// Enrich swap event with transaction metadata
swapEvent.BlockNumber = receipt.BlockNumber.Uint64()
swapEvent.TxHash = tx.Hash().Hex()
swapEvent.Router = router
swapEvent.GasUsed = receipt.GasUsed
swapEvent.GasPrice = tx.GasPrice().String()
swapEvent.Sender = p.getSender(tx)
// Calculate price impact and MEV score
swapEvent.PriceImpact = p.calculatePriceImpact(swapEvent)
swapEvent.MEVScore = p.calculateMEVScore(swapEvent)
swapEvent.Profitable = swapEvent.MEVScore > 1.0
opportunities.SwapEvents = append(opportunities.SwapEvents, swapEvent)
return nil
}
// analyzeTransactionLogs examines transaction logs for DEX events
func (p *EnhancedSequencerParser) analyzeTransactionLogs(tx *types.Transaction, receipt *types.Receipt, opportunities *MEVOpportunities) error {
for _, log := range receipt.Logs {
// Check for Swap events from Uniswap V2/V3 style DEXs
if len(log.Topics) > 0 {
swapEvent := p.parseSwapLog(log)
if swapEvent != nil {
swapEvent.BlockNumber = receipt.BlockNumber.Uint64()
swapEvent.TxHash = tx.Hash().Hex()
swapEvent.GasUsed = receipt.GasUsed
swapEvent.GasPrice = tx.GasPrice().String()
swapEvent.Sender = p.getSender(tx)
// Calculate metrics
swapEvent.PriceImpact = p.calculatePriceImpact(swapEvent)
swapEvent.MEVScore = p.calculateMEVScore(swapEvent)
swapEvent.Profitable = swapEvent.MEVScore > 1.0
opportunities.SwapEvents = append(opportunities.SwapEvents, swapEvent)
}
}
}
return nil
}
// identifyDEXProtocol identifies which DEX protocol based on router address
func (p *EnhancedSequencerParser) identifyDEXProtocol(router common.Address) string {
// Known DEX router addresses on Arbitrum
knownRouters := map[common.Address]string{
common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"): "uniswap_v3", // Uniswap V3 Router
common.HexToAddress("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"): "uniswap_v2", // Uniswap V2 Router
common.HexToAddress("0x1b02dA8Cb0d097eB8D57A175b88c7D8b47997506"): "sushiswap", // SushiSwap Router
common.HexToAddress("0xc873fEcbd354f5A56E00E710B90EF4201db2448d"): "camelot", // Camelot Router
common.HexToAddress("0xBA12222222228d8Ba445958a75a0704d566BF2C8"): "balancer", // Balancer V2 Vault
common.HexToAddress("0x7f90122BF0700F9E7e1F688fe926940E8839F353"): "curve", // Curve Router
}
if protocol, exists := knownRouters[router]; exists {
return protocol
}
return ""
}
// parseSwapLog parses swap events from transaction logs
func (p *EnhancedSequencerParser) parseSwapLog(log *types.Log) *SwapEvent {
// Uniswap V2 Swap event: Swap(address,uint256,uint256,uint256,uint256,address)
uniV2SwapSig := crypto.Keccak256Hash([]byte("Swap(address,uint256,uint256,uint256,uint256,address)"))
// Uniswap V3 Swap event: Swap(address,address,int256,int256,uint160,uint128,int24)
uniV3SwapSig := crypto.Keccak256Hash([]byte("Swap(address,address,int256,int256,uint160,uint128,int24)"))
if len(log.Topics) == 0 {
return nil
}
switch log.Topics[0] {
case uniV2SwapSig:
return p.parseUniV2SwapLog(log)
case uniV3SwapSig:
return p.parseUniV3SwapLog(log)
default:
return nil
}
}
// parseUniV2SwapLog parses Uniswap V2 style swap events
func (p *EnhancedSequencerParser) parseUniV2SwapLog(log *types.Log) *SwapEvent {
if len(log.Topics) < 3 || len(log.Data) < 128 {
return nil
}
// Extract data from log
sender := common.BytesToAddress(log.Topics[1].Bytes())
recipient := common.BytesToAddress(log.Topics[2].Bytes())
// Parse amounts from log data
amount0In := new(big.Int).SetBytes(log.Data[0:32])
amount1In := new(big.Int).SetBytes(log.Data[32:64])
amount0Out := new(big.Int).SetBytes(log.Data[64:96])
amount1Out := new(big.Int).SetBytes(log.Data[96:128])
// Determine swap direction and amounts
var tokenIn, tokenOut common.Address
var amountIn, amountOut *big.Int
if amount0In.Cmp(big.NewInt(0)) > 0 {
// Swapping token0 for token1
tokenIn = p.getToken0FromPool(log.Address)
tokenOut = p.getToken1FromPool(log.Address)
amountIn = amount0In
amountOut = amount1Out
} else {
// Swapping token1 for token0
tokenIn = p.getToken1FromPool(log.Address)
tokenOut = p.getToken0FromPool(log.Address)
amountIn = amount1In
amountOut = amount0Out
}
return &SwapEvent{
Timestamp: time.Now(),
Protocol: "uniswap_v2",
Pool: log.Address,
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: amountIn,
AmountOut: amountOut,
Sender: sender,
Recipient: recipient,
}
}
// parseUniV3SwapLog parses Uniswap V3 swap events
func (p *EnhancedSequencerParser) parseUniV3SwapLog(log *types.Log) *SwapEvent {
if len(log.Topics) < 3 || len(log.Data) < 160 {
return nil
}
// Extract data from log
sender := common.BytesToAddress(log.Topics[1].Bytes())
recipient := common.BytesToAddress(log.Topics[2].Bytes())
// Parse amounts from log data (int256 values)
amount0 := new(big.Int).SetBytes(log.Data[0:32])
amount1 := new(big.Int).SetBytes(log.Data[32:64])
// Determine swap direction
var tokenIn, tokenOut common.Address
var amountIn, amountOut *big.Int
if amount0.Sign() > 0 {
// Positive amount0 means token0 is input
tokenIn = p.getToken0FromPool(log.Address)
tokenOut = p.getToken1FromPool(log.Address)
amountIn = amount0
amountOut = new(big.Int).Abs(amount1) // amount1 will be negative
} else {
// Negative amount0 means token1 is input
tokenIn = p.getToken1FromPool(log.Address)
tokenOut = p.getToken0FromPool(log.Address)
amountIn = new(big.Int).Abs(amount1)
amountOut = new(big.Int).Abs(amount0)
}
return &SwapEvent{
Timestamp: time.Now(),
Protocol: "uniswap_v3",
Pool: log.Address,
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: amountIn,
AmountOut: amountOut,
Sender: sender,
Recipient: recipient,
}
}
// Helper functions for token resolution and MEV detection
func (p *EnhancedSequencerParser) getToken0FromPool(poolAddress common.Address) common.Address {
// In production, this would query the pool contract for token0
// For now, return a placeholder that can be resolved later
return common.Address{}
}
func (p *EnhancedSequencerParser) getToken1FromPool(poolAddress common.Address) common.Address {
// In production, this would query the pool contract for token1
return common.Address{}
}
func (p *EnhancedSequencerParser) getSender(tx *types.Transaction) common.Address {
// Extract sender from transaction
signer := types.NewLondonSigner(tx.ChainId())
sender, _ := types.Sender(signer, tx)
return sender
}
func (p *EnhancedSequencerParser) calculatePriceImpact(swap *SwapEvent) float64 {
// Calculate price impact based on swap amounts
if swap.AmountIn.Cmp(big.NewInt(0)) == 0 {
return 0
}
// Simplified price impact calculation
ratio := new(big.Float).Quo(new(big.Float).SetInt(swap.AmountOut), new(big.Float).SetInt(swap.AmountIn))
result, _ := ratio.Float64()
return result
}
func (p *EnhancedSequencerParser) calculateMEVScore(swap *SwapEvent) float64 {
// Calculate MEV potential score
baseScore := swap.PriceImpact * 0.5
if swap.AmountIn.Cmp(big.NewInt(1e18)) > 0 { // Large swap
baseScore += 2.0
}
return baseScore
}
// detectArbitrageOpportunities finds arbitrage opportunities from swap events
func (p *EnhancedSequencerParser) detectArbitrageOpportunities(opportunities *MEVOpportunities) {
// Group swaps by token pairs
tokenPairs := make(map[string][]*SwapEvent)
for _, swap := range opportunities.SwapEvents {
pairKey := p.getPairKey(swap.TokenIn, swap.TokenOut)
tokenPairs[pairKey] = append(tokenPairs[pairKey], swap)
}
// Look for price differences between DEXs
for _, swaps := range tokenPairs {
if len(swaps) < 2 {
continue // Need at least 2 swaps for arbitrage
}
// Find price differences
for i, swap1 := range swaps {
for _, swap2 := range swaps[i+1:] {
if arb := p.calculateArbitrage(swap1, swap2); arb != nil {
opportunities.ArbitrageOps = append(opportunities.ArbitrageOps, arb)
}
}
}
}
}
func (p *EnhancedSequencerParser) detectSandwichAttacks(opportunities *MEVOpportunities) {
// Detect potential sandwich attack opportunities
for _, swap := range opportunities.SwapEvents {
if swap.AmountIn.Cmp(big.NewInt(1e18)) > 0 && swap.PriceImpact > 0.01 {
// Large swap with significant price impact - sandwich opportunity
sandwich := &SandwichOpportunity{
TargetTx: swap.TxHash,
TokenIn: swap.TokenIn,
TokenOut: swap.TokenOut,
ExpectedProfit: p.calculateSandwichProfit(swap),
FrontrunAmount: new(big.Int).Div(swap.AmountIn, big.NewInt(10)), // 10% of target
BackrunAmount: new(big.Int).Div(swap.AmountIn, big.NewInt(10)),
MaxSlippage: 0.05, // 5% max slippage
GasCost: big.NewInt(300000), // Estimated gas for sandwich
ProfitMargin: swap.PriceImpact,
}
opportunities.SandwichOps = append(opportunities.SandwichOps, sandwich)
}
}
}
func (p *EnhancedSequencerParser) detectLiquidationOpportunities(opportunities *MEVOpportunities) {
// Placeholder for liquidation detection
// Would analyze lending protocols for underwater positions
}
func (p *EnhancedSequencerParser) getPairKey(token0, token1 common.Address) string {
// Create canonical pair key
if bytes.Compare(token0.Bytes(), token1.Bytes()) > 0 {
return token1.Hex() + "-" + token0.Hex()
}
return token0.Hex() + "-" + token1.Hex()
}
func (p *EnhancedSequencerParser) calculateArbitrage(swap1, swap2 *SwapEvent) *pkgtypes.ArbitrageOpportunity {
// Calculate potential arbitrage between two swaps
priceDiff := math.Abs(swap1.PriceImpact - swap2.PriceImpact)
if priceDiff < 0.001 { // 0.1% minimum difference
return nil
}
// Estimate profit
baseAmount := big.NewInt(1e18) // 1 token
profit := new(big.Int).Mul(baseAmount, big.NewInt(safeConvertUint64ToInt64(uint64(priceDiff*1000))))
return &pkgtypes.ArbitrageOpportunity{
Path: []string{swap1.TokenIn.Hex(), swap1.TokenOut.Hex()},
Pools: []string{swap1.Pool.Hex(), swap2.Pool.Hex()},
AmountIn: baseAmount,
Profit: profit,
NetProfit: profit, // Simplified - in production would subtract gas
GasEstimate: big.NewInt(150000), // Estimated gas cost
ROI: priceDiff * 100, // Convert to percentage
Protocol: swap1.Protocol,
ExecutionTime: 5000, // 5 seconds
Confidence: 0.8,
PriceImpact: priceDiff,
MaxSlippage: 0.01, // 1% max slippage
TokenIn: swap1.TokenIn,
TokenOut: swap1.TokenOut,
Timestamp: time.Now().Unix(),
Risk: 0.3, // Medium risk
}
}
func (p *EnhancedSequencerParser) calculateSandwichProfit(swap *SwapEvent) *big.Int {
// Estimate sandwich attack profit
impact := big.NewFloat(swap.PriceImpact)
amount := new(big.Float).SetInt(swap.AmountIn)
profit := new(big.Float).Mul(impact, amount)
profit.Mul(profit, big.NewFloat(0.5)) // 50% capture rate
// CRITICAL FIX: Multiply by 1e18 before converting to preserve decimal precision
// Bug: Direct .Int(nil) conversion truncates all decimals, rejecting valid <1 wei profits
// Fix: Scale up to wei (multiply by 10^18) before conversion
weiMultiplier := new(big.Float).SetInt(new(big.Int).Exp(
big.NewInt(10),
big.NewInt(18),
nil,
))
profitWei := new(big.Float).Mul(profit, weiMultiplier)
result := new(big.Int)
profitWei.Int(result)
return result
}
// fetchBlockSafely attempts to fetch a block with robust error handling for different transaction types
func (p *EnhancedSequencerParser) fetchBlockSafely(ctx context.Context, blockNumber uint64) (*types.Block, error) {
// Get HTTP client from provider manager
ethClient, err := p.providerManager.GetHTTPClient()
if err != nil {
return nil, fmt.Errorf("failed to get HTTP client: %w", err)
}
// First try to get block header only
blockNumBigInt := new(big.Int).SetUint64(blockNumber)
header, err := ethClient.HeaderByNumber(ctx, blockNumBigInt)
if err != nil {
return nil, fmt.Errorf("failed to fetch block header: %w", err)
}
// Try to get full block with transactions
block, err := ethClient.BlockByNumber(ctx, blockNumBigInt)
if err != nil {
// If full block fetch fails, we'll use the RPC client to get block data differently
return p.fetchBlockViaRPC(ctx, blockNumber, header)
}
return block, nil
}
// fetchBlockViaRPC fetches block data using RPC calls to handle unsupported transaction types
func (p *EnhancedSequencerParser) fetchBlockViaRPC(ctx context.Context, blockNumber uint64, header *types.Header) (*types.Block, error) {
p.logger.Debug(fmt.Sprintf("Using alternative transaction fetching for block %d", blockNumber))
// Get RPC client from provider manager
rpcClient, err := p.providerManager.GetRPCClient(false) // Prefer HTTP for this operation
if err != nil {
p.logger.Debug(fmt.Sprintf("Failed to get RPC client: %v", err))
return types.NewBlockWithHeader(header), nil
}
// Get transaction receipts directly instead of trying to parse transaction types
blockHex := fmt.Sprintf("0x%x", blockNumber)
// Use raw RPC call to get block with transaction hashes only
var blockData map[string]interface{}
err = rpcClient.CallContext(ctx, &blockData, "eth_getBlockByNumber", blockHex, false)
if err != nil {
p.logger.Debug(fmt.Sprintf("Failed to get block via RPC: %v", err))
return types.NewBlockWithHeader(header), nil
}
// Extract transaction hashes
txHashes, ok := blockData["transactions"].([]interface{})
if !ok || len(txHashes) == 0 {
return types.NewBlockWithHeader(header), nil
}
// Process transaction receipts to find DEX interactions
p.processTransactionReceipts(ctx, txHashes, blockNumber)
// Return header-only block since we processed transactions separately
return types.NewBlockWithHeader(header), nil
}
// processTransactionReceipts processes transaction receipts to find DEX swaps
func (p *EnhancedSequencerParser) processTransactionReceipts(ctx context.Context, txHashes []interface{}, blockNumber uint64) {
// Get HTTP client from provider manager
ethClient, err := p.providerManager.GetHTTPClient()
if err != nil {
p.logger.Debug(fmt.Sprintf("Failed to get HTTP client for receipts: %v", err))
return
}
for _, hashInterface := range txHashes {
txHashStr, ok := hashInterface.(string)
if !ok {
continue
}
txHash := common.HexToHash(txHashStr)
receipt, err := ethClient.TransactionReceipt(ctx, txHash)
if err != nil {
continue // Skip failed receipts
}
// Analyze logs for DEX events even without full transaction data
p.analyzeReceiptLogs(receipt, blockNumber)
}
}
// analyzeReceiptLogs analyzes transaction logs for DEX events
func (p *EnhancedSequencerParser) analyzeReceiptLogs(receipt *types.Receipt, blockNumber uint64) {
for _, log := range receipt.Logs {
if swapEvent := p.parseSwapLog(log); swapEvent != nil {
swapEvent.BlockNumber = blockNumber
swapEvent.TxHash = receipt.TxHash.Hex()
swapEvent.GasUsed = receipt.GasUsed
// Calculate basic metrics
swapEvent.PriceImpact = p.calculatePriceImpact(swapEvent)
swapEvent.MEVScore = p.calculateMEVScore(swapEvent)
swapEvent.Profitable = swapEvent.MEVScore > 1.0
// Log the detected swap
p.logger.Info(fmt.Sprintf("🔄 Detected %s swap: %s → %s, Amount: %s",
swapEvent.Protocol,
swapEvent.TokenIn.Hex()[:8],
swapEvent.TokenOut.Hex()[:8],
swapEvent.AmountIn.String()))
}
}
}