// Package database provides database functionality for storing MEV bot data package database import ( "database/sql" "fmt" "math/big" "time" "github.com/ethereum/go-ethereum/common" _ "github.com/mattn/go-sqlite3" "github.com/fraktal/mev-beta/internal/config" "github.com/fraktal/mev-beta/internal/logger" ) // Database represents the database connection and operations type Database struct { db *sql.DB logger *logger.Logger config *config.DatabaseConfig } // SwapEvent represents a swap event stored in the database type SwapEvent struct { ID int64 `json:"id"` Timestamp time.Time `json:"timestamp"` BlockNumber uint64 `json:"block_number"` TxHash common.Hash `json:"tx_hash"` LogIndex uint `json:"log_index"` // Pool and protocol info PoolAddress common.Address `json:"pool_address"` Factory common.Address `json:"factory"` Router common.Address `json:"router"` Protocol string `json:"protocol"` // Token and amount details Token0 common.Address `json:"token0"` Token1 common.Address `json:"token1"` Amount0In *big.Int `json:"amount0_in"` Amount1In *big.Int `json:"amount1_in"` Amount0Out *big.Int `json:"amount0_out"` Amount1Out *big.Int `json:"amount1_out"` // Swap execution details Sender common.Address `json:"sender"` Recipient common.Address `json:"recipient"` SqrtPriceX96 *big.Int `json:"sqrt_price_x96"` Liquidity *big.Int `json:"liquidity"` Tick int32 `json:"tick"` // Fee and pricing information Fee uint32 `json:"fee"` AmountInUSD float64 `json:"amount_in_usd"` AmountOutUSD float64 `json:"amount_out_usd"` FeeUSD float64 `json:"fee_usd"` PriceImpact float64 `json:"price_impact"` } // LiquidityEvent represents a liquidity event stored in the database type LiquidityEvent struct { ID int64 `json:"id"` Timestamp time.Time `json:"timestamp"` BlockNumber uint64 `json:"block_number"` TxHash common.Hash `json:"tx_hash"` LogIndex uint `json:"log_index"` EventType string `json:"event_type"` // "mint", "burn", "collect" // Pool and protocol info PoolAddress common.Address `json:"pool_address"` Factory common.Address `json:"factory"` Router common.Address `json:"router"` Protocol string `json:"protocol"` // Token and amount details Token0 common.Address `json:"token0"` Token1 common.Address `json:"token1"` Amount0 *big.Int `json:"amount0"` Amount1 *big.Int `json:"amount1"` Liquidity *big.Int `json:"liquidity"` // Position details (for V3) TokenId *big.Int `json:"token_id"` TickLower int32 `json:"tick_lower"` TickUpper int32 `json:"tick_upper"` // User details Owner common.Address `json:"owner"` Recipient common.Address `json:"recipient"` // Calculated values Amount0USD float64 `json:"amount0_usd"` Amount1USD float64 `json:"amount1_usd"` TotalUSD float64 `json:"total_usd"` } // PoolData represents pool data stored in the database type PoolData struct { ID int64 `json:"id"` Address common.Address `json:"address"` Token0 common.Address `json:"token0"` Token1 common.Address `json:"token1"` Fee int64 `json:"fee"` Liquidity *big.Int `json:"liquidity"` SqrtPriceX96 *big.Int `json:"sqrt_price_x96"` Tick int64 `json:"tick"` LastUpdated time.Time `json:"last_updated"` Protocol string `json:"protocol"` } // NewDatabase creates a new database connection func NewDatabase(cfg *config.DatabaseConfig, logger *logger.Logger) (*Database, error) { // Open database connection db, err := sql.Open("sqlite3", cfg.File) if err != nil { return nil, fmt.Errorf("failed to open database: %w", err) } // Set connection limits db.SetMaxOpenConns(cfg.MaxOpenConnections) db.SetMaxIdleConns(cfg.MaxIdleConnections) // Create database instance database := &Database{ db: db, logger: logger, config: cfg, } // Initialize database schema if err := database.initSchema(); err != nil { return nil, fmt.Errorf("failed to initialize database schema: %w", err) } logger.Info("Database initialized successfully") return database, nil } // initSchema initializes the database schema func (d *Database) initSchema() error { // Create swap events table if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS swap_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME NOT NULL, block_number INTEGER NOT NULL, tx_hash TEXT NOT NULL, log_index INTEGER NOT NULL, pool_address TEXT NOT NULL, factory TEXT NOT NULL, router TEXT NOT NULL, protocol TEXT NOT NULL, token0 TEXT NOT NULL, token1 TEXT NOT NULL, amount0_in TEXT NOT NULL, amount1_in TEXT NOT NULL, amount0_out TEXT NOT NULL, amount1_out TEXT NOT NULL, sender TEXT NOT NULL, recipient TEXT NOT NULL, amount_in_usd REAL NOT NULL, amount_out_usd REAL NOT NULL, fee_usd REAL NOT NULL, price_impact REAL NOT NULL, sqrt_price_x96 TEXT NOT NULL, liquidity TEXT NOT NULL, tick INTEGER NOT NULL, fee INTEGER NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP )`); err != nil { return fmt.Errorf("failed to create swap_events table: %w", err) } // Create liquidity events table if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS liquidity_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME NOT NULL, block_number INTEGER NOT NULL, tx_hash TEXT NOT NULL, log_index INTEGER NOT NULL, event_type TEXT NOT NULL, pool_address TEXT NOT NULL, factory TEXT NOT NULL, router TEXT NOT NULL, protocol TEXT NOT NULL, token0 TEXT NOT NULL, token1 TEXT NOT NULL, token_id TEXT, amount0 TEXT NOT NULL, amount1 TEXT NOT NULL, liquidity TEXT NOT NULL, tick_lower INTEGER, tick_upper INTEGER, owner TEXT NOT NULL, recipient TEXT NOT NULL, amount0_usd REAL NOT NULL, amount1_usd REAL NOT NULL, total_usd REAL NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP )`); err != nil { return fmt.Errorf("failed to create liquidity_events table: %w", err) } // Create pool data table if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS pool_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, address TEXT NOT NULL UNIQUE, token0 TEXT NOT NULL, token1 TEXT NOT NULL, fee INTEGER NOT NULL, liquidity TEXT NOT NULL, sqrt_price_x96 TEXT NOT NULL, tick INTEGER NOT NULL, last_updated DATETIME NOT NULL, protocol TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP )`); err != nil { return fmt.Errorf("failed to create pool_data table: %w", err) } // Create tokens table for metadata storage if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, address TEXT NOT NULL UNIQUE, symbol TEXT NOT NULL, name TEXT NOT NULL, decimals INTEGER NOT NULL, total_supply TEXT, is_stablecoin BOOLEAN NOT NULL DEFAULT FALSE, is_wrapped BOOLEAN NOT NULL DEFAULT FALSE, category TEXT NOT NULL DEFAULT 'unknown', price_usd REAL, price_eth REAL, last_updated DATETIME NOT NULL, risk_score REAL NOT NULL DEFAULT 0.5, is_verified BOOLEAN NOT NULL DEFAULT FALSE, contract_verified BOOLEAN NOT NULL DEFAULT FALSE, implementation TEXT, total_liquidity_usd REAL, main_pool TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP )`); err != nil { return fmt.Errorf("failed to create tokens table: %w", err) } // Create indices for better performance indices := []string{ `CREATE INDEX IF NOT EXISTS idx_swap_events_timestamp ON swap_events(timestamp)`, `CREATE INDEX IF NOT EXISTS idx_swap_events_block_number ON swap_events(block_number)`, `CREATE INDEX IF NOT EXISTS idx_swap_events_tx_hash ON swap_events(tx_hash)`, `CREATE INDEX IF NOT EXISTS idx_swap_events_pool_address ON swap_events(pool_address)`, `CREATE INDEX IF NOT EXISTS idx_liquidity_events_timestamp ON liquidity_events(timestamp)`, `CREATE INDEX IF NOT EXISTS idx_liquidity_events_block_number ON liquidity_events(block_number)`, `CREATE INDEX IF NOT EXISTS idx_liquidity_events_tx_hash ON liquidity_events(tx_hash)`, `CREATE INDEX IF NOT EXISTS idx_liquidity_events_pool_address ON liquidity_events(pool_address)`, `CREATE INDEX IF NOT EXISTS idx_pool_data_last_updated ON pool_data(last_updated)`, `CREATE INDEX IF NOT EXISTS idx_pool_data_protocol ON pool_data(protocol)`, `CREATE INDEX IF NOT EXISTS idx_tokens_symbol ON tokens(symbol)`, `CREATE INDEX IF NOT EXISTS idx_tokens_category ON tokens(category)`, `CREATE INDEX IF NOT EXISTS idx_tokens_address ON tokens(address)`, `CREATE INDEX IF NOT EXISTS idx_tokens_last_updated ON tokens(last_updated)`, } // Execute all index creation statements for _, stmt := range indices { _, err := d.db.Exec(stmt) if err != nil { return fmt.Errorf("failed to execute index statement: %s, error: %w", stmt, err) } } return nil } // InsertSwapEvent inserts a swap event into the database func (d *Database) InsertSwapEvent(event *SwapEvent) error { stmt, err := d.db.Prepare(` INSERT OR IGNORE INTO swap_events ( timestamp, block_number, tx_hash, log_index, pool_address, factory, router, protocol, token0, token1, amount0_in, amount1_in, amount0_out, amount1_out, sender, recipient, sqrt_price_x96, liquidity, tick, fee, amount_in_usd, amount_out_usd, fee_usd, price_impact ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { return fmt.Errorf("failed to prepare insert statement: %w", err) } defer stmt.Close() // Handle nil values safely sqrtPrice := "0" if event.SqrtPriceX96 != nil { sqrtPrice = event.SqrtPriceX96.String() } liquidity := "0" if event.Liquidity != nil { liquidity = event.Liquidity.String() } _, err = stmt.Exec( event.Timestamp.UTC().Format(time.RFC3339), event.BlockNumber, event.TxHash.Hex(), event.LogIndex, event.PoolAddress.Hex(), event.Factory.Hex(), event.Router.Hex(), event.Protocol, event.Token0.Hex(), event.Token1.Hex(), event.Amount0In.String(), event.Amount1In.String(), event.Amount0Out.String(), event.Amount1Out.String(), event.Sender.Hex(), event.Recipient.Hex(), sqrtPrice, liquidity, event.Tick, event.Fee, event.AmountInUSD, event.AmountOutUSD, event.FeeUSD, event.PriceImpact, ) if err != nil { return fmt.Errorf("failed to insert swap event: %w", err) } d.logger.Debug(fmt.Sprintf("Inserted swap event for pool %s (tx: %s)", event.PoolAddress.Hex(), event.TxHash.Hex())) return nil } // InsertLiquidityEvent inserts a liquidity event into the database func (d *Database) InsertLiquidityEvent(event *LiquidityEvent) error { stmt, err := d.db.Prepare(` INSERT OR IGNORE INTO liquidity_events ( timestamp, block_number, tx_hash, log_index, event_type, pool_address, factory, router, protocol, token0, token1, amount0, amount1, liquidity, token_id, tick_lower, tick_upper, owner, recipient, amount0_usd, amount1_usd, total_usd ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { return fmt.Errorf("failed to prepare insert statement: %w", err) } defer stmt.Close() // Handle nil values safely tokenId := "" if event.TokenId != nil { tokenId = event.TokenId.String() } _, err = stmt.Exec( event.Timestamp.UTC().Format(time.RFC3339), event.BlockNumber, event.TxHash.Hex(), event.LogIndex, event.EventType, event.PoolAddress.Hex(), event.Factory.Hex(), event.Router.Hex(), event.Protocol, event.Token0.Hex(), event.Token1.Hex(), event.Amount0.String(), event.Amount1.String(), event.Liquidity.String(), tokenId, event.TickLower, event.TickUpper, event.Owner.Hex(), event.Recipient.Hex(), event.Amount0USD, event.Amount1USD, event.TotalUSD, ) if err != nil { return fmt.Errorf("failed to insert liquidity event: %w", err) } d.logger.Debug(fmt.Sprintf("Inserted %s liquidity event for pool %s (tx: %s)", event.EventType, event.PoolAddress.Hex(), event.TxHash.Hex())) return nil } // InsertPoolData inserts or updates pool data in the database func (d *Database) InsertPoolData(pool *PoolData) error { stmt, err := d.db.Prepare(` INSERT OR REPLACE INTO pool_data ( address, token0, token1, fee, liquidity, sqrt_price_x96, tick, last_updated, protocol ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { return fmt.Errorf("failed to prepare insert statement: %w", err) } defer stmt.Close() _, err = stmt.Exec( pool.Address.Hex(), pool.Token0.Hex(), pool.Token1.Hex(), pool.Fee, pool.Liquidity.String(), pool.SqrtPriceX96.String(), pool.Tick, pool.LastUpdated.UTC().Format(time.RFC3339), // Store in UTC RFC3339 format pool.Protocol, ) if err != nil { return fmt.Errorf("failed to insert pool data: %w", err) } d.logger.Debug(fmt.Sprintf("Inserted/updated pool data for pool %s", pool.Address.Hex())) return nil } // GetRecentSwapEvents retrieves recent swap events from the database func (d *Database) GetRecentSwapEvents(limit int) ([]*SwapEvent, error) { rows, err := d.db.Query(` SELECT id, timestamp, block_number, tx_hash, log_index, pool_address, factory, router, protocol, token0, token1, amount0_in, amount1_in, amount0_out, amount1_out, sender, recipient, sqrt_price_x96, liquidity, tick, fee, amount_in_usd, amount_out_usd, fee_usd, price_impact FROM swap_events ORDER BY timestamp DESC LIMIT ? `, limit) if err != nil { return nil, fmt.Errorf("failed to query swap events: %w", err) } defer rows.Close() events := make([]*SwapEvent, 0) for rows.Next() { event := &SwapEvent{} var txHash, poolAddr, factory, router, token0, token1, sender, recipient string var amount0In, amount1In, amount0Out, amount1Out, sqrtPrice, liquidity string err := rows.Scan( &event.ID, &event.Timestamp, &event.BlockNumber, &txHash, &event.LogIndex, &poolAddr, &factory, &router, &event.Protocol, &token0, &token1, &amount0In, &amount1In, &amount0Out, &amount1Out, &sender, &recipient, &sqrtPrice, &liquidity, &event.Tick, &event.Fee, &event.AmountInUSD, &event.AmountOutUSD, &event.FeeUSD, &event.PriceImpact, ) if err != nil { return nil, fmt.Errorf("failed to scan swap event: %w", err) } // Convert string values to proper types event.TxHash = common.HexToHash(txHash) event.PoolAddress = common.HexToAddress(poolAddr) event.Factory = common.HexToAddress(factory) event.Router = common.HexToAddress(router) event.Token0 = common.HexToAddress(token0) event.Token1 = common.HexToAddress(token1) event.Sender = common.HexToAddress(sender) event.Recipient = common.HexToAddress(recipient) // Convert string amounts to big.Int event.Amount0In = new(big.Int) event.Amount0In.SetString(amount0In, 10) event.Amount1In = new(big.Int) event.Amount1In.SetString(amount1In, 10) event.Amount0Out = new(big.Int) event.Amount0Out.SetString(amount0Out, 10) event.Amount1Out = new(big.Int) event.Amount1Out.SetString(amount1Out, 10) event.SqrtPriceX96 = new(big.Int) event.SqrtPriceX96.SetString(sqrtPrice, 10) event.Liquidity = new(big.Int) event.Liquidity.SetString(liquidity, 10) events = append(events, event) } return events, nil } // GetRecentLiquidityEvents retrieves recent liquidity events from the database func (d *Database) GetRecentLiquidityEvents(limit int) ([]*LiquidityEvent, error) { rows, err := d.db.Query(` SELECT id, timestamp, block_number, tx_hash, log_index, event_type, pool_address, factory, router, protocol, token0, token1, amount0, amount1, liquidity, token_id, tick_lower, tick_upper, owner, recipient, amount0_usd, amount1_usd, total_usd FROM liquidity_events ORDER BY timestamp DESC LIMIT ? `, limit) if err != nil { return nil, fmt.Errorf("failed to query liquidity events: %w", err) } defer rows.Close() events := make([]*LiquidityEvent, 0) for rows.Next() { event := &LiquidityEvent{} var txHash, poolAddr, factory, router, token0, token1, owner, recipient, tokenId string var liquidity, amount0, amount1 string err := rows.Scan( &event.ID, &event.Timestamp, &event.BlockNumber, &txHash, &event.LogIndex, &event.EventType, &poolAddr, &factory, &router, &event.Protocol, &token0, &token1, &amount0, &amount1, &liquidity, &tokenId, &event.TickLower, &event.TickUpper, &owner, &recipient, &event.Amount0USD, &event.Amount1USD, &event.TotalUSD, ) if err != nil { return nil, fmt.Errorf("failed to scan liquidity event: %w", err) } // Convert string values to proper types event.TxHash = common.HexToHash(txHash) event.PoolAddress = common.HexToAddress(poolAddr) event.Factory = common.HexToAddress(factory) event.Router = common.HexToAddress(router) event.Token0 = common.HexToAddress(token0) event.Token1 = common.HexToAddress(token1) event.Owner = common.HexToAddress(owner) event.Recipient = common.HexToAddress(recipient) // Convert string amounts to big.Int event.Liquidity = new(big.Int) event.Liquidity.SetString(liquidity, 10) event.Amount0 = new(big.Int) event.Amount0.SetString(amount0, 10) event.Amount1 = new(big.Int) event.Amount1.SetString(amount1, 10) // Convert TokenId if present if tokenId != "" { event.TokenId = new(big.Int) event.TokenId.SetString(tokenId, 10) } events = append(events, event) } return events, nil } // GetPoolData retrieves pool data from the database func (d *Database) GetPoolData(poolAddress common.Address) (*PoolData, error) { row := d.db.QueryRow(` SELECT id, address, token0, token1, fee, liquidity, sqrt_price_x96, tick, last_updated, protocol FROM pool_data WHERE address = ? `, poolAddress.Hex()) pool := &PoolData{} var addr, token0, token1, liquidity, sqrtPrice string var lastUpdated string err := row.Scan( &pool.ID, &addr, &token0, &token1, &pool.Fee, &liquidity, &sqrtPrice, &pool.Tick, &lastUpdated, &pool.Protocol, ) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("pool data not found for address: %s", poolAddress.Hex()) } return nil, fmt.Errorf("failed to scan pool data: %w", err) } // Convert string values to proper types pool.Address = common.HexToAddress(addr) pool.Token0 = common.HexToAddress(token0) pool.Token1 = common.HexToAddress(token1) // Convert string amounts to big.Int pool.Liquidity = new(big.Int) pool.Liquidity.SetString(liquidity, 10) pool.SqrtPriceX96 = new(big.Int) pool.SqrtPriceX96.SetString(sqrtPrice, 10) // Parse timestamp pool.LastUpdated, err = time.Parse(time.RFC3339, lastUpdated) if err != nil { return nil, fmt.Errorf("failed to parse timestamp: %w", err) } return pool, nil } // Close closes the database connection func (d *Database) Close() error { if d.db != nil { return d.db.Close() } return nil }