Files
mev-beta/internal/recovery/error_handler.go
Krypto Kajun 850223a953 fix(multicall): resolve critical multicall parsing corruption issues
- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing
- Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives
- Added LRU caching system for address validation with 10-minute TTL
- Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures
- Fixed duplicate function declarations and import conflicts across multiple files
- Added error recovery mechanisms with multiple fallback strategies
- Updated tests to handle new validation behavior for suspicious addresses
- Fixed parser test expectations for improved validation system
- Applied gofmt formatting fixes to ensure code style compliance
- Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot
- Resolved critical security vulnerabilities in heuristic address extraction
- Progress: Updated TODO audit from 10% to 35% complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 00:12:55 -05:00

622 lines
16 KiB
Go

package recovery
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/fraktal/mev-beta/internal/logger"
)
// ErrorSeverity represents the severity level of an error
type ErrorSeverity int
const (
SeverityLow ErrorSeverity = iota
SeverityMedium
SeverityHigh
SeverityCritical
)
func (s ErrorSeverity) String() string {
switch s {
case SeverityLow:
return "LOW"
case SeverityMedium:
return "MEDIUM"
case SeverityHigh:
return "HIGH"
case SeverityCritical:
return "CRITICAL"
default:
return "UNKNOWN"
}
}
// ErrorType categorizes different types of errors
type ErrorType int
const (
ErrorTypeAddressCorruption ErrorType = iota
ErrorTypeContractCallFailed
ErrorTypeRPCConnectionFailed
ErrorTypeDataParsingFailed
ErrorTypeValidationFailed
ErrorTypeTimeoutError
)
func (e ErrorType) String() string {
switch e {
case ErrorTypeAddressCorruption:
return "ADDRESS_CORRUPTION"
case ErrorTypeContractCallFailed:
return "CONTRACT_CALL_FAILED"
case ErrorTypeRPCConnectionFailed:
return "RPC_CONNECTION_FAILED"
case ErrorTypeDataParsingFailed:
return "DATA_PARSING_FAILED"
case ErrorTypeValidationFailed:
return "VALIDATION_FAILED"
case ErrorTypeTimeoutError:
return "TIMEOUT_ERROR"
default:
return "UNKNOWN_ERROR"
}
}
// RecoveryAction represents an action to take when an error occurs
type RecoveryAction int
const (
ActionSkipAndContinue RecoveryAction = iota
ActionRetryWithBackoff
ActionUseFallbackData
ActionCircuitBreaker
ActionEmergencyStop
)
func (a RecoveryAction) String() string {
switch a {
case ActionSkipAndContinue:
return "SKIP_AND_CONTINUE"
case ActionRetryWithBackoff:
return "RETRY_WITH_BACKOFF"
case ActionUseFallbackData:
return "USE_FALLBACK_DATA"
case ActionCircuitBreaker:
return "CIRCUIT_BREAKER"
case ActionEmergencyStop:
return "EMERGENCY_STOP"
default:
return "UNKNOWN_ACTION"
}
}
// ErrorEvent represents a specific error occurrence
type ErrorEvent struct {
Timestamp time.Time
Type ErrorType
Severity ErrorSeverity
Component string
Address common.Address
Message string
Context map[string]interface{}
AttemptCount int
LastAttempt time.Time
Resolved bool
ResolvedAt time.Time
}
// RecoveryRule defines how to handle specific error patterns
type RecoveryRule struct {
ErrorType ErrorType
MaxSeverity ErrorSeverity
Action RecoveryAction
MaxRetries int
BackoffInterval time.Duration
CircuitBreakerThreshold int
ContextMatchers map[string]interface{}
}
// ErrorHandler provides comprehensive error handling and recovery capabilities
type ErrorHandler struct {
mu sync.RWMutex
logger *logger.Logger
errorHistory []ErrorEvent
componentStats map[string]*ComponentStats
circuitBreakers map[string]*CircuitBreaker
recoveryRules []RecoveryRule
fallbackProvider FallbackDataProvider
maxHistorySize int
alertThresholds map[ErrorType]int
enabled bool
}
// ComponentStats tracks error statistics for components
type ComponentStats struct {
mu sync.RWMutex
Component string
TotalErrors int
ErrorsByType map[ErrorType]int
ErrorsBySeverity map[ErrorSeverity]int
LastError time.Time
ConsecutiveFailures int
SuccessCount int
IsHealthy bool
LastHealthCheck time.Time
}
// CircuitBreaker implements circuit breaker pattern for failing components
type CircuitBreaker struct {
mu sync.RWMutex
Name string
State CircuitState
FailureCount int
Threshold int
Timeout time.Duration
LastFailure time.Time
LastSuccess time.Time
HalfOpenAllowed bool
}
type CircuitState int
const (
CircuitClosed CircuitState = iota
CircuitOpen
CircuitHalfOpen
)
func (s CircuitState) String() string {
switch s {
case CircuitClosed:
return "CLOSED"
case CircuitOpen:
return "OPEN"
case CircuitHalfOpen:
return "HALF_OPEN"
default:
return "UNKNOWN"
}
}
// FallbackDataProvider interface for providing fallback data when primary sources fail
type FallbackDataProvider interface {
GetFallbackTokenInfo(ctx context.Context, address common.Address) (*FallbackTokenInfo, error)
GetFallbackPoolInfo(ctx context.Context, address common.Address) (*FallbackPoolInfo, error)
GetFallbackContractType(ctx context.Context, address common.Address) (string, error)
}
type FallbackTokenInfo struct {
Address common.Address
Symbol string
Name string
Decimals uint8
IsVerified bool
Source string
Confidence float64
}
type FallbackPoolInfo struct {
Address common.Address
Token0 common.Address
Token1 common.Address
Protocol string
Fee uint32
IsVerified bool
Source string
Confidence float64
}
// NewErrorHandler creates a new error handler with default configuration
func NewErrorHandler(logger *logger.Logger) *ErrorHandler {
handler := &ErrorHandler{
logger: logger,
errorHistory: make([]ErrorEvent, 0),
componentStats: make(map[string]*ComponentStats),
circuitBreakers: make(map[string]*CircuitBreaker),
maxHistorySize: 1000,
alertThresholds: make(map[ErrorType]int),
enabled: true,
}
// Initialize default recovery rules
handler.initializeDefaultRules()
// Initialize default alert thresholds
handler.initializeAlertThresholds()
return handler
}
// initializeDefaultRules sets up default recovery rules for common error scenarios
func (eh *ErrorHandler) initializeDefaultRules() {
eh.recoveryRules = []RecoveryRule{
{
ErrorType: ErrorTypeAddressCorruption,
MaxSeverity: SeverityMedium,
Action: ActionRetryWithBackoff,
MaxRetries: 2,
BackoffInterval: 500 * time.Millisecond,
},
{
ErrorType: ErrorTypeAddressCorruption,
MaxSeverity: SeverityCritical,
Action: ActionUseFallbackData,
MaxRetries: 0,
BackoffInterval: 0,
},
{
ErrorType: ErrorTypeContractCallFailed,
MaxSeverity: SeverityMedium,
Action: ActionRetryWithBackoff,
MaxRetries: 3,
BackoffInterval: 2 * time.Second,
},
{
ErrorType: ErrorTypeRPCConnectionFailed,
MaxSeverity: SeverityHigh,
Action: ActionCircuitBreaker,
MaxRetries: 5,
BackoffInterval: 5 * time.Second,
CircuitBreakerThreshold: 10,
},
{
ErrorType: ErrorTypeDataParsingFailed,
MaxSeverity: SeverityMedium,
Action: ActionUseFallbackData,
MaxRetries: 2,
BackoffInterval: 1 * time.Second,
},
{
ErrorType: ErrorTypeValidationFailed,
MaxSeverity: SeverityLow,
Action: ActionSkipAndContinue,
MaxRetries: 0,
BackoffInterval: 0,
},
{
ErrorType: ErrorTypeValidationFailed,
MaxSeverity: SeverityHigh,
Action: ActionRetryWithBackoff,
MaxRetries: 1,
BackoffInterval: 500 * time.Millisecond,
},
{
ErrorType: ErrorTypeTimeoutError,
MaxSeverity: SeverityMedium,
Action: ActionRetryWithBackoff,
MaxRetries: 3,
BackoffInterval: 3 * time.Second,
},
}
}
// initializeAlertThresholds sets up alert thresholds for different error types
func (eh *ErrorHandler) initializeAlertThresholds() {
eh.alertThresholds[ErrorTypeAddressCorruption] = 5
eh.alertThresholds[ErrorTypeContractCallFailed] = 20
eh.alertThresholds[ErrorTypeRPCConnectionFailed] = 10
eh.alertThresholds[ErrorTypeDataParsingFailed] = 15
eh.alertThresholds[ErrorTypeValidationFailed] = 25
eh.alertThresholds[ErrorTypeTimeoutError] = 30
}
// HandleError processes an error and determines the appropriate recovery action
func (eh *ErrorHandler) HandleError(ctx context.Context, errorType ErrorType, severity ErrorSeverity, component string, address common.Address, message string, context map[string]interface{}) RecoveryAction {
if !eh.enabled {
return ActionSkipAndContinue
}
eh.mu.Lock()
defer eh.mu.Unlock()
// Record the error event
event := ErrorEvent{
Timestamp: time.Now(),
Type: errorType,
Severity: severity,
Component: component,
Address: address,
Message: message,
Context: context,
AttemptCount: 1,
LastAttempt: time.Now(),
}
// Update error history
eh.addToHistory(event)
// Update component statistics
eh.updateComponentStats(component, errorType, severity)
// Check circuit breakers
if eh.shouldTriggerCircuitBreaker(component, errorType) {
eh.triggerCircuitBreaker(component)
return ActionCircuitBreaker
}
// Find matching recovery rule
rule := eh.findRecoveryRule(errorType, severity, context)
if rule == nil {
// Default action for unmatched errors
return ActionSkipAndContinue
}
// Log the error and recovery action
eh.logger.Error("Error handled by recovery system",
"type", errorType.String(),
"severity", severity.String(),
"component", component,
"address", address.Hex(),
"message", message,
"action", rule.Action.String())
// Check if alert threshold is reached
eh.checkAlertThresholds(errorType)
return rule.Action
}
// addToHistory adds an error event to the history buffer
func (eh *ErrorHandler) addToHistory(event ErrorEvent) {
eh.errorHistory = append(eh.errorHistory, event)
// Trim history if it exceeds max size
if len(eh.errorHistory) > eh.maxHistorySize {
eh.errorHistory = eh.errorHistory[len(eh.errorHistory)-eh.maxHistorySize:]
}
}
// updateComponentStats updates statistics for a component
func (eh *ErrorHandler) updateComponentStats(component string, errorType ErrorType, severity ErrorSeverity) {
stats, exists := eh.componentStats[component]
if !exists {
stats = &ComponentStats{
Component: component,
ErrorsByType: make(map[ErrorType]int),
ErrorsBySeverity: make(map[ErrorSeverity]int),
IsHealthy: true,
}
eh.componentStats[component] = stats
}
stats.mu.Lock()
defer stats.mu.Unlock()
stats.TotalErrors++
stats.ErrorsByType[errorType]++
stats.ErrorsBySeverity[severity]++
stats.LastError = time.Now()
stats.ConsecutiveFailures++
// Mark as unhealthy if too many consecutive failures
if stats.ConsecutiveFailures > 10 {
stats.IsHealthy = false
}
}
// findRecoveryRule finds the best matching recovery rule for an error
func (eh *ErrorHandler) findRecoveryRule(errorType ErrorType, severity ErrorSeverity, context map[string]interface{}) *RecoveryRule {
for _, rule := range eh.recoveryRules {
if rule.ErrorType == errorType && severity <= rule.MaxSeverity {
// Check context matchers if present
if len(rule.ContextMatchers) > 0 {
if !eh.matchesContext(context, rule.ContextMatchers) {
continue
}
}
return &rule
}
}
return nil
}
// matchesContext checks if the error context matches the rule's context matchers
func (eh *ErrorHandler) matchesContext(errorContext, ruleMatchers map[string]interface{}) bool {
for key, expectedValue := range ruleMatchers {
if actualValue, exists := errorContext[key]; !exists || actualValue != expectedValue {
return false
}
}
return true
}
// shouldTriggerCircuitBreaker determines if a circuit breaker should be triggered
func (eh *ErrorHandler) shouldTriggerCircuitBreaker(component string, errorType ErrorType) bool {
stats, exists := eh.componentStats[component]
if !exists {
return false
}
stats.mu.RLock()
defer stats.mu.RUnlock()
// Trigger if consecutive failures exceed threshold for critical errors
if errorType == ErrorTypeRPCConnectionFailed && stats.ConsecutiveFailures >= 5 {
return true
}
if errorType == ErrorTypeAddressCorruption && stats.ConsecutiveFailures >= 3 {
return true
}
return false
}
// triggerCircuitBreaker activates a circuit breaker for a component
func (eh *ErrorHandler) triggerCircuitBreaker(component string) {
breaker := &CircuitBreaker{
Name: component,
State: CircuitOpen,
FailureCount: 0,
Threshold: 5,
Timeout: 30 * time.Second,
LastFailure: time.Now(),
}
eh.circuitBreakers[component] = breaker
eh.logger.Warn("Circuit breaker triggered",
"component", component,
"timeout", breaker.Timeout)
}
// checkAlertThresholds checks if error counts have reached alert thresholds
func (eh *ErrorHandler) checkAlertThresholds(errorType ErrorType) {
threshold, exists := eh.alertThresholds[errorType]
if !exists {
return
}
// Count recent errors of this type (last hour)
recentCount := 0
cutoff := time.Now().Add(-1 * time.Hour)
for _, event := range eh.errorHistory {
if event.Type == errorType && event.Timestamp.After(cutoff) {
recentCount++
}
}
if recentCount >= threshold {
eh.logger.Warn("Error threshold reached - alert triggered",
"error_type", errorType.String(),
"count", recentCount,
"threshold", threshold)
// Here you would trigger your alerting system
}
}
// GetComponentHealth returns the health status of all components
func (eh *ErrorHandler) GetComponentHealth() map[string]*ComponentStats {
eh.mu.RLock()
defer eh.mu.RUnlock()
// Return a copy to prevent external modification
result := make(map[string]*ComponentStats)
for name, stats := range eh.componentStats {
result[name] = &ComponentStats{
Component: stats.Component,
TotalErrors: stats.TotalErrors,
ErrorsByType: make(map[ErrorType]int),
ErrorsBySeverity: make(map[ErrorSeverity]int),
LastError: stats.LastError,
ConsecutiveFailures: stats.ConsecutiveFailures,
SuccessCount: stats.SuccessCount,
IsHealthy: stats.IsHealthy,
LastHealthCheck: stats.LastHealthCheck,
}
// Copy maps
for k, v := range stats.ErrorsByType {
result[name].ErrorsByType[k] = v
}
for k, v := range stats.ErrorsBySeverity {
result[name].ErrorsBySeverity[k] = v
}
}
return result
}
// RecordSuccess records a successful operation for a component
func (eh *ErrorHandler) RecordSuccess(component string) {
eh.mu.Lock()
defer eh.mu.Unlock()
stats, exists := eh.componentStats[component]
if !exists {
stats = &ComponentStats{
Component: component,
ErrorsByType: make(map[ErrorType]int),
ErrorsBySeverity: make(map[ErrorSeverity]int),
IsHealthy: true,
}
eh.componentStats[component] = stats
}
stats.mu.Lock()
defer stats.mu.Unlock()
stats.SuccessCount++
stats.ConsecutiveFailures = 0
stats.IsHealthy = true
stats.LastHealthCheck = time.Now()
// Reset circuit breaker if it exists
if breaker, exists := eh.circuitBreakers[component]; exists {
breaker.mu.Lock()
breaker.State = CircuitClosed
breaker.FailureCount = 0
breaker.LastSuccess = time.Now()
breaker.mu.Unlock()
}
}
// IsCircuitOpen checks if a circuit breaker is open for a component
func (eh *ErrorHandler) IsCircuitOpen(component string) bool {
eh.mu.RLock()
defer eh.mu.RUnlock()
breaker, exists := eh.circuitBreakers[component]
if !exists {
return false
}
breaker.mu.RLock()
defer breaker.mu.RUnlock()
if breaker.State == CircuitOpen {
// Check if timeout has passed
if time.Since(breaker.LastFailure) > breaker.Timeout {
breaker.State = CircuitHalfOpen
breaker.HalfOpenAllowed = true
return false
}
return true
}
return false
}
// SetFallbackProvider sets the fallback data provider
func (eh *ErrorHandler) SetFallbackProvider(provider FallbackDataProvider) {
eh.mu.Lock()
defer eh.mu.Unlock()
eh.fallbackProvider = provider
}
// GetErrorSummary returns a summary of recent errors
func (eh *ErrorHandler) GetErrorSummary(duration time.Duration) map[string]interface{} {
eh.mu.RLock()
defer eh.mu.RUnlock()
cutoff := time.Now().Add(-duration)
summary := map[string]interface{}{
"total_errors": 0,
"errors_by_type": make(map[string]int),
"errors_by_severity": make(map[string]int),
"errors_by_component": make(map[string]int),
"time_range": duration.String(),
}
for _, event := range eh.errorHistory {
if event.Timestamp.After(cutoff) {
summary["total_errors"] = summary["total_errors"].(int) + 1
typeKey := event.Type.String()
summary["errors_by_type"].(map[string]int)[typeKey]++
severityKey := event.Severity.String()
summary["errors_by_severity"].(map[string]int)[severityKey]++
summary["errors_by_component"].(map[string]int)[event.Component]++
}
}
return summary
}