Files
mev-beta/internal/ratelimit/adaptive.go
Krypto Kajun 850223a953 fix(multicall): resolve critical multicall parsing corruption issues
- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing
- Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives
- Added LRU caching system for address validation with 10-minute TTL
- Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures
- Fixed duplicate function declarations and import conflicts across multiple files
- Added error recovery mechanisms with multiple fallback strategies
- Updated tests to handle new validation behavior for suspicious addresses
- Fixed parser test expectations for improved validation system
- Applied gofmt formatting fixes to ensure code style compliance
- Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot
- Resolved critical security vulnerabilities in heuristic address extraction
- Progress: Updated TODO audit from 10% to 35% complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 00:12:55 -05:00

495 lines
14 KiB
Go

package ratelimit
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"golang.org/x/time/rate"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
)
// AdaptiveRateLimiter implements adaptive rate limiting that adjusts to endpoint capacity
type AdaptiveRateLimiter struct {
endpoints map[string]*AdaptiveEndpoint
mu sync.RWMutex
logger *logger.Logger
defaultConfig config.RateLimitConfig
adjustInterval time.Duration
stopChan chan struct{}
}
// AdaptiveEndpoint represents an endpoint with adaptive rate limiting
type AdaptiveEndpoint struct {
URL string
limiter *rate.Limiter
config config.RateLimitConfig
circuitBreaker *CircuitBreaker
metrics *EndpointMetrics
healthChecker *HealthChecker
lastAdjustment time.Time
consecutiveErrors int64
consecutiveSuccess int64
}
// EndpointMetrics tracks performance metrics for an endpoint
// All fields must be 64-bit aligned for atomic access
type EndpointMetrics struct {
TotalRequests int64
SuccessfulRequests int64
FailedRequests int64
TotalLatency int64 // nanoseconds
LastRequestTime int64 // unix timestamp
// Non-atomic fields - must be protected by mutex when accessed
mu sync.RWMutex
SuccessRate float64
AverageLatency float64 // milliseconds
}
// CircuitBreaker implements circuit breaker pattern for failed endpoints
type CircuitBreaker struct {
state int32 // 0: Closed, 1: Open, 2: HalfOpen
failureCount int64
lastFailTime int64
threshold int64
timeout time.Duration // How long to wait before trying again
testRequests int64 // Number of test requests in half-open state
}
// Circuit breaker states
const (
CircuitClosed = 0
CircuitOpen = 1
CircuitHalfOpen = 2
)
// HealthChecker monitors endpoint health
type HealthChecker struct {
endpoint string
interval time.Duration
timeout time.Duration
isHealthy int64 // atomic bool
lastCheck int64 // unix timestamp
stopChan chan struct{}
}
// NewAdaptiveRateLimiter creates a new adaptive rate limiter
func NewAdaptiveRateLimiter(cfg *config.ArbitrumConfig, logger *logger.Logger) *AdaptiveRateLimiter {
arl := &AdaptiveRateLimiter{
endpoints: make(map[string]*AdaptiveEndpoint),
logger: logger,
defaultConfig: cfg.RateLimit,
adjustInterval: 30 * time.Second,
stopChan: make(chan struct{}),
}
// Create adaptive endpoint for primary endpoint
arl.addEndpoint(cfg.RPCEndpoint, cfg.RateLimit)
// Create adaptive endpoints for reading endpoints
for _, endpoint := range cfg.ReadingEndpoints {
arl.addEndpoint(endpoint.URL, endpoint.RateLimit)
}
// Create adaptive endpoints for execution endpoints
for _, endpoint := range cfg.ExecutionEndpoints {
arl.addEndpoint(endpoint.URL, endpoint.RateLimit)
}
// Start background adjustment routine
go arl.adjustmentLoop()
return arl
}
// addEndpoint adds a new adaptive endpoint
func (arl *AdaptiveRateLimiter) addEndpoint(url string, config config.RateLimitConfig) {
endpoint := &AdaptiveEndpoint{
URL: url,
limiter: rate.NewLimiter(rate.Limit(config.RequestsPerSecond), config.Burst),
config: config,
circuitBreaker: &CircuitBreaker{
threshold: 10, // Break after 10 consecutive failures
timeout: 60 * time.Second,
},
metrics: &EndpointMetrics{},
healthChecker: &HealthChecker{
endpoint: url,
interval: 30 * time.Second,
timeout: 5 * time.Second,
isHealthy: 1, // Start assuming healthy
stopChan: make(chan struct{}),
},
}
arl.mu.Lock()
arl.endpoints[url] = endpoint
arl.mu.Unlock()
// Start health checker for this endpoint
go endpoint.healthChecker.start()
arl.logger.Info(fmt.Sprintf("Added adaptive rate limiter for endpoint: %s", url))
}
// WaitForBestEndpoint waits for the best available endpoint
func (arl *AdaptiveRateLimiter) WaitForBestEndpoint(ctx context.Context) (string, error) {
// Find the best available endpoint
bestEndpoint := arl.getBestEndpoint()
if bestEndpoint == "" {
return "", fmt.Errorf("no healthy endpoints available")
}
// Wait for rate limiter
arl.mu.RLock()
endpoint := arl.endpoints[bestEndpoint]
arl.mu.RUnlock()
if endpoint == nil {
return "", fmt.Errorf("endpoint not found: %s", bestEndpoint)
}
// Check circuit breaker
if !endpoint.circuitBreaker.canExecute() {
return "", fmt.Errorf("circuit breaker open for endpoint: %s", bestEndpoint)
}
// Wait for rate limiter
err := endpoint.limiter.Wait(ctx)
if err != nil {
return "", err
}
return bestEndpoint, nil
}
// RecordResult records the result of a request for adaptive adjustment
func (arl *AdaptiveRateLimiter) RecordResult(endpointURL string, success bool, latency time.Duration) {
arl.mu.RLock()
endpoint, exists := arl.endpoints[endpointURL]
arl.mu.RUnlock()
if !exists {
return
}
// Update metrics atomically
atomic.AddInt64(&endpoint.metrics.TotalRequests, 1)
atomic.AddInt64(&endpoint.metrics.TotalLatency, latency.Nanoseconds())
atomic.StoreInt64(&endpoint.metrics.LastRequestTime, time.Now().Unix())
if success {
atomic.AddInt64(&endpoint.metrics.SuccessfulRequests, 1)
atomic.AddInt64(&endpoint.consecutiveSuccess, 1)
atomic.StoreInt64(&endpoint.consecutiveErrors, 0)
endpoint.circuitBreaker.recordSuccess()
} else {
atomic.AddInt64(&endpoint.metrics.FailedRequests, 1)
atomic.AddInt64(&endpoint.consecutiveErrors, 1)
atomic.StoreInt64(&endpoint.consecutiveSuccess, 0)
endpoint.circuitBreaker.recordFailure()
}
// Update calculated metrics
arl.updateCalculatedMetrics(endpoint)
}
// updateCalculatedMetrics updates derived metrics
func (arl *AdaptiveRateLimiter) updateCalculatedMetrics(endpoint *AdaptiveEndpoint) {
totalReq := atomic.LoadInt64(&endpoint.metrics.TotalRequests)
successReq := atomic.LoadInt64(&endpoint.metrics.SuccessfulRequests)
totalLatency := atomic.LoadInt64(&endpoint.metrics.TotalLatency)
if totalReq > 0 {
endpoint.metrics.SuccessRate = float64(successReq) / float64(totalReq)
endpoint.metrics.AverageLatency = float64(totalLatency) / float64(totalReq) / 1000000 // Convert to milliseconds
}
}
// getBestEndpoint selects the best available endpoint based on metrics
func (arl *AdaptiveRateLimiter) getBestEndpoint() string {
arl.mu.RLock()
defer arl.mu.RUnlock()
bestEndpoint := ""
bestScore := float64(-1)
for url, endpoint := range arl.endpoints {
// Skip unhealthy endpoints
if atomic.LoadInt64(&endpoint.healthChecker.isHealthy) == 0 {
continue
}
// Skip if circuit breaker is open
if !endpoint.circuitBreaker.canExecute() {
continue
}
// Calculate score based on success rate, latency, and current load
score := arl.calculateEndpointScore(endpoint)
if score > bestScore {
bestScore = score
bestEndpoint = url
}
}
return bestEndpoint
}
// updateDerivedMetrics safely updates calculated metrics with proper synchronization
func (em *EndpointMetrics) updateDerivedMetrics() {
totalRequests := atomic.LoadInt64(&em.TotalRequests)
successfulRequests := atomic.LoadInt64(&em.SuccessfulRequests)
totalLatency := atomic.LoadInt64(&em.TotalLatency)
em.mu.Lock()
defer em.mu.Unlock()
// Calculate success rate
if totalRequests > 0 {
em.SuccessRate = float64(successfulRequests) / float64(totalRequests)
} else {
em.SuccessRate = 0.0
}
// Calculate average latency in milliseconds
if totalRequests > 0 {
em.AverageLatency = float64(totalLatency) / float64(totalRequests) / 1e6 // ns to ms
} else {
em.AverageLatency = 0.0
}
}
// getCalculatedMetrics safely returns derived metrics
func (em *EndpointMetrics) getCalculatedMetrics() (float64, float64) {
em.mu.RLock()
defer em.mu.RUnlock()
return em.SuccessRate, em.AverageLatency
}
// calculateEndpointScore calculates a score for endpoint selection
func (arl *AdaptiveRateLimiter) calculateEndpointScore(endpoint *AdaptiveEndpoint) float64 {
// Base score on success rate (0-1)
successWeight := 0.6
latencyWeight := 0.3
loadWeight := 0.1
// Update derived metrics first
endpoint.metrics.updateDerivedMetrics()
// Get calculated metrics safely
successScore, avgLatency := endpoint.metrics.getCalculatedMetrics()
// Invert latency score (lower latency = higher score)
latencyScore := 1.0
if avgLatency > 0 {
// Normalize latency score (assuming 1000ms is poor, 100ms is good)
latencyScore = 1.0 - (avgLatency / 1000.0)
if latencyScore < 0 {
latencyScore = 0
}
}
// Load score based on current rate limiter state
loadScore := 1.0 // Simplified - could check current tokens in limiter
return successScore*successWeight + latencyScore*latencyWeight + loadScore*loadWeight
}
// adjustmentLoop runs periodic adjustments to rate limits
func (arl *AdaptiveRateLimiter) adjustmentLoop() {
ticker := time.NewTicker(arl.adjustInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
arl.adjustRateLimits()
case <-arl.stopChan:
return
}
}
}
// adjustRateLimits adjusts rate limits based on observed performance
func (arl *AdaptiveRateLimiter) adjustRateLimits() {
arl.mu.Lock()
defer arl.mu.Unlock()
for url, endpoint := range arl.endpoints {
arl.adjustEndpointRateLimit(url, endpoint)
}
}
// adjustEndpointRateLimit adjusts rate limit for a specific endpoint
func (arl *AdaptiveRateLimiter) adjustEndpointRateLimit(url string, endpoint *AdaptiveEndpoint) {
// Don't adjust too frequently
if time.Since(endpoint.lastAdjustment) < arl.adjustInterval {
return
}
successRate := endpoint.metrics.SuccessRate
avgLatency := endpoint.metrics.AverageLatency
currentLimit := float64(endpoint.limiter.Limit())
var newLimit float64 = currentLimit
adjustmentFactor := 0.1 // 10% adjustment
// Increase rate if performing well
if successRate > 0.95 && avgLatency < 500 { // 95% success, < 500ms latency
newLimit = currentLimit * (1.0 + adjustmentFactor)
} else if successRate < 0.8 || avgLatency > 2000 { // < 80% success or > 2s latency
newLimit = currentLimit * (1.0 - adjustmentFactor)
}
// Apply bounds
minLimit := float64(arl.defaultConfig.RequestsPerSecond) * 0.1 // 10% of default minimum
maxLimit := float64(arl.defaultConfig.RequestsPerSecond) * 3.0 // 300% of default maximum
if newLimit < minLimit {
newLimit = minLimit
}
if newLimit > maxLimit {
newLimit = maxLimit
}
// Update if changed significantly
if abs(newLimit-currentLimit)/currentLimit > 0.05 { // 5% change threshold
endpoint.limiter.SetLimit(rate.Limit(newLimit))
endpoint.lastAdjustment = time.Now()
arl.logger.Info(fmt.Sprintf("Adjusted rate limit for %s: %.2f -> %.2f (success: %.2f%%, latency: %.2fms)",
url, currentLimit, newLimit, successRate*100, avgLatency))
}
}
// abs returns absolute value of float64
func abs(x float64) float64 {
if x < 0 {
return -x
}
return x
}
// canExecute checks if circuit breaker allows execution
func (cb *CircuitBreaker) canExecute() bool {
state := atomic.LoadInt32(&cb.state)
now := time.Now().Unix()
switch state {
case CircuitClosed:
return true
case CircuitOpen:
// Check if timeout has passed
lastFail := atomic.LoadInt64(&cb.lastFailTime)
if now-lastFail > int64(cb.timeout.Seconds()) {
// Try to move to half-open
if atomic.CompareAndSwapInt32(&cb.state, CircuitOpen, CircuitHalfOpen) {
atomic.StoreInt64(&cb.testRequests, 0)
return true
}
}
return false
case CircuitHalfOpen:
// Allow limited test requests
testReq := atomic.LoadInt64(&cb.testRequests)
if testReq < 3 { // Allow up to 3 test requests
atomic.AddInt64(&cb.testRequests, 1)
return true
}
return false
}
return false
}
// recordSuccess records a successful request
func (cb *CircuitBreaker) recordSuccess() {
state := atomic.LoadInt32(&cb.state)
if state == CircuitHalfOpen {
// Move back to closed after successful test
atomic.StoreInt32(&cb.state, CircuitClosed)
atomic.StoreInt64(&cb.failureCount, 0)
}
}
// recordFailure records a failed request
func (cb *CircuitBreaker) recordFailure() {
failures := atomic.AddInt64(&cb.failureCount, 1)
atomic.StoreInt64(&cb.lastFailTime, time.Now().Unix())
if failures >= cb.threshold {
atomic.StoreInt32(&cb.state, CircuitOpen)
}
}
// start starts the health checker
func (hc *HealthChecker) start() {
ticker := time.NewTicker(hc.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
hc.checkHealth()
case <-hc.stopChan:
return
}
}
}
// checkHealth performs a health check on the endpoint
func (hc *HealthChecker) checkHealth() {
ctx, cancel := context.WithTimeout(context.Background(), hc.timeout)
defer cancel()
// Simple health check - try to connect
// In production, this might make a simple RPC call
healthy := hc.performHealthCheck(ctx)
if healthy {
atomic.StoreInt64(&hc.isHealthy, 1)
} else {
atomic.StoreInt64(&hc.isHealthy, 0)
}
atomic.StoreInt64(&hc.lastCheck, time.Now().Unix())
}
// performHealthCheck performs the actual health check
func (hc *HealthChecker) performHealthCheck(ctx context.Context) bool {
// Simplified health check - in production would make actual RPC call
// For now, just simulate based on endpoint availability
return true // Assume healthy for demo
}
// Stop stops the adaptive rate limiter
func (arl *AdaptiveRateLimiter) Stop() {
close(arl.stopChan)
// Stop all health checkers
arl.mu.RLock()
for _, endpoint := range arl.endpoints {
close(endpoint.healthChecker.stopChan)
}
arl.mu.RUnlock()
}
// GetMetrics returns current metrics for all endpoints
func (arl *AdaptiveRateLimiter) GetMetrics() map[string]*EndpointMetrics {
arl.mu.RLock()
defer arl.mu.RUnlock()
metrics := make(map[string]*EndpointMetrics)
for url, endpoint := range arl.endpoints {
// Update calculated metrics before returning
arl.updateCalculatedMetrics(endpoint)
metrics[url] = endpoint.metrics
}
return metrics
}