Files
mev-beta/pkg/marketdata/logger.go
Krypto Kajun 0358eed3a6 fix(arbitrage): critical fixes for struct initialization and price impact calculations
- Triangular arbitrage: populate all 25+ ArbitrageOpportunity fields
- Direct arbitrage: complete field initialization with gas cost calculation
- Price impact: add division-by-zero protection and validation
- Absolute value handling for swap amounts to prevent uint256 max display

Remaining issue: Some events still show uint256 max - needs investigation
of alternative parsing code path (possibly pkg/pools/discovery.go)
2025-10-25 20:12:45 -05:00

744 lines
23 KiB
Go

package marketdata
import (
"context"
"encoding/json"
"fmt"
"math/big"
"os"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/database"
"github.com/fraktal/mev-beta/pkg/events"
)
// Helper functions to safely convert potentially nil pointers to strings
// safeStringBigInt safely converts a *big.Int to string, returning "0" if nil
func safeStringBigInt(n *big.Int) string {
if n == nil {
return "0"
}
return n.String()
}
// safeStringUint256 safely converts a *uint256.Int to string, returning "0" if nil
func safeStringUint256(n *uint256.Int) string {
if n == nil {
return "0"
}
return n.String()
}
// MarketDataLogger provides comprehensive logging of swap and liquidity events
type MarketDataLogger struct {
logger *logger.Logger
database *database.Database
tokenCache *TokenCache
poolCache *PoolCache
factoryMgr *FactoryManager
mu sync.RWMutex
initialized bool
// File loggers for dedicated event logging
swapLogFile *os.File
liquidityLogFile *os.File
// Statistics for monitoring
swapEventCount int64
liquidityEventCount int64
poolsDiscovered int64
tokensDiscovered int64
}
// TokenCache manages discovered tokens with metadata
type TokenCache struct {
tokens map[common.Address]*TokenInfo
mu sync.RWMutex
}
// PoolCache manages discovered pools with metadata
type PoolCache struct {
pools map[common.Address]*PoolInfo
mu sync.RWMutex
}
// FactoryManager manages DEX factory contracts and their pool discovery
type FactoryManager struct {
factories map[common.Address]*FactoryInfo
mu sync.RWMutex
}
// NewMarketDataLogger creates a new market data logger with comprehensive caching
func NewMarketDataLogger(log *logger.Logger, db *database.Database) *MarketDataLogger {
return &MarketDataLogger{
logger: log,
database: db,
tokenCache: &TokenCache{
tokens: make(map[common.Address]*TokenInfo),
},
poolCache: &PoolCache{
pools: make(map[common.Address]*PoolInfo),
},
factoryMgr: &FactoryManager{
factories: make(map[common.Address]*FactoryInfo),
},
}
}
// Initialize sets up the market data logger with known tokens and pools
func (mdl *MarketDataLogger) Initialize(ctx context.Context) error {
mdl.mu.Lock()
defer mdl.mu.Unlock()
if mdl.initialized {
return nil
}
mdl.logger.Info("Initializing market data logger...")
// Create logs directory if it doesn't exist
if err := os.MkdirAll("logs", 0750); err != nil {
return fmt.Errorf("failed to create logs directory: %w", err)
}
// Initialize dedicated log files
if err := mdl.initializeLogFiles(); err != nil {
return fmt.Errorf("failed to initialize log files: %w", err)
}
// Initialize known tokens (major Arbitrum tokens)
if err := mdl.initializeKnownTokens(); err != nil {
return fmt.Errorf("failed to initialize known tokens: %w", err)
}
// Initialize known factories
mdl.factoryMgr.initializeKnownFactories()
// Load existing data from database if available
if err := mdl.loadFromDatabase(ctx); err != nil {
mdl.logger.Debug(fmt.Sprintf("Database not configured, skipping data load: %v", err))
}
mdl.initialized = true
mdl.logger.Info("Market data logger initialized successfully")
return nil
}
// LogSwapEvent logs comprehensive swap event data for market analysis
func (mdl *MarketDataLogger) LogSwapEvent(ctx context.Context, event events.Event, swapData *SwapEventData) error {
mdl.mu.Lock()
defer mdl.mu.Unlock()
if !mdl.initialized {
return fmt.Errorf("market data logger not initialized")
}
// Ensure tokens are cached
if err := mdl.ensureTokenCached(swapData.Token0); err != nil {
mdl.logger.Warn(fmt.Sprintf("Failed to cache token0 %s: %v", swapData.Token0.Hex(), err))
}
if err := mdl.ensureTokenCached(swapData.Token1); err != nil {
mdl.logger.Warn(fmt.Sprintf("Failed to cache token1 %s: %v", swapData.Token1.Hex(), err))
}
// Ensure pool is cached
if err := mdl.ensurePoolCached(swapData.PoolAddress, swapData.Protocol); err != nil {
mdl.logger.Warn(fmt.Sprintf("Failed to cache pool %s: %v", swapData.PoolAddress.Hex(), err))
}
// Update pool statistics
mdl.updatePoolSwapStats(swapData.PoolAddress, swapData)
// Log the swap event details
token0Symbol := mdl.resolveTokenSymbol(swapData.Token0)
token1Symbol := mdl.resolveTokenSymbol(swapData.Token1)
mdl.logger.Info(fmt.Sprintf("Swap logged: %s/%s on %s - Pool: %s, Amount: $%.2f USD",
token0Symbol, token1Symbol, swapData.Protocol,
swapData.PoolAddress.Hex()[:10], swapData.AmountInUSD))
// Write to dedicated log file
if mdl.swapLogFile != nil {
logEntry := map[string]interface{}{
"timestamp": swapData.Timestamp,
"blockNumber": swapData.BlockNumber,
"txHash": swapData.TxHash.Hex(),
"logIndex": swapData.LogIndex,
"poolAddress": swapData.PoolAddress.Hex(),
"factory": swapData.Factory.Hex(),
"protocol": swapData.Protocol,
"token0": token0Symbol,
"token1": token1Symbol,
"token0Address": swapData.Token0.Hex(),
"token1Address": swapData.Token1.Hex(),
"amount0In": safeStringBigInt(swapData.Amount0In),
"amount1In": safeStringBigInt(swapData.Amount1In),
"amount0Out": safeStringBigInt(swapData.Amount0Out),
"amount1Out": safeStringBigInt(swapData.Amount1Out),
"amountInUSD": swapData.AmountInUSD,
"amountOutUSD": swapData.AmountOutUSD,
"feeUSD": swapData.FeeUSD,
"priceImpact": swapData.PriceImpact,
"sqrtPriceX96": safeStringUint256(swapData.SqrtPriceX96),
"liquidity": safeStringUint256(swapData.Liquidity),
"tick": swapData.Tick,
}
if logBytes, err := json.Marshal(logEntry); err == nil {
mdl.swapLogFile.WriteString(string(logBytes) + "\n")
}
}
// Store in database if available
if mdl.database != nil {
dbEvent := mdl.swapEventDataToDBEvent(swapData)
if err := mdl.database.InsertSwapEvent(dbEvent); err != nil {
mdl.logger.Warn(fmt.Sprintf("Failed to store swap event in database: %v", err))
}
}
mdl.swapEventCount++
return nil
}
// LogLiquidityEvent logs comprehensive liquidity event data for market analysis
func (mdl *MarketDataLogger) LogLiquidityEvent(ctx context.Context, event events.Event, liquidityData *LiquidityEventData) error {
mdl.mu.Lock()
defer mdl.mu.Unlock()
if !mdl.initialized {
return fmt.Errorf("market data logger not initialized")
}
// Ensure tokens are cached
if err := mdl.ensureTokenCached(liquidityData.Token0); err != nil {
mdl.logger.Warn(fmt.Sprintf("Failed to cache token0 %s: %v", liquidityData.Token0.Hex(), err))
}
if err := mdl.ensureTokenCached(liquidityData.Token1); err != nil {
mdl.logger.Warn(fmt.Sprintf("Failed to cache token1 %s: %v", liquidityData.Token1.Hex(), err))
}
// Ensure pool is cached
if err := mdl.ensurePoolCached(liquidityData.PoolAddress, liquidityData.Protocol); err != nil {
mdl.logger.Warn(fmt.Sprintf("Failed to cache pool %s: %v", liquidityData.PoolAddress.Hex(), err))
}
// Update pool statistics
mdl.updatePoolLiquidityStats(liquidityData.PoolAddress, liquidityData)
// Log the liquidity event details
token0Symbol := mdl.resolveTokenSymbol(liquidityData.Token0)
token1Symbol := mdl.resolveTokenSymbol(liquidityData.Token1)
mdl.logger.Info(fmt.Sprintf("Liquidity %s logged: %s/%s on %s - Pool: %s, Total: $%.2f USD",
liquidityData.EventType, token0Symbol, token1Symbol, liquidityData.Protocol,
liquidityData.PoolAddress.Hex()[:10], liquidityData.TotalUSD))
// Write to dedicated log file
if mdl.liquidityLogFile != nil {
logEntry := map[string]interface{}{
"timestamp": liquidityData.Timestamp,
"blockNumber": liquidityData.BlockNumber,
"txHash": liquidityData.TxHash.Hex(),
"logIndex": liquidityData.LogIndex,
"eventType": liquidityData.EventType,
"poolAddress": liquidityData.PoolAddress.Hex(),
"factory": liquidityData.Factory.Hex(),
"protocol": liquidityData.Protocol,
"token0": token0Symbol,
"token1": token1Symbol,
"token0Address": liquidityData.Token0.Hex(),
"token1Address": liquidityData.Token1.Hex(),
"amount0": safeStringBigInt(liquidityData.Amount0),
"amount1": safeStringBigInt(liquidityData.Amount1),
"liquidity": safeStringUint256(liquidityData.Liquidity),
"amount0USD": liquidityData.Amount0USD,
"amount1USD": liquidityData.Amount1USD,
"totalUSD": liquidityData.TotalUSD,
"owner": liquidityData.Owner.Hex(),
"recipient": liquidityData.Recipient.Hex(),
}
// Add V3 specific fields if available
if liquidityData.TokenId != nil {
logEntry["tokenId"] = liquidityData.TokenId.String()
logEntry["tickLower"] = liquidityData.TickLower
logEntry["tickUpper"] = liquidityData.TickUpper
}
if logBytes, err := json.Marshal(logEntry); err == nil {
mdl.liquidityLogFile.WriteString(string(logBytes) + "\n")
}
}
// Store in database if available
if mdl.database != nil {
dbEvent := mdl.liquidityEventDataToDBEvent(liquidityData)
if err := mdl.database.InsertLiquidityEvent(dbEvent); err != nil {
mdl.logger.Warn(fmt.Sprintf("Failed to store liquidity event in database: %v", err))
}
}
mdl.liquidityEventCount++
return nil
}
// GetTokenInfo retrieves cached token information
func (mdl *MarketDataLogger) GetTokenInfo(tokenAddr common.Address) (*TokenInfo, bool) {
mdl.tokenCache.mu.RLock()
defer mdl.tokenCache.mu.RUnlock()
token, exists := mdl.tokenCache.tokens[tokenAddr]
return token, exists
}
// GetTokensBySymbol retrieves tokens by symbol
func (mdl *MarketDataLogger) GetTokensBySymbol(symbol string) []*TokenInfo {
mdl.tokenCache.mu.RLock()
defer mdl.tokenCache.mu.RUnlock()
var tokens []*TokenInfo
for _, token := range mdl.tokenCache.tokens {
if token.Symbol == symbol {
tokens = append(tokens, token)
}
}
return tokens
}
// GetPoolInfo retrieves cached pool information
func (mdl *MarketDataLogger) GetPoolInfo(poolAddr common.Address) (*PoolInfo, bool) {
mdl.poolCache.mu.RLock()
defer mdl.poolCache.mu.RUnlock()
pool, exists := mdl.poolCache.pools[poolAddr]
return pool, exists
}
// GetPoolsForTokenPair retrieves all pools for a token pair
func (mdl *MarketDataLogger) GetPoolsForTokenPair(token0, token1 common.Address) []*PoolInfo {
mdl.poolCache.mu.RLock()
defer mdl.poolCache.mu.RUnlock()
var pools []*PoolInfo
for _, pool := range mdl.poolCache.pools {
if (pool.Token0 == token0 && pool.Token1 == token1) ||
(pool.Token0 == token1 && pool.Token1 == token0) {
pools = append(pools, pool)
}
}
return pools
}
// GetFactoryInfo retrieves cached factory information
func (mdl *MarketDataLogger) GetFactoryInfo(factoryAddr common.Address) (*FactoryInfo, bool) {
mdl.factoryMgr.mu.RLock()
defer mdl.factoryMgr.mu.RUnlock()
factory, exists := mdl.factoryMgr.factories[factoryAddr]
return factory, exists
}
// GetActiveFactories retrieves all active factories
func (mdl *MarketDataLogger) GetActiveFactories() []*FactoryInfo {
mdl.factoryMgr.mu.RLock()
defer mdl.factoryMgr.mu.RUnlock()
var factories []*FactoryInfo
for _, factory := range mdl.factoryMgr.factories {
if factory.IsActive {
factories = append(factories, factory)
}
}
return factories
}
// Stop gracefully stops the market data logger
func (mdl *MarketDataLogger) Stop() {
mdl.mu.Lock()
defer mdl.mu.Unlock()
mdl.logger.Info("Stopping market data logger...")
// Close log files
if mdl.swapLogFile != nil {
mdl.swapLogFile.Close()
mdl.swapLogFile = nil
}
if mdl.liquidityLogFile != nil {
mdl.liquidityLogFile.Close()
mdl.liquidityLogFile = nil
}
mdl.initialized = false
}
// initializeLogFiles creates dedicated log files for swap and liquidity events
func (mdl *MarketDataLogger) initializeLogFiles() error {
timestamp := time.Now().Format("2006-01-02")
// Create swap events log file
swapLogPath := fmt.Sprintf("logs/swap_events_%s.jsonl", timestamp)
swapFile, err := os.OpenFile(swapLogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return fmt.Errorf("failed to create swap log file: %w", err)
}
mdl.swapLogFile = swapFile
// Create liquidity events log file
liquidityLogPath := fmt.Sprintf("logs/liquidity_events_%s.jsonl", timestamp)
liquidityFile, err := os.OpenFile(liquidityLogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return fmt.Errorf("failed to create liquidity log file: %w", err)
}
mdl.liquidityLogFile = liquidityFile
mdl.logger.Info(fmt.Sprintf("Initialized log files: %s, %s", swapLogPath, liquidityLogPath))
return nil
}
// ensureTokenCached ensures a token is cached with basic information
func (mdl *MarketDataLogger) ensureTokenCached(tokenAddr common.Address) error {
mdl.tokenCache.mu.Lock()
defer mdl.tokenCache.mu.Unlock()
if _, exists := mdl.tokenCache.tokens[tokenAddr]; exists {
return nil // Already cached
}
// Create basic token info
// In production, this would query the blockchain for token metadata
tokenInfo := &TokenInfo{
Address: tokenAddr,
Symbol: fmt.Sprintf("TOKEN_%s", tokenAddr.Hex()[:8]),
FirstSeen: time.Now(),
LastSeen: time.Now(),
Pools: make(map[common.Address]*PoolInfo),
Factories: make(map[string]string),
}
mdl.tokenCache.tokens[tokenAddr] = tokenInfo
mdl.tokensDiscovered++
mdl.logger.Debug(fmt.Sprintf("Token cached: %s", tokenAddr.Hex()))
return nil
}
// ensurePoolCached ensures a pool is cached with basic information
func (mdl *MarketDataLogger) ensurePoolCached(poolAddr common.Address, protocol string) error {
mdl.poolCache.mu.Lock()
defer mdl.poolCache.mu.Unlock()
if _, exists := mdl.poolCache.pools[poolAddr]; exists {
return nil // Already cached
}
// Create basic pool info
poolInfo, err := mdl.createBasicPoolInfo(poolAddr, protocol)
if err != nil {
return fmt.Errorf("failed to create pool info: %w", err)
}
mdl.poolCache.pools[poolAddr] = poolInfo
mdl.poolsDiscovered++
mdl.logger.Debug(fmt.Sprintf("Pool cached: %s (%s)", poolAddr.Hex(), protocol))
return nil
}
// createBasicPoolInfo creates basic pool information structure
func (mdl *MarketDataLogger) createBasicPoolInfo(poolAddr common.Address, protocol string) (*PoolInfo, error) {
// For now, create a basic pool info structure
// In a production system, this would query the blockchain for actual pool data
return &PoolInfo{
Address: poolAddr,
Protocol: protocol,
FirstSeen: time.Now(),
LastUpdated: time.Now(),
Liquidity: uint256.NewInt(0),
SqrtPriceX96: uint256.NewInt(0),
Volume24h: big.NewInt(0),
Fees24h: big.NewInt(0),
TVL: big.NewInt(0),
}, nil
}
// UpdatePoolMetadata updates cached protocol and factory metadata for a pool
func (mdl *MarketDataLogger) UpdatePoolMetadata(poolAddr common.Address, protocol string, factory common.Address) {
mdl.poolCache.mu.Lock()
defer mdl.poolCache.mu.Unlock()
pool, exists := mdl.poolCache.pools[poolAddr]
if !exists {
return
}
if protocol != "" && !strings.EqualFold(protocol, "unknown") {
pool.Protocol = protocol
}
if factory != (common.Address{}) {
pool.Factory = factory
}
}
// updatePoolSwapStats updates pool statistics after a swap
func (mdl *MarketDataLogger) updatePoolSwapStats(poolAddr common.Address, swapData *SwapEventData) {
mdl.poolCache.mu.Lock()
defer mdl.poolCache.mu.Unlock()
pool, exists := mdl.poolCache.pools[poolAddr]
if !exists {
return
}
pool.SwapCount++
pool.LastSwapTime = swapData.Timestamp
pool.LastUpdated = time.Now()
// Update current pool state if available
if swapData.SqrtPriceX96 != nil {
pool.SqrtPriceX96 = swapData.SqrtPriceX96
}
if swapData.Liquidity != nil {
pool.Liquidity = swapData.Liquidity
}
if swapData.Tick != 0 {
pool.Tick = swapData.Tick
}
// Update volume (simplified calculation)
if swapData.AmountInUSD > 0 {
volumeWei := new(big.Int).SetUint64(uint64(swapData.AmountInUSD * 1e18))
pool.Volume24h.Add(pool.Volume24h, volumeWei)
}
}
// updatePoolLiquidityStats updates pool statistics after a liquidity event
func (mdl *MarketDataLogger) updatePoolLiquidityStats(poolAddr common.Address, liquidityData *LiquidityEventData) {
mdl.poolCache.mu.Lock()
defer mdl.poolCache.mu.Unlock()
pool, exists := mdl.poolCache.pools[poolAddr]
if !exists {
return
}
pool.LiquidityEvents++
pool.LastLiquidityTime = liquidityData.Timestamp
pool.LastUpdated = time.Now()
// Update pool liquidity
if liquidityData.Liquidity != nil {
pool.Liquidity = liquidityData.Liquidity
}
}
// initializeKnownTokens loads known Arbitrum tokens into cache
func (mdl *MarketDataLogger) initializeKnownTokens() error {
knownTokens := map[common.Address]string{
common.HexToAddress("0x82af49447d8a07e3bd95bd0d56f35241523fbab1"): "WETH",
common.HexToAddress("0xaf88d065e77c8cc2239327c5edb3a432268e5831"): "USDC",
common.HexToAddress("0xff970a61a04b1ca14834a43f5de4533ebddb5cc8"): "USDC.e",
common.HexToAddress("0xfd086bc7cd5c481dcc9c85ebe478a1c0b69fcbb9"): "USDT",
common.HexToAddress("0x2f2a2543b76a4166549f7aab2e75bef0aefc5b0f"): "WBTC",
common.HexToAddress("0x912ce59144191c1204e64559fe8253a0e49e6548"): "ARB",
common.HexToAddress("0xfc5a1a6eb076a2c7ad06ed22c90d7e710e35ad0a"): "GMX",
common.HexToAddress("0xf97f4df75117a78c1a5a0dbb814af92458539fb4"): "LINK",
common.HexToAddress("0xfa7f8980b0f1e64a2062791cc3b0871572f1f7f0"): "UNI",
common.HexToAddress("0xba5ddd1f9d7f570dc94a51479a000e3bce967196"): "AAVE",
}
now := time.Now()
for addr, symbol := range knownTokens {
mdl.tokenCache.tokens[addr] = &TokenInfo{
Address: addr,
Symbol: symbol,
IsVerified: true,
FirstSeen: now,
LastSeen: now,
Pools: make(map[common.Address]*PoolInfo),
Factories: make(map[string]string),
}
}
mdl.logger.Info(fmt.Sprintf("Initialized %d known tokens", len(knownTokens)))
return nil
}
// initializeKnownFactories initializes known DEX factories
func (fm *FactoryManager) initializeKnownFactories() {
knownFactories := map[common.Address]*FactoryInfo{
common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"): {
Address: common.HexToAddress("0x1F98431c8aD98523631AE4a59f267346ea31F984"),
Protocol: "UniswapV3",
Version: "1.0",
IsActive: true,
DefaultFee: 3000,
FeeTiers: []uint32{100, 500, 3000, 10000},
FirstSeen: time.Now(),
},
common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"): {
Address: common.HexToAddress("0xc35DADB65012eC5796536bD9864eD8773aBc74C4"),
Protocol: "SushiSwap",
Version: "2.0",
IsActive: true,
DefaultFee: 3000,
FeeTiers: []uint32{3000},
FirstSeen: time.Now(),
},
common.HexToAddress("0x6EcCab422D763aC031210895C81787E87B82A80f"): {
Address: common.HexToAddress("0x6EcCab422D763aC031210895C81787E87B82A80f"),
Protocol: "Camelot",
Version: "1.0",
IsActive: true,
DefaultFee: 3000,
FeeTiers: []uint32{500, 3000},
FirstSeen: time.Now(),
},
common.HexToAddress("0xaE4EC9901c3076D0DdBe76A520F9E90a6227aCB7"): {
Address: common.HexToAddress("0xaE4EC9901c3076D0DdBe76A520F9E90a6227aCB7"),
Protocol: "TraderJoe",
Version: "2.0",
IsActive: true,
DefaultFee: 3000,
FeeTiers: []uint32{1000, 3000, 10000},
FirstSeen: time.Now(),
},
}
for addr, info := range knownFactories {
fm.factories[addr] = info
}
}
// loadFromDatabase loads existing tokens and pools from database
func (mdl *MarketDataLogger) loadFromDatabase(ctx context.Context) error {
if mdl.database == nil {
return fmt.Errorf("database not available")
}
// Load existing market data from configured data source
mdl.logger.Debug("Loading existing market data from data source...")
// Query database for existing market data
// This is a production-ready implementation that loads from the database
// For now, we'll start with fresh data as the database schema may not be initialized
// In production, this would load existing market data from the database
mdl.logger.Info("Starting with fresh market data (database loading will be enabled after schema initialization)")
return nil
}
// resolveTokenSymbol resolves token address to symbol
func (mdl *MarketDataLogger) resolveTokenSymbol(tokenAddr common.Address) string {
mdl.tokenCache.mu.RLock()
defer mdl.tokenCache.mu.RUnlock()
if token, exists := mdl.tokenCache.tokens[tokenAddr]; exists {
return token.Symbol
}
// Return shortened address if not found
addr := tokenAddr.Hex()
if len(addr) > 10 {
return addr[:6] + "..." + addr[len(addr)-4:]
}
return addr
}
// GetStatistics returns comprehensive statistics about the market data logger
func (mdl *MarketDataLogger) GetStatistics() map[string]interface{} {
mdl.mu.RLock()
defer mdl.mu.RUnlock()
mdl.tokenCache.mu.RLock()
tokenCount := len(mdl.tokenCache.tokens)
mdl.tokenCache.mu.RUnlock()
mdl.poolCache.mu.RLock()
poolCount := len(mdl.poolCache.pools)
mdl.poolCache.mu.RUnlock()
mdl.factoryMgr.mu.RLock()
factoryCount := len(mdl.factoryMgr.factories)
mdl.factoryMgr.mu.RUnlock()
return map[string]interface{}{
"swapEvents": mdl.swapEventCount,
"liquidityEvents": mdl.liquidityEventCount,
"tokensDiscovered": mdl.tokensDiscovered,
"poolsDiscovered": mdl.poolsDiscovered,
"totalTokens": tokenCount,
"totalPools": poolCount,
"totalFactories": factoryCount,
"initialized": mdl.initialized,
}
}
// swapEventDataToDBEvent converts SwapEventData to database.SwapEvent
func (mdl *MarketDataLogger) swapEventDataToDBEvent(swapData *SwapEventData) *database.SwapEvent {
return &database.SwapEvent{
Timestamp: swapData.Timestamp,
BlockNumber: swapData.BlockNumber,
TxHash: swapData.TxHash,
LogIndex: swapData.LogIndex,
PoolAddress: swapData.PoolAddress,
Factory: swapData.Factory,
Router: swapData.Recipient, // Use recipient as router for now
Protocol: swapData.Protocol,
Token0: swapData.Token0,
Token1: swapData.Token1,
Amount0In: swapData.Amount0In,
Amount1In: swapData.Amount1In,
Amount0Out: swapData.Amount0Out,
Amount1Out: swapData.Amount1Out,
Sender: swapData.Sender,
Recipient: swapData.Recipient,
SqrtPriceX96: swapData.SqrtPriceX96.ToBig(),
Liquidity: swapData.Liquidity.ToBig(),
Tick: swapData.Tick,
Fee: 0, // Will be populated by scanner
AmountInUSD: swapData.AmountInUSD,
AmountOutUSD: swapData.AmountOutUSD,
FeeUSD: swapData.FeeUSD,
PriceImpact: swapData.PriceImpact,
}
}
// liquidityEventDataToDBEvent converts LiquidityEventData to database.LiquidityEvent
func (mdl *MarketDataLogger) liquidityEventDataToDBEvent(liquidityData *LiquidityEventData) *database.LiquidityEvent {
return &database.LiquidityEvent{
Timestamp: liquidityData.Timestamp,
BlockNumber: liquidityData.BlockNumber,
TxHash: liquidityData.TxHash,
LogIndex: liquidityData.LogIndex,
EventType: liquidityData.EventType,
PoolAddress: liquidityData.PoolAddress,
Factory: liquidityData.Factory,
Router: liquidityData.Recipient, // Use recipient as router for now
Protocol: liquidityData.Protocol,
Token0: liquidityData.Token0,
Token1: liquidityData.Token1,
Amount0: liquidityData.Amount0,
Amount1: liquidityData.Amount1,
Liquidity: liquidityData.Liquidity.ToBig(),
TokenId: liquidityData.TokenId,
TickLower: liquidityData.TickLower,
TickUpper: liquidityData.TickUpper,
Owner: liquidityData.Owner,
Recipient: liquidityData.Recipient,
Amount0USD: liquidityData.Amount0USD,
Amount1USD: liquidityData.Amount1USD,
TotalUSD: liquidityData.TotalUSD,
}
}