Files
mev-beta/pkg/execution/queue.go
Krypto Kajun 850223a953 fix(multicall): resolve critical multicall parsing corruption issues
- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing
- Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives
- Added LRU caching system for address validation with 10-minute TTL
- Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures
- Fixed duplicate function declarations and import conflicts across multiple files
- Added error recovery mechanisms with multiple fallback strategies
- Updated tests to handle new validation behavior for suspicious addresses
- Fixed parser test expectations for improved validation system
- Applied gofmt formatting fixes to ensure code style compliance
- Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot
- Resolved critical security vulnerabilities in heuristic address extraction
- Progress: Updated TODO audit from 10% to 35% complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 00:12:55 -05:00

368 lines
10 KiB
Go

// Package execution implements the execution layer for the MEV bot.
// It manages prioritized execution of arbitrage opportunities with features like
// priority-based queues, circuit breakers, rate limiting, and retry logic.
package execution
import (
"container/heap"
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"
"github.com/fraktal/mev-beta/internal/logger"
pkgtypes "github.com/fraktal/mev-beta/pkg/types"
)
var (
// ErrQueueFullLowerPriority indicates that the queue is full and the new opportunity has lower priority than existing items
ErrQueueFullLowerPriority = errors.New("queue full and new opportunity has lower priority")
)
const (
// Default queue configuration
DefaultMaxQueueSize = 100
DefaultExecutionRate = 500 * time.Millisecond
DefaultMaxRetries = 3
// Circuit breaker configuration
CircuitBreakerMaxFailures = 5
CircuitBreakerTimeWindow = 5 * time.Minute
// Priority calculation factors
ProfitScoreScaleFactor = 10.0
ConfidenceBoostFactor = 20.0
MarginBoostFactor = 10.0
DefaultTimeDecayPriority = 100.0
// Execution simulation parameters
ConfidenceThresholdForSuccess = 0.7
SimulatedExecutionTime = 200 * time.Millisecond
)
// ExecutionQueue manages prioritized execution of arbitrage opportunities
type ExecutionQueue struct {
logger *logger.Logger
queue *PriorityQueue
mu sync.RWMutex
maxQueueSize int
executionRate time.Duration
circuitBreaker *CircuitBreaker
// Execution stats
totalExecuted int64
successCount int64
failureCount int64
totalProfitUSD float64
}
// ExecutionItem represents an arbitrage opportunity in the execution queue
type ExecutionItem struct {
Opportunity *pkgtypes.ArbitrageOpportunity
Priority float64 // Higher = more urgent
Timestamp time.Time
Retries int
MaxRetries int
}
// PriorityQueue implements a priority queue for execution items
type PriorityQueue []*ExecutionItem
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// Higher priority first, then by timestamp for tie-breaking
if pq[i].Priority != pq[j].Priority {
return pq[i].Priority > pq[j].Priority
}
return pq[i].Timestamp.Before(pq[j].Timestamp)
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(*ExecutionItem))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// CircuitBreaker prevents execution when too many failures occur
type CircuitBreaker struct {
maxFailures int
timeWindow time.Duration
failures []time.Time
isOpen bool
mu sync.RWMutex
}
// NewExecutionQueue creates a new execution queue
func NewExecutionQueue(logger *logger.Logger) *ExecutionQueue {
pq := &PriorityQueue{}
heap.Init(pq)
return &ExecutionQueue{
logger: logger,
queue: pq,
maxQueueSize: DefaultMaxQueueSize,
executionRate: DefaultExecutionRate, // Execute every 500ms max
circuitBreaker: &CircuitBreaker{
maxFailures: CircuitBreakerMaxFailures,
timeWindow: CircuitBreakerTimeWindow,
failures: make([]time.Time, 0),
},
}
}
// AddOpportunity adds an arbitrage opportunity to the execution queue
func (eq *ExecutionQueue) AddOpportunity(opportunity *pkgtypes.ArbitrageOpportunity) error {
eq.mu.Lock()
defer eq.mu.Unlock()
// Check if queue is full
if eq.queue.Len() >= eq.maxQueueSize {
// Remove lowest priority item
if eq.queue.Len() > 0 {
lowestPriorityItem := (*eq.queue)[eq.queue.Len()-1]
if eq.calculatePriority(opportunity) > lowestPriorityItem.Priority {
heap.Pop(eq.queue) // Remove lowest priority
eq.logger.Info("Queue full, replaced lower priority opportunity")
} else {
return ErrQueueFullLowerPriority
}
}
}
// Create execution item
item := &ExecutionItem{
Opportunity: opportunity,
Priority: eq.calculatePriority(opportunity),
Timestamp: time.Now(),
MaxRetries: DefaultMaxRetries,
}
heap.Push(eq.queue, item)
// Convert UniversalDecimal to float64 for display
profitFloat := 0.0
if opportunity.NetProfit != nil {
profitEth := new(big.Float).Quo(new(big.Float).SetInt(opportunity.NetProfit), big.NewFloat(1e18))
var accuracy big.Accuracy
profitFloat, accuracy = profitEth.Float64()
// Check if the conversion was exact (accuracy == 0) or if there was a loss of precision
if accuracy != big.Exact {
eq.logger.Warn(fmt.Sprintf("NetProfit conversion to float64 may have lost precision, accuracy: %v", accuracy))
}
}
eq.logger.Info(fmt.Sprintf("📋 Added arbitrage opportunity to queue: %.6f ETH profit, priority: %.2f",
profitFloat, item.Priority))
return nil
}
// calculatePriority calculates execution priority based on profit, confidence, and time sensitivity
func (eq *ExecutionQueue) calculatePriority(opp *pkgtypes.ArbitrageOpportunity) float64 {
// Base priority on profit potential
// Convert NetProfit to float64 for priority calculation
profitScore := 0.0
if opp.NetProfit != nil {
profitEth := new(big.Float).Quo(new(big.Float).SetInt(opp.NetProfit), big.NewFloat(1e18))
var accuracy big.Accuracy
profitScore, accuracy = profitEth.Float64()
// Check if the conversion was exact (accuracy == 0) or if there was a loss of precision
if accuracy != big.Exact {
eq.logger.Warn(fmt.Sprintf("NetProfit conversion to float64 may have lost precision, accuracy: %v", accuracy))
}
profitScore *= ProfitScoreScaleFactor // Scale by factor
}
// Boost for high confidence
confidenceBoost := opp.Confidence * ConfidenceBoostFactor
// Boost for large profit margins (indicates stable opportunity)
marginBoost := opp.ROI * MarginBoostFactor // ROI is already a float64 percentage
// Time decay - use current time as opportunities don't have timestamps
// This could be enhanced by adding creation timestamp to ArbitrageOpportunity
timeDecay := DefaultTimeDecayPriority // Default high priority for new opportunities
priority := profitScore + confidenceBoost + marginBoost + timeDecay
return priority
}
// Start begins processing the execution queue
func (eq *ExecutionQueue) Start(ctx context.Context) {
eq.logger.Info("🚀 Starting execution queue processor")
ticker := time.NewTicker(eq.executionRate)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
eq.logger.Info("⏹️ Execution queue stopped")
return
case <-ticker.C:
eq.processNext()
}
}
}
// processNext processes the next item in the queue
func (eq *ExecutionQueue) processNext() {
eq.mu.Lock()
// Check circuit breaker
if eq.circuitBreaker.IsOpen() {
eq.mu.Unlock()
eq.logger.Warn("⚠️ Circuit breaker open, skipping execution")
return
}
if eq.queue.Len() == 0 {
eq.mu.Unlock()
return
}
item := heap.Pop(eq.queue).(*ExecutionItem)
eq.mu.Unlock()
// Execute the opportunity
success := eq.executeOpportunity(item)
if success {
eq.successCount++
// Convert NetProfit to float64 for tracking
profitFloat := 0.0
if item.Opportunity.NetProfit != nil {
profitEth := new(big.Float).Quo(new(big.Float).SetInt(item.Opportunity.NetProfit), big.NewFloat(1e18))
var accuracy big.Accuracy
profitFloat, accuracy = profitEth.Float64()
// Check if the conversion was exact (accuracy == 0) or if there was a loss of precision
if accuracy != big.Exact {
eq.logger.Warn(fmt.Sprintf("NetProfit conversion to float64 may have lost precision, accuracy: %v", accuracy))
}
}
eq.totalProfitUSD += profitFloat
eq.logger.Info(fmt.Sprintf("✅ Execution successful: %.6f ETH profit", profitFloat))
} else {
eq.failureCount++
eq.circuitBreaker.RecordFailure()
// Retry if not exceeded max retries
if item.Retries < item.MaxRetries {
item.Retries++
item.Priority *= 0.9 // Slightly lower priority for retries
eq.mu.Lock()
heap.Push(eq.queue, item)
eq.mu.Unlock()
eq.logger.Warn(fmt.Sprintf("🔄 Execution failed, retrying (%d/%d)", item.Retries, item.MaxRetries))
} else {
eq.logger.Error("❌ Execution failed after max retries")
}
}
eq.totalExecuted++
}
// executeOpportunity executes a single arbitrage opportunity
func (eq *ExecutionQueue) executeOpportunity(item *ExecutionItem) bool {
opp := item.Opportunity
// Convert NetProfit to float64 for logging
profitFloat := 0.0
if opp.NetProfit != nil {
profitEth := new(big.Float).Quo(new(big.Float).SetInt(opp.NetProfit), big.NewFloat(1e18))
var accuracy big.Accuracy
profitFloat, accuracy = profitEth.Float64()
// Check if the conversion was exact (accuracy == 0) or if there was a loss of precision
if accuracy != big.Exact {
eq.logger.Warn(fmt.Sprintf("NetProfit conversion to float64 may have lost precision, accuracy: %v", accuracy))
}
}
// Get exchange info from path if available
exchangeInfo := "multi-DEX"
if len(opp.Path) > 0 {
exchangeInfo = fmt.Sprintf("%d-hop", len(opp.Path))
}
eq.logger.Info(fmt.Sprintf("⚡ Executing arbitrage: %s path, %.6f ETH profit",
exchangeInfo, profitFloat))
// TODO: Implement actual execution logic
// For now, simulate execution with success probability based on confidence
simulatedSuccess := opp.Confidence > ConfidenceThresholdForSuccess // 70% confidence threshold
// Simulate execution time
time.Sleep(SimulatedExecutionTime)
return simulatedSuccess
}
// IsOpen checks if the circuit breaker is open
func (cb *CircuitBreaker) IsOpen() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
// Clean old failures
now := time.Now()
validFailures := make([]time.Time, 0)
for _, failure := range cb.failures {
if now.Sub(failure) < cb.timeWindow {
validFailures = append(validFailures, failure)
}
}
cb.failures = validFailures
// Check if we should open the circuit breaker
if len(cb.failures) >= cb.maxFailures {
cb.isOpen = true
} else {
cb.isOpen = false
}
return cb.isOpen
}
// RecordFailure records a failed execution
func (cb *CircuitBreaker) RecordFailure() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failures = append(cb.failures, time.Now())
}
// GetStats returns execution queue statistics
func (eq *ExecutionQueue) GetStats() map[string]interface{} {
eq.mu.RLock()
defer eq.mu.RUnlock()
successRate := 0.0
if eq.totalExecuted > 0 {
successRate = float64(eq.successCount) / float64(eq.totalExecuted) * 100
}
return map[string]interface{}{
"queue_size": eq.queue.Len(),
"total_executed": eq.totalExecuted,
"success_count": eq.successCount,
"failure_count": eq.failureCount,
"success_rate": successRate,
"total_profit_usd": eq.totalProfitUSD,
"circuit_breaker_open": eq.circuitBreaker.IsOpen(),
}
}