345 lines
8.9 KiB
Go
345 lines
8.9 KiB
Go
package datafetcher
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
)
|
|
|
|
// FailureReason represents why a pool failed
|
|
type FailureReason string
|
|
|
|
const (
|
|
FailureExecutionRevert FailureReason = "execution_reverted"
|
|
FailureNoData FailureReason = "no_data_returned"
|
|
FailureInvalidAddress FailureReason = "invalid_address"
|
|
FailureTimeout FailureReason = "timeout"
|
|
FailureRateLimit FailureReason = "rate_limit"
|
|
FailureOther FailureReason = "other"
|
|
)
|
|
|
|
// PoolFailureRecord tracks failures for a specific pool
|
|
type PoolFailureRecord struct {
|
|
Address common.Address `json:"address"`
|
|
FailureCount int `json:"failure_count"`
|
|
ConsecutiveFails int `json:"consecutive_fails"`
|
|
LastFailure time.Time `json:"last_failure"`
|
|
LastReason FailureReason `json:"last_reason"`
|
|
FirstSeen time.Time `json:"first_seen"`
|
|
IsBlacklisted bool `json:"is_blacklisted"`
|
|
BlacklistedAt time.Time `json:"blacklisted_at,omitempty"`
|
|
}
|
|
|
|
// PoolBlacklist manages a list of pools that consistently fail
|
|
type PoolBlacklist struct {
|
|
mu sync.RWMutex
|
|
failures map[common.Address]*PoolFailureRecord
|
|
blacklistThreshold int // Number of consecutive failures before blacklisting
|
|
blacklistExpiry time.Duration // How long to keep in blacklist before retry
|
|
rateLimitIgnoreWindow time.Duration // Ignore rate limit errors within this window
|
|
persistPath string
|
|
lastPersist time.Time
|
|
persistInterval time.Duration
|
|
}
|
|
|
|
// NewPoolBlacklist creates a new pool blacklist manager
|
|
func NewPoolBlacklist(persistPath string) *PoolBlacklist {
|
|
pb := &PoolBlacklist{
|
|
failures: make(map[common.Address]*PoolFailureRecord),
|
|
blacklistThreshold: 5, // Blacklist after 5 consecutive failures
|
|
blacklistExpiry: 24 * time.Hour, // Retry blacklisted pools after 24 hours
|
|
rateLimitIgnoreWindow: 5 * time.Minute, // Ignore rate limit errors for 5 minutes
|
|
persistPath: persistPath,
|
|
persistInterval: 5 * time.Minute, // Persist every 5 minutes
|
|
}
|
|
|
|
// Load existing blacklist from disk
|
|
if persistPath != "" {
|
|
if err := pb.Load(); err != nil {
|
|
// Not critical, just log and continue
|
|
fmt.Printf("[WARN] Failed to load blacklist from %s: %v\n", persistPath, err)
|
|
}
|
|
}
|
|
|
|
return pb
|
|
}
|
|
|
|
// RecordFailure records a failure for a pool
|
|
func (pb *PoolBlacklist) RecordFailure(addr common.Address, reason FailureReason) {
|
|
pb.mu.Lock()
|
|
defer pb.mu.Unlock()
|
|
|
|
// Ignore rate limit errors during the ignore window
|
|
// (rate limits are temporary and shouldn't blacklist pools)
|
|
if reason == FailureRateLimit {
|
|
if record, exists := pb.failures[addr]; exists {
|
|
if time.Since(record.LastFailure) < pb.rateLimitIgnoreWindow {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
now := time.Now()
|
|
record, exists := pb.failures[addr]
|
|
|
|
if !exists {
|
|
record = &PoolFailureRecord{
|
|
Address: addr,
|
|
FirstSeen: now,
|
|
}
|
|
pb.failures[addr] = record
|
|
}
|
|
|
|
// Update failure counts
|
|
record.FailureCount++
|
|
record.ConsecutiveFails++
|
|
record.LastFailure = now
|
|
record.LastReason = reason
|
|
|
|
// Check if should be blacklisted
|
|
if !record.IsBlacklisted && record.ConsecutiveFails >= pb.blacklistThreshold {
|
|
record.IsBlacklisted = true
|
|
record.BlacklistedAt = now
|
|
}
|
|
|
|
// Persist periodically
|
|
if pb.persistPath != "" && time.Since(pb.lastPersist) > pb.persistInterval {
|
|
pb.persistUnsafe()
|
|
}
|
|
}
|
|
|
|
// RecordSuccess records a successful fetch for a pool
|
|
func (pb *PoolBlacklist) RecordSuccess(addr common.Address) {
|
|
pb.mu.Lock()
|
|
defer pb.mu.Unlock()
|
|
|
|
record, exists := pb.failures[addr]
|
|
if !exists {
|
|
// No failures recorded, nothing to do
|
|
return
|
|
}
|
|
|
|
// Reset consecutive failures
|
|
record.ConsecutiveFails = 0
|
|
|
|
// If it was blacklisted and now works, remove from blacklist
|
|
if record.IsBlacklisted {
|
|
record.IsBlacklisted = false
|
|
record.BlacklistedAt = time.Time{}
|
|
}
|
|
|
|
// Persist on success to ensure blacklist is updated
|
|
if pb.persistPath != "" {
|
|
pb.persistUnsafe()
|
|
}
|
|
}
|
|
|
|
// IsBlacklisted checks if a pool is currently blacklisted
|
|
func (pb *PoolBlacklist) IsBlacklisted(addr common.Address) bool {
|
|
pb.mu.RLock()
|
|
defer pb.mu.RUnlock()
|
|
|
|
record, exists := pb.failures[addr]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
if !record.IsBlacklisted {
|
|
return false
|
|
}
|
|
|
|
// Check if blacklist has expired
|
|
if time.Since(record.BlacklistedAt) > pb.blacklistExpiry {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// FilterBlacklisted removes blacklisted pools from a list
|
|
func (pb *PoolBlacklist) FilterBlacklisted(pools []common.Address) []common.Address {
|
|
pb.mu.RLock()
|
|
defer pb.mu.RUnlock()
|
|
|
|
filtered := make([]common.Address, 0, len(pools))
|
|
now := time.Now()
|
|
|
|
for _, pool := range pools {
|
|
record, exists := pb.failures[pool]
|
|
|
|
// If no record or not blacklisted, include it
|
|
if !exists || !record.IsBlacklisted {
|
|
filtered = append(filtered, pool)
|
|
continue
|
|
}
|
|
|
|
// Check if blacklist has expired
|
|
if now.Sub(record.BlacklistedAt) > pb.blacklistExpiry {
|
|
// Expired, include it and mark for retry
|
|
filtered = append(filtered, pool)
|
|
// Note: We don't modify the record here (would need write lock)
|
|
// It will be updated on next fetch attempt
|
|
}
|
|
// Otherwise skip (blacklisted)
|
|
}
|
|
|
|
return filtered
|
|
}
|
|
|
|
// GetStats returns blacklist statistics
|
|
func (pb *PoolBlacklist) GetStats() (total, blacklisted, expired int) {
|
|
pb.mu.RLock()
|
|
defer pb.mu.RUnlock()
|
|
|
|
now := time.Now()
|
|
total = len(pb.failures)
|
|
|
|
for _, record := range pb.failures {
|
|
if record.IsBlacklisted {
|
|
if now.Sub(record.BlacklistedAt) > pb.blacklistExpiry {
|
|
expired++
|
|
} else {
|
|
blacklisted++
|
|
}
|
|
}
|
|
}
|
|
|
|
return total, blacklisted, expired
|
|
}
|
|
|
|
// GetTopFailures returns the top N pools by failure count
|
|
func (pb *PoolBlacklist) GetTopFailures(n int) []*PoolFailureRecord {
|
|
pb.mu.RLock()
|
|
defer pb.mu.RUnlock()
|
|
|
|
// Copy all records
|
|
records := make([]*PoolFailureRecord, 0, len(pb.failures))
|
|
for _, record := range pb.failures {
|
|
records = append(records, record)
|
|
}
|
|
|
|
// Simple bubble sort (fine for small n)
|
|
for i := 0; i < len(records)-1; i++ {
|
|
for j := 0; j < len(records)-i-1; j++ {
|
|
if records[j].FailureCount < records[j+1].FailureCount {
|
|
records[j], records[j+1] = records[j+1], records[j]
|
|
}
|
|
}
|
|
}
|
|
|
|
// Return top n
|
|
if n > len(records) {
|
|
n = len(records)
|
|
}
|
|
return records[:n]
|
|
}
|
|
|
|
// Persist saves the blacklist to disk
|
|
func (pb *PoolBlacklist) Persist() error {
|
|
pb.mu.Lock()
|
|
defer pb.mu.Unlock()
|
|
return pb.persistUnsafe()
|
|
}
|
|
|
|
// persistUnsafe persists without acquiring lock (caller must hold lock)
|
|
func (pb *PoolBlacklist) persistUnsafe() error {
|
|
if pb.persistPath == "" {
|
|
return nil
|
|
}
|
|
|
|
// Convert map to slice for JSON
|
|
records := make([]*PoolFailureRecord, 0, len(pb.failures))
|
|
for _, record := range pb.failures {
|
|
records = append(records, record)
|
|
}
|
|
|
|
data, err := json.MarshalIndent(records, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal blacklist: %w", err)
|
|
}
|
|
|
|
if err := os.WriteFile(pb.persistPath, data, 0644); err != nil {
|
|
return fmt.Errorf("failed to write blacklist: %w", err)
|
|
}
|
|
|
|
pb.lastPersist = time.Now()
|
|
return nil
|
|
}
|
|
|
|
// Load loads the blacklist from disk
|
|
func (pb *PoolBlacklist) Load() error {
|
|
pb.mu.Lock()
|
|
defer pb.mu.Unlock()
|
|
|
|
if pb.persistPath == "" {
|
|
return nil
|
|
}
|
|
|
|
data, err := os.ReadFile(pb.persistPath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil // File doesn't exist yet, not an error
|
|
}
|
|
return fmt.Errorf("failed to read blacklist: %w", err)
|
|
}
|
|
|
|
var records []*PoolFailureRecord
|
|
if err := json.Unmarshal(data, &records); err != nil {
|
|
return fmt.Errorf("failed to unmarshal blacklist: %w", err)
|
|
}
|
|
|
|
// Load records into map
|
|
pb.failures = make(map[common.Address]*PoolFailureRecord)
|
|
for _, record := range records {
|
|
pb.failures[record.Address] = record
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetBlacklistThreshold sets the number of consecutive failures before blacklisting
|
|
func (pb *PoolBlacklist) SetBlacklistThreshold(threshold int) {
|
|
pb.mu.Lock()
|
|
defer pb.mu.Unlock()
|
|
if threshold > 0 {
|
|
pb.blacklistThreshold = threshold
|
|
}
|
|
}
|
|
|
|
// SetBlacklistExpiry sets how long pools stay blacklisted
|
|
func (pb *PoolBlacklist) SetBlacklistExpiry(expiry time.Duration) {
|
|
pb.mu.Lock()
|
|
defer pb.mu.Unlock()
|
|
if expiry > 0 {
|
|
pb.blacklistExpiry = expiry
|
|
}
|
|
}
|
|
|
|
// Clear removes all entries from the blacklist
|
|
func (pb *PoolBlacklist) Clear() {
|
|
pb.mu.Lock()
|
|
defer pb.mu.Unlock()
|
|
pb.failures = make(map[common.Address]*PoolFailureRecord)
|
|
}
|
|
|
|
// ValidatePoolAddress checks if a pool address is valid for fetching
|
|
func ValidatePoolAddress(addr common.Address) error {
|
|
// Check for zero address
|
|
if addr == (common.Address{}) {
|
|
return fmt.Errorf("zero address not allowed")
|
|
}
|
|
|
|
// Check for common invalid addresses
|
|
if addr.Hex() == "0x0000000000000000000000000000000000000000" {
|
|
return fmt.Errorf("zero address not allowed")
|
|
}
|
|
|
|
// All other addresses are potentially valid
|
|
// (we can't pre-validate if they're actual pool contracts without RPC calls)
|
|
return nil
|
|
}
|