saving in place

This commit is contained in:
Krypto Kajun
2025-10-04 09:31:02 -05:00
parent 76c1b5cee1
commit f358f49aa9
295 changed files with 72071 additions and 17209 deletions

View File

@@ -21,26 +21,31 @@ import (
"github.com/fraktal/mev-beta/pkg/oracle"
"github.com/fraktal/mev-beta/pkg/pools"
"github.com/fraktal/mev-beta/pkg/scanner"
arbitragetypes "github.com/fraktal/mev-beta/pkg/types"
"github.com/holiman/uint256"
"golang.org/x/time/rate"
)
// ArbitrumMonitor monitors the Arbitrum sequencer for transactions with concurrency support
type ArbitrumMonitor struct {
config *config.ArbitrumConfig
botConfig *config.BotConfig
client *ethclient.Client
l2Parser *arbitrum.ArbitrumL2Parser
logger *logger.Logger
rateLimiter *ratelimit.LimiterManager
marketMgr *market.MarketManager
scanner *scanner.MarketScanner
pipeline *market.Pipeline
fanManager *market.FanManager
config *config.ArbitrumConfig
botConfig *config.BotConfig
client *ethclient.Client
connectionManager *arbitrum.ConnectionManager
l2Parser *arbitrum.ArbitrumL2Parser
logger *logger.Logger
rateLimiter *ratelimit.LimiterManager
marketMgr *market.MarketManager
scanner *scanner.Scanner
pipeline *market.Pipeline
fanManager *market.FanManager
// coordinator *orchestrator.MEVCoordinator // Removed to avoid import cycle
limiter *rate.Limiter
pollInterval time.Duration
running bool
mu sync.RWMutex
limiter *rate.Limiter
pollInterval time.Duration
running bool
mu sync.RWMutex
transactionChannel chan interface{}
lastHealthCheck time.Time
}
// NewArbitrumMonitor creates a new Arbitrum monitor with rate limiting
@@ -50,13 +55,16 @@ func NewArbitrumMonitor(
logger *logger.Logger,
rateLimiter *ratelimit.LimiterManager,
marketMgr *market.MarketManager,
scanner *scanner.MarketScanner,
scanner *scanner.Scanner,
) (*ArbitrumMonitor, error) {
// Create Ethereum client
client, err := ethclient.Dial(arbCfg.RPCEndpoint)
// Create Ethereum client with connection manager for retry and fallback support
ctx := context.Background()
connectionManager := arbitrum.NewConnectionManager(arbCfg, logger)
rateLimitedClient, err := connectionManager.GetClientWithRetry(ctx, 3)
if err != nil {
return nil, fmt.Errorf("failed to connect to Arbitrum node: %v", err)
return nil, fmt.Errorf("failed to connect to Arbitrum node with retries: %v", err)
}
client := rateLimitedClient.Client
// Create price oracle for L2 parser
priceOracle := oracle.NewPriceOracle(client, logger)
@@ -114,20 +122,23 @@ func NewArbitrumMonitor(
// )
return &ArbitrumMonitor{
config: arbCfg,
botConfig: botCfg,
client: client,
l2Parser: l2Parser,
logger: logger,
rateLimiter: rateLimiter,
marketMgr: marketMgr,
scanner: scanner,
pipeline: pipeline,
fanManager: fanManager,
config: arbCfg,
botConfig: botCfg,
client: client,
connectionManager: connectionManager,
l2Parser: l2Parser,
logger: logger,
rateLimiter: rateLimiter,
marketMgr: marketMgr,
scanner: scanner,
pipeline: pipeline,
fanManager: fanManager,
// coordinator: coordinator, // Removed to avoid import cycle
limiter: limiter,
pollInterval: time.Duration(botCfg.PollingInterval) * time.Second,
running: false,
limiter: limiter,
pollInterval: time.Duration(botCfg.PollingInterval) * time.Second,
running: false,
transactionChannel: make(chan interface{}, 50000), // Initialize with large buffer for Arbitrum's high TX volume
lastHealthCheck: time.Now(),
}, nil
}
@@ -164,6 +175,12 @@ func (m *ArbitrumMonitor) Start(ctx context.Context) error {
m.logger.Info("Subscribed to DEX events")
}
// Start transaction processor goroutine
go m.processTransactionChannel(ctx)
// Start connection health checker
go m.checkConnectionHealth(ctx)
for {
m.mu.RLock()
running := m.running
@@ -246,12 +263,16 @@ func (m *ArbitrumMonitor) processBlock(ctx context.Context, blockNumber uint64)
dexTransactions := m.l2Parser.ParseDEXTransactions(ctx, l2Block)
parseDuration := time.Since(parseStart)
// Calculate proper parsing rate (transactions per second of parsing time)
parseRateTPS := float64(len(l2Block.Transactions)) / parseDuration.Seconds()
// Log parsing performance
m.logger.Performance("monitor", "parse_dex_transactions", parseDuration, map[string]interface{}{
"block_number": blockNumber,
"total_txs": len(l2Block.Transactions),
"dex_txs": len(dexTransactions),
"parse_rate_tps": float64(len(l2Block.Transactions)) / parseDuration.Seconds(),
"block_number": blockNumber,
"total_txs": len(l2Block.Transactions),
"dex_txs": len(dexTransactions),
"parse_rate_txs_per_sec": parseRateTPS, // This is parsing throughput, not network TPS
"parse_duration_ms": parseDuration.Milliseconds(),
})
m.logger.Info(fmt.Sprintf("Block %d: Processing %d transactions, found %d DEX transactions",
@@ -267,10 +288,20 @@ func (m *ArbitrumMonitor) processBlock(ctx context.Context, blockNumber uint64)
m.logger.Info(fmt.Sprintf(" [%d] %s: %s -> %s (%s) calling %s (%s)",
i+1, dexTx.Hash, dexTx.From, dexTx.To, dexTx.ContractName,
dexTx.FunctionName, dexTx.Protocol))
}
// TODO: Convert DEX transactions to standard format and process through pipeline
// For now, we're successfully detecting and logging DEX transactions
// Convert DEX transactions to standard format for processing
standardizedTx := m.convertToStandardFormat(&dexTx)
if standardizedTx != nil {
// Send to pipeline for arbitrage analysis
select {
case m.transactionChannel <- standardizedTx:
// Successfully sent to pipeline
default:
// Channel full, log warning
m.logger.Warn(fmt.Sprintf("Transaction pipeline full, dropping tx %s", dexTx.Hash))
}
}
}
}
// If no DEX transactions found, report empty block
@@ -286,6 +317,150 @@ func (m *ArbitrumMonitor) processBlock(ctx context.Context, blockNumber uint64)
return nil
}
// checkConnectionHealth periodically checks and maintains connection health
func (m *ArbitrumMonitor) checkConnectionHealth(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
m.logger.Error(fmt.Sprintf("Panic in connection health checker: %v", r))
}
}()
healthCheckInterval := 30 * time.Second
ticker := time.NewTicker(healthCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
m.logger.Info("Connection health checker shutting down")
return
case <-ticker.C:
m.performHealthCheck(ctx)
}
}
}
// performHealthCheck checks connection health and reconnects if necessary
func (m *ArbitrumMonitor) performHealthCheck(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()
// Skip health check if too recent
if time.Since(m.lastHealthCheck) < 10*time.Second {
return
}
// Test connection by trying to get latest block
testCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err := m.client.HeaderByNumber(testCtx, nil)
if err != nil {
m.logger.Warn(fmt.Sprintf("Connection health check failed: %v, attempting reconnection", err))
// Attempt to get a new healthy client
rateLimitedClient, reconnectErr := m.connectionManager.GetClientWithRetry(ctx, 3)
if reconnectErr != nil {
m.logger.Error(fmt.Sprintf("Failed to reconnect: %v", reconnectErr))
} else {
// Close old client and update to new one
m.client.Close()
m.client = rateLimitedClient.Client
m.logger.Info("Successfully reconnected to Arbitrum RPC")
}
}
m.lastHealthCheck = time.Now()
}
// processTransactionChannel processes transactions from the transaction channel
func (m *ArbitrumMonitor) processTransactionChannel(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
m.logger.Error(fmt.Sprintf("Panic in transaction processor: %v", r))
}
}()
for {
select {
case <-ctx.Done():
m.logger.Info("Transaction processor shutting down")
return
case tx := <-m.transactionChannel:
if tx != nil {
if err := m.processChannelTransaction(ctx, tx); err != nil {
m.logger.Debug(fmt.Sprintf("Error processing transaction from channel: %v", err))
}
}
}
}
}
// processChannelTransaction processes a single transaction from the channel
func (m *ArbitrumMonitor) processChannelTransaction(ctx context.Context, tx interface{}) error {
// Convert transaction to the expected format and process through pipeline
if txMap, ok := tx.(map[string]interface{}); ok {
// Log transaction processing
if hash, exists := txMap["hash"]; exists {
m.logger.Debug(fmt.Sprintf("Processing transaction from pipeline: %s", hash))
}
// Process transaction through the scanner for arbitrage analysis
// This bypasses the full pipeline since we already have the parsed DEX data
if err := m.processTransactionMap(ctx, txMap); err != nil {
return fmt.Errorf("transaction processing error: %w", err)
}
}
return nil
}
// processTransactionMap processes a transaction map for arbitrage opportunities
func (m *ArbitrumMonitor) processTransactionMap(ctx context.Context, txMap map[string]interface{}) error {
// Extract basic transaction info
hash, _ := txMap["hash"].(string)
protocol, _ := txMap["protocol"].(string)
functionName, _ := txMap["function"].(string)
m.logger.Debug(fmt.Sprintf("Analyzing transaction %s: %s.%s", hash, protocol, functionName))
// Create a basic event structure for the scanner
event := &events.Event{
Type: events.Swap, // Assume it's a swap since it came from DEX parsing
Protocol: protocol,
BlockNumber: getUint64(txMap, "block_number"),
TransactionHash: common.HexToHash(hash),
Timestamp: getUint64(txMap, "timestamp"),
Amount0: big.NewInt(0), // Will be filled by deeper analysis
Amount1: big.NewInt(0), // Will be filled by deeper analysis
SqrtPriceX96: uint256.NewInt(0), // Will be filled by deeper analysis
Liquidity: uint256.NewInt(0), // Will be filled by deeper analysis
Tick: 0, // Will be filled by deeper analysis
}
// Submit directly to scanner for arbitrage analysis
m.scanner.SubmitEvent(*event)
return nil
}
// getUint64 safely extracts a uint64 from a map
func getUint64(m map[string]interface{}, key string) uint64 {
if val, ok := m[key]; ok {
switch v := val.(type) {
case uint64:
return v
case int64:
return uint64(v)
case int:
return uint64(v)
case float64:
return uint64(v)
}
}
return 0
}
// subscribeToDEXEvents subscribes to DEX contract events for real-time monitoring
func (m *ArbitrumMonitor) subscribeToDEXEvents(ctx context.Context) error {
// Define official DEX contract addresses for Arbitrum mainnet
@@ -344,7 +519,7 @@ func (m *ArbitrumMonitor) subscribeToDEXEvents(ctx context.Context) error {
// Subscribe to logs
logs := make(chan types.Log)
sub, err := m.client.SubscribeFilterLogs(context.Background(), query, logs)
sub, err := m.client.SubscribeFilterLogs(ctx, query, logs)
if err != nil {
return fmt.Errorf("failed to subscribe to DEX events: %v", err)
}
@@ -507,12 +682,20 @@ func (m *ArbitrumMonitor) processTransaction(ctx context.Context, tx *types.Tran
new(big.Float).Quo(new(big.Float).SetInt(tx.Value()), big.NewFloat(1e18)).String(),
))
// TODO: Add logic to detect swap transactions and analyze them
// This would involve:
// 1. Checking if the transaction is calling a Uniswap-like contract
// 2. Decoding the swap function call
// 3. Extracting the token addresses and amounts
// 4. Calculating potential price impact
// Detect and analyze swap transactions
if m.isSwapTransaction(tx) {
swapData := m.analyzeSwapTransaction(tx, from)
if swapData != nil {
m.logger.Info(fmt.Sprintf("Detected swap: %s -> %s, Amount: %s",
swapData.TokenIn.Hex(), swapData.TokenOut.Hex(), swapData.AmountIn.String()))
// Calculate potential arbitrage opportunity
if opportunity := m.calculateArbitrageOpportunity(swapData); opportunity != nil {
m.logger.Info(fmt.Sprintf("Potential arbitrage detected: %s profit",
opportunity.NetProfit.String()))
}
}
}
return nil
}
@@ -564,3 +747,163 @@ func (m *ArbitrumMonitor) getTransactionReceiptWithRetry(ctx context.Context, tx
return nil, fmt.Errorf("failed to fetch receipt for transaction %s after %d attempts", txHash.Hex(), maxRetries)
}
// convertToStandardFormat converts a DEX transaction to standard format for pipeline processing
func (m *ArbitrumMonitor) convertToStandardFormat(dexTx *arbitrum.DEXTransaction) interface{} {
// Convert DEX transaction to a standardized transaction format
// that can be processed by the arbitrage pipeline
return map[string]interface{}{
"hash": dexTx.Hash,
"from": dexTx.From,
"to": dexTx.To,
"value": dexTx.Value,
"protocol": dexTx.Protocol,
"function": dexTx.FunctionName,
"function_sig": dexTx.FunctionSig,
"contract": dexTx.ContractName,
"block_number": dexTx.BlockNumber,
"input_data": dexTx.InputData,
"timestamp": time.Now().Unix(),
// Token and amount information would be extracted from InputData
// during deeper analysis in the pipeline
}
}
// SwapData represents analyzed swap transaction data
type SwapData struct {
TokenIn common.Address
TokenOut common.Address
AmountIn *big.Int
AmountOut *big.Int
Pool common.Address
Protocol string
}
// Use the canonical ArbitrageOpportunity from types package
// isSwapTransaction checks if a transaction is a DEX swap
func (m *ArbitrumMonitor) isSwapTransaction(tx *types.Transaction) bool {
if tx.To() == nil || len(tx.Data()) < 4 {
return false
}
// Check function selector for common swap functions
selector := fmt.Sprintf("0x%x", tx.Data()[:4])
swapSelectors := map[string]bool{
"0x38ed1739": true, // swapExactTokensForTokens
"0x7ff36ab5": true, // swapExactETHForTokens
"0x18cbafe5": true, // swapExactTokensForETH
"0x8803dbee": true, // swapTokensForExactTokens
"0x414bf389": true, // exactInputSingle (Uniswap V3)
"0x09b81346": true, // exactInput (Uniswap V3)
}
return swapSelectors[selector]
}
// analyzeSwapTransaction extracts swap data from a transaction
func (m *ArbitrumMonitor) analyzeSwapTransaction(tx *types.Transaction, from common.Address) *SwapData {
if len(tx.Data()) < 4 {
return nil
}
selector := fmt.Sprintf("0x%x", tx.Data()[:4])
// Basic swap data extraction (simplified for different function signatures)
switch selector {
case "0x38ed1739", "0x7ff36ab5", "0x18cbafe5": // Uniswap V2 style
return m.parseUniswapV2Swap(tx.Data())
case "0x414bf389": // Uniswap V3 exactInputSingle
return m.parseUniswapV3SingleSwap(tx.Data())
default:
// Fallback parsing attempt
return m.parseGenericSwap(tx.Data())
}
}
// parseUniswapV2Swap parses Uniswap V2 style swap data
func (m *ArbitrumMonitor) parseUniswapV2Swap(data []byte) *SwapData {
if len(data) < 68 { // 4 bytes selector + 2 * 32 bytes for amounts
return nil
}
// Extract amount from first parameter (simplified)
amountIn := new(big.Int).SetBytes(data[4:36])
return &SwapData{
AmountIn: amountIn,
AmountOut: big.NewInt(0), // Would need full ABI decoding
Protocol: "UniswapV2",
}
}
// parseUniswapV3SingleSwap parses Uniswap V3 exactInputSingle data
func (m *ArbitrumMonitor) parseUniswapV3SingleSwap(data []byte) *SwapData {
if len(data) < 196 { // Minimum size for exactInputSingle params
return nil
}
// Extract basic data (would need full ABI parsing for complete data)
amountIn := new(big.Int).SetBytes(data[68:100])
return &SwapData{
AmountIn: amountIn,
AmountOut: big.NewInt(0), // Would need full ABI decoding
Protocol: "UniswapV3",
}
}
// parseGenericSwap attempts to parse swap data from unknown format
func (m *ArbitrumMonitor) parseGenericSwap(data []byte) *SwapData {
if len(data) < 36 {
return nil
}
// Very basic fallback - just extract first amount
amountIn := new(big.Int).SetBytes(data[4:36])
return &SwapData{
AmountIn: amountIn,
AmountOut: big.NewInt(0),
Protocol: "Unknown",
}
}
// calculateArbitrageOpportunity analyzes swap for arbitrage potential
func (m *ArbitrumMonitor) calculateArbitrageOpportunity(swapData *SwapData) *arbitragetypes.ArbitrageOpportunity {
// Simplified arbitrage calculation
// In production, this would compare prices across multiple DEXs
// Only consider opportunities above a minimum threshold
minAmount := big.NewInt(1000000000000000000) // 1 ETH worth
if swapData.AmountIn.Cmp(minAmount) < 0 {
return nil
}
// Estimate potential profit (simplified)
// Real implementation would query multiple pools
estimatedProfit := new(big.Int).Div(swapData.AmountIn, big.NewInt(1000)) // 0.1% profit assumption
if estimatedProfit.Cmp(big.NewInt(10000000000000000)) > 0 { // > 0.01 ETH profit
return &arbitragetypes.ArbitrageOpportunity{
Path: []string{swapData.TokenIn.Hex(), swapData.TokenOut.Hex()},
Pools: []string{swapData.Pool.Hex()},
AmountIn: swapData.AmountIn,
Profit: estimatedProfit,
NetProfit: estimatedProfit,
GasEstimate: big.NewInt(100000), // Estimated gas
ROI: 0.1,
Protocol: swapData.Protocol,
ExecutionTime: 1000, // 1 second estimate
Confidence: 0.7,
PriceImpact: 0.01,
MaxSlippage: 0.05,
TokenIn: swapData.TokenIn,
TokenOut: swapData.TokenOut,
Timestamp: time.Now().Unix(),
Risk: 0.3,
}
}
return nil
}