Files
mev-beta/pkg/marketmanager/manager.go
Krypto Kajun fac8a64092 feat: Implement comprehensive Market Manager with database and logging
- Add complete Market Manager package with in-memory storage and CRUD operations
- Implement arbitrage detection with profit calculations and thresholds
- Add database adapter with PostgreSQL schema for persistence
- Create comprehensive logging system with specialized log files
- Add detailed documentation and implementation plans
- Include example application and comprehensive test suite
- Update Makefile with market manager build targets
- Add check-implementations command for verification
2025-09-18 03:52:33 -05:00

268 lines
7.1 KiB
Go

package marketmanager
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/ethclient"
)
// MarketManager handles market data collection, storage, and retrieval
type MarketManager struct {
// In-memory storage
markets Markets
mutex sync.RWMutex
// Ethereum client for on-chain verification
client *ethclient.Client
// Configuration
verificationWindow time.Duration // Time window for on-chain verification
maxMarkets int // Maximum number of markets to store
}
// MarketManagerConfig holds configuration for the MarketManager
type MarketManagerConfig struct {
EthereumClient *ethclient.Client
VerificationWindow time.Duration
MaxMarkets int
}
// NewMarketManager creates a new MarketManager instance
func NewMarketManager(config *MarketManagerConfig) *MarketManager {
if config.VerificationWindow == 0 {
config.VerificationWindow = 500 * time.Millisecond // Default 500ms
}
if config.MaxMarkets == 0 {
config.MaxMarkets = 10000 // Default 10,000 markets
}
return &MarketManager{
markets: make(Markets),
client: config.EthereumClient,
verificationWindow: config.VerificationWindow,
maxMarkets: config.MaxMarkets,
}
}
// AddMarket adds a new market to the manager
func (mm *MarketManager) AddMarket(market *Market) error {
mm.mutex.Lock()
defer mm.mutex.Unlock()
// Check if we need to evict old markets
if len(mm.markets) >= mm.maxMarkets {
mm.evictOldestMarkets()
}
// Initialize the rawTicker map if it doesn't exist
if mm.markets[market.RawTicker] == nil {
mm.markets[market.RawTicker] = make(map[string]*Market)
}
// Add the market
mm.markets[market.RawTicker][market.Key] = market
return nil
}
// GetMarket retrieves a market by rawTicker and marketKey
func (mm *MarketManager) GetMarket(rawTicker, marketKey string) (*Market, error) {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
if marketsForTicker, exists := mm.markets[rawTicker]; exists {
if market, exists := marketsForTicker[marketKey]; exists {
return market, nil
}
}
return nil, fmt.Errorf("market not found for rawTicker: %s, marketKey: %s", rawTicker, marketKey)
}
// GetMarketsByRawTicker retrieves all markets for a given rawTicker
func (mm *MarketManager) GetMarketsByRawTicker(rawTicker string) (map[string]*Market, error) {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
if markets, exists := mm.markets[rawTicker]; exists {
// Return a copy to avoid external modification
result := make(map[string]*Market)
for key, market := range markets {
result[key] = market.Clone()
}
return result, nil
}
return nil, fmt.Errorf("no markets found for rawTicker: %s", rawTicker)
}
// GetAllMarkets retrieves all markets
func (mm *MarketManager) GetAllMarkets() Markets {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
// Return a deep copy to avoid external modification
result := make(Markets)
for rawTicker, markets := range mm.markets {
result[rawTicker] = make(map[string]*Market)
for key, market := range markets {
result[rawTicker][key] = market.Clone()
}
}
return result
}
// UpdateMarket updates an existing market
func (mm *MarketManager) UpdateMarket(market *Market) error {
mm.mutex.Lock()
defer mm.mutex.Unlock()
if mm.markets[market.RawTicker] == nil {
return fmt.Errorf("no markets found for rawTicker: %s", market.RawTicker)
}
if _, exists := mm.markets[market.RawTicker][market.Key]; !exists {
return fmt.Errorf("market not found for rawTicker: %s, marketKey: %s", market.RawTicker, market.Key)
}
// Update the market
mm.markets[market.RawTicker][market.Key] = market
return nil
}
// RemoveMarket removes a market by rawTicker and marketKey
func (mm *MarketManager) RemoveMarket(rawTicker, marketKey string) error {
mm.mutex.Lock()
defer mm.mutex.Unlock()
if mm.markets[rawTicker] == nil {
return fmt.Errorf("no markets found for rawTicker: %s", rawTicker)
}
if _, exists := mm.markets[rawTicker][marketKey]; !exists {
return fmt.Errorf("market not found for rawTicker: %s, marketKey: %s", rawTicker, marketKey)
}
delete(mm.markets[rawTicker], marketKey)
// Clean up empty rawTicker maps
if len(mm.markets[rawTicker]) == 0 {
delete(mm.markets, rawTicker)
}
return nil
}
// VerifyMarket verifies a market's transaction on-chain
func (mm *MarketManager) VerifyMarket(ctx context.Context, market *Market) (bool, error) {
if mm.client == nil {
return false, fmt.Errorf("ethereum client not configured")
}
// Check if the transaction exists on-chain
_, err := mm.client.TransactionReceipt(ctx, market.TxHash)
if err != nil {
return false, nil // Transaction not found, but not an error
}
// Transaction exists, market is confirmed
return true, nil
}
// ScheduleVerification schedules verification of a market within the verification window
func (mm *MarketManager) ScheduleVerification(market *Market) {
go func() {
// Wait for the verification window
time.Sleep(mm.verificationWindow)
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Verify the market
confirmed, err := mm.VerifyMarket(ctx, market)
if err != nil {
// Log error but don't fail
fmt.Printf("Error verifying market %s: %v\n", market.Key, err)
return
}
if confirmed {
// Update market status to confirmed
market.Status = StatusConfirmed
// Update the market in storage
mm.mutex.Lock()
if mm.markets[market.RawTicker] != nil {
if existingMarket, exists := mm.markets[market.RawTicker][market.Key]; exists {
existingMarket.Status = StatusConfirmed
existingMarket.Timestamp = time.Now().Unix()
}
}
mm.mutex.Unlock()
} else {
// Mark as invalid if not confirmed
mm.mutex.Lock()
if mm.markets[market.RawTicker] != nil {
if existingMarket, exists := mm.markets[market.RawTicker][market.Key]; exists {
existingMarket.Status = StatusInvalid
}
}
mm.mutex.Unlock()
}
}()
}
// GetMarketCount returns the total number of markets
func (mm *MarketManager) GetMarketCount() int {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
count := 0
for _, markets := range mm.markets {
count += len(markets)
}
return count
}
// GetRawTickerCount returns the number of unique rawTickers
func (mm *MarketManager) GetRawTickerCount() int {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
return len(mm.markets)
}
// evictOldestMarkets removes the oldest markets when the limit is reached
func (mm *MarketManager) evictOldestMarkets() {
// This is a simple implementation that removes the first rawTicker
// A more sophisticated implementation might remove based on last access time
for rawTicker := range mm.markets {
delete(mm.markets, rawTicker)
break // Remove just one to make space
}
}
// GetValidMarketsByRawTicker retrieves all valid markets for a given rawTicker
func (mm *MarketManager) GetValidMarketsByRawTicker(rawTicker string) (map[string]*Market, error) {
markets, err := mm.GetMarketsByRawTicker(rawTicker)
if err != nil {
return nil, err
}
validMarkets := make(map[string]*Market)
for key, market := range markets {
if market.IsValid() {
validMarkets[key] = market
}
}
return validMarkets, nil
}