Files
mev-beta/pkg/monitor/concurrent.go
Administrator e79e0d960d feat: add pool cache adapter and strict event validation
- Created PoolCacheAdapter to wrap PoolDiscovery for EventParser
- Updated ArbitrumMonitor to pass pool cache to parser via NewEventParserFull
- Added strict validation to reject events with zero addresses
- Added strict validation to reject events with zero amounts
- Parser now uses discovered pools from cache for token enrichment

This ensures zero addresses and zero amounts NEVER reach the scanner.
Events with invalid data are logged and rejected at the monitor level.

Changes:
- pkg/pools/pool_cache_adapter.go: New adapter implementing PoolCache interface
- pkg/monitor/concurrent.go: Pool cache integration and validation logic

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:03:28 +01:00

1485 lines
49 KiB
Go

package monitor
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/time/rate"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/internal/ratelimit"
"github.com/fraktal/mev-beta/pkg/arbitrum"
parserpkg "github.com/fraktal/mev-beta/pkg/arbitrum/parser"
"github.com/fraktal/mev-beta/pkg/calldata"
pkgerrors "github.com/fraktal/mev-beta/pkg/errors"
"github.com/fraktal/mev-beta/pkg/events"
"github.com/fraktal/mev-beta/pkg/market"
"github.com/fraktal/mev-beta/pkg/oracle"
"github.com/fraktal/mev-beta/pkg/pools"
"github.com/fraktal/mev-beta/pkg/scanner"
arbitragetypes "github.com/fraktal/mev-beta/pkg/types"
"github.com/fraktal/mev-beta/pkg/uniswap"
)
// safeConvertInt64ToUint64 safely converts an int64 to uint64, ensuring no negative values
func safeConvertInt64ToUint64(v int64) uint64 {
if v < 0 {
return 0
}
return uint64(v)
}
// ArbitrumMonitor monitors the Arbitrum sequencer for transactions with concurrency support
type ArbitrumMonitor struct {
config *config.ArbitrumConfig
botConfig *config.BotConfig
client *ethclient.Client
connectionManager *arbitrum.ConnectionManager
l2Parser *arbitrum.ArbitrumL2Parser
logger *logger.Logger
rateLimiter *ratelimit.LimiterManager
marketMgr *market.MarketManager
scanner *scanner.Scanner
pipeline *market.Pipeline
fanManager *market.FanManager
eventParser *events.EventParser // CRITICAL FIX: Add event parser for direct receipt parsing
// coordinator *orchestrator.MEVCoordinator // Removed to avoid import cycle
limiter *rate.Limiter
pollInterval time.Duration
running bool
mu sync.RWMutex
transactionChannel chan interface{}
lastHealthCheck time.Time
opportunityExecutor *parserpkg.Executor
}
var (
payloadCaptureDir string
payloadCaptureOnce sync.Once
)
// NewArbitrumMonitor creates a new Arbitrum monitor with rate limiting
func NewArbitrumMonitor(
arbCfg *config.ArbitrumConfig,
botCfg *config.BotConfig,
logger *logger.Logger,
rateLimiter *ratelimit.LimiterManager,
marketMgr *market.MarketManager,
scanner *scanner.Scanner,
) (*ArbitrumMonitor, error) {
logger.Info("🏁 STARTING NewArbitrumMonitor CREATION - Enhanced parser integration will begin")
// Early check before any processing
if arbCfg == nil {
logger.Error("❌ arbCfg is nil")
return nil, fmt.Errorf("arbCfg is nil")
}
if logger == nil {
fmt.Println("❌ logger is nil - using printf")
return nil, fmt.Errorf("logger is nil")
}
logger.Info("🔧 Parameters validated - proceeding with monitor creation")
// Create Ethereum client with connection manager for retry and fallback support
ctx := context.Background()
connectionManager := arbitrum.NewConnectionManager(arbCfg, logger)
rateLimitedClient, err := connectionManager.GetClientWithRetry(ctx, 3)
if err != nil {
return nil, fmt.Errorf("failed to connect to Arbitrum node with retries: %v", err)
}
client := rateLimitedClient.Client
// Create price oracle for L2 parser
priceOracle := oracle.NewPriceOracle(client, logger)
// Create L2 parser for Arbitrum transaction parsing
l2Parser, err := arbitrum.NewArbitrumL2Parser(arbCfg.RPCEndpoint, logger, priceOracle)
if err != nil {
return nil, fmt.Errorf("failed to create L2 parser: %v", err)
}
// Create rate limiter based on config
limiter := rate.NewLimiter(
rate.Limit(arbCfg.RateLimit.RequestsPerSecond),
arbCfg.RateLimit.Burst,
)
// Create pipeline
pipeline := market.NewPipeline(botCfg, logger, marketMgr, scanner, client)
// Add default stages
pipeline.AddDefaultStages()
// Create fan manager
fanManager := market.NewFanManager(
&config.Config{
Arbitrum: *arbCfg,
Bot: *botCfg,
},
logger,
rateLimiter,
)
// CRITICAL FIX: Create enhanced event parser with L2 token extraction
logger.Info("🔧 CREATING ENHANCED EVENT PARSER WITH L2 TOKEN EXTRACTION")
if l2Parser == nil {
logger.Error("❌ L2 PARSER IS NULL - Cannot create enhanced event parser")
return nil, fmt.Errorf("L2 parser is null, cannot create enhanced event parser")
}
logger.Info("✅ L2 PARSER AVAILABLE - Creating pool discovery for cache...")
// Create raw RPC client for pool discovery
poolRPCClient, err := rpc.Dial(arbCfg.RPCEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to create RPC client for pool discovery: %w", err)
}
// Create pool discovery for caching discovered pools
poolDiscovery := pools.NewPoolDiscovery(poolRPCClient, logger)
// Create pool cache adapter to provide PoolCache interface
poolCacheAdapter := pools.NewPoolCacheAdapter(poolDiscovery)
logger.Info("✅ POOL CACHE ADAPTER CREATED - Creating enhanced event parser...")
// Create enhanced event parser with pool cache support
enhancedEventParser := events.NewEventParserFull(logger, l2Parser, poolCacheAdapter)
if enhancedEventParser == nil {
logger.Error("❌ ENHANCED EVENT PARSER CREATION FAILED")
return nil, fmt.Errorf("enhanced event parser creation failed")
}
logger.Info("✅ ENHANCED EVENT PARSER CREATED SUCCESSFULLY WITH POOL CACHE")
logger.Info("🔄 INJECTING ENHANCED PARSER INTO PIPELINE...")
// Inject enhanced parser into pipeline to avoid import cycle
pipeline.SetEnhancedEventParser(enhancedEventParser)
logger.Info("🎯 ENHANCED PARSER INJECTION COMPLETED")
// Create MEV coordinator - removed to avoid import cycle
// coordinator := orchestrator.NewMEVCoordinator(
// &config.Config{
// Arbitrum: *arbCfg,
// Bot: *botCfg,
// },
// logger,
// eventParser,
// poolDiscovery,
// marketMgr,
// scanner,
// )
return &ArbitrumMonitor{
config: arbCfg,
botConfig: botCfg,
client: client,
connectionManager: connectionManager,
l2Parser: l2Parser,
logger: logger,
rateLimiter: rateLimiter,
marketMgr: marketMgr,
scanner: scanner,
pipeline: pipeline,
fanManager: fanManager,
eventParser: enhancedEventParser, // CRITICAL FIX: Store event parser for direct receipt parsing
// coordinator: coordinator, // Removed to avoid import cycle
limiter: limiter,
pollInterval: time.Duration(botCfg.PollingInterval) * time.Second,
running: false,
transactionChannel: make(chan interface{}, 50000), // Initialize with large buffer for Arbitrum's high TX volume
lastHealthCheck: time.Now(),
}, nil
}
// SetOpportunityExecutor wires an arbitrage executor that receives detected opportunities.
func (m *ArbitrumMonitor) SetOpportunityExecutor(executor *parserpkg.Executor) {
m.mu.Lock()
defer m.mu.Unlock()
m.opportunityExecutor = executor
}
// Start begins monitoring the Arbitrum sequencer
func (m *ArbitrumMonitor) Start(ctx context.Context) error {
m.mu.Lock()
m.running = true
m.mu.Unlock()
m.logger.Info("Starting Arbitrum sequencer monitoring...")
// Start the MEV coordinator pipeline - removed to avoid import cycle
// if err := m.coordinator.Start(); err != nil {
// return fmt.Errorf("failed to start MEV coordinator: %w", err)
// }
// Get the latest block to start from
if err := m.rateLimiter.WaitForLimit(ctx, m.config.RPCEndpoint); err != nil {
return fmt.Errorf("rate limit error: %v", err)
}
header, err := m.client.HeaderByNumber(ctx, nil)
if err != nil {
return fmt.Errorf("failed to get latest block header: %v", err)
}
lastBlock := header.Number.Uint64()
m.logger.Info(fmt.Sprintf("Starting from block: %d", lastBlock))
// Subscribe to DEX events for real-time monitoring
if err := m.subscribeToDEXEvents(ctx); err != nil {
m.logger.Warn(fmt.Sprintf("Failed to subscribe to DEX events: %v", err))
} else {
m.logger.Info("Subscribed to DEX events")
}
// Start transaction processor goroutine
go m.processTransactionChannel(ctx)
// Start connection health checker
go m.checkConnectionHealth(ctx)
for {
m.mu.RLock()
running := m.running
m.mu.RUnlock()
if !running {
break
}
select {
case <-ctx.Done():
m.Stop()
return nil
case <-time.After(m.pollInterval):
// Get the latest block
if err := m.rateLimiter.WaitForLimit(ctx, m.config.RPCEndpoint); err != nil {
m.logger.Error(fmt.Sprintf("Rate limit error: %v", err))
continue
}
header, err := m.client.HeaderByNumber(ctx, nil)
if err != nil {
m.logger.Error(fmt.Sprintf("Failed to get latest block header: %v", err))
continue
}
currentBlock := header.Number.Uint64()
// Process blocks from lastBlock+1 to currentBlock
for blockNum := lastBlock + 1; blockNum <= currentBlock; blockNum++ {
if err := m.processBlock(ctx, blockNum); err != nil {
m.logger.Error(fmt.Sprintf("Failed to process block %d: %v", blockNum, err))
}
}
lastBlock = currentBlock
}
}
return nil
}
// Stop stops the monitor
func (m *ArbitrumMonitor) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
m.running = false
m.logger.Info("Stopping Arbitrum monitor...")
}
// processBlock processes a single block for potential swap transactions with enhanced L2 parsing
func (m *ArbitrumMonitor) processBlock(ctx context.Context, blockNumber uint64) error {
startTime := time.Now()
m.logger.Debug(fmt.Sprintf("Processing block %d", blockNumber))
// Wait for rate limiter
if err := m.rateLimiter.WaitForLimit(ctx, m.config.RPCEndpoint); err != nil {
return fmt.Errorf("rate limit error: %v", err)
}
// Get block using L2 parser to bypass transaction type issues
rpcStart := time.Now()
l2Block, err := m.l2Parser.GetBlockByNumber(ctx, blockNumber)
rpcDuration := time.Since(rpcStart)
// Log RPC performance
errorMsg := ""
if err != nil {
errorMsg = err.Error()
}
m.logger.RPC(m.config.RPCEndpoint, "GetBlockByNumber", rpcDuration, err == nil, errorMsg)
if err != nil {
m.logger.Error(fmt.Sprintf("Failed to get L2 block %d: %v", blockNumber, err))
return fmt.Errorf("failed to get L2 block %d: %v", blockNumber, err)
}
// Parse DEX transactions from the block
parseStart := time.Now()
dexTransactions := m.l2Parser.ParseDEXTransactions(ctx, l2Block)
parseDuration := time.Since(parseStart)
// Calculate proper parsing rate (transactions per second of parsing time)
parseRateTPS := float64(len(l2Block.Transactions)) / parseDuration.Seconds()
// Log parsing performance
m.logger.Performance("monitor", "parse_dex_transactions", parseDuration, map[string]interface{}{
"block_number": blockNumber,
"total_txs": len(l2Block.Transactions),
"dex_txs": len(dexTransactions),
"parse_rate_txs_per_sec": parseRateTPS, // This is parsing throughput, not network TPS
"parse_duration_ms": parseDuration.Milliseconds(),
})
m.logger.Info(fmt.Sprintf("Block %d: Processing %d transactions, found %d DEX transactions",
blockNumber, len(l2Block.Transactions), len(dexTransactions)))
// Log block processing metrics
m.logger.BlockProcessing(blockNumber, len(l2Block.Transactions), len(dexTransactions), time.Since(startTime))
// Process DEX transactions
if len(dexTransactions) > 0 {
m.logger.Info(fmt.Sprintf("Block %d contains %d DEX transactions:", blockNumber, len(dexTransactions)))
for i, dexTx := range dexTransactions {
m.logger.Info(fmt.Sprintf(" [%d] %s: %s -> %s (%s) calling %s (%s)",
i+1, dexTx.Hash, dexTx.From, dexTx.To, dexTx.ContractName,
dexTx.FunctionName, dexTx.Protocol))
// CRITICAL FIX: Parse swap events from transaction receipt
txHashBytes := common.HexToHash(dexTx.Hash)
receipt, err := m.client.TransactionReceipt(ctx, txHashBytes)
if err != nil {
m.logger.Debug(fmt.Sprintf("Failed to fetch receipt for DEX tx %s: %v", dexTx.Hash, err))
} else if receipt != nil {
// Parse events from receipt using event parser
timestamp := safeConvertInt64ToUint64(time.Now().Unix())
parsedEvents, err := m.eventParser.ParseTransactionReceipt(receipt, blockNumber, timestamp)
if err != nil {
m.logger.Debug(fmt.Sprintf("Failed to parse events from receipt %s: %v", dexTx.Hash, err))
} else if len(parsedEvents) > 0 {
m.logger.Info(fmt.Sprintf("✅ Parsed %d events from DEX tx %s", len(parsedEvents), dexTx.Hash))
// Submit each parsed event directly to the scanner
for _, event := range parsedEvents {
if event != nil {
// Log submission (will be enriched by scanner before processing)
m.logger.Info(fmt.Sprintf("📤 Submitting event: Type=%s, Pool=%s, Tokens=%s↔%s",
event.Type.String(), event.PoolAddress.Hex()[:10],
event.Token0.Hex()[:10], event.Token1.Hex()[:10]))
// Submit to scanner for arbitrage analysis
// Note: Scanner will enrich event with token addresses from cache if missing
m.scanner.SubmitEvent(*event)
}
}
}
}
// Also send to pipeline for compatibility
standardizedTx := m.convertToStandardFormat(&dexTx)
if standardizedTx != nil {
// Send to pipeline for arbitrage analysis
select {
case m.transactionChannel <- standardizedTx:
// Successfully sent to pipeline
default:
// Channel full, log warning
m.logger.Warn(fmt.Sprintf("Transaction pipeline full, dropping tx %s", dexTx.Hash))
}
}
}
}
// If no DEX transactions found, report empty block
if len(dexTransactions) == 0 {
if len(l2Block.Transactions) == 0 {
m.logger.Info(fmt.Sprintf("Block %d: Empty block", blockNumber))
} else {
m.logger.Info(fmt.Sprintf("Block %d: No DEX transactions found in %d total transactions",
blockNumber, len(l2Block.Transactions)))
}
}
return nil
}
// checkConnectionHealth periodically checks and maintains connection health
func (m *ArbitrumMonitor) checkConnectionHealth(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
m.logger.Error(fmt.Sprintf("Panic in connection health checker: %v", r))
}
}()
healthCheckInterval := 30 * time.Second
ticker := time.NewTicker(healthCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
m.logger.Info("Connection health checker shutting down")
return
case <-ticker.C:
m.performHealthCheck(ctx)
}
}
}
// performHealthCheck checks connection health and reconnects if necessary
func (m *ArbitrumMonitor) performHealthCheck(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()
// Skip health check if too recent
if time.Since(m.lastHealthCheck) < 10*time.Second {
return
}
// Test connection by trying to get latest block
testCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err := m.client.HeaderByNumber(testCtx, nil)
if err != nil {
m.logger.Warn(fmt.Sprintf("Connection health check failed: %v, attempting reconnection", err))
// Attempt to get a new healthy client
rateLimitedClient, reconnectErr := m.connectionManager.GetClientWithRetry(ctx, 3)
if reconnectErr != nil {
m.logger.Error(fmt.Sprintf("Failed to reconnect: %v", reconnectErr))
} else {
// Close old client and update to new one
m.client.Close()
m.client = rateLimitedClient.Client
m.logger.Info("Successfully reconnected to Arbitrum RPC")
}
}
m.lastHealthCheck = time.Now()
}
// processTransactionChannel processes transactions from the transaction channel
func (m *ArbitrumMonitor) processTransactionChannel(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
m.logger.Error(fmt.Sprintf("Panic in transaction processor: %v", r))
}
}()
for {
select {
case <-ctx.Done():
m.logger.Info("Transaction processor shutting down")
return
case tx := <-m.transactionChannel:
if tx != nil {
if err := m.processChannelTransaction(ctx, tx); err != nil {
m.logger.Debug(fmt.Sprintf("Error processing transaction from channel: %v", err))
}
}
}
}
}
// processChannelTransaction processes a single transaction from the channel
func (m *ArbitrumMonitor) processChannelTransaction(ctx context.Context, tx interface{}) error {
// Convert transaction to the expected format and process through pipeline
if txMap, ok := tx.(map[string]interface{}); ok {
// Log transaction processing
if hash, exists := txMap["hash"]; exists {
m.logger.Debug(fmt.Sprintf("Processing transaction from pipeline: %s", hash))
}
// Process transaction through the scanner for arbitrage analysis
// This bypasses the full pipeline since we already have the parsed DEX data
if err := m.processTransactionMap(ctx, txMap); err != nil {
return fmt.Errorf("transaction processing error: %w", err)
}
}
return nil
}
// processTransactionMap processes a transaction map for arbitrage opportunities
func (m *ArbitrumMonitor) processTransactionMap(ctx context.Context, txMap map[string]interface{}) error {
// Extract basic transaction info
hash, _ := txMap["hash"].(string)
protocol, _ := txMap["protocol"].(string)
functionName, _ := txMap["function"].(string)
m.logger.Debug(fmt.Sprintf("Analyzing transaction %s: %s.%s", hash, protocol, functionName))
// DISABLED: This legacy code creates incomplete events with zero addresses
// Events should only be created from DEXTransaction objects with valid SwapDetails
// The L2 parser (processTransaction) handles event creation properly
//
// Leaving this as a no-op to avoid breaking the transaction channel flow
// but preventing submission of incomplete events
m.logger.Debug(fmt.Sprintf("Skipping legacy event creation for %s - events created by L2 parser instead", hash))
return nil
}
// getUint64 safely extracts a uint64 from a map
func getUint64(m map[string]interface{}, key string) uint64 {
if val, ok := m[key]; ok {
switch v := val.(type) {
case uint64:
return v
case int64:
if v < 0 {
return 0
}
return uint64(v)
case int:
if v < 0 {
return 0
}
return uint64(v)
case float64:
if v < 0 {
return 0
}
return uint64(v)
}
}
return 0
}
// subscribeToDEXEvents subscribes to DEX contract events for real-time monitoring
func (m *ArbitrumMonitor) subscribeToDEXEvents(ctx context.Context) error {
// Define official DEX contract addresses for Arbitrum mainnet
dexContracts := []struct {
Address common.Address
Name string
}{
// Official Arbitrum DEX Factories
{common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"), "UniswapV2Factory"}, // Official Uniswap V2 Factory
{common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"), "UniswapV3Factory"}, // Official Uniswap V3 Factory
{common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"), "SushiSwapFactory"}, // Official SushiSwap V2 Factory
// Official Arbitrum DEX Routers
{common.HexToAddress("0x4752ba5dbc23f44d87826276bf6fd6b1c372ad24"), "UniswapV2Router02"}, // Official Uniswap V2 Router02
{common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"), "UniswapV3Router"}, // Official Uniswap V3 SwapRouter
{common.HexToAddress("0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45"), "UniswapV3Router02"}, // Official Uniswap V3 SwapRouter02
{common.HexToAddress("0x1b02dA8Cb0d097eB8D57A175b88c7D8b47997506"), "SushiSwapRouter"}, // Official SushiSwap Router
{common.HexToAddress("0xC36442b4a4522E871399CD717aBDD847Ab11FE88"), "UniswapV3PositionManager"}, // Official Position Manager
// Additional official routers
{common.HexToAddress("0xa51afafe0263b40edaef0df8781ea9aa03e381a3"), "UniversalRouter"}, // Universal Router
{common.HexToAddress("0x4C60051384bd2d3C01bfc845Cf5F4b44bcbE9de5"), "GMX Router"}, // GMX DEX Router
// Popular Arbitrum pools (verified high volume pools)
{common.HexToAddress("0xC6962004f452bE9203591991D15f6b388e09E8D0"), "USDC/WETH UniswapV3 0.05%"}, // High volume pool
{common.HexToAddress("0x17c14D2c404D167802b16C450d3c99F88F2c4F4d"), "USDC/WETH UniswapV3 0.3%"}, // High volume pool
{common.HexToAddress("0x2f5e87C9312fa29aed5c179E456625D79015299c"), "WBTC/WETH UniswapV3 0.05%"}, // High volume pool
{common.HexToAddress("0x149e36E72726e0BceA5c59d40df2c43F60f5A22D"), "WBTC/WETH UniswapV3 0.3%"}, // High volume pool
{common.HexToAddress("0x641C00A822e8b671738d32a431a4Fb6074E5c79d"), "USDT/WETH UniswapV3 0.05%"}, // High volume pool
{common.HexToAddress("0xFe7D6a84287235C7b4b57C4fEb9a44d4C6Ed3BB8"), "ARB/WETH UniswapV3 0.05%"}, // ARB native token pool
}
// Define common DEX event signatures
eventSignatures := []common.Hash{
common.HexToHash("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"), // Swap (Uniswap V2)
common.HexToHash("0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"), // Swap (Uniswap V3)
common.HexToHash("0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f"), // Mint (Uniswap V2)
common.HexToHash("0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"), // Burn (Uniswap V2)
common.HexToHash("0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118"), // Mint (Uniswap V3)
common.HexToHash("0x0c396cd989a39f49a56c8a608a0409f2075c6b60e9c44533b5cf87abdbe393f1"), // Burn (Uniswap V3)
}
// Create filter query for DEX events
addresses := make([]common.Address, len(dexContracts))
for i, dex := range dexContracts {
addresses[i] = dex.Address
}
topics := [][]common.Hash{{}}
topics[0] = eventSignatures
query := ethereum.FilterQuery{
Addresses: addresses,
Topics: topics,
}
// Subscribe to logs - try WebSocket first, fallback to polling
logs := make(chan types.Log)
sub, err := m.client.SubscribeFilterLogs(ctx, query, logs)
if err != nil {
// Check if error is due to WebSocket not being supported
if strings.Contains(err.Error(), "notifications not supported") ||
strings.Contains(err.Error(), "websocket") ||
strings.Contains(err.Error(), "subscription") {
m.logger.Warn(fmt.Sprintf("WebSocket subscription not supported, falling back to polling: %v", err))
// Start polling fallback
go m.pollDEXEvents(ctx, query)
return nil
}
return fmt.Errorf("failed to subscribe to DEX events: %v", err)
}
m.logger.Info("Subscribed to DEX events via WebSocket")
// Process logs in a goroutine
go func() {
defer func() {
if r := recover(); r != nil {
m.logger.Error(fmt.Sprintf("Panic in DEX event processor: %v", r))
}
}()
defer sub.Unsubscribe()
for {
select {
case log := <-logs:
m.processDEXEvent(ctx, log)
case err := <-sub.Err():
if err != nil {
m.logger.Error(fmt.Sprintf("DEX event subscription error: %v", err))
}
return
case <-ctx.Done():
return
}
}
}()
return nil
}
// pollDEXEvents polls for DEX events when WebSocket is not available
func (m *ArbitrumMonitor) pollDEXEvents(ctx context.Context, query ethereum.FilterQuery) {
m.logger.Info("Starting DEX event polling (WebSocket not available)")
ticker := time.NewTicker(2 * time.Second) // Poll every 2 seconds
defer ticker.Stop()
// Track last processed block to avoid duplicates
var lastBlock uint64
for {
select {
case <-ctx.Done():
m.logger.Info("Stopping DEX event polling")
return
case <-ticker.C:
// Get current block number
currentBlock, err := m.client.BlockNumber(ctx)
if err != nil {
m.logger.Error(fmt.Sprintf("Failed to get block number: %v", err))
continue
}
// Skip if no new blocks
if lastBlock == 0 {
lastBlock = currentBlock - 1
}
if currentBlock <= lastBlock {
continue
}
// Query for logs in new blocks
query.FromBlock = big.NewInt(int64(lastBlock + 1))
query.ToBlock = big.NewInt(int64(currentBlock))
logs, err := m.client.FilterLogs(ctx, query)
if err != nil {
m.logger.Error(fmt.Sprintf("Failed to filter logs: %v", err))
continue
}
if len(logs) > 0 {
m.logger.Debug(fmt.Sprintf("Found %d DEX events in blocks %d-%d", len(logs), lastBlock+1, currentBlock))
for _, log := range logs {
m.processDEXEvent(ctx, log)
}
}
lastBlock = currentBlock
}
}
}
// processDEXEvent processes a DEX event log
func (m *ArbitrumMonitor) processDEXEvent(ctx context.Context, log types.Log) {
m.logger.Debug(fmt.Sprintf("Processing DEX event from contract %s, topic count: %d", log.Address.Hex(), len(log.Topics)))
// Check if this is a swap event
if len(log.Topics) > 0 {
eventSig := log.Topics[0]
// Check for common swap event signatures
switch eventSig.Hex() {
case "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822": // Uniswap V2 Swap
m.logger.Info(fmt.Sprintf("Uniswap V2 Swap event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67": // Uniswap V3 Swap
m.logger.Info(fmt.Sprintf("Uniswap V3 Swap event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f": // Uniswap V2 Mint
m.logger.Info(fmt.Sprintf("Uniswap V2 Mint event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde": // Uniswap V2 Burn
m.logger.Info(fmt.Sprintf("Uniswap V2 Burn event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118": // Uniswap V3 Mint
m.logger.Info(fmt.Sprintf("Uniswap V3 Mint event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0x0c396cd989a39f49a56c8a608a0409f2075c6b60e9c44533b5cf87abdbe393f1": // Uniswap V3 Burn
m.logger.Info(fmt.Sprintf("Uniswap V3 Burn event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
default:
m.logger.Debug(fmt.Sprintf("Other DEX event detected: Contract=%s, EventSig=%s, TxHash=%s",
log.Address.Hex(), eventSig.Hex(), log.TxHash.Hex()))
}
// Fetch transaction receipt for detailed analysis
receipt, err := m.client.TransactionReceipt(ctx, log.TxHash)
if err != nil {
m.logger.Error(fmt.Sprintf("Failed to fetch receipt for transaction %s: %v", log.TxHash.Hex(), err))
return
}
// Process the transaction through the pipeline
// This will parse the DEX events and look for arbitrage opportunities
m.processTransactionReceipt(ctx, receipt, log.BlockNumber, log.BlockHash)
}
}
// processTransactionReceipt processes a transaction receipt for DEX events
func (m *ArbitrumMonitor) processTransactionReceipt(ctx context.Context, receipt *types.Receipt, blockNumber uint64, blockHash common.Hash) {
if receipt == nil {
return
}
// CRITICAL FIX: Skip failed transactions (Status = 0)
// Failed transactions emit events but with invalid/zero data because the swap reverted
// This eliminates 10-30% of false positive opportunities with zero amounts
if receipt.Status != 1 {
m.logger.Info(fmt.Sprintf("⏭️ Skipping failed transaction %s (status=%d)", receipt.TxHash.Hex()[:10], receipt.Status))
return
}
m.logger.Debug(fmt.Sprintf("Processing SUCCESSFUL transaction receipt %s from block %d",
receipt.TxHash.Hex(), blockNumber))
// Process transaction logs for DEX events
dexEvents := 0
for _, log := range receipt.Logs {
if len(log.Topics) > 0 {
eventSig := log.Topics[0]
// Check for common DEX event signatures
switch eventSig.Hex() {
case "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822": // Uniswap V2 Swap
m.logger.Info(fmt.Sprintf("DEX Swap event detected in transaction %s: Uniswap V2", receipt.TxHash.Hex()))
dexEvents++
case "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67": // Uniswap V3 Swap
m.logger.Info(fmt.Sprintf("DEX Swap event detected in transaction %s: Uniswap V3", receipt.TxHash.Hex()))
dexEvents++
case "0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f": // Uniswap V2 Mint
m.logger.Info(fmt.Sprintf("DEX Mint event detected in transaction %s: Uniswap V2", receipt.TxHash.Hex()))
dexEvents++
case "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde": // Uniswap V2 Burn
m.logger.Info(fmt.Sprintf("DEX Burn event detected in transaction %s: Uniswap V2", receipt.TxHash.Hex()))
dexEvents++
case "0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118": // Uniswap V3 Mint
m.logger.Info(fmt.Sprintf("DEX Mint event detected in transaction %s: Uniswap V3", receipt.TxHash.Hex()))
dexEvents++
case "0x0c396cd989a39f49a56c8a608a0409f2075c6b60e9c44533b5cf87abdbe393f1": // Uniswap V3 Burn
m.logger.Info(fmt.Sprintf("DEX Burn event detected in transaction %s: Uniswap V3", receipt.TxHash.Hex()))
dexEvents++
}
}
}
if dexEvents > 0 {
m.logger.Info(fmt.Sprintf("Transaction %s contains %d DEX events", receipt.TxHash.Hex(), dexEvents))
// CRITICAL FIX: Fetch full transaction to extract token addresses from calldata
tx, err := m.client.TransactionInBlock(ctx, blockHash, receipt.TransactionIndex)
if err != nil {
m.logger.Warn(fmt.Sprintf("Failed to fetch transaction %s for token extraction: %v", receipt.TxHash.Hex(), err))
tx = nil // Continue without transaction (will use zero addresses)
}
// Parse events from receipt with transaction for token enrichment
m.logger.Debug(fmt.Sprintf("Parsing events from receipt %s using event parser (tx=%v)", receipt.TxHash.Hex(), tx != nil))
timestamp := safeConvertInt64ToUint64(time.Now().Unix())
parsedEvents, err := m.eventParser.ParseTransactionReceiptWithTx(receipt, tx, blockNumber, timestamp)
if err != nil {
m.logger.Error(fmt.Sprintf("Failed to parse events from receipt %s: %v", receipt.TxHash.Hex(), err))
return
}
m.logger.Info(fmt.Sprintf("Successfully parsed %d events from receipt %s", len(parsedEvents), receipt.TxHash.Hex()))
// Submit each parsed event directly to the scanner with strict validation
for _, event := range parsedEvents {
if event != nil {
// CRITICAL: Validate event data quality before submission
// Zero addresses and zero amounts are NEVER acceptable
isValid := true
validationErrors := []string{}
// Check for zero addresses
zeroAddr := common.Address{}
if event.Token0 == zeroAddr {
validationErrors = append(validationErrors, "Token0 is zero address")
isValid = false
}
if event.Token1 == zeroAddr {
validationErrors = append(validationErrors, "Token1 is zero address")
isValid = false
}
if event.PoolAddress == zeroAddr {
validationErrors = append(validationErrors, "PoolAddress is zero address")
isValid = false
}
// Check for zero amounts (for swap events)
if event.Type == events.EventTypeSwap {
if event.Amount0In != nil && event.Amount0In.Sign() == 0 &&
event.Amount0Out != nil && event.Amount0Out.Sign() == 0 {
validationErrors = append(validationErrors, "Amount0In and Amount0Out are both zero")
isValid = false
}
if event.Amount1In != nil && event.Amount1In.Sign() == 0 &&
event.Amount1Out != nil && event.Amount1Out.Sign() == 0 {
validationErrors = append(validationErrors, "Amount1In and Amount1Out are both zero")
isValid = false
}
}
if !isValid {
m.logger.Warn(fmt.Sprintf("❌ REJECTING INVALID EVENT - Type=%s, Pool=%s, TxHash=%s, Errors: %v",
event.Type.String(), event.PoolAddress.Hex(), event.TxHash.Hex(), validationErrors))
continue
}
m.logger.Debug(fmt.Sprintf("✅ Valid event - Submitting to scanner: Type=%s, Pool=%s, Token0=%s, Token1=%s",
event.Type.String(), event.PoolAddress.Hex(), event.Token0.Hex(), event.Token1.Hex()))
// Submit to scanner for arbitrage analysis
m.scanner.SubmitEvent(*event)
m.logger.Info(fmt.Sprintf("✅ Event submitted to scanner successfully"))
}
}
}
}
// processTransaction analyzes a transaction for potential swap opportunities
func (m *ArbitrumMonitor) processTransaction(ctx context.Context, tx *types.Transaction) error {
// Check if this is a potential swap transaction
// This is a simplified check - in practice, you would check for
// specific function signatures of Uniswap-like contracts
// For now, we'll just log all transactions
from, err := m.client.TransactionSender(ctx, tx, common.Hash{}, 0)
if err != nil {
// This can happen for pending transactions
from = common.HexToAddress("0x0")
}
m.logger.Debug(fmt.Sprintf("Transaction: %s, From: %s, To: %s, Value: %s ETH",
tx.Hash().Hex(),
from.Hex(),
func() string {
if tx.To() != nil {
return tx.To().Hex()
}
return "contract creation"
}(),
new(big.Float).Quo(new(big.Float).SetInt(tx.Value()), big.NewFloat(1e18)).String(),
))
// Detect and analyze swap transactions
if m.isSwapTransaction(tx) {
swapData := m.analyzeSwapTransaction(tx, from)
if swapData != nil {
m.logger.Info(fmt.Sprintf("Detected swap: %s -> %s, Amount: %s",
swapData.TokenIn.Hex(), swapData.TokenOut.Hex(), swapData.AmountIn.String()))
// Calculate potential arbitrage opportunity
if opportunity := m.calculateArbitrageOpportunity(swapData); opportunity != nil {
m.logger.Info(fmt.Sprintf("Potential arbitrage detected: %s profit",
opportunity.NetProfit.String()))
}
}
}
return nil
}
// GetPendingTransactions retrieves pending transactions from the mempool
func (m *ArbitrumMonitor) GetPendingTransactions(ctx context.Context) ([]*types.Transaction, error) {
// This is a simplified implementation
// In practice, you might need to use a different approach to access pending transactions
// Query for pending transactions
txs := make([]*types.Transaction, 0)
return txs, nil
}
// GetL2Parser returns the ArbitrumL2Parser instance for external configuration
// This allows callers to configure the parser after initialization (e.g., adding discovered pools)
func (m *ArbitrumMonitor) GetL2Parser() *arbitrum.ArbitrumL2Parser {
return m.l2Parser
}
// getTransactionReceiptWithRetry attempts to get a transaction receipt with exponential backoff retry
func (m *ArbitrumMonitor) getTransactionReceiptWithRetry(ctx context.Context, txHash common.Hash, maxRetries int) (*types.Receipt, error) {
for attempt := 0; attempt < maxRetries; attempt++ {
m.logger.Debug(fmt.Sprintf("Attempting to fetch receipt for transaction %s (attempt %d/%d)", txHash.Hex(), attempt+1, maxRetries))
// Try to fetch the transaction receipt
receipt, err := m.client.TransactionReceipt(ctx, txHash)
if err == nil {
m.logger.Debug(fmt.Sprintf("Successfully fetched receipt for transaction %s on attempt %d", txHash.Hex(), attempt+1))
return receipt, nil
}
// Check for specific error types that shouldn't be retried
if ctx.Err() != nil {
return nil, pkgerrors.WrapContextError(ctx.Err(), "fetchTransactionReceipt",
map[string]interface{}{
"txHash": txHash.Hex(),
"attempt": attempt + 1,
"maxRetries": maxRetries,
"lastError": err.Error(),
})
}
// Log retry attempt for other errors
if attempt < maxRetries-1 {
backoffDuration := time.Duration(1<<uint(attempt)) * time.Second
m.logger.Warn(fmt.Sprintf("Receipt fetch for transaction %s attempt %d failed: %v, retrying in %v",
txHash.Hex(), attempt+1, err, backoffDuration))
select {
case <-ctx.Done():
return nil, pkgerrors.WrapContextError(ctx.Err(), "fetchTransactionReceipt.backoff",
map[string]interface{}{
"txHash": txHash.Hex(),
"attempt": attempt + 1,
"maxRetries": maxRetries,
"backoffDuration": backoffDuration.String(),
"lastError": err.Error(),
})
case <-time.After(backoffDuration):
// Continue to next attempt
}
} else {
m.logger.Error(fmt.Sprintf("Receipt fetch for transaction %s failed after %d attempts: %v", txHash.Hex(), maxRetries, err))
}
}
return nil, fmt.Errorf("failed to fetch receipt for transaction %s after %d attempts", txHash.Hex(), maxRetries)
}
func capturePayloadIfEnabled(dexTx *arbitrum.DEXTransaction) {
if dexTx == nil || len(dexTx.InputData) == 0 {
return
}
payloadCaptureOnce.Do(func() {
dir := strings.TrimSpace(os.Getenv("PAYLOAD_CAPTURE_DIR"))
if dir == "" {
return
}
if err := os.MkdirAll(dir, 0o755); err != nil {
return
}
payloadCaptureDir = dir
})
if payloadCaptureDir == "" {
return
}
entry := map[string]interface{}{
"hash": dexTx.Hash,
"from": dexTx.From,
"to": dexTx.To,
"value": dexTx.Value.String(),
"protocol": dexTx.Protocol,
"function": dexTx.FunctionName,
"function_sig": dexTx.FunctionSig,
"input_data": fmt.Sprintf("0x%s", hex.EncodeToString(dexTx.InputData)),
"contract_name": dexTx.ContractName,
"block_number": dexTx.BlockNumber,
}
data, err := json.MarshalIndent(entry, "", " ")
if err != nil {
return
}
fileName := fmt.Sprintf("%s_%s.json", time.Now().UTC().Format("20060102T150405.000Z"), strings.TrimPrefix(dexTx.Hash, "0x"))
filePath := filepath.Join(payloadCaptureDir, fileName)
_ = os.WriteFile(filePath, data, 0o600)
}
// convertToStandardFormat converts a DEX transaction to standard format for pipeline processing
func (m *ArbitrumMonitor) convertToStandardFormat(dexTx *arbitrum.DEXTransaction) interface{} {
capturePayloadIfEnabled(dexTx)
// Convert DEX transaction to a standardized transaction format
// that can be processed by the arbitrage pipeline
return map[string]interface{}{
"hash": dexTx.Hash,
"from": dexTx.From,
"to": dexTx.To,
"value": dexTx.Value,
"protocol": dexTx.Protocol,
"function": dexTx.FunctionName,
"function_sig": dexTx.FunctionSig,
"contract": dexTx.ContractName,
"block_number": dexTx.BlockNumber,
"input_data": fmt.Sprintf("0x%s", hex.EncodeToString(dexTx.InputData)),
"timestamp": time.Now().Unix(),
// Token and amount information would be extracted from InputData
// during deeper analysis in the pipeline
}
}
// SwapData represents analyzed swap transaction data
type SwapData struct {
TokenIn common.Address
TokenOut common.Address
AmountIn *big.Int
AmountOut *big.Int
Pool common.Address
Protocol string
Path []common.Address
Pools []common.Address
}
// Use the canonical ArbitrageOpportunity from types package
// isSwapTransaction checks if a transaction is a DEX swap
func (m *ArbitrumMonitor) isSwapTransaction(tx *types.Transaction) bool {
if tx.To() == nil || len(tx.Data()) < 4 {
return false
}
// Check function selector for common swap functions
selector := fmt.Sprintf("0x%x", tx.Data()[:4])
swapSelectors := map[string]bool{
"0x38ed1739": true, // swapExactTokensForTokens
"0x7ff36ab5": true, // swapExactETHForTokens
"0x18cbafe5": true, // swapExactTokensForETH
"0x8803dbee": true, // swapTokensForExactTokens
"0x414bf389": true, // exactInputSingle (Uniswap V3)
"0x09b81346": true, // exactInput (Uniswap V3)
}
return swapSelectors[selector]
}
// analyzeSwapTransaction extracts swap data from a transaction
func (m *ArbitrumMonitor) analyzeSwapTransaction(tx *types.Transaction, from common.Address) *SwapData {
if len(tx.Data()) < 4 {
return nil
}
swapCall, err := calldata.DecodeSwapCall(tx.Data(), nil)
if err != nil {
return m.parseGenericSwap(tx.To(), tx.Data())
}
pool := swapCall.PoolAddress
if pool == (common.Address{}) && tx.To() != nil {
pool = *tx.To()
}
amountIn := swapCall.AmountIn
if amountIn == nil {
amountIn = big.NewInt(0)
}
amountOut := swapCall.AmountOut
if amountOut == nil || amountOut.Sign() == 0 {
amountOut = swapCall.AmountOutMinimum
}
if amountOut == nil {
amountOut = big.NewInt(0)
}
tokenIn := swapCall.TokenIn
tokenOut := swapCall.TokenOut
path := swapCall.Path
if len(path) >= 2 {
tokenIn = path[0]
tokenOut = path[len(path)-1]
} else if tokenIn != (common.Address{}) && tokenOut != (common.Address{}) {
path = []common.Address{tokenIn, tokenOut}
}
pools := swapCall.Pools
if len(pools) == 0 && pool != (common.Address{}) {
pools = []common.Address{pool}
}
return &SwapData{
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: amountIn,
AmountOut: amountOut,
Pool: pool,
Protocol: swapCall.Protocol,
Path: path,
Pools: pools,
}
}
// parseUniswapV2Swap parses Uniswap V2 style swap data
func (m *ArbitrumMonitor) parseUniswapV2Swap(router *common.Address, data []byte) *SwapData {
if len(data) < 68 { // 4 bytes selector + 2 * 32 bytes for amounts
return nil
}
// Extract amount from first parameter (simplified)
amountIn := new(big.Int).SetBytes(data[4:36])
tokenIn := common.Address{}
tokenOut := common.Address{}
pathAddrs := make([]common.Address, 0)
poolAddr := common.Address{}
// Parse dynamic path parameter to extract first/last token
pathOffset := new(big.Int).SetBytes(data[68:100]).Int64()
if pathOffset > 0 {
pathStart := 4 + int(pathOffset)
if pathStart >= len(data) {
return nil
}
if pathStart+32 > len(data) {
return nil
}
pathLen := new(big.Int).SetBytes(data[pathStart : pathStart+32]).Int64()
if pathLen >= 2 {
for i := int64(0); i < pathLen; i++ {
entryStart := pathStart + 32 + int(i*32)
if entryStart+32 > len(data) {
return nil
}
addr := common.BytesToAddress(data[entryStart+12 : entryStart+32])
pathAddrs = append(pathAddrs, addr)
if i == 0 {
tokenIn = addr
}
if i == pathLen-1 {
tokenOut = addr
}
}
}
}
if router != nil {
poolAddr = *router
}
pools := make([]common.Address, 0)
if len(pathAddrs) >= 2 {
factory := common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9")
for i := 0; i+1 < len(pathAddrs); i++ {
token0 := pathAddrs[i]
token1 := pathAddrs[i+1]
if token0.Big().Cmp(token1.Big()) > 0 {
token0, token1 = token1, token0
}
keccakInput := append(token0.Bytes(), token1.Bytes()...)
salt := crypto.Keccak256(keccakInput)
initCodeHash := common.HexToHash("0x96e8ac4277198ff8b6f785478aa9a39f403cb768dd02cbee326c3e7da348845f")
data := make([]byte, 0, 85)
data = append(data, 0xff)
data = append(data, factory.Bytes()...)
data = append(data, salt...)
data = append(data, initCodeHash.Bytes()...)
hash := crypto.Keccak256(data)
var addr common.Address
copy(addr[:], hash[12:])
pools = append(pools, addr)
if poolAddr == (common.Address{}) {
poolAddr = addr
}
}
}
return &SwapData{
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: amountIn,
AmountOut: big.NewInt(0), // Would need full ABI decoding
Pool: poolAddr,
Protocol: "UniswapV2",
Path: pathAddrs,
Pools: pools,
}
}
// parseUniswapV3SingleSwap parses Uniswap V3 exactInputSingle data
func (m *ArbitrumMonitor) parseUniswapV3SingleSwap(router *common.Address, data []byte) *SwapData {
if len(data) < 196 { // Minimum size for exactInputSingle params
return nil
}
amountIn := new(big.Int).SetBytes(data[4+5*32 : 4+6*32])
tokenIn := common.BytesToAddress(data[4+12 : 4+32])
tokenOut := common.BytesToAddress(data[4+32+12 : 4+2*32])
poolAddr := common.Address{}
if router != nil {
poolAddr = *router
}
return &SwapData{
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: amountIn,
AmountOut: big.NewInt(0), // Would need full ABI decoding
Pool: poolAddr,
Protocol: "UniswapV3",
Path: []common.Address{tokenIn, tokenOut},
Pools: []common.Address{poolAddr},
}
}
// parseGenericSwap attempts to parse swap data from unknown format
func (m *ArbitrumMonitor) parseGenericSwap(router *common.Address, data []byte) *SwapData {
if len(data) < 36 {
return nil
}
// Very basic fallback - just extract first amount
amountIn := new(big.Int).SetBytes(data[4:36])
poolAddr := common.Address{}
if router != nil {
poolAddr = *router
}
pools := make([]common.Address, 0)
if poolAddr != (common.Address{}) {
pools = append(pools, poolAddr)
}
return &SwapData{
Pool: poolAddr,
AmountIn: amountIn,
AmountOut: big.NewInt(0),
Protocol: "Unknown",
Path: nil,
Pools: pools,
}
}
func (m *ArbitrumMonitor) estimateOutputFromPools(ctx context.Context, swapData *SwapData) (*big.Int, error) {
if m.marketMgr == nil {
return nil, fmt.Errorf("market manager not configured")
}
if len(swapData.Pools) == 0 || len(swapData.Path) < 2 {
return nil, fmt.Errorf("insufficient path metadata")
}
amountFloat := new(big.Float).SetPrec(256).SetInt(swapData.AmountIn)
one := new(big.Float).SetPrec(256).SetFloat64(1.0)
for i := 0; i < len(swapData.Pools) && i+1 < len(swapData.Path); i++ {
poolAddr := swapData.Pools[i]
poolData, err := m.marketMgr.GetPool(ctx, poolAddr)
if err != nil {
return nil, err
}
if poolData.Liquidity == nil || poolData.Liquidity.IsZero() {
return nil, fmt.Errorf("pool %s has zero liquidity", poolAddr.Hex())
}
liquidityFloat := new(big.Float).SetPrec(256).SetInt(poolData.Liquidity.ToBig())
if liquidityFloat.Sign() <= 0 {
return nil, fmt.Errorf("invalid liquidity for pool %s", poolAddr.Hex())
}
price := uniswap.SqrtPriceX96ToPrice(poolData.SqrtPriceX96.ToBig())
if price.Sign() <= 0 {
return nil, fmt.Errorf("invalid pool price for %s", poolAddr.Hex())
}
hopPrice := new(big.Float).SetPrec(256).Copy(price)
tokenIn := swapData.Path[i]
tokenOut := swapData.Path[i+1]
switch {
case poolData.Token0 == tokenIn && poolData.Token1 == tokenOut:
// price already token1/token0
case poolData.Token0 == tokenOut && poolData.Token1 == tokenIn:
hopPrice = new(big.Float).SetPrec(256).Quo(one, price)
default:
return nil, fmt.Errorf("pool %s does not match hop tokens", poolAddr.Hex())
}
amountRelative := new(big.Float).Quo(amountFloat, liquidityFloat)
ratio, _ := amountRelative.Float64()
if ratio > 0.25 {
return nil, fmt.Errorf("swap size too large for pool %s (ratio %.2f)", poolAddr.Hex(), ratio)
}
amountFloat.Mul(amountFloat, hopPrice)
}
estimatedOut := new(big.Int)
amountFloat.Int(estimatedOut)
if estimatedOut.Sign() <= 0 {
return nil, fmt.Errorf("non-positive estimated output")
}
return estimatedOut, nil
}
func (m *ArbitrumMonitor) estimateGasCostWei(ctx context.Context) *big.Int {
gasLimit := big.NewInt(220000)
gasPrice, err := m.client.SuggestGasPrice(ctx)
if err != nil || gasPrice == nil || gasPrice.Sign() == 0 {
gasPrice = big.NewInt(1500000000) // fallback 1.5 gwei
}
return new(big.Int).Mul(gasLimit, gasPrice)
}
// calculateArbitrageOpportunity analyzes swap for arbitrage potential
func (m *ArbitrumMonitor) calculateArbitrageOpportunity(swapData *SwapData) *arbitragetypes.ArbitrageOpportunity {
// Simplified arbitrage calculation
// In production, this would compare prices across multiple DEXs
// Only consider opportunities above a minimum threshold
minAmount := big.NewInt(1000000000000000000) // 1 ETH worth
if swapData.AmountIn.Cmp(minAmount) < 0 {
return nil
}
if (swapData.TokenIn == common.Address{}) || (swapData.TokenOut == common.Address{}) {
return nil
}
pathStrings := make([]string, 0)
if len(swapData.Path) >= 2 {
for _, addr := range swapData.Path {
pathStrings = append(pathStrings, addr.Hex())
}
}
if len(pathStrings) < 2 {
pathStrings = []string{swapData.TokenIn.Hex(), swapData.TokenOut.Hex()}
}
pools := make([]string, 0)
if len(swapData.Pools) > 0 {
for _, addr := range swapData.Pools {
pools = append(pools, addr.Hex())
}
}
if len(pools) == 0 && swapData.Pool != (common.Address{}) {
pools = append(pools, swapData.Pool.Hex())
}
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
defer cancel()
estimatedOut, preciseErr := m.estimateOutputFromPools(ctx, swapData)
if preciseErr != nil {
// fallback to heuristic estimate if precise calculation fails
estimatedOut = new(big.Int).Add(swapData.AmountIn, new(big.Int).Div(swapData.AmountIn, big.NewInt(1000)))
}
if estimatedOut.Cmp(swapData.AmountIn) <= 0 {
return nil
}
grossProfit := new(big.Int).Sub(estimatedOut, swapData.AmountIn)
gasCost := m.estimateGasCostWei(ctx)
netProfit := new(big.Int).Sub(grossProfit, gasCost)
if netProfit.Sign() <= 0 {
return nil
}
if netProfit.Cmp(big.NewInt(10000000000000000)) <= 0 { // require >0.01 ETH net profit
return nil
}
amountInFloat := new(big.Float).SetPrec(256).SetInt(swapData.AmountIn)
netProfitFloat := new(big.Float).SetPrec(256).SetInt(netProfit)
ROI := 0.0
if amountInFloat.Sign() > 0 {
roiFloat := new(big.Float).Quo(netProfitFloat, amountInFloat)
ROI, _ = roiFloat.Float64()
}
roiPercent := ROI * 100
confidence := 0.75
if ROI > 0 {
confidence = 0.75 + ROI
if confidence > 0.98 {
confidence = 0.98
}
}
opp := &arbitragetypes.ArbitrageOpportunity{
Path: pathStrings,
Pools: pools,
AmountIn: new(big.Int).Set(swapData.AmountIn),
RequiredAmount: new(big.Int).Set(swapData.AmountIn),
Profit: new(big.Int).Set(grossProfit),
NetProfit: new(big.Int).Set(netProfit),
GasEstimate: new(big.Int).Set(gasCost),
EstimatedProfit: new(big.Int).Set(grossProfit),
ROI: roiPercent,
Protocol: swapData.Protocol,
ExecutionTime: 1200,
Confidence: confidence,
PriceImpact: 0.01,
MaxSlippage: 0.03,
TokenIn: swapData.TokenIn,
TokenOut: swapData.TokenOut,
Timestamp: time.Now().Unix(),
DetectedAt: time.Now(),
ExpiresAt: time.Now().Add(30 * time.Second),
Risk: 0.3,
}
m.mu.RLock()
executor := m.opportunityExecutor
m.mu.RUnlock()
if executor != nil {
go func() {
if err := executor.ExecuteArbitrage(context.Background(), opp); err != nil {
m.logger.Warn(fmt.Sprintf("Failed to dispatch arbitrage opportunity: %v", err))
}
}()
}
return opp
}