- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing - Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives - Added LRU caching system for address validation with 10-minute TTL - Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures - Fixed duplicate function declarations and import conflicts across multiple files - Added error recovery mechanisms with multiple fallback strategies - Updated tests to handle new validation behavior for suspicious addresses - Fixed parser test expectations for improved validation system - Applied gofmt formatting fixes to ensure code style compliance - Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot - Resolved critical security vulnerabilities in heuristic address extraction - Progress: Updated TODO audit from 10% to 35% complete 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
622 lines
16 KiB
Go
622 lines
16 KiB
Go
package recovery
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
)
|
|
|
|
// ErrorSeverity represents the severity level of an error
|
|
type ErrorSeverity int
|
|
|
|
const (
|
|
SeverityLow ErrorSeverity = iota
|
|
SeverityMedium
|
|
SeverityHigh
|
|
SeverityCritical
|
|
)
|
|
|
|
func (s ErrorSeverity) String() string {
|
|
switch s {
|
|
case SeverityLow:
|
|
return "LOW"
|
|
case SeverityMedium:
|
|
return "MEDIUM"
|
|
case SeverityHigh:
|
|
return "HIGH"
|
|
case SeverityCritical:
|
|
return "CRITICAL"
|
|
default:
|
|
return "UNKNOWN"
|
|
}
|
|
}
|
|
|
|
// ErrorType categorizes different types of errors
|
|
type ErrorType int
|
|
|
|
const (
|
|
ErrorTypeAddressCorruption ErrorType = iota
|
|
ErrorTypeContractCallFailed
|
|
ErrorTypeRPCConnectionFailed
|
|
ErrorTypeDataParsingFailed
|
|
ErrorTypeValidationFailed
|
|
ErrorTypeTimeoutError
|
|
)
|
|
|
|
func (e ErrorType) String() string {
|
|
switch e {
|
|
case ErrorTypeAddressCorruption:
|
|
return "ADDRESS_CORRUPTION"
|
|
case ErrorTypeContractCallFailed:
|
|
return "CONTRACT_CALL_FAILED"
|
|
case ErrorTypeRPCConnectionFailed:
|
|
return "RPC_CONNECTION_FAILED"
|
|
case ErrorTypeDataParsingFailed:
|
|
return "DATA_PARSING_FAILED"
|
|
case ErrorTypeValidationFailed:
|
|
return "VALIDATION_FAILED"
|
|
case ErrorTypeTimeoutError:
|
|
return "TIMEOUT_ERROR"
|
|
default:
|
|
return "UNKNOWN_ERROR"
|
|
}
|
|
}
|
|
|
|
// RecoveryAction represents an action to take when an error occurs
|
|
type RecoveryAction int
|
|
|
|
const (
|
|
ActionSkipAndContinue RecoveryAction = iota
|
|
ActionRetryWithBackoff
|
|
ActionUseFallbackData
|
|
ActionCircuitBreaker
|
|
ActionEmergencyStop
|
|
)
|
|
|
|
func (a RecoveryAction) String() string {
|
|
switch a {
|
|
case ActionSkipAndContinue:
|
|
return "SKIP_AND_CONTINUE"
|
|
case ActionRetryWithBackoff:
|
|
return "RETRY_WITH_BACKOFF"
|
|
case ActionUseFallbackData:
|
|
return "USE_FALLBACK_DATA"
|
|
case ActionCircuitBreaker:
|
|
return "CIRCUIT_BREAKER"
|
|
case ActionEmergencyStop:
|
|
return "EMERGENCY_STOP"
|
|
default:
|
|
return "UNKNOWN_ACTION"
|
|
}
|
|
}
|
|
|
|
// ErrorEvent represents a specific error occurrence
|
|
type ErrorEvent struct {
|
|
Timestamp time.Time
|
|
Type ErrorType
|
|
Severity ErrorSeverity
|
|
Component string
|
|
Address common.Address
|
|
Message string
|
|
Context map[string]interface{}
|
|
AttemptCount int
|
|
LastAttempt time.Time
|
|
Resolved bool
|
|
ResolvedAt time.Time
|
|
}
|
|
|
|
// RecoveryRule defines how to handle specific error patterns
|
|
type RecoveryRule struct {
|
|
ErrorType ErrorType
|
|
MaxSeverity ErrorSeverity
|
|
Action RecoveryAction
|
|
MaxRetries int
|
|
BackoffInterval time.Duration
|
|
CircuitBreakerThreshold int
|
|
ContextMatchers map[string]interface{}
|
|
}
|
|
|
|
// ErrorHandler provides comprehensive error handling and recovery capabilities
|
|
type ErrorHandler struct {
|
|
mu sync.RWMutex
|
|
logger *logger.Logger
|
|
errorHistory []ErrorEvent
|
|
componentStats map[string]*ComponentStats
|
|
circuitBreakers map[string]*CircuitBreaker
|
|
recoveryRules []RecoveryRule
|
|
fallbackProvider FallbackDataProvider
|
|
maxHistorySize int
|
|
alertThresholds map[ErrorType]int
|
|
enabled bool
|
|
}
|
|
|
|
// ComponentStats tracks error statistics for components
|
|
type ComponentStats struct {
|
|
mu sync.RWMutex
|
|
Component string
|
|
TotalErrors int
|
|
ErrorsByType map[ErrorType]int
|
|
ErrorsBySeverity map[ErrorSeverity]int
|
|
LastError time.Time
|
|
ConsecutiveFailures int
|
|
SuccessCount int
|
|
IsHealthy bool
|
|
LastHealthCheck time.Time
|
|
}
|
|
|
|
// CircuitBreaker implements circuit breaker pattern for failing components
|
|
type CircuitBreaker struct {
|
|
mu sync.RWMutex
|
|
Name string
|
|
State CircuitState
|
|
FailureCount int
|
|
Threshold int
|
|
Timeout time.Duration
|
|
LastFailure time.Time
|
|
LastSuccess time.Time
|
|
HalfOpenAllowed bool
|
|
}
|
|
|
|
type CircuitState int
|
|
|
|
const (
|
|
CircuitClosed CircuitState = iota
|
|
CircuitOpen
|
|
CircuitHalfOpen
|
|
)
|
|
|
|
func (s CircuitState) String() string {
|
|
switch s {
|
|
case CircuitClosed:
|
|
return "CLOSED"
|
|
case CircuitOpen:
|
|
return "OPEN"
|
|
case CircuitHalfOpen:
|
|
return "HALF_OPEN"
|
|
default:
|
|
return "UNKNOWN"
|
|
}
|
|
}
|
|
|
|
// FallbackDataProvider interface for providing fallback data when primary sources fail
|
|
type FallbackDataProvider interface {
|
|
GetFallbackTokenInfo(ctx context.Context, address common.Address) (*FallbackTokenInfo, error)
|
|
GetFallbackPoolInfo(ctx context.Context, address common.Address) (*FallbackPoolInfo, error)
|
|
GetFallbackContractType(ctx context.Context, address common.Address) (string, error)
|
|
}
|
|
|
|
type FallbackTokenInfo struct {
|
|
Address common.Address
|
|
Symbol string
|
|
Name string
|
|
Decimals uint8
|
|
IsVerified bool
|
|
Source string
|
|
Confidence float64
|
|
}
|
|
|
|
type FallbackPoolInfo struct {
|
|
Address common.Address
|
|
Token0 common.Address
|
|
Token1 common.Address
|
|
Protocol string
|
|
Fee uint32
|
|
IsVerified bool
|
|
Source string
|
|
Confidence float64
|
|
}
|
|
|
|
// NewErrorHandler creates a new error handler with default configuration
|
|
func NewErrorHandler(logger *logger.Logger) *ErrorHandler {
|
|
handler := &ErrorHandler{
|
|
logger: logger,
|
|
errorHistory: make([]ErrorEvent, 0),
|
|
componentStats: make(map[string]*ComponentStats),
|
|
circuitBreakers: make(map[string]*CircuitBreaker),
|
|
maxHistorySize: 1000,
|
|
alertThresholds: make(map[ErrorType]int),
|
|
enabled: true,
|
|
}
|
|
|
|
// Initialize default recovery rules
|
|
handler.initializeDefaultRules()
|
|
|
|
// Initialize default alert thresholds
|
|
handler.initializeAlertThresholds()
|
|
|
|
return handler
|
|
}
|
|
|
|
// initializeDefaultRules sets up default recovery rules for common error scenarios
|
|
func (eh *ErrorHandler) initializeDefaultRules() {
|
|
eh.recoveryRules = []RecoveryRule{
|
|
{
|
|
ErrorType: ErrorTypeAddressCorruption,
|
|
MaxSeverity: SeverityMedium,
|
|
Action: ActionRetryWithBackoff,
|
|
MaxRetries: 2,
|
|
BackoffInterval: 500 * time.Millisecond,
|
|
},
|
|
{
|
|
ErrorType: ErrorTypeAddressCorruption,
|
|
MaxSeverity: SeverityCritical,
|
|
Action: ActionUseFallbackData,
|
|
MaxRetries: 0,
|
|
BackoffInterval: 0,
|
|
},
|
|
{
|
|
ErrorType: ErrorTypeContractCallFailed,
|
|
MaxSeverity: SeverityMedium,
|
|
Action: ActionRetryWithBackoff,
|
|
MaxRetries: 3,
|
|
BackoffInterval: 2 * time.Second,
|
|
},
|
|
{
|
|
ErrorType: ErrorTypeRPCConnectionFailed,
|
|
MaxSeverity: SeverityHigh,
|
|
Action: ActionCircuitBreaker,
|
|
MaxRetries: 5,
|
|
BackoffInterval: 5 * time.Second,
|
|
CircuitBreakerThreshold: 10,
|
|
},
|
|
{
|
|
ErrorType: ErrorTypeDataParsingFailed,
|
|
MaxSeverity: SeverityMedium,
|
|
Action: ActionUseFallbackData,
|
|
MaxRetries: 2,
|
|
BackoffInterval: 1 * time.Second,
|
|
},
|
|
{
|
|
ErrorType: ErrorTypeValidationFailed,
|
|
MaxSeverity: SeverityLow,
|
|
Action: ActionSkipAndContinue,
|
|
MaxRetries: 0,
|
|
BackoffInterval: 0,
|
|
},
|
|
{
|
|
ErrorType: ErrorTypeValidationFailed,
|
|
MaxSeverity: SeverityHigh,
|
|
Action: ActionRetryWithBackoff,
|
|
MaxRetries: 1,
|
|
BackoffInterval: 500 * time.Millisecond,
|
|
},
|
|
{
|
|
ErrorType: ErrorTypeTimeoutError,
|
|
MaxSeverity: SeverityMedium,
|
|
Action: ActionRetryWithBackoff,
|
|
MaxRetries: 3,
|
|
BackoffInterval: 3 * time.Second,
|
|
},
|
|
}
|
|
}
|
|
|
|
// initializeAlertThresholds sets up alert thresholds for different error types
|
|
func (eh *ErrorHandler) initializeAlertThresholds() {
|
|
eh.alertThresholds[ErrorTypeAddressCorruption] = 5
|
|
eh.alertThresholds[ErrorTypeContractCallFailed] = 20
|
|
eh.alertThresholds[ErrorTypeRPCConnectionFailed] = 10
|
|
eh.alertThresholds[ErrorTypeDataParsingFailed] = 15
|
|
eh.alertThresholds[ErrorTypeValidationFailed] = 25
|
|
eh.alertThresholds[ErrorTypeTimeoutError] = 30
|
|
}
|
|
|
|
// HandleError processes an error and determines the appropriate recovery action
|
|
func (eh *ErrorHandler) HandleError(ctx context.Context, errorType ErrorType, severity ErrorSeverity, component string, address common.Address, message string, context map[string]interface{}) RecoveryAction {
|
|
if !eh.enabled {
|
|
return ActionSkipAndContinue
|
|
}
|
|
|
|
eh.mu.Lock()
|
|
defer eh.mu.Unlock()
|
|
|
|
// Record the error event
|
|
event := ErrorEvent{
|
|
Timestamp: time.Now(),
|
|
Type: errorType,
|
|
Severity: severity,
|
|
Component: component,
|
|
Address: address,
|
|
Message: message,
|
|
Context: context,
|
|
AttemptCount: 1,
|
|
LastAttempt: time.Now(),
|
|
}
|
|
|
|
// Update error history
|
|
eh.addToHistory(event)
|
|
|
|
// Update component statistics
|
|
eh.updateComponentStats(component, errorType, severity)
|
|
|
|
// Check circuit breakers
|
|
if eh.shouldTriggerCircuitBreaker(component, errorType) {
|
|
eh.triggerCircuitBreaker(component)
|
|
return ActionCircuitBreaker
|
|
}
|
|
|
|
// Find matching recovery rule
|
|
rule := eh.findRecoveryRule(errorType, severity, context)
|
|
if rule == nil {
|
|
// Default action for unmatched errors
|
|
return ActionSkipAndContinue
|
|
}
|
|
|
|
// Log the error and recovery action
|
|
eh.logger.Error("Error handled by recovery system",
|
|
"type", errorType.String(),
|
|
"severity", severity.String(),
|
|
"component", component,
|
|
"address", address.Hex(),
|
|
"message", message,
|
|
"action", rule.Action.String())
|
|
|
|
// Check if alert threshold is reached
|
|
eh.checkAlertThresholds(errorType)
|
|
|
|
return rule.Action
|
|
}
|
|
|
|
// addToHistory adds an error event to the history buffer
|
|
func (eh *ErrorHandler) addToHistory(event ErrorEvent) {
|
|
eh.errorHistory = append(eh.errorHistory, event)
|
|
|
|
// Trim history if it exceeds max size
|
|
if len(eh.errorHistory) > eh.maxHistorySize {
|
|
eh.errorHistory = eh.errorHistory[len(eh.errorHistory)-eh.maxHistorySize:]
|
|
}
|
|
}
|
|
|
|
// updateComponentStats updates statistics for a component
|
|
func (eh *ErrorHandler) updateComponentStats(component string, errorType ErrorType, severity ErrorSeverity) {
|
|
stats, exists := eh.componentStats[component]
|
|
if !exists {
|
|
stats = &ComponentStats{
|
|
Component: component,
|
|
ErrorsByType: make(map[ErrorType]int),
|
|
ErrorsBySeverity: make(map[ErrorSeverity]int),
|
|
IsHealthy: true,
|
|
}
|
|
eh.componentStats[component] = stats
|
|
}
|
|
|
|
stats.mu.Lock()
|
|
defer stats.mu.Unlock()
|
|
|
|
stats.TotalErrors++
|
|
stats.ErrorsByType[errorType]++
|
|
stats.ErrorsBySeverity[severity]++
|
|
stats.LastError = time.Now()
|
|
stats.ConsecutiveFailures++
|
|
|
|
// Mark as unhealthy if too many consecutive failures
|
|
if stats.ConsecutiveFailures > 10 {
|
|
stats.IsHealthy = false
|
|
}
|
|
}
|
|
|
|
// findRecoveryRule finds the best matching recovery rule for an error
|
|
func (eh *ErrorHandler) findRecoveryRule(errorType ErrorType, severity ErrorSeverity, context map[string]interface{}) *RecoveryRule {
|
|
for _, rule := range eh.recoveryRules {
|
|
if rule.ErrorType == errorType && severity <= rule.MaxSeverity {
|
|
// Check context matchers if present
|
|
if len(rule.ContextMatchers) > 0 {
|
|
if !eh.matchesContext(context, rule.ContextMatchers) {
|
|
continue
|
|
}
|
|
}
|
|
return &rule
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// matchesContext checks if the error context matches the rule's context matchers
|
|
func (eh *ErrorHandler) matchesContext(errorContext, ruleMatchers map[string]interface{}) bool {
|
|
for key, expectedValue := range ruleMatchers {
|
|
if actualValue, exists := errorContext[key]; !exists || actualValue != expectedValue {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// shouldTriggerCircuitBreaker determines if a circuit breaker should be triggered
|
|
func (eh *ErrorHandler) shouldTriggerCircuitBreaker(component string, errorType ErrorType) bool {
|
|
stats, exists := eh.componentStats[component]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
stats.mu.RLock()
|
|
defer stats.mu.RUnlock()
|
|
|
|
// Trigger if consecutive failures exceed threshold for critical errors
|
|
if errorType == ErrorTypeRPCConnectionFailed && stats.ConsecutiveFailures >= 5 {
|
|
return true
|
|
}
|
|
|
|
if errorType == ErrorTypeAddressCorruption && stats.ConsecutiveFailures >= 3 {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// triggerCircuitBreaker activates a circuit breaker for a component
|
|
func (eh *ErrorHandler) triggerCircuitBreaker(component string) {
|
|
breaker := &CircuitBreaker{
|
|
Name: component,
|
|
State: CircuitOpen,
|
|
FailureCount: 0,
|
|
Threshold: 5,
|
|
Timeout: 30 * time.Second,
|
|
LastFailure: time.Now(),
|
|
}
|
|
|
|
eh.circuitBreakers[component] = breaker
|
|
|
|
eh.logger.Warn("Circuit breaker triggered",
|
|
"component", component,
|
|
"timeout", breaker.Timeout)
|
|
}
|
|
|
|
// checkAlertThresholds checks if error counts have reached alert thresholds
|
|
func (eh *ErrorHandler) checkAlertThresholds(errorType ErrorType) {
|
|
threshold, exists := eh.alertThresholds[errorType]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
// Count recent errors of this type (last hour)
|
|
recentCount := 0
|
|
cutoff := time.Now().Add(-1 * time.Hour)
|
|
|
|
for _, event := range eh.errorHistory {
|
|
if event.Type == errorType && event.Timestamp.After(cutoff) {
|
|
recentCount++
|
|
}
|
|
}
|
|
|
|
if recentCount >= threshold {
|
|
eh.logger.Warn("Error threshold reached - alert triggered",
|
|
"error_type", errorType.String(),
|
|
"count", recentCount,
|
|
"threshold", threshold)
|
|
// Here you would trigger your alerting system
|
|
}
|
|
}
|
|
|
|
// GetComponentHealth returns the health status of all components
|
|
func (eh *ErrorHandler) GetComponentHealth() map[string]*ComponentStats {
|
|
eh.mu.RLock()
|
|
defer eh.mu.RUnlock()
|
|
|
|
// Return a copy to prevent external modification
|
|
result := make(map[string]*ComponentStats)
|
|
for name, stats := range eh.componentStats {
|
|
result[name] = &ComponentStats{
|
|
Component: stats.Component,
|
|
TotalErrors: stats.TotalErrors,
|
|
ErrorsByType: make(map[ErrorType]int),
|
|
ErrorsBySeverity: make(map[ErrorSeverity]int),
|
|
LastError: stats.LastError,
|
|
ConsecutiveFailures: stats.ConsecutiveFailures,
|
|
SuccessCount: stats.SuccessCount,
|
|
IsHealthy: stats.IsHealthy,
|
|
LastHealthCheck: stats.LastHealthCheck,
|
|
}
|
|
|
|
// Copy maps
|
|
for k, v := range stats.ErrorsByType {
|
|
result[name].ErrorsByType[k] = v
|
|
}
|
|
for k, v := range stats.ErrorsBySeverity {
|
|
result[name].ErrorsBySeverity[k] = v
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// RecordSuccess records a successful operation for a component
|
|
func (eh *ErrorHandler) RecordSuccess(component string) {
|
|
eh.mu.Lock()
|
|
defer eh.mu.Unlock()
|
|
|
|
stats, exists := eh.componentStats[component]
|
|
if !exists {
|
|
stats = &ComponentStats{
|
|
Component: component,
|
|
ErrorsByType: make(map[ErrorType]int),
|
|
ErrorsBySeverity: make(map[ErrorSeverity]int),
|
|
IsHealthy: true,
|
|
}
|
|
eh.componentStats[component] = stats
|
|
}
|
|
|
|
stats.mu.Lock()
|
|
defer stats.mu.Unlock()
|
|
|
|
stats.SuccessCount++
|
|
stats.ConsecutiveFailures = 0
|
|
stats.IsHealthy = true
|
|
stats.LastHealthCheck = time.Now()
|
|
|
|
// Reset circuit breaker if it exists
|
|
if breaker, exists := eh.circuitBreakers[component]; exists {
|
|
breaker.mu.Lock()
|
|
breaker.State = CircuitClosed
|
|
breaker.FailureCount = 0
|
|
breaker.LastSuccess = time.Now()
|
|
breaker.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// IsCircuitOpen checks if a circuit breaker is open for a component
|
|
func (eh *ErrorHandler) IsCircuitOpen(component string) bool {
|
|
eh.mu.RLock()
|
|
defer eh.mu.RUnlock()
|
|
|
|
breaker, exists := eh.circuitBreakers[component]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
breaker.mu.RLock()
|
|
defer breaker.mu.RUnlock()
|
|
|
|
if breaker.State == CircuitOpen {
|
|
// Check if timeout has passed
|
|
if time.Since(breaker.LastFailure) > breaker.Timeout {
|
|
breaker.State = CircuitHalfOpen
|
|
breaker.HalfOpenAllowed = true
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// SetFallbackProvider sets the fallback data provider
|
|
func (eh *ErrorHandler) SetFallbackProvider(provider FallbackDataProvider) {
|
|
eh.mu.Lock()
|
|
defer eh.mu.Unlock()
|
|
eh.fallbackProvider = provider
|
|
}
|
|
|
|
// GetErrorSummary returns a summary of recent errors
|
|
func (eh *ErrorHandler) GetErrorSummary(duration time.Duration) map[string]interface{} {
|
|
eh.mu.RLock()
|
|
defer eh.mu.RUnlock()
|
|
|
|
cutoff := time.Now().Add(-duration)
|
|
summary := map[string]interface{}{
|
|
"total_errors": 0,
|
|
"errors_by_type": make(map[string]int),
|
|
"errors_by_severity": make(map[string]int),
|
|
"errors_by_component": make(map[string]int),
|
|
"time_range": duration.String(),
|
|
}
|
|
|
|
for _, event := range eh.errorHistory {
|
|
if event.Timestamp.After(cutoff) {
|
|
summary["total_errors"] = summary["total_errors"].(int) + 1
|
|
|
|
typeKey := event.Type.String()
|
|
summary["errors_by_type"].(map[string]int)[typeKey]++
|
|
|
|
severityKey := event.Severity.String()
|
|
summary["errors_by_severity"].(map[string]int)[severityKey]++
|
|
|
|
summary["errors_by_component"].(map[string]int)[event.Component]++
|
|
}
|
|
}
|
|
|
|
return summary
|
|
}
|