Files
mev-beta/pkg/monitor/concurrent.go
2025-09-16 11:05:47 -05:00

540 lines
20 KiB
Go

package monitor
import (
"context"
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/internal/ratelimit"
"github.com/fraktal/mev-beta/pkg/arbitrum"
"github.com/fraktal/mev-beta/pkg/events"
"github.com/fraktal/mev-beta/pkg/market"
"github.com/fraktal/mev-beta/pkg/orchestrator"
"github.com/fraktal/mev-beta/pkg/pools"
"github.com/fraktal/mev-beta/pkg/scanner"
"golang.org/x/time/rate"
)
// ArbitrumMonitor monitors the Arbitrum sequencer for transactions with concurrency support
type ArbitrumMonitor struct {
config *config.ArbitrumConfig
botConfig *config.BotConfig
client *ethclient.Client
l2Parser *arbitrum.ArbitrumL2Parser
logger *logger.Logger
rateLimiter *ratelimit.LimiterManager
marketMgr *market.MarketManager
scanner *scanner.MarketScanner
pipeline *market.Pipeline
fanManager *market.FanManager
coordinator *orchestrator.MEVCoordinator
limiter *rate.Limiter
pollInterval time.Duration
running bool
mu sync.RWMutex
}
// NewArbitrumMonitor creates a new Arbitrum monitor with rate limiting
func NewArbitrumMonitor(
arbCfg *config.ArbitrumConfig,
botCfg *config.BotConfig,
logger *logger.Logger,
rateLimiter *ratelimit.LimiterManager,
marketMgr *market.MarketManager,
scanner *scanner.MarketScanner,
) (*ArbitrumMonitor, error) {
// Create Ethereum client
client, err := ethclient.Dial(arbCfg.RPCEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to connect to Arbitrum node: %v", err)
}
// Create L2 parser for Arbitrum transaction parsing
l2Parser, err := arbitrum.NewArbitrumL2Parser(arbCfg.RPCEndpoint, logger)
if err != nil {
return nil, fmt.Errorf("failed to create L2 parser: %v", err)
}
// Create rate limiter based on config
limiter := rate.NewLimiter(
rate.Limit(arbCfg.RateLimit.RequestsPerSecond),
arbCfg.RateLimit.Burst,
)
// Create pipeline
pipeline := market.NewPipeline(botCfg, logger, marketMgr, scanner, client)
// Add default stages
pipeline.AddDefaultStages()
// Create fan manager
fanManager := market.NewFanManager(
&config.Config{
Arbitrum: *arbCfg,
Bot: *botCfg,
},
logger,
rateLimiter,
)
// Create event parser and pool discovery
eventParser := events.NewEventParser()
// Create raw RPC client for pool discovery
poolRPCClient, err := rpc.Dial(arbCfg.RPCEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to create RPC client for pool discovery: %w", err)
}
poolDiscovery := pools.NewPoolDiscovery(poolRPCClient, logger)
// Create MEV coordinator
coordinator := orchestrator.NewMEVCoordinator(
&config.Config{
Arbitrum: *arbCfg,
Bot: *botCfg,
},
logger,
eventParser,
poolDiscovery,
marketMgr,
scanner,
)
return &ArbitrumMonitor{
config: arbCfg,
botConfig: botCfg,
client: client,
l2Parser: l2Parser,
logger: logger,
rateLimiter: rateLimiter,
marketMgr: marketMgr,
scanner: scanner,
pipeline: pipeline,
fanManager: fanManager,
coordinator: coordinator,
limiter: limiter,
pollInterval: time.Duration(botCfg.PollingInterval) * time.Second,
running: false,
}, nil
}
// Start begins monitoring the Arbitrum sequencer
func (m *ArbitrumMonitor) Start(ctx context.Context) error {
m.mu.Lock()
m.running = true
m.mu.Unlock()
m.logger.Info("Starting Arbitrum sequencer monitoring...")
// Start the MEV coordinator pipeline
if err := m.coordinator.Start(); err != nil {
return fmt.Errorf("failed to start MEV coordinator: %w", err)
}
// Get the latest block to start from
if err := m.rateLimiter.WaitForLimit(ctx, m.config.RPCEndpoint); err != nil {
return fmt.Errorf("rate limit error: %v", err)
}
header, err := m.client.HeaderByNumber(ctx, nil)
if err != nil {
return fmt.Errorf("failed to get latest block header: %v", err)
}
lastBlock := header.Number.Uint64()
m.logger.Info(fmt.Sprintf("Starting from block: %d", lastBlock))
// Subscribe to DEX events for real-time monitoring
if err := m.subscribeToDEXEvents(ctx); err != nil {
m.logger.Warn(fmt.Sprintf("Failed to subscribe to DEX events: %v", err))
} else {
m.logger.Info("Subscribed to DEX events")
}
for {
m.mu.RLock()
running := m.running
m.mu.RUnlock()
if !running {
break
}
select {
case <-ctx.Done():
m.Stop()
return nil
case <-time.After(m.pollInterval):
// Get the latest block
if err := m.rateLimiter.WaitForLimit(ctx, m.config.RPCEndpoint); err != nil {
m.logger.Error(fmt.Sprintf("Rate limit error: %v", err))
continue
}
header, err := m.client.HeaderByNumber(ctx, nil)
if err != nil {
m.logger.Error(fmt.Sprintf("Failed to get latest block header: %v", err))
continue
}
currentBlock := header.Number.Uint64()
// Process blocks from lastBlock+1 to currentBlock
for blockNum := lastBlock + 1; blockNum <= currentBlock; blockNum++ {
if err := m.processBlock(ctx, blockNum); err != nil {
m.logger.Error(fmt.Sprintf("Failed to process block %d: %v", blockNum, err))
}
}
lastBlock = currentBlock
}
}
return nil
}
// Stop stops the monitor
func (m *ArbitrumMonitor) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
m.running = false
m.logger.Info("Stopping Arbitrum monitor...")
}
// processBlock processes a single block for potential swap transactions with enhanced L2 parsing
func (m *ArbitrumMonitor) processBlock(ctx context.Context, blockNumber uint64) error {
m.logger.Debug(fmt.Sprintf("Processing block %d", blockNumber))
// Wait for rate limiter
if err := m.rateLimiter.WaitForLimit(ctx, m.config.RPCEndpoint); err != nil {
return fmt.Errorf("rate limit error: %v", err)
}
// Get block using L2 parser to bypass transaction type issues
l2Block, err := m.l2Parser.GetBlockByNumber(ctx, blockNumber)
if err != nil {
m.logger.Error(fmt.Sprintf("Failed to get L2 block %d: %v", blockNumber, err))
return fmt.Errorf("failed to get L2 block %d: %v", blockNumber, err)
}
// Parse DEX transactions from the block
dexTransactions := m.l2Parser.ParseDEXTransactions(ctx, l2Block)
m.logger.Info(fmt.Sprintf("Block %d: Processing %d transactions, found %d DEX transactions",
blockNumber, len(l2Block.Transactions), len(dexTransactions)))
// Process DEX transactions
if len(dexTransactions) > 0 {
m.logger.Info(fmt.Sprintf("Block %d contains %d DEX transactions:", blockNumber, len(dexTransactions)))
for i, dexTx := range dexTransactions {
m.logger.Info(fmt.Sprintf(" [%d] %s: %s -> %s (%s) calling %s (%s)",
i+1, dexTx.Hash, dexTx.From, dexTx.To, dexTx.ContractName,
dexTx.FunctionName, dexTx.Protocol))
}
// TODO: Convert DEX transactions to standard format and process through pipeline
// For now, we're successfully detecting and logging DEX transactions
}
// If no DEX transactions found, report empty block
if len(dexTransactions) == 0 {
if len(l2Block.Transactions) == 0 {
m.logger.Info(fmt.Sprintf("Block %d: Empty block", blockNumber))
} else {
m.logger.Info(fmt.Sprintf("Block %d: No DEX transactions found in %d total transactions",
blockNumber, len(l2Block.Transactions)))
}
}
return nil
}
// subscribeToDEXEvents subscribes to DEX contract events for real-time monitoring
func (m *ArbitrumMonitor) subscribeToDEXEvents(ctx context.Context) error {
// Define official DEX contract addresses for Arbitrum mainnet
dexContracts := []struct {
Address common.Address
Name string
}{
// Official Arbitrum DEX Factories
{common.HexToAddress("0xf1D7CC64Fb4452F05c498126312eBE29f30Fbcf9"), "UniswapV2Factory"}, // Official Uniswap V2 Factory
{common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"), "UniswapV3Factory"}, // Official Uniswap V3 Factory
{common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"), "SushiSwapFactory"}, // Official SushiSwap V2 Factory
// Official Arbitrum DEX Routers
{common.HexToAddress("0x4752ba5dbc23f44d87826276bf6fd6b1c372ad24"), "UniswapV2Router02"}, // Official Uniswap V2 Router02
{common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"), "UniswapV3Router"}, // Official Uniswap V3 SwapRouter
{common.HexToAddress("0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45"), "UniswapV3Router02"}, // Official Uniswap V3 SwapRouter02
{common.HexToAddress("0x1b02dA8Cb0d097eB8D57A175b88c7D8b47997506"), "SushiSwapRouter"}, // Official SushiSwap Router
{common.HexToAddress("0xC36442b4a4522E871399CD717aBDD847Ab11FE88"), "UniswapV3PositionManager"}, // Official Position Manager
// Additional official routers
{common.HexToAddress("0xa51afafe0263b40edaef0df8781ea9aa03e381a3"), "UniversalRouter"}, // Universal Router
{common.HexToAddress("0x4C60051384bd2d3C01bfc845Cf5F4b44bcbE9de5"), "GMX Router"}, // GMX DEX Router
// Popular Arbitrum pools (verified high volume pools)
{common.HexToAddress("0xC6962004f452bE9203591991D15f6b388e09E8D0"), "USDC/WETH UniswapV3 0.05%"}, // High volume pool
{common.HexToAddress("0x17c14D2c404D167802b16C450d3c99F88F2c4F4d"), "USDC/WETH UniswapV3 0.3%"}, // High volume pool
{common.HexToAddress("0x2f5e87C9312fa29aed5c179E456625D79015299c"), "WBTC/WETH UniswapV3 0.05%"}, // High volume pool
{common.HexToAddress("0x149e36E72726e0BceA5c59d40df2c43F60f5A22D"), "WBTC/WETH UniswapV3 0.3%"}, // High volume pool
{common.HexToAddress("0x641C00A822e8b671738d32a431a4Fb6074E5c79d"), "USDT/WETH UniswapV3 0.05%"}, // High volume pool
{common.HexToAddress("0xFe7D6a84287235C7b4b57C4fEb9a44d4C6Ed3BB8"), "ARB/WETH UniswapV3 0.05%"}, // ARB native token pool
}
// Define common DEX event signatures
eventSignatures := []common.Hash{
common.HexToHash("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"), // Swap (Uniswap V2)
common.HexToHash("0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"), // Swap (Uniswap V3)
common.HexToHash("0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f"), // Mint (Uniswap V2)
common.HexToHash("0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"), // Burn (Uniswap V2)
common.HexToHash("0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118"), // Mint (Uniswap V3)
common.HexToHash("0x0c396cd989a39f49a56c8a608a0409f2075c6b60e9c44533b5cf87abdbe393f1"), // Burn (Uniswap V3)
}
// Create filter query for DEX events
addresses := make([]common.Address, len(dexContracts))
for i, dex := range dexContracts {
addresses[i] = dex.Address
}
topics := [][]common.Hash{{}}
topics[0] = eventSignatures
query := ethereum.FilterQuery{
Addresses: addresses,
Topics: topics,
}
// Subscribe to logs
logs := make(chan types.Log)
sub, err := m.client.SubscribeFilterLogs(context.Background(), query, logs)
if err != nil {
return fmt.Errorf("failed to subscribe to DEX events: %v", err)
}
m.logger.Info("Subscribed to DEX events")
// Process logs in a goroutine
go func() {
defer func() {
if r := recover(); r != nil {
m.logger.Error(fmt.Sprintf("Panic in DEX event processor: %v", r))
}
}()
defer sub.Unsubscribe()
for {
select {
case log := <-logs:
m.processDEXEvent(ctx, log)
case err := <-sub.Err():
if err != nil {
m.logger.Error(fmt.Sprintf("DEX event subscription error: %v", err))
}
return
case <-ctx.Done():
return
}
}
}()
return nil
}
// processDEXEvent processes a DEX event log
func (m *ArbitrumMonitor) processDEXEvent(ctx context.Context, log types.Log) {
m.logger.Debug(fmt.Sprintf("Processing DEX event from contract %s, topic count: %d", log.Address.Hex(), len(log.Topics)))
// Check if this is a swap event
if len(log.Topics) > 0 {
eventSig := log.Topics[0]
// Check for common swap event signatures
switch eventSig.Hex() {
case "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822": // Uniswap V2 Swap
m.logger.Info(fmt.Sprintf("Uniswap V2 Swap event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67": // Uniswap V3 Swap
m.logger.Info(fmt.Sprintf("Uniswap V3 Swap event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f": // Uniswap V2 Mint
m.logger.Info(fmt.Sprintf("Uniswap V2 Mint event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde": // Uniswap V2 Burn
m.logger.Info(fmt.Sprintf("Uniswap V2 Burn event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118": // Uniswap V3 Mint
m.logger.Info(fmt.Sprintf("Uniswap V3 Mint event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
case "0x0c396cd989a39f49a56c8a608a0409f2075c6b60e9c44533b5cf87abdbe393f1": // Uniswap V3 Burn
m.logger.Info(fmt.Sprintf("Uniswap V3 Burn event detected: Contract=%s, TxHash=%s",
log.Address.Hex(), log.TxHash.Hex()))
default:
m.logger.Debug(fmt.Sprintf("Other DEX event detected: Contract=%s, EventSig=%s, TxHash=%s",
log.Address.Hex(), eventSig.Hex(), log.TxHash.Hex()))
}
// Fetch transaction receipt for detailed analysis
receipt, err := m.client.TransactionReceipt(ctx, log.TxHash)
if err != nil {
m.logger.Error(fmt.Sprintf("Failed to fetch receipt for transaction %s: %v", log.TxHash.Hex(), err))
return
}
// Process the transaction through the pipeline
// This will parse the DEX events and look for arbitrage opportunities
m.processTransactionReceipt(ctx, receipt, log.BlockNumber, log.BlockHash)
}
}
// processTransactionReceipt processes a transaction receipt for DEX events
func (m *ArbitrumMonitor) processTransactionReceipt(ctx context.Context, receipt *types.Receipt, blockNumber uint64, blockHash common.Hash) {
if receipt == nil {
return
}
m.logger.Debug(fmt.Sprintf("Processing transaction receipt %s from block %d",
receipt.TxHash.Hex(), blockNumber))
// Process transaction logs for DEX events
dexEvents := 0
for _, log := range receipt.Logs {
if len(log.Topics) > 0 {
eventSig := log.Topics[0]
// Check for common DEX event signatures
switch eventSig.Hex() {
case "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822": // Uniswap V2 Swap
m.logger.Info(fmt.Sprintf("DEX Swap event detected in transaction %s: Uniswap V2", receipt.TxHash.Hex()))
dexEvents++
case "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67": // Uniswap V3 Swap
m.logger.Info(fmt.Sprintf("DEX Swap event detected in transaction %s: Uniswap V3", receipt.TxHash.Hex()))
dexEvents++
case "0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f": // Uniswap V2 Mint
m.logger.Info(fmt.Sprintf("DEX Mint event detected in transaction %s: Uniswap V2", receipt.TxHash.Hex()))
dexEvents++
case "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde": // Uniswap V2 Burn
m.logger.Info(fmt.Sprintf("DEX Burn event detected in transaction %s: Uniswap V2", receipt.TxHash.Hex()))
dexEvents++
case "0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118": // Uniswap V3 Mint
m.logger.Info(fmt.Sprintf("DEX Mint event detected in transaction %s: Uniswap V3", receipt.TxHash.Hex()))
dexEvents++
case "0x0c396cd989a39f49a56c8a608a0409f2075c6b60e9c44533b5cf87abdbe393f1": // Uniswap V3 Burn
m.logger.Info(fmt.Sprintf("DEX Burn event detected in transaction %s: Uniswap V3", receipt.TxHash.Hex()))
dexEvents++
}
}
}
if dexEvents > 0 {
m.logger.Info(fmt.Sprintf("Transaction %s contains %d DEX events", receipt.TxHash.Hex(), dexEvents))
}
// Create a minimal transaction for the pipeline
// This is just a stub since we don't have the full transaction data
tx := types.NewTransaction(0, common.Address{}, big.NewInt(0), 0, big.NewInt(0), nil)
// Process through the new MEV coordinator
m.coordinator.ProcessTransaction(tx, receipt, blockNumber, uint64(time.Now().Unix()))
// Also process through the legacy pipeline for compatibility
transactions := []*types.Transaction{tx}
if err := m.pipeline.ProcessTransactions(ctx, transactions, blockNumber, uint64(time.Now().Unix())); err != nil {
m.logger.Debug(fmt.Sprintf("Legacy pipeline processing error for receipt %s: %v", receipt.TxHash.Hex(), err))
}
}
// processTransaction analyzes a transaction for potential swap opportunities
func (m *ArbitrumMonitor) processTransaction(ctx context.Context, tx *types.Transaction) error {
// Check if this is a potential swap transaction
// This is a simplified check - in practice, you would check for
// specific function signatures of Uniswap-like contracts
// For now, we'll just log all transactions
from, err := m.client.TransactionSender(ctx, tx, common.Hash{}, 0)
if err != nil {
// This can happen for pending transactions
from = common.HexToAddress("0x0")
}
m.logger.Debug(fmt.Sprintf("Transaction: %s, From: %s, To: %s, Value: %s ETH",
tx.Hash().Hex(),
from.Hex(),
func() string {
if tx.To() != nil {
return tx.To().Hex()
}
return "contract creation"
}(),
new(big.Float).Quo(new(big.Float).SetInt(tx.Value()), big.NewFloat(1e18)).String(),
))
// TODO: Add logic to detect swap transactions and analyze them
// This would involve:
// 1. Checking if the transaction is calling a Uniswap-like contract
// 2. Decoding the swap function call
// 3. Extracting the token addresses and amounts
// 4. Calculating potential price impact
return nil
}
// GetPendingTransactions retrieves pending transactions from the mempool
func (m *ArbitrumMonitor) GetPendingTransactions(ctx context.Context) ([]*types.Transaction, error) {
// This is a simplified implementation
// In practice, you might need to use a different approach to access pending transactions
// Query for pending transactions
txs := make([]*types.Transaction, 0)
return txs, nil
}
// getTransactionReceiptWithRetry attempts to get a transaction receipt with exponential backoff retry
func (m *ArbitrumMonitor) getTransactionReceiptWithRetry(ctx context.Context, txHash common.Hash, maxRetries int) (*types.Receipt, error) {
for attempt := 0; attempt < maxRetries; attempt++ {
m.logger.Debug(fmt.Sprintf("Attempting to fetch receipt for transaction %s (attempt %d/%d)", txHash.Hex(), attempt+1, maxRetries))
// Try to fetch the transaction receipt
receipt, err := m.client.TransactionReceipt(ctx, txHash)
if err == nil {
m.logger.Debug(fmt.Sprintf("Successfully fetched receipt for transaction %s on attempt %d", txHash.Hex(), attempt+1))
return receipt, nil
}
// Check for specific error types that shouldn't be retried
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Log retry attempt for other errors
if attempt < maxRetries-1 {
backoffDuration := time.Duration(1<<uint(attempt)) * time.Second
m.logger.Warn(fmt.Sprintf("Receipt fetch for transaction %s attempt %d failed: %v, retrying in %v",
txHash.Hex(), attempt+1, err, backoffDuration))
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(backoffDuration):
// Continue to next attempt
}
} else {
m.logger.Error(fmt.Sprintf("Receipt fetch for transaction %s failed after %d attempts: %v", txHash.Hex(), maxRetries, err))
}
}
return nil, fmt.Errorf("failed to fetch receipt for transaction %s after %d attempts", txHash.Hex(), maxRetries)
}