package dex import ( "context" "fmt" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" ) // PoolCache caches pool reserves to reduce RPC calls type PoolCache struct { cache map[string]*CachedPoolData mu sync.RWMutex ttl time.Duration registry *Registry client *ethclient.Client } // CachedPoolData represents cached pool data type CachedPoolData struct { Reserves *PoolReserves Timestamp time.Time Protocol DEXProtocol } // NewPoolCache creates a new pool cache func NewPoolCache(registry *Registry, client *ethclient.Client, ttl time.Duration) *PoolCache { return &PoolCache{ cache: make(map[string]*CachedPoolData), ttl: ttl, registry: registry, client: client, } } // Get retrieves pool reserves from cache or fetches if expired func (pc *PoolCache) Get(ctx context.Context, protocol DEXProtocol, poolAddress common.Address) (*PoolReserves, error) { key := pc.cacheKey(protocol, poolAddress) // Try cache first pc.mu.RLock() cached, exists := pc.cache[key] pc.mu.RUnlock() if exists && time.Since(cached.Timestamp) < pc.ttl { return cached.Reserves, nil } // Cache miss or expired - fetch fresh data return pc.fetchAndCache(ctx, protocol, poolAddress, key) } // fetchAndCache fetches reserves and updates cache func (pc *PoolCache) fetchAndCache(ctx context.Context, protocol DEXProtocol, poolAddress common.Address, key string) (*PoolReserves, error) { // Get DEX info dex, err := pc.registry.Get(protocol) if err != nil { return nil, fmt.Errorf("failed to get DEX: %w", err) } // Fetch reserves reserves, err := dex.Decoder.GetPoolReserves(ctx, pc.client, poolAddress) if err != nil { return nil, fmt.Errorf("failed to fetch reserves: %w", err) } // Update cache pc.mu.Lock() pc.cache[key] = &CachedPoolData{ Reserves: reserves, Timestamp: time.Now(), Protocol: protocol, } pc.mu.Unlock() return reserves, nil } // Invalidate removes a pool from cache func (pc *PoolCache) Invalidate(protocol DEXProtocol, poolAddress common.Address) { key := pc.cacheKey(protocol, poolAddress) pc.mu.Lock() delete(pc.cache, key) pc.mu.Unlock() } // Clear removes all cached data func (pc *PoolCache) Clear() { pc.mu.Lock() pc.cache = make(map[string]*CachedPoolData) pc.mu.Unlock() } // cacheKey generates a unique cache key func (pc *PoolCache) cacheKey(protocol DEXProtocol, poolAddress common.Address) string { return fmt.Sprintf("%d:%s", protocol, poolAddress.Hex()) } // GetCacheSize returns the number of cached pools func (pc *PoolCache) GetCacheSize() int { pc.mu.RLock() defer pc.mu.RUnlock() return len(pc.cache) } // CleanExpired removes expired entries from cache func (pc *PoolCache) CleanExpired() int { pc.mu.Lock() defer pc.mu.Unlock() removed := 0 for key, cached := range pc.cache { if time.Since(cached.Timestamp) >= pc.ttl { delete(pc.cache, key) removed++ } } return removed } // StartCleanupRoutine starts a background goroutine to clean expired entries func (pc *PoolCache) StartCleanupRoutine(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) go func() { defer ticker.Stop() for { select { case <-ticker.C: removed := pc.CleanExpired() if removed > 0 { // Could log here if logger is available } case <-ctx.Done(): return } } }() }