Files
mev-beta/pkg/database/database.go
Krypto Kajun 850223a953 fix(multicall): resolve critical multicall parsing corruption issues
- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing
- Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives
- Added LRU caching system for address validation with 10-minute TTL
- Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures
- Fixed duplicate function declarations and import conflicts across multiple files
- Added error recovery mechanisms with multiple fallback strategies
- Updated tests to handle new validation behavior for suspicious addresses
- Fixed parser test expectations for improved validation system
- Applied gofmt formatting fixes to ensure code style compliance
- Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot
- Resolved critical security vulnerabilities in heuristic address extraction
- Progress: Updated TODO audit from 10% to 35% complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 00:12:55 -05:00

592 lines
18 KiB
Go

// Package database provides database functionality for storing MEV bot data
package database
import (
"database/sql"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
_ "github.com/mattn/go-sqlite3"
"github.com/fraktal/mev-beta/internal/config"
"github.com/fraktal/mev-beta/internal/logger"
)
// Database represents the database connection and operations
type Database struct {
db *sql.DB
logger *logger.Logger
config *config.DatabaseConfig
}
// SwapEvent represents a swap event stored in the database
type SwapEvent struct {
ID int64 `json:"id"`
Timestamp time.Time `json:"timestamp"`
BlockNumber uint64 `json:"block_number"`
TxHash common.Hash `json:"tx_hash"`
LogIndex uint `json:"log_index"`
// Pool and protocol info
PoolAddress common.Address `json:"pool_address"`
Factory common.Address `json:"factory"`
Router common.Address `json:"router"`
Protocol string `json:"protocol"`
// Token and amount details
Token0 common.Address `json:"token0"`
Token1 common.Address `json:"token1"`
Amount0In *big.Int `json:"amount0_in"`
Amount1In *big.Int `json:"amount1_in"`
Amount0Out *big.Int `json:"amount0_out"`
Amount1Out *big.Int `json:"amount1_out"`
// Swap execution details
Sender common.Address `json:"sender"`
Recipient common.Address `json:"recipient"`
SqrtPriceX96 *big.Int `json:"sqrt_price_x96"`
Liquidity *big.Int `json:"liquidity"`
Tick int32 `json:"tick"`
// Fee and pricing information
Fee uint32 `json:"fee"`
AmountInUSD float64 `json:"amount_in_usd"`
AmountOutUSD float64 `json:"amount_out_usd"`
FeeUSD float64 `json:"fee_usd"`
PriceImpact float64 `json:"price_impact"`
}
// LiquidityEvent represents a liquidity event stored in the database
type LiquidityEvent struct {
ID int64 `json:"id"`
Timestamp time.Time `json:"timestamp"`
BlockNumber uint64 `json:"block_number"`
TxHash common.Hash `json:"tx_hash"`
LogIndex uint `json:"log_index"`
EventType string `json:"event_type"` // "mint", "burn", "collect"
// Pool and protocol info
PoolAddress common.Address `json:"pool_address"`
Factory common.Address `json:"factory"`
Router common.Address `json:"router"`
Protocol string `json:"protocol"`
// Token and amount details
Token0 common.Address `json:"token0"`
Token1 common.Address `json:"token1"`
Amount0 *big.Int `json:"amount0"`
Amount1 *big.Int `json:"amount1"`
Liquidity *big.Int `json:"liquidity"`
// Position details (for V3)
TokenId *big.Int `json:"token_id"`
TickLower int32 `json:"tick_lower"`
TickUpper int32 `json:"tick_upper"`
// User details
Owner common.Address `json:"owner"`
Recipient common.Address `json:"recipient"`
// Calculated values
Amount0USD float64 `json:"amount0_usd"`
Amount1USD float64 `json:"amount1_usd"`
TotalUSD float64 `json:"total_usd"`
}
// PoolData represents pool data stored in the database
type PoolData struct {
ID int64 `json:"id"`
Address common.Address `json:"address"`
Token0 common.Address `json:"token0"`
Token1 common.Address `json:"token1"`
Fee int64 `json:"fee"`
Liquidity *big.Int `json:"liquidity"`
SqrtPriceX96 *big.Int `json:"sqrt_price_x96"`
Tick int64 `json:"tick"`
LastUpdated time.Time `json:"last_updated"`
Protocol string `json:"protocol"`
}
// NewDatabase creates a new database connection
func NewDatabase(cfg *config.DatabaseConfig, logger *logger.Logger) (*Database, error) {
// Open database connection
db, err := sql.Open("sqlite3", cfg.File)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
// Set connection limits
db.SetMaxOpenConns(cfg.MaxOpenConnections)
db.SetMaxIdleConns(cfg.MaxIdleConnections)
// Create database instance
database := &Database{
db: db,
logger: logger,
config: cfg,
}
// Initialize database schema
if err := database.initSchema(); err != nil {
return nil, fmt.Errorf("failed to initialize database schema: %w", err)
}
logger.Info("Database initialized successfully")
return database, nil
}
// initSchema initializes the database schema
func (d *Database) initSchema() error {
// Create swap events table
if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS swap_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
block_number INTEGER NOT NULL,
tx_hash TEXT NOT NULL,
log_index INTEGER NOT NULL,
pool_address TEXT NOT NULL,
factory TEXT NOT NULL,
router TEXT NOT NULL,
protocol TEXT NOT NULL,
token0 TEXT NOT NULL,
token1 TEXT NOT NULL,
amount0_in TEXT NOT NULL,
amount1_in TEXT NOT NULL,
amount0_out TEXT NOT NULL,
amount1_out TEXT NOT NULL,
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
amount_in_usd REAL NOT NULL,
amount_out_usd REAL NOT NULL,
fee_usd REAL NOT NULL,
price_impact REAL NOT NULL,
sqrt_price_x96 TEXT NOT NULL,
liquidity TEXT NOT NULL,
tick INTEGER NOT NULL,
fee INTEGER NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)`); err != nil {
return fmt.Errorf("failed to create swap_events table: %w", err)
}
// Create liquidity events table
if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS liquidity_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
block_number INTEGER NOT NULL,
tx_hash TEXT NOT NULL,
log_index INTEGER NOT NULL,
event_type TEXT NOT NULL,
pool_address TEXT NOT NULL,
factory TEXT NOT NULL,
router TEXT NOT NULL,
protocol TEXT NOT NULL,
token0 TEXT NOT NULL,
token1 TEXT NOT NULL,
token_id TEXT,
amount0 TEXT NOT NULL,
amount1 TEXT NOT NULL,
liquidity TEXT NOT NULL,
tick_lower INTEGER,
tick_upper INTEGER,
owner TEXT NOT NULL,
recipient TEXT NOT NULL,
amount0_usd REAL NOT NULL,
amount1_usd REAL NOT NULL,
total_usd REAL NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)`); err != nil {
return fmt.Errorf("failed to create liquidity_events table: %w", err)
}
// Create pool data table
if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS pool_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
address TEXT NOT NULL UNIQUE,
token0 TEXT NOT NULL,
token1 TEXT NOT NULL,
fee INTEGER NOT NULL,
liquidity TEXT NOT NULL,
sqrt_price_x96 TEXT NOT NULL,
tick INTEGER NOT NULL,
last_updated DATETIME NOT NULL,
protocol TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)`); err != nil {
return fmt.Errorf("failed to create pool_data table: %w", err)
}
// Create tokens table for metadata storage
if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
address TEXT NOT NULL UNIQUE,
symbol TEXT NOT NULL,
name TEXT NOT NULL,
decimals INTEGER NOT NULL,
total_supply TEXT,
is_stablecoin BOOLEAN NOT NULL DEFAULT FALSE,
is_wrapped BOOLEAN NOT NULL DEFAULT FALSE,
category TEXT NOT NULL DEFAULT 'unknown',
price_usd REAL,
price_eth REAL,
last_updated DATETIME NOT NULL,
risk_score REAL NOT NULL DEFAULT 0.5,
is_verified BOOLEAN NOT NULL DEFAULT FALSE,
contract_verified BOOLEAN NOT NULL DEFAULT FALSE,
implementation TEXT,
total_liquidity_usd REAL,
main_pool TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)`); err != nil {
return fmt.Errorf("failed to create tokens table: %w", err)
}
// Create indices for better performance
indices := []string{
`CREATE INDEX IF NOT EXISTS idx_swap_events_timestamp ON swap_events(timestamp)`,
`CREATE INDEX IF NOT EXISTS idx_swap_events_block_number ON swap_events(block_number)`,
`CREATE INDEX IF NOT EXISTS idx_swap_events_tx_hash ON swap_events(tx_hash)`,
`CREATE INDEX IF NOT EXISTS idx_swap_events_pool_address ON swap_events(pool_address)`,
`CREATE INDEX IF NOT EXISTS idx_liquidity_events_timestamp ON liquidity_events(timestamp)`,
`CREATE INDEX IF NOT EXISTS idx_liquidity_events_block_number ON liquidity_events(block_number)`,
`CREATE INDEX IF NOT EXISTS idx_liquidity_events_tx_hash ON liquidity_events(tx_hash)`,
`CREATE INDEX IF NOT EXISTS idx_liquidity_events_pool_address ON liquidity_events(pool_address)`,
`CREATE INDEX IF NOT EXISTS idx_pool_data_last_updated ON pool_data(last_updated)`,
`CREATE INDEX IF NOT EXISTS idx_pool_data_protocol ON pool_data(protocol)`,
`CREATE INDEX IF NOT EXISTS idx_tokens_symbol ON tokens(symbol)`,
`CREATE INDEX IF NOT EXISTS idx_tokens_category ON tokens(category)`,
`CREATE INDEX IF NOT EXISTS idx_tokens_address ON tokens(address)`,
`CREATE INDEX IF NOT EXISTS idx_tokens_last_updated ON tokens(last_updated)`,
}
// Execute all index creation statements
for _, stmt := range indices {
_, err := d.db.Exec(stmt)
if err != nil {
return fmt.Errorf("failed to execute index statement: %s, error: %w", stmt, err)
}
}
return nil
}
// InsertSwapEvent inserts a swap event into the database
func (d *Database) InsertSwapEvent(event *SwapEvent) error {
stmt, err := d.db.Prepare(`
INSERT OR IGNORE INTO swap_events (
timestamp, block_number, tx_hash, log_index, pool_address, factory, router, protocol,
token0, token1, amount0_in, amount1_in, amount0_out, amount1_out,
sender, recipient, sqrt_price_x96, liquidity, tick, fee,
amount_in_usd, amount_out_usd, fee_usd, price_impact
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
defer stmt.Close()
// Handle nil values safely
sqrtPrice := "0"
if event.SqrtPriceX96 != nil {
sqrtPrice = event.SqrtPriceX96.String()
}
liquidity := "0"
if event.Liquidity != nil {
liquidity = event.Liquidity.String()
}
_, err = stmt.Exec(
event.Timestamp.UTC().Format(time.RFC3339),
event.BlockNumber,
event.TxHash.Hex(),
event.LogIndex,
event.PoolAddress.Hex(),
event.Factory.Hex(),
event.Router.Hex(),
event.Protocol,
event.Token0.Hex(),
event.Token1.Hex(),
event.Amount0In.String(),
event.Amount1In.String(),
event.Amount0Out.String(),
event.Amount1Out.String(),
event.Sender.Hex(),
event.Recipient.Hex(),
sqrtPrice,
liquidity,
event.Tick,
event.Fee,
event.AmountInUSD,
event.AmountOutUSD,
event.FeeUSD,
event.PriceImpact,
)
if err != nil {
return fmt.Errorf("failed to insert swap event: %w", err)
}
d.logger.Debug(fmt.Sprintf("Inserted swap event for pool %s (tx: %s)", event.PoolAddress.Hex(), event.TxHash.Hex()))
return nil
}
// InsertLiquidityEvent inserts a liquidity event into the database
func (d *Database) InsertLiquidityEvent(event *LiquidityEvent) error {
stmt, err := d.db.Prepare(`
INSERT OR IGNORE INTO liquidity_events (
timestamp, block_number, tx_hash, log_index, event_type,
pool_address, factory, router, protocol, token0, token1,
amount0, amount1, liquidity, token_id, tick_lower, tick_upper,
owner, recipient, amount0_usd, amount1_usd, total_usd
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
defer stmt.Close()
// Handle nil values safely
tokenId := ""
if event.TokenId != nil {
tokenId = event.TokenId.String()
}
_, err = stmt.Exec(
event.Timestamp.UTC().Format(time.RFC3339),
event.BlockNumber,
event.TxHash.Hex(),
event.LogIndex,
event.EventType,
event.PoolAddress.Hex(),
event.Factory.Hex(),
event.Router.Hex(),
event.Protocol,
event.Token0.Hex(),
event.Token1.Hex(),
event.Amount0.String(),
event.Amount1.String(),
event.Liquidity.String(),
tokenId,
event.TickLower,
event.TickUpper,
event.Owner.Hex(),
event.Recipient.Hex(),
event.Amount0USD,
event.Amount1USD,
event.TotalUSD,
)
if err != nil {
return fmt.Errorf("failed to insert liquidity event: %w", err)
}
d.logger.Debug(fmt.Sprintf("Inserted %s liquidity event for pool %s (tx: %s)", event.EventType, event.PoolAddress.Hex(), event.TxHash.Hex()))
return nil
}
// InsertPoolData inserts or updates pool data in the database
func (d *Database) InsertPoolData(pool *PoolData) error {
stmt, err := d.db.Prepare(`
INSERT OR REPLACE INTO pool_data (
address, token0, token1, fee, liquidity, sqrt_price_x96, tick, last_updated, protocol
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
defer stmt.Close()
_, err = stmt.Exec(
pool.Address.Hex(),
pool.Token0.Hex(),
pool.Token1.Hex(),
pool.Fee,
pool.Liquidity.String(),
pool.SqrtPriceX96.String(),
pool.Tick,
pool.LastUpdated.UTC().Format(time.RFC3339), // Store in UTC RFC3339 format
pool.Protocol,
)
if err != nil {
return fmt.Errorf("failed to insert pool data: %w", err)
}
d.logger.Debug(fmt.Sprintf("Inserted/updated pool data for pool %s", pool.Address.Hex()))
return nil
}
// GetRecentSwapEvents retrieves recent swap events from the database
func (d *Database) GetRecentSwapEvents(limit int) ([]*SwapEvent, error) {
rows, err := d.db.Query(`
SELECT id, timestamp, block_number, tx_hash, log_index, pool_address, factory, router, protocol,
token0, token1, amount0_in, amount1_in, amount0_out, amount1_out,
sender, recipient, sqrt_price_x96, liquidity, tick, fee,
amount_in_usd, amount_out_usd, fee_usd, price_impact
FROM swap_events
ORDER BY timestamp DESC
LIMIT ?
`, limit)
if err != nil {
return nil, fmt.Errorf("failed to query swap events: %w", err)
}
defer rows.Close()
events := make([]*SwapEvent, 0)
for rows.Next() {
event := &SwapEvent{}
var txHash, poolAddr, factory, router, token0, token1, sender, recipient string
var amount0In, amount1In, amount0Out, amount1Out, sqrtPrice, liquidity string
err := rows.Scan(
&event.ID, &event.Timestamp, &event.BlockNumber, &txHash, &event.LogIndex, &poolAddr, &factory, &router, &event.Protocol,
&token0, &token1, &amount0In, &amount1In, &amount0Out, &amount1Out,
&sender, &recipient, &sqrtPrice, &liquidity, &event.Tick, &event.Fee,
&event.AmountInUSD, &event.AmountOutUSD, &event.FeeUSD, &event.PriceImpact,
)
if err != nil {
return nil, fmt.Errorf("failed to scan swap event: %w", err)
}
// Convert string values to proper types
event.TxHash = common.HexToHash(txHash)
event.PoolAddress = common.HexToAddress(poolAddr)
event.Factory = common.HexToAddress(factory)
event.Router = common.HexToAddress(router)
event.Token0 = common.HexToAddress(token0)
event.Token1 = common.HexToAddress(token1)
event.Sender = common.HexToAddress(sender)
event.Recipient = common.HexToAddress(recipient)
// Convert string amounts to big.Int
event.Amount0In = new(big.Int)
event.Amount0In.SetString(amount0In, 10)
event.Amount1In = new(big.Int)
event.Amount1In.SetString(amount1In, 10)
event.Amount0Out = new(big.Int)
event.Amount0Out.SetString(amount0Out, 10)
event.Amount1Out = new(big.Int)
event.Amount1Out.SetString(amount1Out, 10)
event.SqrtPriceX96 = new(big.Int)
event.SqrtPriceX96.SetString(sqrtPrice, 10)
event.Liquidity = new(big.Int)
event.Liquidity.SetString(liquidity, 10)
events = append(events, event)
}
return events, nil
}
// GetRecentLiquidityEvents retrieves recent liquidity events from the database
func (d *Database) GetRecentLiquidityEvents(limit int) ([]*LiquidityEvent, error) {
rows, err := d.db.Query(`
SELECT id, timestamp, block_number, tx_hash, log_index, event_type,
pool_address, factory, router, protocol, token0, token1,
amount0, amount1, liquidity, token_id, tick_lower, tick_upper,
owner, recipient, amount0_usd, amount1_usd, total_usd
FROM liquidity_events
ORDER BY timestamp DESC
LIMIT ?
`, limit)
if err != nil {
return nil, fmt.Errorf("failed to query liquidity events: %w", err)
}
defer rows.Close()
events := make([]*LiquidityEvent, 0)
for rows.Next() {
event := &LiquidityEvent{}
var txHash, poolAddr, factory, router, token0, token1, owner, recipient, tokenId string
var liquidity, amount0, amount1 string
err := rows.Scan(
&event.ID, &event.Timestamp, &event.BlockNumber, &txHash, &event.LogIndex, &event.EventType,
&poolAddr, &factory, &router, &event.Protocol, &token0, &token1,
&amount0, &amount1, &liquidity, &tokenId, &event.TickLower, &event.TickUpper,
&owner, &recipient, &event.Amount0USD, &event.Amount1USD, &event.TotalUSD,
)
if err != nil {
return nil, fmt.Errorf("failed to scan liquidity event: %w", err)
}
// Convert string values to proper types
event.TxHash = common.HexToHash(txHash)
event.PoolAddress = common.HexToAddress(poolAddr)
event.Factory = common.HexToAddress(factory)
event.Router = common.HexToAddress(router)
event.Token0 = common.HexToAddress(token0)
event.Token1 = common.HexToAddress(token1)
event.Owner = common.HexToAddress(owner)
event.Recipient = common.HexToAddress(recipient)
// Convert string amounts to big.Int
event.Liquidity = new(big.Int)
event.Liquidity.SetString(liquidity, 10)
event.Amount0 = new(big.Int)
event.Amount0.SetString(amount0, 10)
event.Amount1 = new(big.Int)
event.Amount1.SetString(amount1, 10)
// Convert TokenId if present
if tokenId != "" {
event.TokenId = new(big.Int)
event.TokenId.SetString(tokenId, 10)
}
events = append(events, event)
}
return events, nil
}
// GetPoolData retrieves pool data from the database
func (d *Database) GetPoolData(poolAddress common.Address) (*PoolData, error) {
row := d.db.QueryRow(`
SELECT id, address, token0, token1, fee, liquidity, sqrt_price_x96, tick, last_updated, protocol
FROM pool_data
WHERE address = ?
`, poolAddress.Hex())
pool := &PoolData{}
var addr, token0, token1, liquidity, sqrtPrice string
var lastUpdated string
err := row.Scan(
&pool.ID, &addr, &token0, &token1, &pool.Fee, &liquidity, &sqrtPrice, &pool.Tick, &lastUpdated, &pool.Protocol,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("pool data not found for address: %s", poolAddress.Hex())
}
return nil, fmt.Errorf("failed to scan pool data: %w", err)
}
// Convert string values to proper types
pool.Address = common.HexToAddress(addr)
pool.Token0 = common.HexToAddress(token0)
pool.Token1 = common.HexToAddress(token1)
// Convert string amounts to big.Int
pool.Liquidity = new(big.Int)
pool.Liquidity.SetString(liquidity, 10)
pool.SqrtPriceX96 = new(big.Int)
pool.SqrtPriceX96.SetString(sqrtPrice, 10)
// Parse timestamp
pool.LastUpdated, err = time.Parse(time.RFC3339, lastUpdated)
if err != nil {
return nil, fmt.Errorf("failed to parse timestamp: %w", err)
}
return pool, nil
}
// Close closes the database connection
func (d *Database) Close() error {
if d.db != nil {
return d.db.Close()
}
return nil
}