Files
mev-beta/pkg/marketmanager/database.go
Krypto Kajun fac8a64092 feat: Implement comprehensive Market Manager with database and logging
- Add complete Market Manager package with in-memory storage and CRUD operations
- Implement arbitrage detection with profit calculations and thresholds
- Add database adapter with PostgreSQL schema for persistence
- Create comprehensive logging system with specialized log files
- Add detailed documentation and implementation plans
- Include example application and comprehensive test suite
- Update Makefile with market manager build targets
- Add check-implementations command for verification
2025-09-18 03:52:33 -05:00

354 lines
9.7 KiB
Go

package marketmanager
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
_ "github.com/lib/pq" // PostgreSQL driver
)
// DatabaseAdapter handles persistence of market data
type DatabaseAdapter struct {
db *sql.DB
}
// NewDatabaseAdapter creates a new database adapter
func NewDatabaseAdapter(connectionString string) (*DatabaseAdapter, error) {
db, err := sql.Open("postgres", connectionString)
if err != nil {
return nil, fmt.Errorf("failed to open database connection: %w", err)
}
// Test the connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("failed to ping database: %w", err)
}
return &DatabaseAdapter{db: db}, nil
}
// InitializeSchema creates the necessary tables if they don't exist
func (da *DatabaseAdapter) InitializeSchema() error {
schema := `
CREATE TABLE IF NOT EXISTS markets (
key VARCHAR(66) PRIMARY KEY,
factory_address VARCHAR(42) NOT NULL,
pool_address VARCHAR(42) NOT NULL,
token0_address VARCHAR(42) NOT NULL,
token1_address VARCHAR(42) NOT NULL,
fee INTEGER NOT NULL,
ticker VARCHAR(50) NOT NULL,
raw_ticker VARCHAR(90) NOT NULL,
protocol VARCHAR(20) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS market_data (
id SERIAL PRIMARY KEY,
market_key VARCHAR(66) NOT NULL REFERENCES markets(key) ON DELETE CASCADE,
price NUMERIC NOT NULL,
liquidity NUMERIC NOT NULL,
sqrt_price_x96 NUMERIC,
tick INTEGER,
status VARCHAR(20) NOT NULL,
timestamp BIGINT NOT NULL,
block_number BIGINT NOT NULL,
tx_hash VARCHAR(66) NOT NULL,
source VARCHAR(10) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_market_data_market_key_timestamp ON market_data(market_key, timestamp);
CREATE INDEX IF NOT EXISTS idx_market_data_status ON market_data(status);
CREATE INDEX IF NOT EXISTS idx_market_data_block_number ON market_data(block_number);
CREATE TABLE IF NOT EXISTS arbitrage_opportunities (
id SERIAL PRIMARY KEY,
market_key_1 VARCHAR(66) NOT NULL REFERENCES markets(key) ON DELETE CASCADE,
market_key_2 VARCHAR(66) NOT NULL REFERENCES markets(key) ON DELETE CASCADE,
path TEXT NOT NULL,
profit NUMERIC NOT NULL,
gas_estimate NUMERIC NOT NULL,
roi DECIMAL(10, 6) NOT NULL,
status VARCHAR(20) NOT NULL,
detection_timestamp BIGINT NOT NULL,
execution_timestamp BIGINT,
tx_hash VARCHAR(66),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_arbitrage_opportunities_detection_timestamp ON arbitrage_opportunities(detection_timestamp);
CREATE INDEX IF NOT EXISTS idx_arbitrage_opportunities_status ON arbitrage_opportunities(status);
CREATE TABLE IF NOT EXISTS market_events (
id SERIAL PRIMARY KEY,
market_key VARCHAR(66) NOT NULL REFERENCES markets(key) ON DELETE CASCADE,
event_type VARCHAR(20) NOT NULL,
amount0 NUMERIC,
amount1 NUMERIC,
transaction_hash VARCHAR(66) NOT NULL,
block_number BIGINT NOT NULL,
log_index INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_market_events_market_key_timestamp ON market_events(market_key, timestamp);
CREATE INDEX IF NOT EXISTS idx_market_events_event_type ON market_events(event_type);
CREATE INDEX IF NOT EXISTS idx_market_events_block_number ON market_events(block_number);
`
_, err := da.db.Exec(schema)
return err
}
// SaveMarket saves a market to the database
func (da *DatabaseAdapter) SaveMarket(ctx context.Context, market *Market) error {
query := `
INSERT INTO markets (
key, factory_address, pool_address, token0_address, token1_address,
fee, ticker, raw_ticker, protocol, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (key) DO UPDATE SET
factory_address = EXCLUDED.factory_address,
pool_address = EXCLUDED.pool_address,
token0_address = EXCLUDED.token0_address,
token1_address = EXCLUDED.token1_address,
fee = EXCLUDED.fee,
ticker = EXCLUDED.ticker,
raw_ticker = EXCLUDED.raw_ticker,
protocol = EXCLUDED.protocol,
updated_at = CURRENT_TIMESTAMP
`
_, err := da.db.ExecContext(ctx, query,
market.Key,
market.Factory.Hex(),
market.PoolAddress.Hex(),
market.Token0.Hex(),
market.Token1.Hex(),
market.Fee,
market.Ticker,
market.RawTicker,
market.Protocol,
)
return err
}
// SaveMarketData saves market data to the database
func (da *DatabaseAdapter) SaveMarketData(ctx context.Context, market *Market, source string) error {
query := `
INSERT INTO market_data (
market_key, price, liquidity, sqrt_price_x96, tick,
status, timestamp, block_number, tx_hash, source, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
`
// Convert big.Float to string for storage
priceStr := "0"
if market.Price != nil {
priceStr = market.Price.Text('f', -1)
}
// Convert big.Int to string for storage
liquidityStr := "0"
if market.Liquidity != nil {
liquidityStr = market.Liquidity.String()
}
sqrtPriceStr := "0"
if market.SqrtPriceX96 != nil {
sqrtPriceStr = market.SqrtPriceX96.String()
}
_, err := da.db.ExecContext(ctx, query,
market.Key,
priceStr,
liquidityStr,
sqrtPriceStr,
market.Tick,
string(market.Status),
market.Timestamp,
market.BlockNumber,
market.TxHash.Hex(),
source,
)
return err
}
// GetMarket retrieves a market from the database
func (da *DatabaseAdapter) GetMarket(ctx context.Context, key string) (*Market, error) {
query := `
SELECT key, factory_address, pool_address, token0_address, token1_address,
fee, ticker, raw_ticker, protocol
FROM markets
WHERE key = $1
`
row := da.db.QueryRowContext(ctx, query, key)
var market Market
var factoryAddr, poolAddr, token0Addr, token1Addr string
err := row.Scan(
&market.Key,
&factoryAddr,
&poolAddr,
&token0Addr,
&token1Addr,
&market.Fee,
&market.Ticker,
&market.RawTicker,
&market.Protocol,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("market not found: %w", err)
}
return nil, fmt.Errorf("failed to query market: %w", err)
}
// Convert string addresses back to common.Address
market.Factory = common.HexToAddress(factoryAddr)
market.PoolAddress = common.HexToAddress(poolAddr)
market.Token0 = common.HexToAddress(token0Addr)
market.Token1 = common.HexToAddress(token1Addr)
// Initialize price data
market.Price = big.NewFloat(0)
market.Liquidity = big.NewInt(0)
market.SqrtPriceX96 = big.NewInt(0)
return &market, nil
}
// GetLatestMarketData retrieves the latest market data from the database
func (da *DatabaseAdapter) GetLatestMarketData(ctx context.Context, marketKey string) (*Market, error) {
query := `
SELECT price, liquidity, sqrt_price_x96, tick, status, timestamp, block_number, tx_hash
FROM market_data
WHERE market_key = $1
ORDER BY timestamp DESC
LIMIT 1
`
row := da.db.QueryRowContext(ctx, query, marketKey)
var priceStr, liquidityStr, sqrtPriceStr string
var market Market
err := row.Scan(
&priceStr,
&liquidityStr,
&sqrtPriceStr,
&market.Tick,
&market.Status,
&market.Timestamp,
&market.BlockNumber,
&market.TxHash,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("no market data found: %w", err)
}
return nil, fmt.Errorf("failed to query market data: %w", err)
}
// Convert strings back to big numbers
if priceStr != "" {
if price, ok := new(big.Float).SetString(priceStr); ok {
market.Price = price
}
}
if liquidityStr != "" {
if liquidity, ok := new(big.Int).SetString(liquidityStr, 10); ok {
market.Liquidity = liquidity
}
}
if sqrtPriceStr != "" {
if sqrtPrice, ok := new(big.Int).SetString(sqrtPriceStr, 10); ok {
market.SqrtPriceX96 = sqrtPrice
}
}
return &market, nil
}
// SaveArbitrageOpportunity saves an arbitrage opportunity to the database
func (da *DatabaseAdapter) SaveArbitrageOpportunity(ctx context.Context, opportunity *DatabaseArbitrageOpportunity) error {
// Serialize path to JSON
pathJSON, err := json.Marshal(opportunity.Path)
if err != nil {
return fmt.Errorf("failed to serialize path: %w", err)
}
query := `
INSERT INTO arbitrage_opportunities (
market_key_1, market_key_2, path, profit, gas_estimate, roi,
status, detection_timestamp, execution_timestamp, tx_hash,
created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
`
profitStr := "0"
if opportunity.Profit != nil {
profitStr = opportunity.Profit.String()
}
gasEstimateStr := "0"
if opportunity.GasEstimate != nil {
gasEstimateStr = opportunity.GasEstimate.String()
}
_, err = da.db.ExecContext(ctx, query,
opportunity.MarketKey1,
opportunity.MarketKey2,
string(pathJSON),
profitStr,
gasEstimateStr,
opportunity.ROI,
string(opportunity.Status),
opportunity.DetectionTimestamp,
opportunity.ExecutionTimestamp,
opportunity.TxHash,
)
return err
}
// Close closes the database connection
func (da *DatabaseAdapter) Close() error {
return da.db.Close()
}
// DatabaseArbitrageOpportunity represents a detected arbitrage opportunity for database storage
type DatabaseArbitrageOpportunity struct {
MarketKey1 string
MarketKey2 string
Path []string
Profit *big.Int
GasEstimate *big.Int
ROI float64
Status string
DetectionTimestamp int64
ExecutionTimestamp int64
TxHash string
}