Files
mev-beta/orig/pkg/arbitrum/rpc_client_helper.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

199 lines
5.4 KiB
Go

package arbitrum
import (
"context"
"fmt"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
)
// RoundRobinClient wraps a client and tracks round-robin usage
type RoundRobinClient struct {
manager *RPCManager
ctx context.Context
logger *logger.Logger
lastIdx int
readCalls int64
writeCalls int64
}
// NewRoundRobinClient creates a new round-robin client wrapper
func NewRoundRobinClient(manager *RPCManager, ctx context.Context, logger *logger.Logger) *RoundRobinClient {
return &RoundRobinClient{
manager: manager,
ctx: ctx,
logger: logger,
lastIdx: -1,
}
}
// GetClientForRead returns the next RPC client for a read operation using round-robin
func (rr *RoundRobinClient) GetClientForRead() (*ethclient.Client, error) {
if rr.manager == nil {
return nil, fmt.Errorf("RPC manager not initialized")
}
client, idx, err := rr.manager.GetNextClient(rr.ctx)
if err != nil {
return nil, err
}
rr.lastIdx = idx
rr.readCalls++
if client == nil || client.Client == nil {
return nil, fmt.Errorf("client at index %d is nil", idx)
}
return client.Client, nil
}
// RecordReadSuccess records a successful read operation
func (rr *RoundRobinClient) RecordReadSuccess(responseTime time.Duration) {
if rr.lastIdx >= 0 && rr.manager != nil {
rr.manager.RecordSuccess(rr.lastIdx, responseTime)
}
}
// RecordReadFailure records a failed read operation
func (rr *RoundRobinClient) RecordReadFailure() {
if rr.lastIdx >= 0 && rr.manager != nil {
rr.manager.RecordFailure(rr.lastIdx)
}
}
// GetClientForWrite returns a read-optimized client for write operations
// Uses least-failure strategy to prefer stable endpoints
func (rr *RoundRobinClient) GetClientForWrite() (*ethclient.Client, error) {
if rr.manager == nil {
return nil, fmt.Errorf("RPC manager not initialized")
}
// Temporarily switch to least-failures policy for writes
currentPolicy := rr.manager.rotationPolicy
rr.manager.SetRotationPolicy(LeastFailures)
defer rr.manager.SetRotationPolicy(currentPolicy)
client, idx, err := rr.manager.GetNextClient(rr.ctx)
if err != nil {
return nil, err
}
rr.lastIdx = idx
rr.writeCalls++
if client == nil || client.Client == nil {
return nil, fmt.Errorf("client at index %d is nil", idx)
}
return client.Client, nil
}
// RecordWriteSuccess records a successful write operation
func (rr *RoundRobinClient) RecordWriteSuccess(responseTime time.Duration) {
if rr.lastIdx >= 0 && rr.manager != nil {
rr.manager.RecordSuccess(rr.lastIdx, responseTime)
}
}
// RecordWriteFailure records a failed write operation
func (rr *RoundRobinClient) RecordWriteFailure() {
if rr.lastIdx >= 0 && rr.manager != nil {
rr.manager.RecordFailure(rr.lastIdx)
}
}
// GetLoadBalancingStats returns statistics about load distribution
func (rr *RoundRobinClient) GetLoadBalancingStats() map[string]interface{} {
if rr.manager == nil {
return map[string]interface{}{
"error": "RPC manager not initialized",
}
}
return map[string]interface{}{
"total_reads": rr.readCalls,
"total_writes": rr.writeCalls,
"rpc_stats": rr.manager.GetStats(),
}
}
// InitializeRPCRoundRobin sets up round-robin RPC management for a connection manager
func InitializeRPCRoundRobin(cm *ConnectionManager, endpoints []string) error {
if cm == nil {
return fmt.Errorf("connection manager is nil")
}
if len(endpoints) == 0 {
cm.logger.Warn("⚠️ No additional endpoints provided for round-robin initialization")
return nil
}
// Connect to each endpoint and add to RPC manager
connectedCount := 0
for _, endpoint := range endpoints {
client, err := cm.connectWithTimeout(context.Background(), endpoint)
if err != nil {
cm.logger.Warn(fmt.Sprintf("Failed to connect to endpoint %s: %v", endpoint, err))
continue
}
if err := cm.rpcManager.AddEndpoint(client, endpoint); err != nil {
cm.logger.Warn(fmt.Sprintf("Failed to add endpoint to RPC manager: %v", err))
client.Client.Close()
continue
}
connectedCount++
}
cm.logger.Info(fmt.Sprintf("✅ Initialized round-robin with %d/%d endpoints", connectedCount, len(endpoints)))
// Perform initial health check
healthCheckCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := cm.rpcManager.HealthCheckAll(healthCheckCtx); err != nil {
cm.logger.Warn(fmt.Sprintf("⚠️ Health check failed: %v", err))
}
return nil
}
// ConfigureRPCLoadBalancing configures load balancing strategy for RPC endpoints
func ConfigureRPCLoadBalancing(cm *ConnectionManager, strategy RotationPolicy) error {
if cm == nil {
return fmt.Errorf("connection manager is nil")
}
cm.SetRPCRotationPolicy(strategy)
cm.logger.Info(fmt.Sprintf("📊 RPC load balancing configured with strategy: %s", strategy))
return nil
}
// GetConnectionManagerWithRoundRobin creates a connection manager with round-robin already set up
func GetConnectionManagerWithRoundRobin(cfg *config.ArbitrumConfig, logger *logger.Logger, endpoints []string) (*ConnectionManager, error) {
if cfg == nil {
return nil, fmt.Errorf("config must not be nil")
}
cm := NewConnectionManager(cfg, logger)
// Initialize with provided endpoints
if len(endpoints) > 0 {
if err := InitializeRPCRoundRobin(cm, endpoints); err != nil {
logger.Warn(fmt.Sprintf("Failed to initialize round-robin: %v", err))
}
}
// Set health-aware strategy by default
cm.SetRPCRotationPolicy(HealthAware)
return cm, nil
}