Restructured project for V2 refactor: **Structure Changes:** - Moved all V1 code to orig/ folder (preserved with git mv) - Created docs/planning/ directory - Added orig/README_V1.md explaining V1 preservation **Planning Documents:** - 00_V2_MASTER_PLAN.md: Complete architecture overview - Executive summary of critical V1 issues - High-level component architecture diagrams - 5-phase implementation roadmap - Success metrics and risk mitigation - 07_TASK_BREAKDOWN.md: Atomic task breakdown - 99+ hours of detailed tasks - Every task < 2 hours (atomic) - Clear dependencies and success criteria - Organized by implementation phase **V2 Key Improvements:** - Per-exchange parsers (factory pattern) - Multi-layer strict validation - Multi-index pool cache - Background validation pipeline - Comprehensive observability **Critical Issues Addressed:** - Zero address tokens (strict validation + cache enrichment) - Parsing accuracy (protocol-specific parsers) - No audit trail (background validation channel) - Inefficient lookups (multi-index cache) - Stats disconnection (event-driven metrics) Next Steps: 1. Review planning documents 2. Begin Phase 1: Foundation (P1-001 through P1-010) 3. Implement parsers in Phase 2 4. Build cache system in Phase 3 5. Add validation pipeline in Phase 4 6. Migrate and test in Phase 5 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
393 lines
8.5 KiB
Go
393 lines
8.5 KiB
Go
package performance
|
|
|
|
import (
|
|
"math/big"
|
|
"sync"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/holiman/uint256"
|
|
|
|
"github.com/fraktal/mev-beta/pkg/events"
|
|
)
|
|
|
|
// ObjectPool manages reusable objects to reduce garbage collection pressure
|
|
type ObjectPool struct {
|
|
bigIntPool sync.Pool
|
|
uint256Pool sync.Pool
|
|
eventPool sync.Pool
|
|
addressPool sync.Pool
|
|
slicePool sync.Pool
|
|
}
|
|
|
|
// NewObjectPool creates a new object pool for performance optimization
|
|
func NewObjectPool() *ObjectPool {
|
|
return &ObjectPool{
|
|
bigIntPool: sync.Pool{
|
|
New: func() interface{} {
|
|
return new(big.Int)
|
|
},
|
|
},
|
|
uint256Pool: sync.Pool{
|
|
New: func() interface{} {
|
|
return new(uint256.Int)
|
|
},
|
|
},
|
|
eventPool: sync.Pool{
|
|
New: func() interface{} {
|
|
return &events.Event{}
|
|
},
|
|
},
|
|
addressPool: sync.Pool{
|
|
New: func() interface{} {
|
|
return make([]common.Address, 0, 8)
|
|
},
|
|
},
|
|
slicePool: sync.Pool{
|
|
New: func() interface{} {
|
|
return make([]byte, 0, 1024)
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// GetBigInt returns a reusable big.Int from the pool
|
|
func (p *ObjectPool) GetBigInt() *big.Int {
|
|
bi := p.bigIntPool.Get().(*big.Int)
|
|
bi.SetInt64(0) // Reset to zero
|
|
return bi
|
|
}
|
|
|
|
// PutBigInt returns a big.Int to the pool for reuse
|
|
func (p *ObjectPool) PutBigInt(bi *big.Int) {
|
|
if bi != nil {
|
|
p.bigIntPool.Put(bi)
|
|
}
|
|
}
|
|
|
|
// GetUint256 returns a reusable uint256.Int from the pool
|
|
func (p *ObjectPool) GetUint256() *uint256.Int {
|
|
ui := p.uint256Pool.Get().(*uint256.Int)
|
|
ui.SetUint64(0) // Reset to zero
|
|
return ui
|
|
}
|
|
|
|
// PutUint256 returns a uint256.Int to the pool for reuse
|
|
func (p *ObjectPool) PutUint256(ui *uint256.Int) {
|
|
if ui != nil {
|
|
p.uint256Pool.Put(ui)
|
|
}
|
|
}
|
|
|
|
// GetEvent returns a reusable Event from the pool
|
|
func (p *ObjectPool) GetEvent() *events.Event {
|
|
event := p.eventPool.Get().(*events.Event)
|
|
// Reset event fields
|
|
*event = events.Event{}
|
|
return event
|
|
}
|
|
|
|
// PutEvent returns an Event to the pool for reuse
|
|
func (p *ObjectPool) PutEvent(event *events.Event) {
|
|
if event != nil {
|
|
p.eventPool.Put(event)
|
|
}
|
|
}
|
|
|
|
// GetAddressSlice returns a reusable address slice from the pool
|
|
func (p *ObjectPool) GetAddressSlice() []common.Address {
|
|
slice := p.addressPool.Get().([]common.Address)
|
|
return slice[:0] // Reset length to 0 but keep capacity
|
|
}
|
|
|
|
// PutAddressSlice returns an address slice to the pool for reuse
|
|
func (p *ObjectPool) PutAddressSlice(slice []common.Address) {
|
|
if slice != nil && cap(slice) > 0 {
|
|
p.addressPool.Put(slice)
|
|
}
|
|
}
|
|
|
|
// GetByteSlice returns a reusable byte slice from the pool
|
|
func (p *ObjectPool) GetByteSlice() []byte {
|
|
slice := p.slicePool.Get().([]byte)
|
|
return slice[:0] // Reset length to 0 but keep capacity
|
|
}
|
|
|
|
// PutByteSlice returns a byte slice to the pool for reuse
|
|
func (p *ObjectPool) PutByteSlice(slice []byte) {
|
|
if slice != nil && cap(slice) > 0 {
|
|
p.slicePool.Put(slice)
|
|
}
|
|
}
|
|
|
|
// LockFreeRingBuffer implements a lock-free ring buffer for high-performance message passing
|
|
type LockFreeRingBuffer struct {
|
|
buffer []interface{}
|
|
mask uint64
|
|
head uint64 // Padding to prevent false sharing
|
|
_ [7]uint64
|
|
tail uint64 // Padding to prevent false sharing
|
|
_ [7]uint64
|
|
}
|
|
|
|
// NewLockFreeRingBuffer creates a new lock-free ring buffer
|
|
// Size must be a power of 2
|
|
func NewLockFreeRingBuffer(size uint64) *LockFreeRingBuffer {
|
|
// Ensure size is power of 2
|
|
if size&(size-1) != 0 {
|
|
// Find next power of 2
|
|
size = 1 << (64 - countLeadingZeros(size-1))
|
|
}
|
|
|
|
return &LockFreeRingBuffer{
|
|
buffer: make([]interface{}, size),
|
|
mask: size - 1,
|
|
}
|
|
}
|
|
|
|
// countLeadingZeros counts leading zeros in a uint64
|
|
func countLeadingZeros(x uint64) int {
|
|
if x == 0 {
|
|
return 64
|
|
}
|
|
n := 0
|
|
if x <= 0x00000000FFFFFFFF {
|
|
n += 32
|
|
x <<= 32
|
|
}
|
|
if x <= 0x0000FFFFFFFFFFFF {
|
|
n += 16
|
|
x <<= 16
|
|
}
|
|
if x <= 0x00FFFFFFFFFFFFFF {
|
|
n += 8
|
|
x <<= 8
|
|
}
|
|
if x <= 0x0FFFFFFFFFFFFFFF {
|
|
n += 4
|
|
x <<= 4
|
|
}
|
|
if x <= 0x3FFFFFFFFFFFFFFF {
|
|
n += 2
|
|
x <<= 2
|
|
}
|
|
if x <= 0x7FFFFFFFFFFFFFFF {
|
|
n += 1
|
|
}
|
|
return n
|
|
}
|
|
|
|
// FastCache implements a high-performance cache with minimal locking
|
|
type FastCache struct {
|
|
shards []*CacheShard
|
|
mask uint64
|
|
}
|
|
|
|
// CacheShard represents a single cache shard to reduce lock contention
|
|
type CacheShard struct {
|
|
mu sync.RWMutex
|
|
data map[string]*CacheItem
|
|
size int
|
|
limit int
|
|
}
|
|
|
|
// CacheItem represents a cached item with metadata
|
|
type CacheItem struct {
|
|
Value interface{}
|
|
AccessTime int64
|
|
Cost int
|
|
}
|
|
|
|
// NewFastCache creates a new high-performance cache
|
|
func NewFastCache(shardCount, itemsPerShard int) *FastCache {
|
|
// Ensure shard count is power of 2
|
|
if shardCount&(shardCount-1) != 0 {
|
|
shardCount = 1 << (32 - countLeadingZeros32(uint32(shardCount-1)))
|
|
}
|
|
|
|
shards := make([]*CacheShard, shardCount)
|
|
for i := 0; i < shardCount; i++ {
|
|
shards[i] = &CacheShard{
|
|
data: make(map[string]*CacheItem, itemsPerShard),
|
|
limit: itemsPerShard,
|
|
}
|
|
}
|
|
|
|
return &FastCache{
|
|
shards: shards,
|
|
mask: uint64(shardCount - 1),
|
|
}
|
|
}
|
|
|
|
// countLeadingZeros32 counts leading zeros in a uint32
|
|
func countLeadingZeros32(x uint32) int {
|
|
if x == 0 {
|
|
return 32
|
|
}
|
|
n := 0
|
|
if x <= 0x0000FFFF {
|
|
n += 16
|
|
x <<= 16
|
|
}
|
|
if x <= 0x00FFFFFF {
|
|
n += 8
|
|
x <<= 8
|
|
}
|
|
if x <= 0x0FFFFFFF {
|
|
n += 4
|
|
x <<= 4
|
|
}
|
|
if x <= 0x3FFFFFFF {
|
|
n += 2
|
|
x <<= 2
|
|
}
|
|
if x <= 0x7FFFFFFF {
|
|
n += 1
|
|
}
|
|
return n
|
|
}
|
|
|
|
// hash computes a hash for the key
|
|
func (c *FastCache) hash(key string) uint64 {
|
|
hash := uint64(0)
|
|
for _, b := range key {
|
|
hash = hash*31 + uint64(b)
|
|
}
|
|
return hash
|
|
}
|
|
|
|
// getShard returns the shard for a given key
|
|
func (c *FastCache) getShard(key string) *CacheShard {
|
|
return c.shards[c.hash(key)&c.mask]
|
|
}
|
|
|
|
// Get retrieves an item from the cache
|
|
func (c *FastCache) Get(key string) (interface{}, bool) {
|
|
shard := c.getShard(key)
|
|
shard.mu.RLock()
|
|
item, exists := shard.data[key]
|
|
shard.mu.RUnlock()
|
|
|
|
if exists {
|
|
return item.Value, true
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
// Set stores an item in the cache
|
|
func (c *FastCache) Set(key string, value interface{}, cost int) {
|
|
shard := c.getShard(key)
|
|
shard.mu.Lock()
|
|
|
|
// Check if we need to evict items
|
|
if shard.size >= shard.limit && shard.data[key] == nil {
|
|
c.evictOldest(shard)
|
|
}
|
|
|
|
shard.data[key] = &CacheItem{
|
|
Value: value,
|
|
Cost: cost,
|
|
}
|
|
shard.size++
|
|
|
|
shard.mu.Unlock()
|
|
}
|
|
|
|
// evictOldest removes the oldest item from a shard
|
|
func (c *FastCache) evictOldest(shard *CacheShard) {
|
|
var oldestKey string
|
|
var oldestTime int64 = 1<<63 - 1
|
|
|
|
for key, item := range shard.data {
|
|
if item.AccessTime < oldestTime {
|
|
oldestTime = item.AccessTime
|
|
oldestKey = key
|
|
}
|
|
}
|
|
|
|
if oldestKey != "" {
|
|
delete(shard.data, oldestKey)
|
|
shard.size--
|
|
}
|
|
}
|
|
|
|
// BatchProcessor processes items in batches for better performance
|
|
type BatchProcessor struct {
|
|
batchSize int
|
|
flushTimeout int64 // nanoseconds
|
|
buffer []interface{}
|
|
processor func([]interface{}) error
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewBatchProcessor creates a new batch processor
|
|
func NewBatchProcessor(batchSize int, flushTimeoutNs int64, processor func([]interface{}) error) *BatchProcessor {
|
|
return &BatchProcessor{
|
|
batchSize: batchSize,
|
|
flushTimeout: flushTimeoutNs,
|
|
buffer: make([]interface{}, 0, batchSize),
|
|
processor: processor,
|
|
}
|
|
}
|
|
|
|
// Add adds an item to the batch processor
|
|
func (bp *BatchProcessor) Add(item interface{}) error {
|
|
bp.mu.Lock()
|
|
defer bp.mu.Unlock()
|
|
|
|
bp.buffer = append(bp.buffer, item)
|
|
|
|
if len(bp.buffer) >= bp.batchSize {
|
|
return bp.flushLocked()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Flush processes all items in the buffer immediately
|
|
func (bp *BatchProcessor) Flush() error {
|
|
bp.mu.Lock()
|
|
defer bp.mu.Unlock()
|
|
|
|
return bp.flushLocked()
|
|
}
|
|
|
|
// flushLocked processes items while holding the lock
|
|
func (bp *BatchProcessor) flushLocked() error {
|
|
if len(bp.buffer) == 0 {
|
|
return nil
|
|
}
|
|
|
|
batch := make([]interface{}, len(bp.buffer))
|
|
copy(batch, bp.buffer)
|
|
bp.buffer = bp.buffer[:0] // Reset buffer
|
|
|
|
return bp.processor(batch)
|
|
}
|
|
|
|
// MemoryOptimizer provides utilities for memory optimization
|
|
type MemoryOptimizer struct {
|
|
pools *ObjectPool
|
|
}
|
|
|
|
// NewMemoryOptimizer creates a new memory optimizer
|
|
func NewMemoryOptimizer() *MemoryOptimizer {
|
|
return &MemoryOptimizer{
|
|
pools: NewObjectPool(),
|
|
}
|
|
}
|
|
|
|
// ProcessWithPools processes data using object pools to minimize allocations
|
|
func (mo *MemoryOptimizer) ProcessWithPools(data []byte, processor func(*big.Int, *uint256.Int, []byte) error) error {
|
|
bigInt := mo.pools.GetBigInt()
|
|
uint256Int := mo.pools.GetUint256()
|
|
workBuffer := mo.pools.GetByteSlice()
|
|
|
|
defer func() {
|
|
mo.pools.PutBigInt(bigInt)
|
|
mo.pools.PutUint256(uint256Int)
|
|
mo.pools.PutByteSlice(workBuffer)
|
|
}()
|
|
|
|
return processor(bigInt, uint256Int, workBuffer)
|
|
}
|