package datafetcher import ( "context" "fmt" "math/big" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/holiman/uint256" "github.com/fraktal/mev-beta/bindings/datafetcher" "github.com/fraktal/mev-beta/internal/logger" ) // PoolData represents unified pool data for both V2 and V3 pools type PoolData struct { Address common.Address Token0 common.Address Token1 common.Address Fee int64 // V3 only (0 for V2) Liquidity *uint256.Int // V3 liquidity or derived from V2 reserves SqrtPriceX96 *uint256.Int // V3 price or calculated from V2 reserves Tick int // V3 only (0 for V2) Reserve0 *big.Int // V2 only (nil for V3) Reserve1 *big.Int // V2 only (nil for V3) IsV3 bool BlockNumber *big.Int Timestamp *big.Int } // BatchFetcher provides efficient batch pool data fetching using the DataFetcher contract type BatchFetcher struct { client *ethclient.Client contract *datafetcher.DataFetcher contractAddr common.Address logger *logger.Logger maxBatchSize int requestTimeout time.Duration blacklist *PoolBlacklist } // NewBatchFetcher creates a new batch fetcher instance func NewBatchFetcher( client *ethclient.Client, contractAddr common.Address, logger *logger.Logger, ) (*BatchFetcher, error) { if client == nil { return nil, fmt.Errorf("client cannot be nil") } contract, err := datafetcher.NewDataFetcher(contractAddr, client) if err != nil { return nil, fmt.Errorf("failed to instantiate DataFetcher contract: %w", err) } // Initialize blacklist with persistence blacklistPath := "logs/pool_blacklist.json" blacklist := NewPoolBlacklist(blacklistPath) return &BatchFetcher{ client: client, contract: contract, contractAddr: contractAddr, logger: logger, maxBatchSize: 100, // Fetch up to 100 pools per batch requestTimeout: 30 * time.Second, // FIXED (2025-11-03): Increased from 10s to 30s to handle RPC latency blacklist: blacklist, }, nil } // FetchPoolsBatch fetches data for multiple pools in a single RPC call // Automatically detects V2 vs V3 and fetches appropriate data func (bf *BatchFetcher) FetchPoolsBatch(ctx context.Context, poolAddresses []common.Address) (map[common.Address]*PoolData, error) { if len(poolAddresses) == 0 { return make(map[common.Address]*PoolData), nil } // Validate and filter pool addresses validPools := make([]common.Address, 0, len(poolAddresses)) invalidCount := 0 blacklistedCount := 0 for _, addr := range poolAddresses { // Check if address is valid if err := ValidatePoolAddress(addr); err != nil { invalidCount++ bf.blacklist.RecordFailure(addr, FailureInvalidAddress) continue } // Check if blacklisted if bf.blacklist.IsBlacklisted(addr) { blacklistedCount++ continue } validPools = append(validPools, addr) } if invalidCount > 0 || blacklistedCount > 0 { bf.logger.Debug(fmt.Sprintf("🔍 Filtered pools: %d invalid, %d blacklisted, %d valid", invalidCount, blacklistedCount, len(validPools))) } if len(validPools) == 0 { return make(map[common.Address]*PoolData), nil } // Split into batches if needed results := make(map[common.Address]*PoolData) for i := 0; i < len(validPools); i += bf.maxBatchSize { end := i + bf.maxBatchSize if end > len(validPools) { end = len(validPools) } batch := validPools[i:end] batchResults, err := bf.fetchSingleBatch(ctx, batch) // If batch fetch failed with execution revert, try individual fetches as fallback if err != nil && bf.isExecutionRevert(err) && len(batch) > 1 { bf.logger.Debug(fmt.Sprintf("Batch %d-%d failed with revert, trying individual fetches for %d pools", i, end, len(batch))) // Fetch each pool individually to identify which ones are bad for _, poolAddr := range batch { individualResult, individualErr := bf.fetchSingleBatch(ctx, []common.Address{poolAddr}) if individualErr != nil { // This specific pool is bad bf.blacklist.RecordFailure(poolAddr, bf.categorizeError(individualErr)) bf.logger.Debug(fmt.Sprintf("Pool %s failed individual fetch: %v", poolAddr.Hex()[:10], individualErr)) } else if len(individualResult) > 0 { // This pool is good! Merge its data for addr, data := range individualResult { results[addr] = data bf.blacklist.RecordSuccess(addr) } } else { // Pool returned no data bf.blacklist.RecordFailure(poolAddr, FailureNoData) } } continue } // If batch fetch failed for other reasons, record failures if err != nil { bf.logger.Warn(fmt.Sprintf("Failed to fetch batch %d-%d: %v", i, end, err)) // Record failures for all pools in this batch for _, addr := range batch { bf.blacklist.RecordFailure(addr, bf.categorizeError(err)) } continue } // Batch succeeded - merge results and record successes for addr, data := range batchResults { results[addr] = data bf.blacklist.RecordSuccess(addr) } // Record failures for pools that didn't return data for _, addr := range batch { if _, exists := batchResults[addr]; !exists { bf.blacklist.RecordFailure(addr, FailureNoData) } } } total, blacklisted, expired := bf.blacklist.GetStats() bf.logger.Debug(fmt.Sprintf("✅ Batch fetched %d/%d pools successfully (Blacklist: %d total, %d active, %d expired)", len(results), len(poolAddresses), total, blacklisted, expired)) return results, nil } // fetchSingleBatch fetches a single batch (max 100 pools) with retry logic func (bf *BatchFetcher) fetchSingleBatch(ctx context.Context, pools []common.Address) (map[common.Address]*PoolData, error) { // FIXED (2025-11-03): Added exponential backoff retry logic for transient RPC failures // This handles temporary network issues, rate limiting, and RPC overload gracefully maxRetries := 3 baseDelay := time.Second var lastErr error for attempt := 0; attempt < maxRetries; attempt++ { timeoutCtx, cancel := context.WithTimeout(ctx, bf.requestTimeout) defer cancel() // FIXED: Use batchFetchV3Data instead of batchFetchAllData // Reason: batchFetchAllData is missing 'view' modifier in Solidity, causing ABI issues // batchFetchV3Data is properly marked as 'view' and works correctly // Call batchFetchV3Data using the generated bindings opts := &bind.CallOpts{ Context: timeoutCtx, } v3Data, err := bf.contract.BatchFetchV3Data(opts, pools) if err == nil { // Success on first or retry attempt return bf.parseV3Data(timeoutCtx, v3Data) } lastErr = err // Check if this is a transient error worth retrying if bf.isTransientError(err) && attempt < maxRetries-1 { // Exponential backoff: 1s, 2s, 4s delayDuration := baseDelay * time.Duration(1< 0 && size <= 1000 { bf.maxBatchSize = size } } // SetRequestTimeout sets the timeout for batch requests func (bf *BatchFetcher) SetRequestTimeout(timeout time.Duration) { bf.requestTimeout = timeout } // categorizeError determines the type of error for blacklist tracking func (bf *BatchFetcher) categorizeError(err error) FailureReason { if err == nil { return FailureOther } errStr := err.Error() // Check for common error patterns if contains(errStr, "execution reverted") { return FailureExecutionRevert } if contains(errStr, "no data returned") || contains(errStr, "no data") { return FailureNoData } if contains(errStr, "429") || contains(errStr, "too many requests") || contains(errStr, "rate limit") { return FailureRateLimit } if contains(errStr, "timeout") || contains(errStr, "deadline exceeded") { return FailureTimeout } return FailureOther } // contains checks if a string contains a substring (case-insensitive) func contains(s, substr string) bool { return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && (hasSubstring(s, substr))) } func hasSubstring(s, substr string) bool { for i := 0; i <= len(s)-len(substr); i++ { if s[i:i+len(substr)] == substr { return true } } return false } // GetBlacklist returns the blacklist for external access func (bf *BatchFetcher) GetBlacklist() *PoolBlacklist { return bf.blacklist } // PersistBlacklist forces immediate persistence of the blacklist func (bf *BatchFetcher) PersistBlacklist() error { return bf.blacklist.Persist() } // isExecutionRevert checks if an error is an execution revert func (bf *BatchFetcher) isExecutionRevert(err error) bool { if err == nil { return false } errStr := err.Error() return contains(errStr, "execution reverted") || contains(errStr, "revert") || contains(errStr, "reverted") } // isTransientError checks if an error is transient (worth retrying) // ADDED (2025-11-03): Distinguishes between transient (network/timeout) and permanent (contract) errors func (bf *BatchFetcher) isTransientError(err error) bool { if err == nil { return false } errStr := err.Error() // Transient errors worth retrying: // - Timeouts/deadline exceeded (network latency) // - Rate limiting (429 Too Many Requests) // - Temporary network issues (connection refused, reset, etc.) // - Context deadline exceeded (timeout) transientPatterns := []string{ "context deadline exceeded", "timeout", "429", "too many requests", "rate limit", "connection refused", "connection reset", "i/o timeout", "temporary failure", "temporarily unavailable", "econnrefused", "econnreset", "etimedout", } for _, pattern := range transientPatterns { if contains(errStr, pattern) { return true } } // Non-transient errors that should NOT be retried: // - Execution reverts (contract logic failures) // - Invalid addresses // - Method not found if bf.isExecutionRevert(err) || contains(errStr, "no method") || contains(errStr, "invalid address") || contains(errStr, "ABI") { return false } // For unknown errors, assume NOT transient (fail fast) return false }