package arbitrum import ( "context" "fmt" "os" "strings" "time" "github.com/ethereum/go-ethereum/ethclient" "golang.org/x/time/rate" "github.com/fraktal/mev-beta/internal/config" "github.com/fraktal/mev-beta/internal/logger" ) // RateLimitedClient wraps ethclient.Client with rate limiting and circuit breaker type RateLimitedClient struct { *ethclient.Client limiter *rate.Limiter circuitBreaker *CircuitBreaker logger *logger.Logger } // RateLimitConfig represents the configuration for rate limiting type RateLimitConfig struct { RequestsPerSecond float64 `yaml:"requests_per_second"` MaxConcurrent int `yaml:"max_concurrent"` Burst int `yaml:"burst"` } // NewRateLimitedClient creates a new rate limited client func NewRateLimitedClient(client *ethclient.Client, requestsPerSecond float64, logger *logger.Logger) *RateLimitedClient { // Create a rate limiter limiter := rate.NewLimiter(rate.Limit(requestsPerSecond), int(requestsPerSecond*2)) // Create circuit breaker with default configuration circuitBreakerConfig := &CircuitBreakerConfig{ FailureThreshold: 5, Timeout: 30 * time.Second, SuccessThreshold: 3, } circuitBreaker := NewCircuitBreaker(circuitBreakerConfig) circuitBreaker.SetLogger(logger) return &RateLimitedClient{ Client: client, limiter: limiter, circuitBreaker: circuitBreaker, logger: logger, } } // CallWithRateLimit executes a call with rate limiting and circuit breaker protection func (rlc *RateLimitedClient) CallWithRateLimit(ctx context.Context, call func() error) error { // Check circuit breaker state if rlc.circuitBreaker.GetState() == Open { return fmt.Errorf("circuit breaker is open") } // Wait for rate limiter if err := rlc.limiter.Wait(ctx); err != nil { return fmt.Errorf("rate limiter wait error: %w", err) } // Execute the call through circuit breaker err := rlc.circuitBreaker.Call(ctx, call) // Log circuit breaker state transitions if rlc.circuitBreaker.GetState() == Open { rlc.logger.Warn("🚨 Circuit breaker OPENED due to failed RPC calls") } return err } // GetCircuitBreaker returns the circuit breaker for external access func (rlc *RateLimitedClient) GetCircuitBreaker() *CircuitBreaker { return rlc.circuitBreaker } // ResetCircuitBreaker resets the circuit breaker func (rlc *RateLimitedClient) ResetCircuitBreaker() { rlc.circuitBreaker.Reset() rlc.logger.Info("✅ Circuit breaker reset to closed state") } // ConnectionManager manages Arbitrum RPC connections with fallback support type ConnectionManager struct { config *config.ArbitrumConfig primaryClient *RateLimitedClient fallbackClients []*RateLimitedClient currentClientIndex int logger *logger.Logger } // NewConnectionManager creates a new connection manager func NewConnectionManager(cfg *config.ArbitrumConfig, logger *logger.Logger) *ConnectionManager { return &ConnectionManager{ config: cfg, logger: logger, } } // GetClient returns a connected Ethereum client with automatic fallback func (cm *ConnectionManager) GetClient(ctx context.Context) (*RateLimitedClient, error) { // Try primary endpoint first if cm.primaryClient == nil { primaryEndpoint := cm.getPrimaryEndpoint() client, err := cm.connectWithTimeout(ctx, primaryEndpoint) if err == nil { cm.primaryClient = client return client, nil } } else { // Test if primary client is still connected if cm.testConnection(ctx, cm.primaryClient.Client) == nil { return cm.primaryClient, nil } // Primary client failed, close it cm.primaryClient.Client.Close() cm.primaryClient = nil } // Try fallback endpoints fallbackEndpoints := cm.getFallbackEndpoints() for i, endpoint := range fallbackEndpoints { client, err := cm.connectWithTimeout(ctx, endpoint) if err == nil { // Store successful fallback client if i < len(cm.fallbackClients) { if cm.fallbackClients[i] != nil { cm.fallbackClients[i].Client.Close() } cm.fallbackClients[i] = client } else { cm.fallbackClients = append(cm.fallbackClients, client) } cm.currentClientIndex = i return client, nil } } return nil, fmt.Errorf("all RPC endpoints failed to connect") } // getPrimaryEndpoint returns the primary RPC endpoint func (cm *ConnectionManager) getPrimaryEndpoint() string { // Check environment variable first if endpoint := os.Getenv("ARBITRUM_RPC_ENDPOINT"); endpoint != "" { return endpoint } // Use config value if cm.config != nil && cm.config.RPCEndpoint != "" { return cm.config.RPCEndpoint } // Default fallback return "wss://arbitrum-mainnet.core.chainstack.com/53c30e7a941160679fdcc396c894fc57" } // getFallbackEndpoints returns fallback RPC endpoints func (cm *ConnectionManager) getFallbackEndpoints() []string { var endpoints []string // Check environment variable first if envEndpoints := os.Getenv("ARBITRUM_FALLBACK_ENDPOINTS"); envEndpoints != "" { for _, endpoint := range strings.Split(envEndpoints, ",") { if endpoint = strings.TrimSpace(endpoint); endpoint != "" { endpoints = append(endpoints, endpoint) } } // If environment variables are set, use only those and return return endpoints } // Add configured reading and execution endpoints if cm.config != nil { // Add reading endpoints for _, endpoint := range cm.config.ReadingEndpoints { if endpoint.URL != "" { endpoints = append(endpoints, endpoint.URL) } } // Add execution endpoints for _, endpoint := range cm.config.ExecutionEndpoints { if endpoint.URL != "" { endpoints = append(endpoints, endpoint.URL) } } } // Default fallbacks if none configured if len(endpoints) == 0 { endpoints = []string{ "https://arb1.arbitrum.io/rpc", "https://arbitrum.llamarpc.com", "https://arbitrum-one.publicnode.com", "https://arbitrum-one.public.blastapi.io", } } return endpoints } // connectWithTimeout attempts to connect to an RPC endpoint with timeout func (cm *ConnectionManager) connectWithTimeout(ctx context.Context, endpoint string) (*RateLimitedClient, error) { // Create timeout context with extended timeout for production stability // Increased from 10s to 30s to handle network congestion and slow RPC responses connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() cm.logger.Info(fmt.Sprintf("🔌 Attempting connection to endpoint: %s (timeout: 30s)", endpoint)) // Create client client, err := ethclient.DialContext(connectCtx, endpoint) if err != nil { return nil, fmt.Errorf("failed to connect to %s: %w", endpoint, err) } cm.logger.Info("✅ Client connected, testing connection health...") // Test connection with a simple call if err := cm.testConnection(connectCtx, client); err != nil { client.Close() return nil, fmt.Errorf("connection test failed for %s: %w", endpoint, err) } cm.logger.Info("✅ Connection health check passed") // Wrap with rate limiting // Get rate limit from config or use defaults requestsPerSecond := 10.0 // Default 10 requests per second if cm.config != nil && cm.config.RateLimit.RequestsPerSecond > 0 { requestsPerSecond = float64(cm.config.RateLimit.RequestsPerSecond) } rateLimitedClient := NewRateLimitedClient(client, requestsPerSecond, cm.logger) return rateLimitedClient, nil } // testConnection tests if a client connection is working func (cm *ConnectionManager) testConnection(ctx context.Context, client *ethclient.Client) error { // Increased timeout from 5s to 15s for production stability testCtx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() // Try to get chain ID as a simple connection test chainID, err := client.ChainID(testCtx) if err != nil { return err } cm.logger.Info(fmt.Sprintf("✅ Connected to chain ID: %s", chainID.String())) return nil } // Close closes all client connections func (cm *ConnectionManager) Close() { if cm.primaryClient != nil { cm.primaryClient.Client.Close() cm.primaryClient = nil } for _, client := range cm.fallbackClients { if client != nil { client.Client.Close() } } cm.fallbackClients = nil } // GetClientWithRetry returns a client with automatic retry on failure func (cm *ConnectionManager) GetClientWithRetry(ctx context.Context, maxRetries int) (*RateLimitedClient, error) { var lastErr error cm.logger.Info(fmt.Sprintf("🔄 Starting connection attempts (max retries: %d)", maxRetries)) for attempt := 0; attempt < maxRetries; attempt++ { cm.logger.Info(fmt.Sprintf("📡 Connection attempt %d/%d", attempt+1, maxRetries)) client, err := cm.GetClient(ctx) if err == nil { cm.logger.Info("✅ Successfully connected to RPC endpoint") return client, nil } lastErr = err cm.logger.Warn(fmt.Sprintf("❌ Connection attempt %d failed: %v", attempt+1, err)) // Wait before retry (exponential backoff with cap at 8 seconds) if attempt < maxRetries-1 { waitTime := time.Duration(1< 8*time.Second { waitTime = 8 * time.Second } cm.logger.Info(fmt.Sprintf("⏳ Waiting %v before retry...", waitTime)) select { case <-ctx.Done(): return nil, fmt.Errorf("context cancelled during retry: %w", ctx.Err()) case <-time.After(waitTime): // Continue to next attempt } } } return nil, fmt.Errorf("failed to connect after %d attempts (last error: %w)", maxRetries, lastErr) } // GetHealthyClient returns a client that passes health checks func GetHealthyClient(ctx context.Context, logger *logger.Logger) (*RateLimitedClient, error) { cfg := &config.ArbitrumConfig{} // Use default config cm := NewConnectionManager(cfg, logger) defer cm.Close() return cm.GetClientWithRetry(ctx, 3) }