- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing - Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives - Added LRU caching system for address validation with 10-minute TTL - Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures - Fixed duplicate function declarations and import conflicts across multiple files - Added error recovery mechanisms with multiple fallback strategies - Updated tests to handle new validation behavior for suspicious addresses - Fixed parser test expectations for improved validation system - Applied gofmt formatting fixes to ensure code style compliance - Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot - Resolved critical security vulnerabilities in heuristic address extraction - Progress: Updated TODO audit from 10% to 35% complete 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
368 lines
10 KiB
Go
368 lines
10 KiB
Go
// Package execution implements the execution layer for the MEV bot.
|
|
// It manages prioritized execution of arbitrage opportunities with features like
|
|
// priority-based queues, circuit breakers, rate limiting, and retry logic.
|
|
package execution
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
pkgtypes "github.com/fraktal/mev-beta/pkg/types"
|
|
)
|
|
|
|
var (
|
|
// ErrQueueFullLowerPriority indicates that the queue is full and the new opportunity has lower priority than existing items
|
|
ErrQueueFullLowerPriority = errors.New("queue full and new opportunity has lower priority")
|
|
)
|
|
|
|
const (
|
|
// Default queue configuration
|
|
DefaultMaxQueueSize = 100
|
|
DefaultExecutionRate = 500 * time.Millisecond
|
|
DefaultMaxRetries = 3
|
|
|
|
// Circuit breaker configuration
|
|
CircuitBreakerMaxFailures = 5
|
|
CircuitBreakerTimeWindow = 5 * time.Minute
|
|
|
|
// Priority calculation factors
|
|
ProfitScoreScaleFactor = 10.0
|
|
ConfidenceBoostFactor = 20.0
|
|
MarginBoostFactor = 10.0
|
|
DefaultTimeDecayPriority = 100.0
|
|
|
|
// Execution simulation parameters
|
|
ConfidenceThresholdForSuccess = 0.7
|
|
SimulatedExecutionTime = 200 * time.Millisecond
|
|
)
|
|
|
|
// ExecutionQueue manages prioritized execution of arbitrage opportunities
|
|
type ExecutionQueue struct {
|
|
logger *logger.Logger
|
|
queue *PriorityQueue
|
|
mu sync.RWMutex
|
|
maxQueueSize int
|
|
executionRate time.Duration
|
|
circuitBreaker *CircuitBreaker
|
|
|
|
// Execution stats
|
|
totalExecuted int64
|
|
successCount int64
|
|
failureCount int64
|
|
totalProfitUSD float64
|
|
}
|
|
|
|
// ExecutionItem represents an arbitrage opportunity in the execution queue
|
|
type ExecutionItem struct {
|
|
Opportunity *pkgtypes.ArbitrageOpportunity
|
|
Priority float64 // Higher = more urgent
|
|
Timestamp time.Time
|
|
Retries int
|
|
MaxRetries int
|
|
}
|
|
|
|
// PriorityQueue implements a priority queue for execution items
|
|
type PriorityQueue []*ExecutionItem
|
|
|
|
func (pq PriorityQueue) Len() int { return len(pq) }
|
|
|
|
func (pq PriorityQueue) Less(i, j int) bool {
|
|
// Higher priority first, then by timestamp for tie-breaking
|
|
if pq[i].Priority != pq[j].Priority {
|
|
return pq[i].Priority > pq[j].Priority
|
|
}
|
|
return pq[i].Timestamp.Before(pq[j].Timestamp)
|
|
}
|
|
|
|
func (pq PriorityQueue) Swap(i, j int) {
|
|
pq[i], pq[j] = pq[j], pq[i]
|
|
}
|
|
|
|
func (pq *PriorityQueue) Push(x interface{}) {
|
|
*pq = append(*pq, x.(*ExecutionItem))
|
|
}
|
|
|
|
func (pq *PriorityQueue) Pop() interface{} {
|
|
old := *pq
|
|
n := len(old)
|
|
item := old[n-1]
|
|
*pq = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
// CircuitBreaker prevents execution when too many failures occur
|
|
type CircuitBreaker struct {
|
|
maxFailures int
|
|
timeWindow time.Duration
|
|
failures []time.Time
|
|
isOpen bool
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewExecutionQueue creates a new execution queue
|
|
func NewExecutionQueue(logger *logger.Logger) *ExecutionQueue {
|
|
pq := &PriorityQueue{}
|
|
heap.Init(pq)
|
|
|
|
return &ExecutionQueue{
|
|
logger: logger,
|
|
queue: pq,
|
|
maxQueueSize: DefaultMaxQueueSize,
|
|
executionRate: DefaultExecutionRate, // Execute every 500ms max
|
|
circuitBreaker: &CircuitBreaker{
|
|
maxFailures: CircuitBreakerMaxFailures,
|
|
timeWindow: CircuitBreakerTimeWindow,
|
|
failures: make([]time.Time, 0),
|
|
},
|
|
}
|
|
}
|
|
|
|
// AddOpportunity adds an arbitrage opportunity to the execution queue
|
|
func (eq *ExecutionQueue) AddOpportunity(opportunity *pkgtypes.ArbitrageOpportunity) error {
|
|
eq.mu.Lock()
|
|
defer eq.mu.Unlock()
|
|
|
|
// Check if queue is full
|
|
if eq.queue.Len() >= eq.maxQueueSize {
|
|
// Remove lowest priority item
|
|
if eq.queue.Len() > 0 {
|
|
lowestPriorityItem := (*eq.queue)[eq.queue.Len()-1]
|
|
if eq.calculatePriority(opportunity) > lowestPriorityItem.Priority {
|
|
heap.Pop(eq.queue) // Remove lowest priority
|
|
eq.logger.Info("Queue full, replaced lower priority opportunity")
|
|
} else {
|
|
return ErrQueueFullLowerPriority
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create execution item
|
|
item := &ExecutionItem{
|
|
Opportunity: opportunity,
|
|
Priority: eq.calculatePriority(opportunity),
|
|
Timestamp: time.Now(),
|
|
MaxRetries: DefaultMaxRetries,
|
|
}
|
|
|
|
heap.Push(eq.queue, item)
|
|
|
|
// Convert UniversalDecimal to float64 for display
|
|
profitFloat := 0.0
|
|
if opportunity.NetProfit != nil {
|
|
profitEth := new(big.Float).Quo(new(big.Float).SetInt(opportunity.NetProfit), big.NewFloat(1e18))
|
|
var accuracy big.Accuracy
|
|
profitFloat, accuracy = profitEth.Float64()
|
|
// Check if the conversion was exact (accuracy == 0) or if there was a loss of precision
|
|
if accuracy != big.Exact {
|
|
eq.logger.Warn(fmt.Sprintf("NetProfit conversion to float64 may have lost precision, accuracy: %v", accuracy))
|
|
}
|
|
}
|
|
eq.logger.Info(fmt.Sprintf("📋 Added arbitrage opportunity to queue: %.6f ETH profit, priority: %.2f",
|
|
profitFloat, item.Priority))
|
|
|
|
return nil
|
|
}
|
|
|
|
// calculatePriority calculates execution priority based on profit, confidence, and time sensitivity
|
|
func (eq *ExecutionQueue) calculatePriority(opp *pkgtypes.ArbitrageOpportunity) float64 {
|
|
// Base priority on profit potential
|
|
// Convert NetProfit to float64 for priority calculation
|
|
profitScore := 0.0
|
|
if opp.NetProfit != nil {
|
|
profitEth := new(big.Float).Quo(new(big.Float).SetInt(opp.NetProfit), big.NewFloat(1e18))
|
|
var accuracy big.Accuracy
|
|
profitScore, accuracy = profitEth.Float64()
|
|
// Check if the conversion was exact (accuracy == 0) or if there was a loss of precision
|
|
if accuracy != big.Exact {
|
|
eq.logger.Warn(fmt.Sprintf("NetProfit conversion to float64 may have lost precision, accuracy: %v", accuracy))
|
|
}
|
|
profitScore *= ProfitScoreScaleFactor // Scale by factor
|
|
}
|
|
|
|
// Boost for high confidence
|
|
confidenceBoost := opp.Confidence * ConfidenceBoostFactor
|
|
|
|
// Boost for large profit margins (indicates stable opportunity)
|
|
marginBoost := opp.ROI * MarginBoostFactor // ROI is already a float64 percentage
|
|
|
|
// Time decay - use current time as opportunities don't have timestamps
|
|
// This could be enhanced by adding creation timestamp to ArbitrageOpportunity
|
|
timeDecay := DefaultTimeDecayPriority // Default high priority for new opportunities
|
|
|
|
priority := profitScore + confidenceBoost + marginBoost + timeDecay
|
|
|
|
return priority
|
|
}
|
|
|
|
// Start begins processing the execution queue
|
|
func (eq *ExecutionQueue) Start(ctx context.Context) {
|
|
eq.logger.Info("🚀 Starting execution queue processor")
|
|
|
|
ticker := time.NewTicker(eq.executionRate)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
eq.logger.Info("⏹️ Execution queue stopped")
|
|
return
|
|
case <-ticker.C:
|
|
eq.processNext()
|
|
}
|
|
}
|
|
}
|
|
|
|
// processNext processes the next item in the queue
|
|
func (eq *ExecutionQueue) processNext() {
|
|
eq.mu.Lock()
|
|
|
|
// Check circuit breaker
|
|
if eq.circuitBreaker.IsOpen() {
|
|
eq.mu.Unlock()
|
|
eq.logger.Warn("⚠️ Circuit breaker open, skipping execution")
|
|
return
|
|
}
|
|
|
|
if eq.queue.Len() == 0 {
|
|
eq.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
item := heap.Pop(eq.queue).(*ExecutionItem)
|
|
eq.mu.Unlock()
|
|
|
|
// Execute the opportunity
|
|
success := eq.executeOpportunity(item)
|
|
|
|
if success {
|
|
eq.successCount++
|
|
// Convert NetProfit to float64 for tracking
|
|
profitFloat := 0.0
|
|
if item.Opportunity.NetProfit != nil {
|
|
profitEth := new(big.Float).Quo(new(big.Float).SetInt(item.Opportunity.NetProfit), big.NewFloat(1e18))
|
|
var accuracy big.Accuracy
|
|
profitFloat, accuracy = profitEth.Float64()
|
|
// Check if the conversion was exact (accuracy == 0) or if there was a loss of precision
|
|
if accuracy != big.Exact {
|
|
eq.logger.Warn(fmt.Sprintf("NetProfit conversion to float64 may have lost precision, accuracy: %v", accuracy))
|
|
}
|
|
}
|
|
eq.totalProfitUSD += profitFloat
|
|
eq.logger.Info(fmt.Sprintf("✅ Execution successful: %.6f ETH profit", profitFloat))
|
|
} else {
|
|
eq.failureCount++
|
|
eq.circuitBreaker.RecordFailure()
|
|
|
|
// Retry if not exceeded max retries
|
|
if item.Retries < item.MaxRetries {
|
|
item.Retries++
|
|
item.Priority *= 0.9 // Slightly lower priority for retries
|
|
|
|
eq.mu.Lock()
|
|
heap.Push(eq.queue, item)
|
|
eq.mu.Unlock()
|
|
|
|
eq.logger.Warn(fmt.Sprintf("🔄 Execution failed, retrying (%d/%d)", item.Retries, item.MaxRetries))
|
|
} else {
|
|
eq.logger.Error("❌ Execution failed after max retries")
|
|
}
|
|
}
|
|
|
|
eq.totalExecuted++
|
|
}
|
|
|
|
// executeOpportunity executes a single arbitrage opportunity
|
|
func (eq *ExecutionQueue) executeOpportunity(item *ExecutionItem) bool {
|
|
opp := item.Opportunity
|
|
|
|
// Convert NetProfit to float64 for logging
|
|
profitFloat := 0.0
|
|
if opp.NetProfit != nil {
|
|
profitEth := new(big.Float).Quo(new(big.Float).SetInt(opp.NetProfit), big.NewFloat(1e18))
|
|
var accuracy big.Accuracy
|
|
profitFloat, accuracy = profitEth.Float64()
|
|
// Check if the conversion was exact (accuracy == 0) or if there was a loss of precision
|
|
if accuracy != big.Exact {
|
|
eq.logger.Warn(fmt.Sprintf("NetProfit conversion to float64 may have lost precision, accuracy: %v", accuracy))
|
|
}
|
|
}
|
|
|
|
// Get exchange info from path if available
|
|
exchangeInfo := "multi-DEX"
|
|
if len(opp.Path) > 0 {
|
|
exchangeInfo = fmt.Sprintf("%d-hop", len(opp.Path))
|
|
}
|
|
|
|
eq.logger.Info(fmt.Sprintf("⚡ Executing arbitrage: %s path, %.6f ETH profit",
|
|
exchangeInfo, profitFloat))
|
|
|
|
// TODO: Implement actual execution logic
|
|
// For now, simulate execution with success probability based on confidence
|
|
simulatedSuccess := opp.Confidence > ConfidenceThresholdForSuccess // 70% confidence threshold
|
|
|
|
// Simulate execution time
|
|
time.Sleep(SimulatedExecutionTime)
|
|
|
|
return simulatedSuccess
|
|
}
|
|
|
|
// IsOpen checks if the circuit breaker is open
|
|
func (cb *CircuitBreaker) IsOpen() bool {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
// Clean old failures
|
|
now := time.Now()
|
|
validFailures := make([]time.Time, 0)
|
|
for _, failure := range cb.failures {
|
|
if now.Sub(failure) < cb.timeWindow {
|
|
validFailures = append(validFailures, failure)
|
|
}
|
|
}
|
|
cb.failures = validFailures
|
|
|
|
// Check if we should open the circuit breaker
|
|
if len(cb.failures) >= cb.maxFailures {
|
|
cb.isOpen = true
|
|
} else {
|
|
cb.isOpen = false
|
|
}
|
|
|
|
return cb.isOpen
|
|
}
|
|
|
|
// RecordFailure records a failed execution
|
|
func (cb *CircuitBreaker) RecordFailure() {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
cb.failures = append(cb.failures, time.Now())
|
|
}
|
|
|
|
// GetStats returns execution queue statistics
|
|
func (eq *ExecutionQueue) GetStats() map[string]interface{} {
|
|
eq.mu.RLock()
|
|
defer eq.mu.RUnlock()
|
|
|
|
successRate := 0.0
|
|
if eq.totalExecuted > 0 {
|
|
successRate = float64(eq.successCount) / float64(eq.totalExecuted) * 100
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"queue_size": eq.queue.Len(),
|
|
"total_executed": eq.totalExecuted,
|
|
"success_count": eq.successCount,
|
|
"failure_count": eq.failureCount,
|
|
"success_rate": successRate,
|
|
"total_profit_usd": eq.totalProfitUSD,
|
|
"circuit_breaker_open": eq.circuitBreaker.IsOpen(),
|
|
}
|
|
}
|