Files
mev-beta/pkg/arbitrum/circuit_breaker.go
2025-10-04 09:31:02 -05:00

183 lines
4.4 KiB
Go

package arbitrum
import (
"context"
"fmt"
"sync"
"time"
"github.com/fraktal/mev-beta/internal/logger"
)
// CircuitBreakerState represents the state of a circuit breaker
type CircuitBreakerState int
const (
// Closed state - requests are allowed
Closed CircuitBreakerState = iota
// Open state - requests are blocked
Open
// HalfOpen state - limited requests are allowed to test if service recovered
HalfOpen
)
// String returns a string representation of the circuit breaker state
func (state CircuitBreakerState) String() string {
switch state {
case Closed:
return "Closed"
case Open:
return "Open"
case HalfOpen:
return "HalfOpen"
default:
return fmt.Sprintf("Unknown(%d)", int(state))
}
}
// CircuitBreakerConfig represents the configuration for a circuit breaker
type CircuitBreakerConfig struct {
// FailureThreshold is the number of failures that will trip the circuit
FailureThreshold int
// Timeout is the time the circuit stays open before trying again
Timeout time.Duration
// SuccessThreshold is the number of consecutive successes needed to close the circuit
SuccessThreshold int
}
// CircuitBreaker implements the circuit breaker pattern for RPC connections
type CircuitBreaker struct {
state CircuitBreakerState
failureCount int
consecutiveSuccesses int
lastFailure time.Time
config *CircuitBreakerConfig
logger *logger.Logger
mu sync.RWMutex
}
// NewCircuitBreaker creates a new circuit breaker with the given configuration
func NewCircuitBreaker(config *CircuitBreakerConfig) *CircuitBreaker {
if config == nil {
config = &CircuitBreakerConfig{
FailureThreshold: 5,
Timeout: 30 * time.Second,
SuccessThreshold: 3,
}
}
// Ensure sensible defaults
if config.FailureThreshold <= 0 {
config.FailureThreshold = 5
}
if config.Timeout <= 0 {
config.Timeout = 30 * time.Second
}
if config.SuccessThreshold <= 0 {
config.SuccessThreshold = 3
}
return &CircuitBreaker{
state: Closed,
failureCount: 0,
consecutiveSuccesses: 0,
lastFailure: time.Time{},
config: config,
logger: nil,
mu: sync.RWMutex{},
}
}
// Call executes a function through the circuit breaker
func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
// Check if circuit should transition from Open to HalfOpen
if cb.state == Open && time.Since(cb.lastFailure) > cb.config.Timeout {
cb.state = HalfOpen
cb.consecutiveSuccesses = 0
}
// If circuit is open, reject the call
if cb.state == Open {
return fmt.Errorf("circuit breaker is open")
}
// Execute the function
err := fn()
// Update circuit state based on result
if err != nil {
cb.onFailure()
return err
}
cb.onSuccess()
return nil
}
// onFailure handles a failed call
func (cb *CircuitBreaker) onFailure() {
cb.failureCount++
cb.lastFailure = time.Now()
// Trip the circuit if failure threshold is reached
if cb.state == HalfOpen || cb.failureCount >= cb.config.FailureThreshold {
cb.state = Open
if cb.logger != nil {
cb.logger.Warn(fmt.Sprintf("Circuit breaker OPENED after %d failures", cb.failureCount))
}
}
}
// onSuccess handles a successful call
func (cb *CircuitBreaker) onSuccess() {
// Reset failure count when in Closed state
if cb.state == Closed {
cb.failureCount = 0
return
}
// In HalfOpen state, count consecutive successes
if cb.state == HalfOpen {
cb.consecutiveSuccesses++
// Close circuit if enough consecutive successes
if cb.consecutiveSuccesses >= cb.config.SuccessThreshold {
cb.state = Closed
cb.failureCount = 0
cb.consecutiveSuccesses = 0
if cb.logger != nil {
cb.logger.Info("Circuit breaker CLOSED after successful recovery")
}
}
}
}
// GetState returns the current state of the circuit breaker
func (cb *CircuitBreaker) GetState() CircuitBreakerState {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
// Reset resets the circuit breaker to closed state
func (cb *CircuitBreaker) Reset() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.state = Closed
cb.failureCount = 0
cb.consecutiveSuccesses = 0
cb.lastFailure = time.Time{}
if cb.logger != nil {
cb.logger.Info("Circuit breaker reset to closed state")
}
}
// SetLogger sets the logger for the circuit breaker
func (cb *CircuitBreaker) SetLogger(logger *logger.Logger) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.logger = logger
}