package pools import ( "encoding/json" "fmt" "os" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/fraktal/mev-beta/internal/logger" ) // PoolBlacklist manages a list of pools that consistently fail type PoolBlacklist struct { mu sync.RWMutex logger *logger.Logger blacklist map[common.Address]*BlacklistEntry failureThreshold int failureWindow time.Duration persistFile string } // BlacklistEntry represents a blacklisted pool type BlacklistEntry struct { Address common.Address `json:"address"` FailureCount int `json:"failure_count"` LastFailure time.Time `json:"last_failure"` FirstFailure time.Time `json:"first_failure"` FailureReason string `json:"failure_reason"` Protocol string `json:"protocol"` TokenPair [2]common.Address `json:"token_pair"` Permanent bool `json:"permanent"` AddedAt time.Time `json:"added_at"` } // NewPoolBlacklist creates a new pool blacklist manager func NewPoolBlacklist(logger *logger.Logger) *PoolBlacklist { pb := &PoolBlacklist{ logger: logger, blacklist: make(map[common.Address]*BlacklistEntry), failureThreshold: 5, // Blacklist after 5 failures failureWindow: time.Hour, // Within 1 hour persistFile: "logs/pool_blacklist.json", } // Load existing blacklist from file pb.loadFromFile() // Start periodic cleanup of old entries go pb.periodicCleanup() return pb } // RecordFailure records a pool failure and checks if it should be blacklisted func (pb *PoolBlacklist) RecordFailure(poolAddress common.Address, reason string, protocol string, token0, token1 common.Address) { pb.mu.Lock() defer pb.mu.Unlock() entry, exists := pb.blacklist[poolAddress] now := time.Now() if !exists { // First failure - create new entry but don't blacklist yet entry = &BlacklistEntry{ Address: poolAddress, FailureCount: 1, FirstFailure: now, LastFailure: now, FailureReason: reason, Protocol: protocol, TokenPair: [2]common.Address{token0, token1}, Permanent: false, AddedAt: now, } pb.blacklist[poolAddress] = entry pb.logger.Warn(fmt.Sprintf("๐Ÿšจ POOL FAILURE [1/%d]: Pool %s (%s) - %s | Tokens: %s/%s", pb.failureThreshold, poolAddress.Hex()[:10], protocol, reason, token0.Hex()[:10], token1.Hex()[:10])) pb.logger.Info(fmt.Sprintf("๐Ÿ“Š Pool Blacklist Status: %d pools blacklisted, %d monitoring", pb.countPermanentlyBlacklisted(), len(pb.blacklist))) return } // Update existing entry entry.FailureCount++ entry.LastFailure = now entry.FailureReason = reason // Check if we should permanently blacklist if entry.FailureCount >= pb.failureThreshold { if !entry.Permanent { entry.Permanent = true pb.logger.Error(fmt.Sprintf("โ›” POOL BLACKLISTED: %s (%s) after %d failures", poolAddress.Hex(), protocol, entry.FailureCount)) pb.logger.Error(fmt.Sprintf("๐Ÿ“ Blacklist Details:\n"+ " - Pool: %s\n"+ " - Protocol: %s\n"+ " - Tokens: %s / %s\n"+ " - Failures: %d\n"+ " - First Failure: %s\n"+ " - Last Failure: %s\n"+ " - Reason: %s\n"+ " - Duration: %s", poolAddress.Hex(), protocol, token0.Hex(), token1.Hex(), entry.FailureCount, entry.FirstFailure.Format("2006-01-02 15:04:05"), entry.LastFailure.Format("2006-01-02 15:04:05"), reason, now.Sub(entry.FirstFailure).String())) // Persist to file pb.saveToFile() } } else { pb.logger.Warn(fmt.Sprintf("๐Ÿšจ POOL FAILURE [%d/%d]: Pool %s (%s) - %s | Tokens: %s/%s", entry.FailureCount, pb.failureThreshold, poolAddress.Hex()[:10], protocol, reason, token0.Hex()[:10], token1.Hex()[:10])) } // Log current blacklist statistics if entry.FailureCount%2 == 0 { // Log stats every 2 failures pb.logStatistics() } } // IsBlacklisted checks if a pool is blacklisted func (pb *PoolBlacklist) IsBlacklisted(poolAddress common.Address) bool { pb.mu.RLock() defer pb.mu.RUnlock() entry, exists := pb.blacklist[poolAddress] if !exists { return false } // Check if permanently blacklisted if entry.Permanent { // Log access attempt to blacklisted pool (throttled) if time.Since(entry.LastFailure) > time.Minute { pb.logger.Debug(fmt.Sprintf("โš ๏ธ Skipping blacklisted pool %s (failed %d times, reason: %s)", poolAddress.Hex()[:10], entry.FailureCount, entry.FailureReason)) entry.LastFailure = time.Now() // Update to throttle logging } return true } // Check if within failure window if time.Since(entry.FirstFailure) > pb.failureWindow { // Outside window - reset the entry delete(pb.blacklist, poolAddress) pb.logger.Debug(fmt.Sprintf("๐Ÿ”„ Pool %s removed from monitoring (failure window expired)", poolAddress.Hex()[:10])) return false } return false } // GetBlacklistStats returns statistics about the blacklist func (pb *PoolBlacklist) GetBlacklistStats() map[string]interface{} { pb.mu.RLock() defer pb.mu.RUnlock() permanentCount := 0 temporaryCount := 0 totalFailures := 0 reasonCounts := make(map[string]int) protocolCounts := make(map[string]int) for _, entry := range pb.blacklist { if entry.Permanent { permanentCount++ } else { temporaryCount++ } totalFailures += entry.FailureCount reasonCounts[entry.FailureReason]++ protocolCounts[entry.Protocol]++ } return map[string]interface{}{ "total_entries": len(pb.blacklist), "permanent_blacklist": permanentCount, "temporary_monitor": temporaryCount, "total_failures": totalFailures, "failure_reasons": reasonCounts, "protocols_affected": protocolCounts, } } // ClearBlacklist clears the entire blacklist (for testing/recovery) func (pb *PoolBlacklist) ClearBlacklist() { pb.mu.Lock() defer pb.mu.Unlock() oldCount := len(pb.blacklist) pb.blacklist = make(map[common.Address]*BlacklistEntry) pb.logger.Info(fmt.Sprintf("๐Ÿ”„ Pool blacklist cleared: %d entries removed", oldCount)) pb.saveToFile() } // RemoveFromBlacklist removes a specific pool from the blacklist func (pb *PoolBlacklist) RemoveFromBlacklist(poolAddress common.Address) { pb.mu.Lock() defer pb.mu.Unlock() if entry, exists := pb.blacklist[poolAddress]; exists { delete(pb.blacklist, poolAddress) pb.logger.Info(fmt.Sprintf("โœ… Pool %s removed from blacklist (was %s, %d failures)", poolAddress.Hex()[:10], entry.FailureReason, entry.FailureCount)) pb.saveToFile() } } // periodicCleanup removes old non-permanent entries func (pb *PoolBlacklist) periodicCleanup() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for range ticker.C { pb.mu.Lock() now := time.Now() removed := 0 for addr, entry := range pb.blacklist { // Remove non-permanent entries older than failure window if !entry.Permanent && now.Sub(entry.FirstFailure) > pb.failureWindow { delete(pb.blacklist, addr) removed++ } } if removed > 0 { pb.logger.Info(fmt.Sprintf("๐Ÿงน Pool blacklist cleanup: %d temporary entries removed", removed)) pb.saveToFile() } pb.mu.Unlock() } } // saveToFile persists the blacklist to disk func (pb *PoolBlacklist) saveToFile() { data, err := json.MarshalIndent(pb.blacklist, "", " ") if err != nil { pb.logger.Error(fmt.Sprintf("Failed to marshal blacklist: %v", err)) return } err = os.WriteFile(pb.persistFile, data, 0644) if err != nil { pb.logger.Error(fmt.Sprintf("Failed to save blacklist to file: %v", err)) return } pb.logger.Debug(fmt.Sprintf("๐Ÿ’พ Pool blacklist saved: %d entries", len(pb.blacklist))) } // loadFromFile loads the blacklist from disk func (pb *PoolBlacklist) loadFromFile() { data, err := os.ReadFile(pb.persistFile) if err != nil { if !os.IsNotExist(err) { pb.logger.Error(fmt.Sprintf("Failed to read blacklist file: %v", err)) } return } // Try to unmarshal as map first (new format) var blacklistMap map[common.Address]*BlacklistEntry err = json.Unmarshal(data, &blacklistMap) if err == nil { pb.blacklist = blacklistMap pb.logger.Info(fmt.Sprintf("๐Ÿ“‚ Pool blacklist loaded (map format): %d entries (%d permanent)", len(pb.blacklist), pb.countPermanentlyBlacklisted())) return } // If that fails, try array format (legacy DataFetcher format) type LegacyBlacklistEntry struct { Address string `json:"address"` FailureCount int `json:"failure_count"` LastFailure time.Time `json:"last_failure"` LastReason string `json:"last_reason"` FirstSeen time.Time `json:"first_seen"` IsBlacklisted bool `json:"is_blacklisted"` BlacklistedAt time.Time `json:"blacklisted_at"` ConsecutiveFails int `json:"consecutive_fails"` } var legacyEntries []LegacyBlacklistEntry err = json.Unmarshal(data, &legacyEntries) if err != nil { pb.logger.Error(fmt.Sprintf("Failed to unmarshal blacklist (tried both formats): %v", err)) return } // Convert legacy format to new format pb.blacklist = make(map[common.Address]*BlacklistEntry) permanentCount := 0 for _, legacy := range legacyEntries { if legacy.IsBlacklisted { addr := common.HexToAddress(legacy.Address) pb.blacklist[addr] = &BlacklistEntry{ Address: addr, FailureCount: legacy.FailureCount, LastFailure: legacy.LastFailure, FirstFailure: legacy.FirstSeen, FailureReason: legacy.LastReason, Protocol: "Unknown", TokenPair: [2]common.Address{}, Permanent: legacy.IsBlacklisted, AddedAt: legacy.BlacklistedAt, } permanentCount++ } } pb.logger.Info(fmt.Sprintf("๐Ÿ“‚ Pool blacklist loaded (legacy format): %d blacklisted from %d total entries", permanentCount, len(legacyEntries))) } // countPermanentlyBlacklisted returns the number of permanently blacklisted pools func (pb *PoolBlacklist) countPermanentlyBlacklisted() int { count := 0 for _, entry := range pb.blacklist { if entry.Permanent { count++ } } return count } // logStatistics logs detailed statistics about the blacklist func (pb *PoolBlacklist) logStatistics() { stats := pb.GetBlacklistStats() pb.logger.Info(fmt.Sprintf("๐Ÿ“Š Pool Blacklist Statistics:\n"+ " - Total Entries: %d\n"+ " - Permanent Blacklist: %d\n"+ " - Temporary Monitor: %d\n"+ " - Total Failures Recorded: %d", stats["total_entries"], stats["permanent_blacklist"], stats["temporary_monitor"], stats["total_failures"])) // Log failure reasons if reasons, ok := stats["failure_reasons"].(map[string]int); ok && len(reasons) > 0 { pb.logger.Info("๐Ÿ“ˆ Failure Reasons:") for reason, count := range reasons { pb.logger.Info(fmt.Sprintf(" - %s: %d pools", reason, count)) } } // Log affected protocols if protocols, ok := stats["protocols_affected"].(map[string]int); ok && len(protocols) > 0 { pb.logger.Info("๐Ÿ”— Affected Protocols:") for protocol, count := range protocols { pb.logger.Info(fmt.Sprintf(" - %s: %d pools", protocol, count)) } } } // GetBlacklistedPools returns a list of all blacklisted pool addresses func (pb *PoolBlacklist) GetBlacklistedPools() []common.Address { pb.mu.RLock() defer pb.mu.RUnlock() pools := make([]common.Address, 0, len(pb.blacklist)) for addr, entry := range pb.blacklist { if entry.Permanent { pools = append(pools, addr) } } return pools }