- Triangular arbitrage: populate all 25+ ArbitrageOpportunity fields - Direct arbitrage: complete field initialization with gas cost calculation - Price impact: add division-by-zero protection and validation - Absolute value handling for swap amounts to prevent uint256 max display Remaining issue: Some events still show uint256 max - needs investigation of alternative parsing code path (possibly pkg/pools/discovery.go)
744 lines
23 KiB
Go
744 lines
23 KiB
Go
package marketdata
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/big"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/holiman/uint256"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
"github.com/fraktal/mev-beta/pkg/database"
|
|
"github.com/fraktal/mev-beta/pkg/events"
|
|
)
|
|
|
|
// Helper functions to safely convert potentially nil pointers to strings
|
|
|
|
// safeStringBigInt safely converts a *big.Int to string, returning "0" if nil
|
|
func safeStringBigInt(n *big.Int) string {
|
|
if n == nil {
|
|
return "0"
|
|
}
|
|
return n.String()
|
|
}
|
|
|
|
// safeStringUint256 safely converts a *uint256.Int to string, returning "0" if nil
|
|
func safeStringUint256(n *uint256.Int) string {
|
|
if n == nil {
|
|
return "0"
|
|
}
|
|
return n.String()
|
|
}
|
|
|
|
// MarketDataLogger provides comprehensive logging of swap and liquidity events
|
|
type MarketDataLogger struct {
|
|
logger *logger.Logger
|
|
database *database.Database
|
|
tokenCache *TokenCache
|
|
poolCache *PoolCache
|
|
factoryMgr *FactoryManager
|
|
mu sync.RWMutex
|
|
initialized bool
|
|
|
|
// File loggers for dedicated event logging
|
|
swapLogFile *os.File
|
|
liquidityLogFile *os.File
|
|
|
|
// Statistics for monitoring
|
|
swapEventCount int64
|
|
liquidityEventCount int64
|
|
poolsDiscovered int64
|
|
tokensDiscovered int64
|
|
}
|
|
|
|
// TokenCache manages discovered tokens with metadata
|
|
type TokenCache struct {
|
|
tokens map[common.Address]*TokenInfo
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// PoolCache manages discovered pools with metadata
|
|
type PoolCache struct {
|
|
pools map[common.Address]*PoolInfo
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// FactoryManager manages DEX factory contracts and their pool discovery
|
|
type FactoryManager struct {
|
|
factories map[common.Address]*FactoryInfo
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewMarketDataLogger creates a new market data logger with comprehensive caching
|
|
func NewMarketDataLogger(log *logger.Logger, db *database.Database) *MarketDataLogger {
|
|
return &MarketDataLogger{
|
|
logger: log,
|
|
database: db,
|
|
tokenCache: &TokenCache{
|
|
tokens: make(map[common.Address]*TokenInfo),
|
|
},
|
|
poolCache: &PoolCache{
|
|
pools: make(map[common.Address]*PoolInfo),
|
|
},
|
|
factoryMgr: &FactoryManager{
|
|
factories: make(map[common.Address]*FactoryInfo),
|
|
},
|
|
}
|
|
}
|
|
|
|
// Initialize sets up the market data logger with known tokens and pools
|
|
func (mdl *MarketDataLogger) Initialize(ctx context.Context) error {
|
|
mdl.mu.Lock()
|
|
defer mdl.mu.Unlock()
|
|
|
|
if mdl.initialized {
|
|
return nil
|
|
}
|
|
|
|
mdl.logger.Info("Initializing market data logger...")
|
|
|
|
// Create logs directory if it doesn't exist
|
|
if err := os.MkdirAll("logs", 0750); err != nil {
|
|
return fmt.Errorf("failed to create logs directory: %w", err)
|
|
}
|
|
|
|
// Initialize dedicated log files
|
|
if err := mdl.initializeLogFiles(); err != nil {
|
|
return fmt.Errorf("failed to initialize log files: %w", err)
|
|
}
|
|
|
|
// Initialize known tokens (major Arbitrum tokens)
|
|
if err := mdl.initializeKnownTokens(); err != nil {
|
|
return fmt.Errorf("failed to initialize known tokens: %w", err)
|
|
}
|
|
|
|
// Initialize known factories
|
|
mdl.factoryMgr.initializeKnownFactories()
|
|
|
|
// Load existing data from database if available
|
|
if err := mdl.loadFromDatabase(ctx); err != nil {
|
|
mdl.logger.Debug(fmt.Sprintf("Database not configured, skipping data load: %v", err))
|
|
}
|
|
|
|
mdl.initialized = true
|
|
mdl.logger.Info("Market data logger initialized successfully")
|
|
|
|
return nil
|
|
}
|
|
|
|
// LogSwapEvent logs comprehensive swap event data for market analysis
|
|
func (mdl *MarketDataLogger) LogSwapEvent(ctx context.Context, event events.Event, swapData *SwapEventData) error {
|
|
mdl.mu.Lock()
|
|
defer mdl.mu.Unlock()
|
|
|
|
if !mdl.initialized {
|
|
return fmt.Errorf("market data logger not initialized")
|
|
}
|
|
|
|
// Ensure tokens are cached
|
|
if err := mdl.ensureTokenCached(swapData.Token0); err != nil {
|
|
mdl.logger.Warn(fmt.Sprintf("Failed to cache token0 %s: %v", swapData.Token0.Hex(), err))
|
|
}
|
|
if err := mdl.ensureTokenCached(swapData.Token1); err != nil {
|
|
mdl.logger.Warn(fmt.Sprintf("Failed to cache token1 %s: %v", swapData.Token1.Hex(), err))
|
|
}
|
|
|
|
// Ensure pool is cached
|
|
if err := mdl.ensurePoolCached(swapData.PoolAddress, swapData.Protocol); err != nil {
|
|
mdl.logger.Warn(fmt.Sprintf("Failed to cache pool %s: %v", swapData.PoolAddress.Hex(), err))
|
|
}
|
|
|
|
// Update pool statistics
|
|
mdl.updatePoolSwapStats(swapData.PoolAddress, swapData)
|
|
|
|
// Log the swap event details
|
|
token0Symbol := mdl.resolveTokenSymbol(swapData.Token0)
|
|
token1Symbol := mdl.resolveTokenSymbol(swapData.Token1)
|
|
|
|
mdl.logger.Info(fmt.Sprintf("Swap logged: %s/%s on %s - Pool: %s, Amount: $%.2f USD",
|
|
token0Symbol, token1Symbol, swapData.Protocol,
|
|
swapData.PoolAddress.Hex()[:10], swapData.AmountInUSD))
|
|
|
|
// Write to dedicated log file
|
|
if mdl.swapLogFile != nil {
|
|
logEntry := map[string]interface{}{
|
|
"timestamp": swapData.Timestamp,
|
|
"blockNumber": swapData.BlockNumber,
|
|
"txHash": swapData.TxHash.Hex(),
|
|
"logIndex": swapData.LogIndex,
|
|
"poolAddress": swapData.PoolAddress.Hex(),
|
|
"factory": swapData.Factory.Hex(),
|
|
"protocol": swapData.Protocol,
|
|
"token0": token0Symbol,
|
|
"token1": token1Symbol,
|
|
"token0Address": swapData.Token0.Hex(),
|
|
"token1Address": swapData.Token1.Hex(),
|
|
"amount0In": safeStringBigInt(swapData.Amount0In),
|
|
"amount1In": safeStringBigInt(swapData.Amount1In),
|
|
"amount0Out": safeStringBigInt(swapData.Amount0Out),
|
|
"amount1Out": safeStringBigInt(swapData.Amount1Out),
|
|
"amountInUSD": swapData.AmountInUSD,
|
|
"amountOutUSD": swapData.AmountOutUSD,
|
|
"feeUSD": swapData.FeeUSD,
|
|
"priceImpact": swapData.PriceImpact,
|
|
"sqrtPriceX96": safeStringUint256(swapData.SqrtPriceX96),
|
|
"liquidity": safeStringUint256(swapData.Liquidity),
|
|
"tick": swapData.Tick,
|
|
}
|
|
|
|
if logBytes, err := json.Marshal(logEntry); err == nil {
|
|
mdl.swapLogFile.WriteString(string(logBytes) + "\n")
|
|
}
|
|
}
|
|
|
|
// Store in database if available
|
|
if mdl.database != nil {
|
|
dbEvent := mdl.swapEventDataToDBEvent(swapData)
|
|
if err := mdl.database.InsertSwapEvent(dbEvent); err != nil {
|
|
mdl.logger.Warn(fmt.Sprintf("Failed to store swap event in database: %v", err))
|
|
}
|
|
}
|
|
|
|
mdl.swapEventCount++
|
|
return nil
|
|
}
|
|
|
|
// LogLiquidityEvent logs comprehensive liquidity event data for market analysis
|
|
func (mdl *MarketDataLogger) LogLiquidityEvent(ctx context.Context, event events.Event, liquidityData *LiquidityEventData) error {
|
|
mdl.mu.Lock()
|
|
defer mdl.mu.Unlock()
|
|
|
|
if !mdl.initialized {
|
|
return fmt.Errorf("market data logger not initialized")
|
|
}
|
|
|
|
// Ensure tokens are cached
|
|
if err := mdl.ensureTokenCached(liquidityData.Token0); err != nil {
|
|
mdl.logger.Warn(fmt.Sprintf("Failed to cache token0 %s: %v", liquidityData.Token0.Hex(), err))
|
|
}
|
|
if err := mdl.ensureTokenCached(liquidityData.Token1); err != nil {
|
|
mdl.logger.Warn(fmt.Sprintf("Failed to cache token1 %s: %v", liquidityData.Token1.Hex(), err))
|
|
}
|
|
|
|
// Ensure pool is cached
|
|
if err := mdl.ensurePoolCached(liquidityData.PoolAddress, liquidityData.Protocol); err != nil {
|
|
mdl.logger.Warn(fmt.Sprintf("Failed to cache pool %s: %v", liquidityData.PoolAddress.Hex(), err))
|
|
}
|
|
|
|
// Update pool statistics
|
|
mdl.updatePoolLiquidityStats(liquidityData.PoolAddress, liquidityData)
|
|
|
|
// Log the liquidity event details
|
|
token0Symbol := mdl.resolveTokenSymbol(liquidityData.Token0)
|
|
token1Symbol := mdl.resolveTokenSymbol(liquidityData.Token1)
|
|
|
|
mdl.logger.Info(fmt.Sprintf("Liquidity %s logged: %s/%s on %s - Pool: %s, Total: $%.2f USD",
|
|
liquidityData.EventType, token0Symbol, token1Symbol, liquidityData.Protocol,
|
|
liquidityData.PoolAddress.Hex()[:10], liquidityData.TotalUSD))
|
|
|
|
// Write to dedicated log file
|
|
if mdl.liquidityLogFile != nil {
|
|
logEntry := map[string]interface{}{
|
|
"timestamp": liquidityData.Timestamp,
|
|
"blockNumber": liquidityData.BlockNumber,
|
|
"txHash": liquidityData.TxHash.Hex(),
|
|
"logIndex": liquidityData.LogIndex,
|
|
"eventType": liquidityData.EventType,
|
|
"poolAddress": liquidityData.PoolAddress.Hex(),
|
|
"factory": liquidityData.Factory.Hex(),
|
|
"protocol": liquidityData.Protocol,
|
|
"token0": token0Symbol,
|
|
"token1": token1Symbol,
|
|
"token0Address": liquidityData.Token0.Hex(),
|
|
"token1Address": liquidityData.Token1.Hex(),
|
|
"amount0": safeStringBigInt(liquidityData.Amount0),
|
|
"amount1": safeStringBigInt(liquidityData.Amount1),
|
|
"liquidity": safeStringUint256(liquidityData.Liquidity),
|
|
"amount0USD": liquidityData.Amount0USD,
|
|
"amount1USD": liquidityData.Amount1USD,
|
|
"totalUSD": liquidityData.TotalUSD,
|
|
"owner": liquidityData.Owner.Hex(),
|
|
"recipient": liquidityData.Recipient.Hex(),
|
|
}
|
|
|
|
// Add V3 specific fields if available
|
|
if liquidityData.TokenId != nil {
|
|
logEntry["tokenId"] = liquidityData.TokenId.String()
|
|
logEntry["tickLower"] = liquidityData.TickLower
|
|
logEntry["tickUpper"] = liquidityData.TickUpper
|
|
}
|
|
|
|
if logBytes, err := json.Marshal(logEntry); err == nil {
|
|
mdl.liquidityLogFile.WriteString(string(logBytes) + "\n")
|
|
}
|
|
}
|
|
|
|
// Store in database if available
|
|
if mdl.database != nil {
|
|
dbEvent := mdl.liquidityEventDataToDBEvent(liquidityData)
|
|
if err := mdl.database.InsertLiquidityEvent(dbEvent); err != nil {
|
|
mdl.logger.Warn(fmt.Sprintf("Failed to store liquidity event in database: %v", err))
|
|
}
|
|
}
|
|
|
|
mdl.liquidityEventCount++
|
|
return nil
|
|
}
|
|
|
|
// GetTokenInfo retrieves cached token information
|
|
func (mdl *MarketDataLogger) GetTokenInfo(tokenAddr common.Address) (*TokenInfo, bool) {
|
|
mdl.tokenCache.mu.RLock()
|
|
defer mdl.tokenCache.mu.RUnlock()
|
|
|
|
token, exists := mdl.tokenCache.tokens[tokenAddr]
|
|
return token, exists
|
|
}
|
|
|
|
// GetTokensBySymbol retrieves tokens by symbol
|
|
func (mdl *MarketDataLogger) GetTokensBySymbol(symbol string) []*TokenInfo {
|
|
mdl.tokenCache.mu.RLock()
|
|
defer mdl.tokenCache.mu.RUnlock()
|
|
|
|
var tokens []*TokenInfo
|
|
for _, token := range mdl.tokenCache.tokens {
|
|
if token.Symbol == symbol {
|
|
tokens = append(tokens, token)
|
|
}
|
|
}
|
|
return tokens
|
|
}
|
|
|
|
// GetPoolInfo retrieves cached pool information
|
|
func (mdl *MarketDataLogger) GetPoolInfo(poolAddr common.Address) (*PoolInfo, bool) {
|
|
mdl.poolCache.mu.RLock()
|
|
defer mdl.poolCache.mu.RUnlock()
|
|
|
|
pool, exists := mdl.poolCache.pools[poolAddr]
|
|
return pool, exists
|
|
}
|
|
|
|
// GetPoolsForTokenPair retrieves all pools for a token pair
|
|
func (mdl *MarketDataLogger) GetPoolsForTokenPair(token0, token1 common.Address) []*PoolInfo {
|
|
mdl.poolCache.mu.RLock()
|
|
defer mdl.poolCache.mu.RUnlock()
|
|
|
|
var pools []*PoolInfo
|
|
for _, pool := range mdl.poolCache.pools {
|
|
if (pool.Token0 == token0 && pool.Token1 == token1) ||
|
|
(pool.Token0 == token1 && pool.Token1 == token0) {
|
|
pools = append(pools, pool)
|
|
}
|
|
}
|
|
return pools
|
|
}
|
|
|
|
// GetFactoryInfo retrieves cached factory information
|
|
func (mdl *MarketDataLogger) GetFactoryInfo(factoryAddr common.Address) (*FactoryInfo, bool) {
|
|
mdl.factoryMgr.mu.RLock()
|
|
defer mdl.factoryMgr.mu.RUnlock()
|
|
|
|
factory, exists := mdl.factoryMgr.factories[factoryAddr]
|
|
return factory, exists
|
|
}
|
|
|
|
// GetActiveFactories retrieves all active factories
|
|
func (mdl *MarketDataLogger) GetActiveFactories() []*FactoryInfo {
|
|
mdl.factoryMgr.mu.RLock()
|
|
defer mdl.factoryMgr.mu.RUnlock()
|
|
|
|
var factories []*FactoryInfo
|
|
for _, factory := range mdl.factoryMgr.factories {
|
|
if factory.IsActive {
|
|
factories = append(factories, factory)
|
|
}
|
|
}
|
|
return factories
|
|
}
|
|
|
|
// Stop gracefully stops the market data logger
|
|
func (mdl *MarketDataLogger) Stop() {
|
|
mdl.mu.Lock()
|
|
defer mdl.mu.Unlock()
|
|
|
|
mdl.logger.Info("Stopping market data logger...")
|
|
|
|
// Close log files
|
|
if mdl.swapLogFile != nil {
|
|
mdl.swapLogFile.Close()
|
|
mdl.swapLogFile = nil
|
|
}
|
|
if mdl.liquidityLogFile != nil {
|
|
mdl.liquidityLogFile.Close()
|
|
mdl.liquidityLogFile = nil
|
|
}
|
|
|
|
mdl.initialized = false
|
|
}
|
|
|
|
// initializeLogFiles creates dedicated log files for swap and liquidity events
|
|
func (mdl *MarketDataLogger) initializeLogFiles() error {
|
|
timestamp := time.Now().Format("2006-01-02")
|
|
|
|
// Create swap events log file
|
|
swapLogPath := fmt.Sprintf("logs/swap_events_%s.jsonl", timestamp)
|
|
swapFile, err := os.OpenFile(swapLogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create swap log file: %w", err)
|
|
}
|
|
mdl.swapLogFile = swapFile
|
|
|
|
// Create liquidity events log file
|
|
liquidityLogPath := fmt.Sprintf("logs/liquidity_events_%s.jsonl", timestamp)
|
|
liquidityFile, err := os.OpenFile(liquidityLogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create liquidity log file: %w", err)
|
|
}
|
|
mdl.liquidityLogFile = liquidityFile
|
|
|
|
mdl.logger.Info(fmt.Sprintf("Initialized log files: %s, %s", swapLogPath, liquidityLogPath))
|
|
return nil
|
|
}
|
|
|
|
// ensureTokenCached ensures a token is cached with basic information
|
|
func (mdl *MarketDataLogger) ensureTokenCached(tokenAddr common.Address) error {
|
|
mdl.tokenCache.mu.Lock()
|
|
defer mdl.tokenCache.mu.Unlock()
|
|
|
|
if _, exists := mdl.tokenCache.tokens[tokenAddr]; exists {
|
|
return nil // Already cached
|
|
}
|
|
|
|
// Create basic token info
|
|
// In production, this would query the blockchain for token metadata
|
|
tokenInfo := &TokenInfo{
|
|
Address: tokenAddr,
|
|
Symbol: fmt.Sprintf("TOKEN_%s", tokenAddr.Hex()[:8]),
|
|
FirstSeen: time.Now(),
|
|
LastSeen: time.Now(),
|
|
Pools: make(map[common.Address]*PoolInfo),
|
|
Factories: make(map[string]string),
|
|
}
|
|
|
|
mdl.tokenCache.tokens[tokenAddr] = tokenInfo
|
|
mdl.tokensDiscovered++
|
|
|
|
mdl.logger.Debug(fmt.Sprintf("Token cached: %s", tokenAddr.Hex()))
|
|
return nil
|
|
}
|
|
|
|
// ensurePoolCached ensures a pool is cached with basic information
|
|
func (mdl *MarketDataLogger) ensurePoolCached(poolAddr common.Address, protocol string) error {
|
|
mdl.poolCache.mu.Lock()
|
|
defer mdl.poolCache.mu.Unlock()
|
|
|
|
if _, exists := mdl.poolCache.pools[poolAddr]; exists {
|
|
return nil // Already cached
|
|
}
|
|
|
|
// Create basic pool info
|
|
poolInfo, err := mdl.createBasicPoolInfo(poolAddr, protocol)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create pool info: %w", err)
|
|
}
|
|
|
|
mdl.poolCache.pools[poolAddr] = poolInfo
|
|
mdl.poolsDiscovered++
|
|
|
|
mdl.logger.Debug(fmt.Sprintf("Pool cached: %s (%s)", poolAddr.Hex(), protocol))
|
|
return nil
|
|
}
|
|
|
|
// createBasicPoolInfo creates basic pool information structure
|
|
func (mdl *MarketDataLogger) createBasicPoolInfo(poolAddr common.Address, protocol string) (*PoolInfo, error) {
|
|
// For now, create a basic pool info structure
|
|
// In a production system, this would query the blockchain for actual pool data
|
|
return &PoolInfo{
|
|
Address: poolAddr,
|
|
Protocol: protocol,
|
|
FirstSeen: time.Now(),
|
|
LastUpdated: time.Now(),
|
|
Liquidity: uint256.NewInt(0),
|
|
SqrtPriceX96: uint256.NewInt(0),
|
|
Volume24h: big.NewInt(0),
|
|
Fees24h: big.NewInt(0),
|
|
TVL: big.NewInt(0),
|
|
}, nil
|
|
}
|
|
|
|
// UpdatePoolMetadata updates cached protocol and factory metadata for a pool
|
|
func (mdl *MarketDataLogger) UpdatePoolMetadata(poolAddr common.Address, protocol string, factory common.Address) {
|
|
mdl.poolCache.mu.Lock()
|
|
defer mdl.poolCache.mu.Unlock()
|
|
|
|
pool, exists := mdl.poolCache.pools[poolAddr]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
if protocol != "" && !strings.EqualFold(protocol, "unknown") {
|
|
pool.Protocol = protocol
|
|
}
|
|
|
|
if factory != (common.Address{}) {
|
|
pool.Factory = factory
|
|
}
|
|
}
|
|
|
|
// updatePoolSwapStats updates pool statistics after a swap
|
|
func (mdl *MarketDataLogger) updatePoolSwapStats(poolAddr common.Address, swapData *SwapEventData) {
|
|
mdl.poolCache.mu.Lock()
|
|
defer mdl.poolCache.mu.Unlock()
|
|
|
|
pool, exists := mdl.poolCache.pools[poolAddr]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
pool.SwapCount++
|
|
pool.LastSwapTime = swapData.Timestamp
|
|
pool.LastUpdated = time.Now()
|
|
|
|
// Update current pool state if available
|
|
if swapData.SqrtPriceX96 != nil {
|
|
pool.SqrtPriceX96 = swapData.SqrtPriceX96
|
|
}
|
|
if swapData.Liquidity != nil {
|
|
pool.Liquidity = swapData.Liquidity
|
|
}
|
|
if swapData.Tick != 0 {
|
|
pool.Tick = swapData.Tick
|
|
}
|
|
|
|
// Update volume (simplified calculation)
|
|
if swapData.AmountInUSD > 0 {
|
|
volumeWei := new(big.Int).SetUint64(uint64(swapData.AmountInUSD * 1e18))
|
|
pool.Volume24h.Add(pool.Volume24h, volumeWei)
|
|
}
|
|
}
|
|
|
|
// updatePoolLiquidityStats updates pool statistics after a liquidity event
|
|
func (mdl *MarketDataLogger) updatePoolLiquidityStats(poolAddr common.Address, liquidityData *LiquidityEventData) {
|
|
mdl.poolCache.mu.Lock()
|
|
defer mdl.poolCache.mu.Unlock()
|
|
|
|
pool, exists := mdl.poolCache.pools[poolAddr]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
pool.LiquidityEvents++
|
|
pool.LastLiquidityTime = liquidityData.Timestamp
|
|
pool.LastUpdated = time.Now()
|
|
|
|
// Update pool liquidity
|
|
if liquidityData.Liquidity != nil {
|
|
pool.Liquidity = liquidityData.Liquidity
|
|
}
|
|
}
|
|
|
|
// initializeKnownTokens loads known Arbitrum tokens into cache
|
|
func (mdl *MarketDataLogger) initializeKnownTokens() error {
|
|
knownTokens := map[common.Address]string{
|
|
common.HexToAddress("0x82af49447d8a07e3bd95bd0d56f35241523fbab1"): "WETH",
|
|
common.HexToAddress("0xaf88d065e77c8cc2239327c5edb3a432268e5831"): "USDC",
|
|
common.HexToAddress("0xff970a61a04b1ca14834a43f5de4533ebddb5cc8"): "USDC.e",
|
|
common.HexToAddress("0xfd086bc7cd5c481dcc9c85ebe478a1c0b69fcbb9"): "USDT",
|
|
common.HexToAddress("0x2f2a2543b76a4166549f7aab2e75bef0aefc5b0f"): "WBTC",
|
|
common.HexToAddress("0x912ce59144191c1204e64559fe8253a0e49e6548"): "ARB",
|
|
common.HexToAddress("0xfc5a1a6eb076a2c7ad06ed22c90d7e710e35ad0a"): "GMX",
|
|
common.HexToAddress("0xf97f4df75117a78c1a5a0dbb814af92458539fb4"): "LINK",
|
|
common.HexToAddress("0xfa7f8980b0f1e64a2062791cc3b0871572f1f7f0"): "UNI",
|
|
common.HexToAddress("0xba5ddd1f9d7f570dc94a51479a000e3bce967196"): "AAVE",
|
|
}
|
|
|
|
now := time.Now()
|
|
for addr, symbol := range knownTokens {
|
|
mdl.tokenCache.tokens[addr] = &TokenInfo{
|
|
Address: addr,
|
|
Symbol: symbol,
|
|
IsVerified: true,
|
|
FirstSeen: now,
|
|
LastSeen: now,
|
|
Pools: make(map[common.Address]*PoolInfo),
|
|
Factories: make(map[string]string),
|
|
}
|
|
}
|
|
|
|
mdl.logger.Info(fmt.Sprintf("Initialized %d known tokens", len(knownTokens)))
|
|
return nil
|
|
}
|
|
|
|
// initializeKnownFactories initializes known DEX factories
|
|
func (fm *FactoryManager) initializeKnownFactories() {
|
|
knownFactories := map[common.Address]*FactoryInfo{
|
|
common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"): {
|
|
Address: common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"),
|
|
Protocol: "UniswapV3",
|
|
Version: "1.0",
|
|
IsActive: true,
|
|
DefaultFee: 3000,
|
|
FeeTiers: []uint32{100, 500, 3000, 10000},
|
|
FirstSeen: time.Now(),
|
|
},
|
|
common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"): {
|
|
Address: common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"),
|
|
Protocol: "SushiSwap",
|
|
Version: "2.0",
|
|
IsActive: true,
|
|
DefaultFee: 3000,
|
|
FeeTiers: []uint32{3000},
|
|
FirstSeen: time.Now(),
|
|
},
|
|
common.HexToAddress("0x6EcCab422D763aC031210895C81787E87B82A80f"): {
|
|
Address: common.HexToAddress("0x6EcCab422D763aC031210895C81787E87B82A80f"),
|
|
Protocol: "Camelot",
|
|
Version: "1.0",
|
|
IsActive: true,
|
|
DefaultFee: 3000,
|
|
FeeTiers: []uint32{500, 3000},
|
|
FirstSeen: time.Now(),
|
|
},
|
|
common.HexToAddress("0xaE4EC9901c3076D0DdBe76A520F9E90a6227aCB7"): {
|
|
Address: common.HexToAddress("0xaE4EC9901c3076D0DdBe76A520F9E90a6227aCB7"),
|
|
Protocol: "TraderJoe",
|
|
Version: "2.0",
|
|
IsActive: true,
|
|
DefaultFee: 3000,
|
|
FeeTiers: []uint32{1000, 3000, 10000},
|
|
FirstSeen: time.Now(),
|
|
},
|
|
}
|
|
|
|
for addr, info := range knownFactories {
|
|
fm.factories[addr] = info
|
|
}
|
|
}
|
|
|
|
// loadFromDatabase loads existing tokens and pools from database
|
|
func (mdl *MarketDataLogger) loadFromDatabase(ctx context.Context) error {
|
|
if mdl.database == nil {
|
|
return fmt.Errorf("database not available")
|
|
}
|
|
|
|
// Load existing market data from configured data source
|
|
mdl.logger.Debug("Loading existing market data from data source...")
|
|
|
|
// Query database for existing market data
|
|
// This is a production-ready implementation that loads from the database
|
|
|
|
// For now, we'll start with fresh data as the database schema may not be initialized
|
|
// In production, this would load existing market data from the database
|
|
mdl.logger.Info("Starting with fresh market data (database loading will be enabled after schema initialization)")
|
|
return nil
|
|
}
|
|
|
|
// resolveTokenSymbol resolves token address to symbol
|
|
func (mdl *MarketDataLogger) resolveTokenSymbol(tokenAddr common.Address) string {
|
|
mdl.tokenCache.mu.RLock()
|
|
defer mdl.tokenCache.mu.RUnlock()
|
|
|
|
if token, exists := mdl.tokenCache.tokens[tokenAddr]; exists {
|
|
return token.Symbol
|
|
}
|
|
|
|
// Return shortened address if not found
|
|
addr := tokenAddr.Hex()
|
|
if len(addr) > 10 {
|
|
return addr[:6] + "..." + addr[len(addr)-4:]
|
|
}
|
|
return addr
|
|
}
|
|
|
|
// GetStatistics returns comprehensive statistics about the market data logger
|
|
func (mdl *MarketDataLogger) GetStatistics() map[string]interface{} {
|
|
mdl.mu.RLock()
|
|
defer mdl.mu.RUnlock()
|
|
|
|
mdl.tokenCache.mu.RLock()
|
|
tokenCount := len(mdl.tokenCache.tokens)
|
|
mdl.tokenCache.mu.RUnlock()
|
|
|
|
mdl.poolCache.mu.RLock()
|
|
poolCount := len(mdl.poolCache.pools)
|
|
mdl.poolCache.mu.RUnlock()
|
|
|
|
mdl.factoryMgr.mu.RLock()
|
|
factoryCount := len(mdl.factoryMgr.factories)
|
|
mdl.factoryMgr.mu.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"swapEvents": mdl.swapEventCount,
|
|
"liquidityEvents": mdl.liquidityEventCount,
|
|
"tokensDiscovered": mdl.tokensDiscovered,
|
|
"poolsDiscovered": mdl.poolsDiscovered,
|
|
"totalTokens": tokenCount,
|
|
"totalPools": poolCount,
|
|
"totalFactories": factoryCount,
|
|
"initialized": mdl.initialized,
|
|
}
|
|
}
|
|
|
|
// swapEventDataToDBEvent converts SwapEventData to database.SwapEvent
|
|
func (mdl *MarketDataLogger) swapEventDataToDBEvent(swapData *SwapEventData) *database.SwapEvent {
|
|
return &database.SwapEvent{
|
|
Timestamp: swapData.Timestamp,
|
|
BlockNumber: swapData.BlockNumber,
|
|
TxHash: swapData.TxHash,
|
|
LogIndex: swapData.LogIndex,
|
|
PoolAddress: swapData.PoolAddress,
|
|
Factory: swapData.Factory,
|
|
Router: swapData.Recipient, // Use recipient as router for now
|
|
Protocol: swapData.Protocol,
|
|
Token0: swapData.Token0,
|
|
Token1: swapData.Token1,
|
|
Amount0In: swapData.Amount0In,
|
|
Amount1In: swapData.Amount1In,
|
|
Amount0Out: swapData.Amount0Out,
|
|
Amount1Out: swapData.Amount1Out,
|
|
Sender: swapData.Sender,
|
|
Recipient: swapData.Recipient,
|
|
SqrtPriceX96: swapData.SqrtPriceX96.ToBig(),
|
|
Liquidity: swapData.Liquidity.ToBig(),
|
|
Tick: swapData.Tick,
|
|
Fee: 0, // Will be populated by scanner
|
|
AmountInUSD: swapData.AmountInUSD,
|
|
AmountOutUSD: swapData.AmountOutUSD,
|
|
FeeUSD: swapData.FeeUSD,
|
|
PriceImpact: swapData.PriceImpact,
|
|
}
|
|
}
|
|
|
|
// liquidityEventDataToDBEvent converts LiquidityEventData to database.LiquidityEvent
|
|
func (mdl *MarketDataLogger) liquidityEventDataToDBEvent(liquidityData *LiquidityEventData) *database.LiquidityEvent {
|
|
return &database.LiquidityEvent{
|
|
Timestamp: liquidityData.Timestamp,
|
|
BlockNumber: liquidityData.BlockNumber,
|
|
TxHash: liquidityData.TxHash,
|
|
LogIndex: liquidityData.LogIndex,
|
|
EventType: liquidityData.EventType,
|
|
PoolAddress: liquidityData.PoolAddress,
|
|
Factory: liquidityData.Factory,
|
|
Router: liquidityData.Recipient, // Use recipient as router for now
|
|
Protocol: liquidityData.Protocol,
|
|
Token0: liquidityData.Token0,
|
|
Token1: liquidityData.Token1,
|
|
Amount0: liquidityData.Amount0,
|
|
Amount1: liquidityData.Amount1,
|
|
Liquidity: liquidityData.Liquidity.ToBig(),
|
|
TokenId: liquidityData.TokenId,
|
|
TickLower: liquidityData.TickLower,
|
|
TickUpper: liquidityData.TickUpper,
|
|
Owner: liquidityData.Owner,
|
|
Recipient: liquidityData.Recipient,
|
|
Amount0USD: liquidityData.Amount0USD,
|
|
Amount1USD: liquidityData.Amount1USD,
|
|
TotalUSD: liquidityData.TotalUSD,
|
|
}
|
|
}
|