Files
mev-beta/orig/pkg/arbitrum/circuit_breaker.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

183 lines
4.4 KiB
Go

package arbitrum
import (
"context"
"fmt"
"sync"
"time"
"github.com/fraktal/mev-beta/internal/logger"
)
// CircuitBreakerState represents the state of a circuit breaker
type CircuitBreakerState int
const (
// Closed state - requests are allowed
Closed CircuitBreakerState = iota
// Open state - requests are blocked
Open
// HalfOpen state - limited requests are allowed to test if service recovered
HalfOpen
)
// String returns a string representation of the circuit breaker state
func (state CircuitBreakerState) String() string {
switch state {
case Closed:
return "Closed"
case Open:
return "Open"
case HalfOpen:
return "HalfOpen"
default:
return fmt.Sprintf("Unknown(%d)", int(state))
}
}
// CircuitBreakerConfig represents the configuration for a circuit breaker
type CircuitBreakerConfig struct {
// FailureThreshold is the number of failures that will trip the circuit
FailureThreshold int
// Timeout is the time the circuit stays open before trying again
Timeout time.Duration
// SuccessThreshold is the number of consecutive successes needed to close the circuit
SuccessThreshold int
}
// CircuitBreaker implements the circuit breaker pattern for RPC connections
type CircuitBreaker struct {
state CircuitBreakerState
failureCount int
consecutiveSuccesses int
lastFailure time.Time
config *CircuitBreakerConfig
logger *logger.Logger
mu sync.RWMutex
}
// NewCircuitBreaker creates a new circuit breaker with the given configuration
func NewCircuitBreaker(config *CircuitBreakerConfig) *CircuitBreaker {
if config == nil {
config = &CircuitBreakerConfig{
FailureThreshold: 5,
Timeout: 30 * time.Second,
SuccessThreshold: 3,
}
}
// Ensure sensible defaults
if config.FailureThreshold <= 0 {
config.FailureThreshold = 5
}
if config.Timeout <= 0 {
config.Timeout = 30 * time.Second
}
if config.SuccessThreshold <= 0 {
config.SuccessThreshold = 3
}
return &CircuitBreaker{
state: Closed,
failureCount: 0,
consecutiveSuccesses: 0,
lastFailure: time.Time{},
config: config,
logger: nil,
mu: sync.RWMutex{},
}
}
// Call executes a function through the circuit breaker
func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
// Check if circuit should transition from Open to HalfOpen
if cb.state == Open && time.Since(cb.lastFailure) > cb.config.Timeout {
cb.state = HalfOpen
cb.consecutiveSuccesses = 0
}
// If circuit is open, reject the call
if cb.state == Open {
return fmt.Errorf("circuit breaker is open")
}
// Execute the function
err := fn()
// Update circuit state based on result
if err != nil {
cb.onFailure()
return err
}
cb.onSuccess()
return nil
}
// onFailure handles a failed call
func (cb *CircuitBreaker) onFailure() {
cb.failureCount++
cb.lastFailure = time.Now()
// Trip the circuit if failure threshold is reached
if cb.state == HalfOpen || cb.failureCount >= cb.config.FailureThreshold {
cb.state = Open
if cb.logger != nil {
cb.logger.Warn(fmt.Sprintf("Circuit breaker OPENED after %d failures", cb.failureCount))
}
}
}
// onSuccess handles a successful call
func (cb *CircuitBreaker) onSuccess() {
// Reset failure count when in Closed state
if cb.state == Closed {
cb.failureCount = 0
return
}
// In HalfOpen state, count consecutive successes
if cb.state == HalfOpen {
cb.consecutiveSuccesses++
// Close circuit if enough consecutive successes
if cb.consecutiveSuccesses >= cb.config.SuccessThreshold {
cb.state = Closed
cb.failureCount = 0
cb.consecutiveSuccesses = 0
if cb.logger != nil {
cb.logger.Info("Circuit breaker CLOSED after successful recovery")
}
}
}
}
// GetState returns the current state of the circuit breaker
func (cb *CircuitBreaker) GetState() CircuitBreakerState {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
// Reset resets the circuit breaker to closed state
func (cb *CircuitBreaker) Reset() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.state = Closed
cb.failureCount = 0
cb.consecutiveSuccesses = 0
cb.lastFailure = time.Time{}
if cb.logger != nil {
cb.logger.Info("Circuit breaker reset to closed state")
}
}
// SetLogger sets the logger for the circuit breaker
func (cb *CircuitBreaker) SetLogger(logger *logger.Logger) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.logger = logger
}