package recovery import ( "context" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/fraktal/mev-beta/internal/logger" ) // ErrorSeverity represents the severity level of an error type ErrorSeverity int const ( SeverityLow ErrorSeverity = iota SeverityMedium SeverityHigh SeverityCritical ) func (s ErrorSeverity) String() string { switch s { case SeverityLow: return "LOW" case SeverityMedium: return "MEDIUM" case SeverityHigh: return "HIGH" case SeverityCritical: return "CRITICAL" default: return "UNKNOWN" } } // ErrorType categorizes different types of errors type ErrorType int const ( ErrorTypeAddressCorruption ErrorType = iota ErrorTypeContractCallFailed ErrorTypeRPCConnectionFailed ErrorTypeDataParsingFailed ErrorTypeValidationFailed ErrorTypeTimeoutError ) func (e ErrorType) String() string { switch e { case ErrorTypeAddressCorruption: return "ADDRESS_CORRUPTION" case ErrorTypeContractCallFailed: return "CONTRACT_CALL_FAILED" case ErrorTypeRPCConnectionFailed: return "RPC_CONNECTION_FAILED" case ErrorTypeDataParsingFailed: return "DATA_PARSING_FAILED" case ErrorTypeValidationFailed: return "VALIDATION_FAILED" case ErrorTypeTimeoutError: return "TIMEOUT_ERROR" default: return "UNKNOWN_ERROR" } } // RecoveryAction represents an action to take when an error occurs type RecoveryAction int const ( ActionSkipAndContinue RecoveryAction = iota ActionRetryWithBackoff ActionUseFallbackData ActionCircuitBreaker ActionEmergencyStop ) func (a RecoveryAction) String() string { switch a { case ActionSkipAndContinue: return "SKIP_AND_CONTINUE" case ActionRetryWithBackoff: return "RETRY_WITH_BACKOFF" case ActionUseFallbackData: return "USE_FALLBACK_DATA" case ActionCircuitBreaker: return "CIRCUIT_BREAKER" case ActionEmergencyStop: return "EMERGENCY_STOP" default: return "UNKNOWN_ACTION" } } // ErrorEvent represents a specific error occurrence type ErrorEvent struct { Timestamp time.Time Type ErrorType Severity ErrorSeverity Component string Address common.Address Message string Context map[string]interface{} AttemptCount int LastAttempt time.Time Resolved bool ResolvedAt time.Time } // RecoveryRule defines how to handle specific error patterns type RecoveryRule struct { ErrorType ErrorType MaxSeverity ErrorSeverity Action RecoveryAction MaxRetries int BackoffInterval time.Duration CircuitBreakerThreshold int ContextMatchers map[string]interface{} } // ErrorHandler provides comprehensive error handling and recovery capabilities type ErrorHandler struct { mu sync.RWMutex logger *logger.Logger errorHistory []ErrorEvent componentStats map[string]*ComponentStats circuitBreakers map[string]*CircuitBreaker recoveryRules []RecoveryRule fallbackProvider FallbackDataProvider maxHistorySize int alertThresholds map[ErrorType]int enabled bool } // ComponentStats tracks error statistics for components type ComponentStats struct { mu sync.RWMutex Component string TotalErrors int ErrorsByType map[ErrorType]int ErrorsBySeverity map[ErrorSeverity]int LastError time.Time ConsecutiveFailures int SuccessCount int IsHealthy bool LastHealthCheck time.Time } // CircuitBreaker implements circuit breaker pattern for failing components type CircuitBreaker struct { mu sync.RWMutex Name string State CircuitState FailureCount int Threshold int Timeout time.Duration LastFailure time.Time LastSuccess time.Time HalfOpenAllowed bool } type CircuitState int const ( CircuitClosed CircuitState = iota CircuitOpen CircuitHalfOpen ) func (s CircuitState) String() string { switch s { case CircuitClosed: return "CLOSED" case CircuitOpen: return "OPEN" case CircuitHalfOpen: return "HALF_OPEN" default: return "UNKNOWN" } } // FallbackDataProvider interface for providing fallback data when primary sources fail type FallbackDataProvider interface { GetFallbackTokenInfo(ctx context.Context, address common.Address) (*FallbackTokenInfo, error) GetFallbackPoolInfo(ctx context.Context, address common.Address) (*FallbackPoolInfo, error) GetFallbackContractType(ctx context.Context, address common.Address) (string, error) } type FallbackTokenInfo struct { Address common.Address Symbol string Name string Decimals uint8 IsVerified bool Source string Confidence float64 } type FallbackPoolInfo struct { Address common.Address Token0 common.Address Token1 common.Address Protocol string Fee uint32 IsVerified bool Source string Confidence float64 } // NewErrorHandler creates a new error handler with default configuration func NewErrorHandler(logger *logger.Logger) *ErrorHandler { handler := &ErrorHandler{ logger: logger, errorHistory: make([]ErrorEvent, 0), componentStats: make(map[string]*ComponentStats), circuitBreakers: make(map[string]*CircuitBreaker), maxHistorySize: 1000, alertThresholds: make(map[ErrorType]int), enabled: true, } // Initialize default recovery rules handler.initializeDefaultRules() // Initialize default alert thresholds handler.initializeAlertThresholds() return handler } // initializeDefaultRules sets up default recovery rules for common error scenarios func (eh *ErrorHandler) initializeDefaultRules() { eh.recoveryRules = []RecoveryRule{ { ErrorType: ErrorTypeAddressCorruption, MaxSeverity: SeverityMedium, Action: ActionRetryWithBackoff, MaxRetries: 2, BackoffInterval: 500 * time.Millisecond, }, { ErrorType: ErrorTypeAddressCorruption, MaxSeverity: SeverityCritical, Action: ActionUseFallbackData, MaxRetries: 0, BackoffInterval: 0, }, { ErrorType: ErrorTypeContractCallFailed, MaxSeverity: SeverityMedium, Action: ActionRetryWithBackoff, MaxRetries: 3, BackoffInterval: 2 * time.Second, }, { ErrorType: ErrorTypeRPCConnectionFailed, MaxSeverity: SeverityHigh, Action: ActionCircuitBreaker, MaxRetries: 5, BackoffInterval: 5 * time.Second, CircuitBreakerThreshold: 10, }, { ErrorType: ErrorTypeDataParsingFailed, MaxSeverity: SeverityMedium, Action: ActionUseFallbackData, MaxRetries: 2, BackoffInterval: 1 * time.Second, }, { ErrorType: ErrorTypeValidationFailed, MaxSeverity: SeverityLow, Action: ActionSkipAndContinue, MaxRetries: 0, BackoffInterval: 0, }, { ErrorType: ErrorTypeValidationFailed, MaxSeverity: SeverityHigh, Action: ActionRetryWithBackoff, MaxRetries: 1, BackoffInterval: 500 * time.Millisecond, }, { ErrorType: ErrorTypeTimeoutError, MaxSeverity: SeverityMedium, Action: ActionRetryWithBackoff, MaxRetries: 3, BackoffInterval: 3 * time.Second, }, } } // initializeAlertThresholds sets up alert thresholds for different error types func (eh *ErrorHandler) initializeAlertThresholds() { eh.alertThresholds[ErrorTypeAddressCorruption] = 5 eh.alertThresholds[ErrorTypeContractCallFailed] = 20 eh.alertThresholds[ErrorTypeRPCConnectionFailed] = 10 eh.alertThresholds[ErrorTypeDataParsingFailed] = 15 eh.alertThresholds[ErrorTypeValidationFailed] = 25 eh.alertThresholds[ErrorTypeTimeoutError] = 30 } // HandleError processes an error and determines the appropriate recovery action func (eh *ErrorHandler) HandleError(ctx context.Context, errorType ErrorType, severity ErrorSeverity, component string, address common.Address, message string, context map[string]interface{}) RecoveryAction { if !eh.enabled { return ActionSkipAndContinue } eh.mu.Lock() defer eh.mu.Unlock() // Record the error event event := ErrorEvent{ Timestamp: time.Now(), Type: errorType, Severity: severity, Component: component, Address: address, Message: message, Context: context, AttemptCount: 1, LastAttempt: time.Now(), } // Update error history eh.addToHistory(event) // Update component statistics eh.updateComponentStats(component, errorType, severity) // Check circuit breakers if eh.shouldTriggerCircuitBreaker(component, errorType) { eh.triggerCircuitBreaker(component) return ActionCircuitBreaker } // Find matching recovery rule rule := eh.findRecoveryRule(errorType, severity, context) if rule == nil { // Default action for unmatched errors return ActionSkipAndContinue } // Log the error and recovery action eh.logger.Error("Error handled by recovery system", "type", errorType.String(), "severity", severity.String(), "component", component, "address", address.Hex(), "message", message, "action", rule.Action.String()) // Check if alert threshold is reached eh.checkAlertThresholds(errorType) return rule.Action } // addToHistory adds an error event to the history buffer func (eh *ErrorHandler) addToHistory(event ErrorEvent) { eh.errorHistory = append(eh.errorHistory, event) // Trim history if it exceeds max size if len(eh.errorHistory) > eh.maxHistorySize { eh.errorHistory = eh.errorHistory[len(eh.errorHistory)-eh.maxHistorySize:] } } // updateComponentStats updates statistics for a component func (eh *ErrorHandler) updateComponentStats(component string, errorType ErrorType, severity ErrorSeverity) { stats, exists := eh.componentStats[component] if !exists { stats = &ComponentStats{ Component: component, ErrorsByType: make(map[ErrorType]int), ErrorsBySeverity: make(map[ErrorSeverity]int), IsHealthy: true, } eh.componentStats[component] = stats } stats.mu.Lock() defer stats.mu.Unlock() stats.TotalErrors++ stats.ErrorsByType[errorType]++ stats.ErrorsBySeverity[severity]++ stats.LastError = time.Now() stats.ConsecutiveFailures++ // Mark as unhealthy if too many consecutive failures if stats.ConsecutiveFailures > 10 { stats.IsHealthy = false } } // findRecoveryRule finds the best matching recovery rule for an error func (eh *ErrorHandler) findRecoveryRule(errorType ErrorType, severity ErrorSeverity, context map[string]interface{}) *RecoveryRule { for _, rule := range eh.recoveryRules { if rule.ErrorType == errorType && severity <= rule.MaxSeverity { // Check context matchers if present if len(rule.ContextMatchers) > 0 { if !eh.matchesContext(context, rule.ContextMatchers) { continue } } return &rule } } return nil } // matchesContext checks if the error context matches the rule's context matchers func (eh *ErrorHandler) matchesContext(errorContext, ruleMatchers map[string]interface{}) bool { for key, expectedValue := range ruleMatchers { if actualValue, exists := errorContext[key]; !exists || actualValue != expectedValue { return false } } return true } // shouldTriggerCircuitBreaker determines if a circuit breaker should be triggered func (eh *ErrorHandler) shouldTriggerCircuitBreaker(component string, errorType ErrorType) bool { stats, exists := eh.componentStats[component] if !exists { return false } stats.mu.RLock() defer stats.mu.RUnlock() // Trigger if consecutive failures exceed threshold for critical errors if errorType == ErrorTypeRPCConnectionFailed && stats.ConsecutiveFailures >= 5 { return true } if errorType == ErrorTypeAddressCorruption && stats.ConsecutiveFailures >= 3 { return true } return false } // triggerCircuitBreaker activates a circuit breaker for a component func (eh *ErrorHandler) triggerCircuitBreaker(component string) { breaker := &CircuitBreaker{ Name: component, State: CircuitOpen, FailureCount: 0, Threshold: 5, Timeout: 30 * time.Second, LastFailure: time.Now(), } eh.circuitBreakers[component] = breaker eh.logger.Warn("Circuit breaker triggered", "component", component, "timeout", breaker.Timeout) } // checkAlertThresholds checks if error counts have reached alert thresholds func (eh *ErrorHandler) checkAlertThresholds(errorType ErrorType) { threshold, exists := eh.alertThresholds[errorType] if !exists { return } // Count recent errors of this type (last hour) recentCount := 0 cutoff := time.Now().Add(-1 * time.Hour) for _, event := range eh.errorHistory { if event.Type == errorType && event.Timestamp.After(cutoff) { recentCount++ } } if recentCount >= threshold { eh.logger.Warn("Error threshold reached - alert triggered", "error_type", errorType.String(), "count", recentCount, "threshold", threshold) // Here you would trigger your alerting system } } // GetComponentHealth returns the health status of all components func (eh *ErrorHandler) GetComponentHealth() map[string]*ComponentStats { eh.mu.RLock() defer eh.mu.RUnlock() // Return a copy to prevent external modification result := make(map[string]*ComponentStats) for name, stats := range eh.componentStats { result[name] = &ComponentStats{ Component: stats.Component, TotalErrors: stats.TotalErrors, ErrorsByType: make(map[ErrorType]int), ErrorsBySeverity: make(map[ErrorSeverity]int), LastError: stats.LastError, ConsecutiveFailures: stats.ConsecutiveFailures, SuccessCount: stats.SuccessCount, IsHealthy: stats.IsHealthy, LastHealthCheck: stats.LastHealthCheck, } // Copy maps for k, v := range stats.ErrorsByType { result[name].ErrorsByType[k] = v } for k, v := range stats.ErrorsBySeverity { result[name].ErrorsBySeverity[k] = v } } return result } // RecordSuccess records a successful operation for a component func (eh *ErrorHandler) RecordSuccess(component string) { eh.mu.Lock() defer eh.mu.Unlock() stats, exists := eh.componentStats[component] if !exists { stats = &ComponentStats{ Component: component, ErrorsByType: make(map[ErrorType]int), ErrorsBySeverity: make(map[ErrorSeverity]int), IsHealthy: true, } eh.componentStats[component] = stats } stats.mu.Lock() defer stats.mu.Unlock() stats.SuccessCount++ stats.ConsecutiveFailures = 0 stats.IsHealthy = true stats.LastHealthCheck = time.Now() // Reset circuit breaker if it exists if breaker, exists := eh.circuitBreakers[component]; exists { breaker.mu.Lock() breaker.State = CircuitClosed breaker.FailureCount = 0 breaker.LastSuccess = time.Now() breaker.mu.Unlock() } } // IsCircuitOpen checks if a circuit breaker is open for a component func (eh *ErrorHandler) IsCircuitOpen(component string) bool { eh.mu.RLock() defer eh.mu.RUnlock() breaker, exists := eh.circuitBreakers[component] if !exists { return false } breaker.mu.RLock() defer breaker.mu.RUnlock() if breaker.State == CircuitOpen { // Check if timeout has passed if time.Since(breaker.LastFailure) > breaker.Timeout { breaker.State = CircuitHalfOpen breaker.HalfOpenAllowed = true return false } return true } return false } // SetFallbackProvider sets the fallback data provider func (eh *ErrorHandler) SetFallbackProvider(provider FallbackDataProvider) { eh.mu.Lock() defer eh.mu.Unlock() eh.fallbackProvider = provider } // GetErrorSummary returns a summary of recent errors func (eh *ErrorHandler) GetErrorSummary(duration time.Duration) map[string]interface{} { eh.mu.RLock() defer eh.mu.RUnlock() cutoff := time.Now().Add(-duration) summary := map[string]interface{}{ "total_errors": 0, "errors_by_type": make(map[string]int), "errors_by_severity": make(map[string]int), "errors_by_component": make(map[string]int), "time_range": duration.String(), } for _, event := range eh.errorHistory { if event.Timestamp.After(cutoff) { summary["total_errors"] = summary["total_errors"].(int) + 1 typeKey := event.Type.String() summary["errors_by_type"].(map[string]int)[typeKey]++ severityKey := event.Severity.String() summary["errors_by_severity"].(map[string]int)[severityKey]++ summary["errors_by_component"].(map[string]int)[event.Component]++ } } return summary }