Restructured project for V2 refactor: **Structure Changes:** - Moved all V1 code to orig/ folder (preserved with git mv) - Created docs/planning/ directory - Added orig/README_V1.md explaining V1 preservation **Planning Documents:** - 00_V2_MASTER_PLAN.md: Complete architecture overview - Executive summary of critical V1 issues - High-level component architecture diagrams - 5-phase implementation roadmap - Success metrics and risk mitigation - 07_TASK_BREAKDOWN.md: Atomic task breakdown - 99+ hours of detailed tasks - Every task < 2 hours (atomic) - Clear dependencies and success criteria - Organized by implementation phase **V2 Key Improvements:** - Per-exchange parsers (factory pattern) - Multi-layer strict validation - Multi-index pool cache - Background validation pipeline - Comprehensive observability **Critical Issues Addressed:** - Zero address tokens (strict validation + cache enrichment) - Parsing accuracy (protocol-specific parsers) - No audit trail (background validation channel) - Inefficient lookups (multi-index cache) - Stats disconnection (event-driven metrics) Next Steps: 1. Review planning documents 2. Begin Phase 1: Foundation (P1-001 through P1-010) 3. Implement parsers in Phase 2 4. Build cache system in Phase 3 5. Add validation pipeline in Phase 4 6. Migrate and test in Phase 5 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
552 lines
13 KiB
Go
552 lines
13 KiB
Go
package arbitrum
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
|
|
arbcommon "github.com/fraktal/mev-beta/pkg/arbitrum/common"
|
|
)
|
|
|
|
// PoolCache provides fast access to pool information with TTL-based caching
|
|
type PoolCache struct {
|
|
pools map[common.Address]*CachedPoolInfo
|
|
poolsByTokens map[string][]*CachedPoolInfo // Key: "token0-token1" (sorted)
|
|
cacheLock sync.RWMutex
|
|
maxSize int
|
|
ttl time.Duration
|
|
|
|
// Metrics
|
|
hits uint64
|
|
misses uint64
|
|
evictions uint64
|
|
lastCleanup time.Time
|
|
|
|
// Cleanup management
|
|
cleanupTicker *time.Ticker
|
|
stopCleanup chan struct{}
|
|
}
|
|
|
|
// CachedPoolInfo wraps PoolInfo with cache metadata
|
|
type CachedPoolInfo struct {
|
|
*arbcommon.PoolInfo
|
|
CachedAt time.Time `json:"cached_at"`
|
|
AccessedAt time.Time `json:"accessed_at"`
|
|
AccessCount uint64 `json:"access_count"`
|
|
}
|
|
|
|
// NewPoolCache creates a new pool cache
|
|
func NewPoolCache(maxSize int, ttl time.Duration) *PoolCache {
|
|
cache := &PoolCache{
|
|
pools: make(map[common.Address]*CachedPoolInfo),
|
|
poolsByTokens: make(map[string][]*CachedPoolInfo),
|
|
maxSize: maxSize,
|
|
ttl: ttl,
|
|
lastCleanup: time.Now(),
|
|
cleanupTicker: time.NewTicker(ttl / 2), // Cleanup twice per TTL period
|
|
stopCleanup: make(chan struct{}),
|
|
}
|
|
|
|
// Start background cleanup goroutine
|
|
go cache.cleanupLoop()
|
|
|
|
return cache
|
|
}
|
|
|
|
// GetPool retrieves pool information from cache
|
|
func (c *PoolCache) GetPool(address common.Address) *arbcommon.PoolInfo {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
if cached, exists := c.pools[address]; exists {
|
|
// Check if cache entry is still valid
|
|
if time.Since(cached.CachedAt) <= c.ttl {
|
|
cached.AccessedAt = time.Now()
|
|
cached.AccessCount++
|
|
c.hits++
|
|
return cached.PoolInfo
|
|
}
|
|
// Cache entry expired, will be cleaned up later
|
|
}
|
|
|
|
c.misses++
|
|
return nil
|
|
}
|
|
|
|
// GetPoolsByTokenPair retrieves pools for a specific token pair
|
|
func (c *PoolCache) GetPoolsByTokenPair(token0, token1 common.Address) []*arbcommon.PoolInfo {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
key := createTokenPairKey(token0, token1)
|
|
|
|
if cached, exists := c.poolsByTokens[key]; exists {
|
|
var validPools []*arbcommon.PoolInfo
|
|
now := time.Now()
|
|
|
|
for _, pool := range cached {
|
|
// Check if cache entry is still valid
|
|
if now.Sub(pool.CachedAt) <= c.ttl {
|
|
pool.AccessedAt = now
|
|
pool.AccessCount++
|
|
validPools = append(validPools, pool.PoolInfo)
|
|
}
|
|
}
|
|
|
|
if len(validPools) > 0 {
|
|
c.hits++
|
|
return validPools
|
|
}
|
|
}
|
|
|
|
c.misses++
|
|
return nil
|
|
}
|
|
|
|
// AddPool adds or updates pool information in cache
|
|
func (c *PoolCache) AddPool(pool *arbcommon.PoolInfo) {
|
|
c.cacheLock.Lock()
|
|
defer c.cacheLock.Unlock()
|
|
|
|
// Check if we need to evict entries to make space
|
|
if len(c.pools) >= c.maxSize {
|
|
c.evictLRU()
|
|
}
|
|
|
|
now := time.Now()
|
|
cached := &CachedPoolInfo{
|
|
PoolInfo: pool,
|
|
CachedAt: now,
|
|
AccessedAt: now,
|
|
AccessCount: 1,
|
|
}
|
|
|
|
// Add to main cache
|
|
c.pools[pool.Address] = cached
|
|
|
|
// Add to token pair index
|
|
key := createTokenPairKey(pool.Token0, pool.Token1)
|
|
c.poolsByTokens[key] = append(c.poolsByTokens[key], cached)
|
|
}
|
|
|
|
// UpdatePool updates existing pool information
|
|
func (c *PoolCache) UpdatePool(pool *arbcommon.PoolInfo) bool {
|
|
c.cacheLock.Lock()
|
|
defer c.cacheLock.Unlock()
|
|
|
|
if cached, exists := c.pools[pool.Address]; exists {
|
|
// Update pool info but keep cache metadata
|
|
cached.PoolInfo = pool
|
|
cached.CachedAt = time.Now()
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// RemovePool removes a pool from cache
|
|
func (c *PoolCache) RemovePool(address common.Address) bool {
|
|
c.cacheLock.Lock()
|
|
defer c.cacheLock.Unlock()
|
|
|
|
if cached, exists := c.pools[address]; exists {
|
|
// Remove from main cache
|
|
delete(c.pools, address)
|
|
|
|
// Remove from token pair index
|
|
key := createTokenPairKey(cached.Token0, cached.Token1)
|
|
if pools, exists := c.poolsByTokens[key]; exists {
|
|
for i, pool := range pools {
|
|
if pool.Address == address {
|
|
c.poolsByTokens[key] = append(pools[:i], pools[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
// Clean up empty token pair entries
|
|
if len(c.poolsByTokens[key]) == 0 {
|
|
delete(c.poolsByTokens, key)
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// AddPoolIfNotExists adds a pool to the cache if it doesn't already exist
|
|
func (c *PoolCache) AddPoolIfNotExists(address common.Address, protocol arbcommon.Protocol) {
|
|
c.cacheLock.Lock()
|
|
defer c.cacheLock.Unlock()
|
|
|
|
// Check if pool already exists
|
|
if _, exists := c.pools[address]; exists {
|
|
return
|
|
}
|
|
|
|
// Add new pool
|
|
c.pools[address] = &CachedPoolInfo{
|
|
PoolInfo: &arbcommon.PoolInfo{
|
|
Address: address,
|
|
Protocol: protocol,
|
|
},
|
|
CachedAt: time.Now(),
|
|
AccessedAt: time.Now(),
|
|
}
|
|
}
|
|
|
|
// GetPoolsByProtocol returns all pools for a specific protocol
|
|
func (c *PoolCache) GetPoolsByProtocol(protocol arbcommon.Protocol) []*arbcommon.PoolInfo {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
var pools []*arbcommon.PoolInfo
|
|
now := time.Now()
|
|
|
|
for _, cached := range c.pools {
|
|
if cached.Protocol == protocol && now.Sub(cached.CachedAt) <= c.ttl {
|
|
cached.AccessedAt = now
|
|
cached.AccessCount++
|
|
pools = append(pools, cached.PoolInfo)
|
|
}
|
|
}
|
|
|
|
return pools
|
|
}
|
|
|
|
// GetPoolAddressesByProtocol returns all pool addresses for a specific protocol
|
|
func (c *PoolCache) GetPoolAddressesByProtocol(protocol arbcommon.Protocol) []common.Address {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
var addresses []common.Address
|
|
now := time.Now()
|
|
|
|
for addr, cached := range c.pools {
|
|
if cached.Protocol == protocol && now.Sub(cached.CachedAt) <= c.ttl {
|
|
cached.AccessedAt = now
|
|
cached.AccessCount++
|
|
addresses = append(addresses, addr)
|
|
}
|
|
}
|
|
|
|
return addresses
|
|
}
|
|
|
|
// GetTopPools returns the most accessed pools
|
|
func (c *PoolCache) GetTopPools(limit int) []*arbcommon.PoolInfo {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
type poolAccess struct {
|
|
pool *arbcommon.PoolInfo
|
|
accessCount uint64
|
|
}
|
|
|
|
var poolAccesses []poolAccess
|
|
now := time.Now()
|
|
|
|
for _, cached := range c.pools {
|
|
if now.Sub(cached.CachedAt) <= c.ttl {
|
|
poolAccesses = append(poolAccesses, poolAccess{
|
|
pool: cached.PoolInfo,
|
|
accessCount: cached.AccessCount,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Sort by access count (simple bubble sort for small datasets)
|
|
for i := 0; i < len(poolAccesses)-1; i++ {
|
|
for j := 0; j < len(poolAccesses)-i-1; j++ {
|
|
if poolAccesses[j].accessCount < poolAccesses[j+1].accessCount {
|
|
poolAccesses[j], poolAccesses[j+1] = poolAccesses[j+1], poolAccesses[j]
|
|
}
|
|
}
|
|
}
|
|
|
|
var result []*arbcommon.PoolInfo
|
|
maxResults := limit
|
|
if maxResults > len(poolAccesses) {
|
|
maxResults = len(poolAccesses)
|
|
}
|
|
|
|
for i := 0; i < maxResults; i++ {
|
|
result = append(result, poolAccesses[i].pool)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// GetCacheStats returns cache performance statistics
|
|
func (c *PoolCache) GetCacheStats() *CacheStats {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
total := c.hits + c.misses
|
|
hitRate := float64(0)
|
|
if total > 0 {
|
|
hitRate = float64(c.hits) / float64(total) * 100
|
|
}
|
|
|
|
return &CacheStats{
|
|
Size: len(c.pools),
|
|
MaxSize: c.maxSize,
|
|
Hits: c.hits,
|
|
Misses: c.misses,
|
|
HitRate: hitRate,
|
|
Evictions: c.evictions,
|
|
TTL: c.ttl,
|
|
LastCleanup: c.lastCleanup,
|
|
}
|
|
}
|
|
|
|
// CacheStats represents cache performance statistics
|
|
type CacheStats struct {
|
|
Size int `json:"size"`
|
|
MaxSize int `json:"max_size"`
|
|
Hits uint64 `json:"hits"`
|
|
Misses uint64 `json:"misses"`
|
|
HitRate float64 `json:"hit_rate_percent"`
|
|
Evictions uint64 `json:"evictions"`
|
|
TTL time.Duration `json:"ttl"`
|
|
LastCleanup time.Time `json:"last_cleanup"`
|
|
}
|
|
|
|
// Flush clears all cached data
|
|
func (c *PoolCache) Flush() {
|
|
c.cacheLock.Lock()
|
|
defer c.cacheLock.Unlock()
|
|
|
|
c.pools = make(map[common.Address]*CachedPoolInfo)
|
|
c.poolsByTokens = make(map[string][]*CachedPoolInfo)
|
|
c.hits = 0
|
|
c.misses = 0
|
|
c.evictions = 0
|
|
}
|
|
|
|
// Close stops the background cleanup and releases resources
|
|
func (c *PoolCache) Close() {
|
|
if c.cleanupTicker != nil {
|
|
c.cleanupTicker.Stop()
|
|
}
|
|
close(c.stopCleanup)
|
|
}
|
|
|
|
// Internal methods
|
|
|
|
// evictLRU removes the least recently used cache entry
|
|
func (c *PoolCache) evictLRU() {
|
|
var oldestAddress common.Address
|
|
var oldestTime time.Time = time.Now()
|
|
|
|
// Find the least recently accessed entry
|
|
for address, cached := range c.pools {
|
|
if cached.AccessedAt.Before(oldestTime) {
|
|
oldestTime = cached.AccessedAt
|
|
oldestAddress = address
|
|
}
|
|
}
|
|
|
|
if oldestAddress != (common.Address{}) {
|
|
// Remove the oldest entry
|
|
if cached, exists := c.pools[oldestAddress]; exists {
|
|
delete(c.pools, oldestAddress)
|
|
|
|
// Also remove from token pair index
|
|
key := createTokenPairKey(cached.Token0, cached.Token1)
|
|
if pools, exists := c.poolsByTokens[key]; exists {
|
|
for i, pool := range pools {
|
|
if pool.Address == oldestAddress {
|
|
c.poolsByTokens[key] = append(pools[:i], pools[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
if len(c.poolsByTokens[key]) == 0 {
|
|
delete(c.poolsByTokens, key)
|
|
}
|
|
}
|
|
|
|
c.evictions++
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupExpired removes expired cache entries
|
|
func (c *PoolCache) cleanupExpired() {
|
|
c.cacheLock.Lock()
|
|
defer c.cacheLock.Unlock()
|
|
|
|
now := time.Now()
|
|
var expiredAddresses []common.Address
|
|
|
|
// Find expired entries
|
|
for address, cached := range c.pools {
|
|
if now.Sub(cached.CachedAt) > c.ttl {
|
|
expiredAddresses = append(expiredAddresses, address)
|
|
}
|
|
}
|
|
|
|
// Remove expired entries
|
|
for _, address := range expiredAddresses {
|
|
if cached, exists := c.pools[address]; exists {
|
|
delete(c.pools, address)
|
|
|
|
// Also remove from token pair index
|
|
key := createTokenPairKey(cached.Token0, cached.Token1)
|
|
if pools, exists := c.poolsByTokens[key]; exists {
|
|
for i, pool := range pools {
|
|
if pool.Address == address {
|
|
c.poolsByTokens[key] = append(pools[:i], pools[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
if len(c.poolsByTokens[key]) == 0 {
|
|
delete(c.poolsByTokens, key)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
c.lastCleanup = now
|
|
}
|
|
|
|
// cleanupLoop runs periodic cleanup of expired entries
|
|
func (c *PoolCache) cleanupLoop() {
|
|
for {
|
|
select {
|
|
case <-c.cleanupTicker.C:
|
|
c.cleanupExpired()
|
|
case <-c.stopCleanup:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// createTokenPairKey creates a consistent key for token pairs (sorted)
|
|
func createTokenPairKey(token0, token1 common.Address) string {
|
|
// Ensure consistent ordering regardless of input order
|
|
if token0.Hex() < token1.Hex() {
|
|
return fmt.Sprintf("%s-%s", token0.Hex(), token1.Hex())
|
|
}
|
|
return fmt.Sprintf("%s-%s", token1.Hex(), token0.Hex())
|
|
}
|
|
|
|
// Advanced cache operations
|
|
|
|
// WarmUp pre-loads commonly used pools into cache
|
|
func (c *PoolCache) WarmUp(pools []*arbcommon.PoolInfo) {
|
|
for _, pool := range pools {
|
|
c.AddPool(pool)
|
|
}
|
|
}
|
|
|
|
// GetPoolCount returns the number of cached pools
|
|
func (c *PoolCache) GetPoolCount() int {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
return len(c.pools)
|
|
}
|
|
|
|
// GetValidPoolCount returns the number of non-expired cached pools
|
|
func (c *PoolCache) GetValidPoolCount() int {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
count := 0
|
|
now := time.Now()
|
|
|
|
for _, cached := range c.pools {
|
|
if now.Sub(cached.CachedAt) <= c.ttl {
|
|
count++
|
|
}
|
|
}
|
|
|
|
return count
|
|
}
|
|
|
|
// GetPoolAddresses returns all cached pool addresses
|
|
func (c *PoolCache) GetPoolAddresses() []common.Address {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
var addresses []common.Address
|
|
now := time.Now()
|
|
|
|
for address, cached := range c.pools {
|
|
if now.Sub(cached.CachedAt) <= c.ttl {
|
|
addresses = append(addresses, address)
|
|
}
|
|
}
|
|
|
|
return addresses
|
|
}
|
|
|
|
// SetTTL updates the cache TTL
|
|
func (c *PoolCache) SetTTL(ttl time.Duration) {
|
|
c.cacheLock.Lock()
|
|
defer c.cacheLock.Unlock()
|
|
|
|
c.ttl = ttl
|
|
|
|
// Update cleanup ticker
|
|
if c.cleanupTicker != nil {
|
|
c.cleanupTicker.Stop()
|
|
c.cleanupTicker = time.NewTicker(ttl / 2)
|
|
}
|
|
}
|
|
|
|
// GetTTL returns the current cache TTL
|
|
func (c *PoolCache) GetTTL() time.Duration {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
return c.ttl
|
|
}
|
|
|
|
// BulkUpdate updates multiple pools atomically
|
|
func (c *PoolCache) BulkUpdate(pools []*arbcommon.PoolInfo) {
|
|
c.cacheLock.Lock()
|
|
defer c.cacheLock.Unlock()
|
|
|
|
now := time.Now()
|
|
|
|
for _, pool := range pools {
|
|
if cached, exists := c.pools[pool.Address]; exists {
|
|
// Update existing pool
|
|
cached.PoolInfo = pool
|
|
cached.CachedAt = now
|
|
} else {
|
|
// Add new pool if there's space
|
|
if len(c.pools) < c.maxSize {
|
|
cached := &CachedPoolInfo{
|
|
PoolInfo: pool,
|
|
CachedAt: now,
|
|
AccessedAt: now,
|
|
AccessCount: 1,
|
|
}
|
|
|
|
c.pools[pool.Address] = cached
|
|
|
|
// Add to token pair index
|
|
key := createTokenPairKey(pool.Token0, pool.Token1)
|
|
c.poolsByTokens[key] = append(c.poolsByTokens[key], cached)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Contains checks if a pool is in cache (without affecting access stats)
|
|
func (c *PoolCache) Contains(address common.Address) bool {
|
|
c.cacheLock.RLock()
|
|
defer c.cacheLock.RUnlock()
|
|
|
|
if cached, exists := c.pools[address]; exists {
|
|
return time.Since(cached.CachedAt) <= c.ttl
|
|
}
|
|
|
|
return false
|
|
}
|