Files
mev-beta/pkg/datafetcher/pool_blacklist.go

345 lines
8.9 KiB
Go

package datafetcher
import (
"encoding/json"
"fmt"
"os"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
)
// FailureReason represents why a pool failed
type FailureReason string
const (
FailureExecutionRevert FailureReason = "execution_reverted"
FailureNoData FailureReason = "no_data_returned"
FailureInvalidAddress FailureReason = "invalid_address"
FailureTimeout FailureReason = "timeout"
FailureRateLimit FailureReason = "rate_limit"
FailureOther FailureReason = "other"
)
// PoolFailureRecord tracks failures for a specific pool
type PoolFailureRecord struct {
Address common.Address `json:"address"`
FailureCount int `json:"failure_count"`
ConsecutiveFails int `json:"consecutive_fails"`
LastFailure time.Time `json:"last_failure"`
LastReason FailureReason `json:"last_reason"`
FirstSeen time.Time `json:"first_seen"`
IsBlacklisted bool `json:"is_blacklisted"`
BlacklistedAt time.Time `json:"blacklisted_at,omitempty"`
}
// PoolBlacklist manages a list of pools that consistently fail
type PoolBlacklist struct {
mu sync.RWMutex
failures map[common.Address]*PoolFailureRecord
blacklistThreshold int // Number of consecutive failures before blacklisting
blacklistExpiry time.Duration // How long to keep in blacklist before retry
rateLimitIgnoreWindow time.Duration // Ignore rate limit errors within this window
persistPath string
lastPersist time.Time
persistInterval time.Duration
}
// NewPoolBlacklist creates a new pool blacklist manager
func NewPoolBlacklist(persistPath string) *PoolBlacklist {
pb := &PoolBlacklist{
failures: make(map[common.Address]*PoolFailureRecord),
blacklistThreshold: 5, // Blacklist after 5 consecutive failures
blacklistExpiry: 24 * time.Hour, // Retry blacklisted pools after 24 hours
rateLimitIgnoreWindow: 5 * time.Minute, // Ignore rate limit errors for 5 minutes
persistPath: persistPath,
persistInterval: 5 * time.Minute, // Persist every 5 minutes
}
// Load existing blacklist from disk
if persistPath != "" {
if err := pb.Load(); err != nil {
// Not critical, just log and continue
fmt.Printf("[WARN] Failed to load blacklist from %s: %v\n", persistPath, err)
}
}
return pb
}
// RecordFailure records a failure for a pool
func (pb *PoolBlacklist) RecordFailure(addr common.Address, reason FailureReason) {
pb.mu.Lock()
defer pb.mu.Unlock()
// Ignore rate limit errors during the ignore window
// (rate limits are temporary and shouldn't blacklist pools)
if reason == FailureRateLimit {
if record, exists := pb.failures[addr]; exists {
if time.Since(record.LastFailure) < pb.rateLimitIgnoreWindow {
return
}
}
}
now := time.Now()
record, exists := pb.failures[addr]
if !exists {
record = &PoolFailureRecord{
Address: addr,
FirstSeen: now,
}
pb.failures[addr] = record
}
// Update failure counts
record.FailureCount++
record.ConsecutiveFails++
record.LastFailure = now
record.LastReason = reason
// Check if should be blacklisted
if !record.IsBlacklisted && record.ConsecutiveFails >= pb.blacklistThreshold {
record.IsBlacklisted = true
record.BlacklistedAt = now
}
// Persist periodically
if pb.persistPath != "" && time.Since(pb.lastPersist) > pb.persistInterval {
pb.persistUnsafe()
}
}
// RecordSuccess records a successful fetch for a pool
func (pb *PoolBlacklist) RecordSuccess(addr common.Address) {
pb.mu.Lock()
defer pb.mu.Unlock()
record, exists := pb.failures[addr]
if !exists {
// No failures recorded, nothing to do
return
}
// Reset consecutive failures
record.ConsecutiveFails = 0
// If it was blacklisted and now works, remove from blacklist
if record.IsBlacklisted {
record.IsBlacklisted = false
record.BlacklistedAt = time.Time{}
}
// Persist on success to ensure blacklist is updated
if pb.persistPath != "" {
pb.persistUnsafe()
}
}
// IsBlacklisted checks if a pool is currently blacklisted
func (pb *PoolBlacklist) IsBlacklisted(addr common.Address) bool {
pb.mu.RLock()
defer pb.mu.RUnlock()
record, exists := pb.failures[addr]
if !exists {
return false
}
if !record.IsBlacklisted {
return false
}
// Check if blacklist has expired
if time.Since(record.BlacklistedAt) > pb.blacklistExpiry {
return false
}
return true
}
// FilterBlacklisted removes blacklisted pools from a list
func (pb *PoolBlacklist) FilterBlacklisted(pools []common.Address) []common.Address {
pb.mu.RLock()
defer pb.mu.RUnlock()
filtered := make([]common.Address, 0, len(pools))
now := time.Now()
for _, pool := range pools {
record, exists := pb.failures[pool]
// If no record or not blacklisted, include it
if !exists || !record.IsBlacklisted {
filtered = append(filtered, pool)
continue
}
// Check if blacklist has expired
if now.Sub(record.BlacklistedAt) > pb.blacklistExpiry {
// Expired, include it and mark for retry
filtered = append(filtered, pool)
// Note: We don't modify the record here (would need write lock)
// It will be updated on next fetch attempt
}
// Otherwise skip (blacklisted)
}
return filtered
}
// GetStats returns blacklist statistics
func (pb *PoolBlacklist) GetStats() (total, blacklisted, expired int) {
pb.mu.RLock()
defer pb.mu.RUnlock()
now := time.Now()
total = len(pb.failures)
for _, record := range pb.failures {
if record.IsBlacklisted {
if now.Sub(record.BlacklistedAt) > pb.blacklistExpiry {
expired++
} else {
blacklisted++
}
}
}
return total, blacklisted, expired
}
// GetTopFailures returns the top N pools by failure count
func (pb *PoolBlacklist) GetTopFailures(n int) []*PoolFailureRecord {
pb.mu.RLock()
defer pb.mu.RUnlock()
// Copy all records
records := make([]*PoolFailureRecord, 0, len(pb.failures))
for _, record := range pb.failures {
records = append(records, record)
}
// Simple bubble sort (fine for small n)
for i := 0; i < len(records)-1; i++ {
for j := 0; j < len(records)-i-1; j++ {
if records[j].FailureCount < records[j+1].FailureCount {
records[j], records[j+1] = records[j+1], records[j]
}
}
}
// Return top n
if n > len(records) {
n = len(records)
}
return records[:n]
}
// Persist saves the blacklist to disk
func (pb *PoolBlacklist) Persist() error {
pb.mu.Lock()
defer pb.mu.Unlock()
return pb.persistUnsafe()
}
// persistUnsafe persists without acquiring lock (caller must hold lock)
func (pb *PoolBlacklist) persistUnsafe() error {
if pb.persistPath == "" {
return nil
}
// Convert map to slice for JSON
records := make([]*PoolFailureRecord, 0, len(pb.failures))
for _, record := range pb.failures {
records = append(records, record)
}
data, err := json.MarshalIndent(records, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal blacklist: %w", err)
}
if err := os.WriteFile(pb.persistPath, data, 0644); err != nil {
return fmt.Errorf("failed to write blacklist: %w", err)
}
pb.lastPersist = time.Now()
return nil
}
// Load loads the blacklist from disk
func (pb *PoolBlacklist) Load() error {
pb.mu.Lock()
defer pb.mu.Unlock()
if pb.persistPath == "" {
return nil
}
data, err := os.ReadFile(pb.persistPath)
if err != nil {
if os.IsNotExist(err) {
return nil // File doesn't exist yet, not an error
}
return fmt.Errorf("failed to read blacklist: %w", err)
}
var records []*PoolFailureRecord
if err := json.Unmarshal(data, &records); err != nil {
return fmt.Errorf("failed to unmarshal blacklist: %w", err)
}
// Load records into map
pb.failures = make(map[common.Address]*PoolFailureRecord)
for _, record := range records {
pb.failures[record.Address] = record
}
return nil
}
// SetBlacklistThreshold sets the number of consecutive failures before blacklisting
func (pb *PoolBlacklist) SetBlacklistThreshold(threshold int) {
pb.mu.Lock()
defer pb.mu.Unlock()
if threshold > 0 {
pb.blacklistThreshold = threshold
}
}
// SetBlacklistExpiry sets how long pools stay blacklisted
func (pb *PoolBlacklist) SetBlacklistExpiry(expiry time.Duration) {
pb.mu.Lock()
defer pb.mu.Unlock()
if expiry > 0 {
pb.blacklistExpiry = expiry
}
}
// Clear removes all entries from the blacklist
func (pb *PoolBlacklist) Clear() {
pb.mu.Lock()
defer pb.mu.Unlock()
pb.failures = make(map[common.Address]*PoolFailureRecord)
}
// ValidatePoolAddress checks if a pool address is valid for fetching
func ValidatePoolAddress(addr common.Address) error {
// Check for zero address
if addr == (common.Address{}) {
return fmt.Errorf("zero address not allowed")
}
// Check for common invalid addresses
if addr.Hex() == "0x0000000000000000000000000000000000000000" {
return fmt.Errorf("zero address not allowed")
}
// All other addresses are potentially valid
// (we can't pre-validate if they're actual pool contracts without RPC calls)
return nil
}