package marketmanager import ( "context" "database/sql" "encoding/json" "fmt" "math/big" "time" "github.com/ethereum/go-ethereum/common" _ "github.com/lib/pq" // PostgreSQL driver ) // DatabaseAdapter handles persistence of market data type DatabaseAdapter struct { db *sql.DB } // NewDatabaseAdapter creates a new database adapter func NewDatabaseAdapter(connectionString string) (*DatabaseAdapter, error) { db, err := sql.Open("postgres", connectionString) if err != nil { return nil, fmt.Errorf("failed to open database connection: %w", err) } // Test the connection ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := db.PingContext(ctx); err != nil { return nil, fmt.Errorf("failed to ping database: %w", err) } return &DatabaseAdapter{db: db}, nil } // InitializeSchema creates the necessary tables if they don't exist func (da *DatabaseAdapter) InitializeSchema() error { schema := ` CREATE TABLE IF NOT EXISTS markets ( key VARCHAR(66) PRIMARY KEY, factory_address VARCHAR(42) NOT NULL, pool_address VARCHAR(42) NOT NULL, token0_address VARCHAR(42) NOT NULL, token1_address VARCHAR(42) NOT NULL, fee INTEGER NOT NULL, ticker VARCHAR(50) NOT NULL, raw_ticker VARCHAR(90) NOT NULL, protocol VARCHAR(20) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS market_data ( id SERIAL PRIMARY KEY, market_key VARCHAR(66) NOT NULL REFERENCES markets(key) ON DELETE CASCADE, price NUMERIC NOT NULL, liquidity NUMERIC NOT NULL, sqrt_price_x96 NUMERIC, tick INTEGER, status VARCHAR(20) NOT NULL, timestamp BIGINT NOT NULL, block_number BIGINT NOT NULL, tx_hash VARCHAR(66) NOT NULL, source VARCHAR(10) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_market_data_market_key_timestamp ON market_data(market_key, timestamp); CREATE INDEX IF NOT EXISTS idx_market_data_status ON market_data(status); CREATE INDEX IF NOT EXISTS idx_market_data_block_number ON market_data(block_number); CREATE TABLE IF NOT EXISTS arbitrage_opportunities ( id SERIAL PRIMARY KEY, market_key_1 VARCHAR(66) NOT NULL REFERENCES markets(key) ON DELETE CASCADE, market_key_2 VARCHAR(66) NOT NULL REFERENCES markets(key) ON DELETE CASCADE, path TEXT NOT NULL, profit NUMERIC NOT NULL, gas_estimate NUMERIC NOT NULL, roi DECIMAL(10, 6) NOT NULL, status VARCHAR(20) NOT NULL, detection_timestamp BIGINT NOT NULL, execution_timestamp BIGINT, tx_hash VARCHAR(66), created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_arbitrage_opportunities_detection_timestamp ON arbitrage_opportunities(detection_timestamp); CREATE INDEX IF NOT EXISTS idx_arbitrage_opportunities_status ON arbitrage_opportunities(status); CREATE TABLE IF NOT EXISTS market_events ( id SERIAL PRIMARY KEY, market_key VARCHAR(66) NOT NULL REFERENCES markets(key) ON DELETE CASCADE, event_type VARCHAR(20) NOT NULL, amount0 NUMERIC, amount1 NUMERIC, transaction_hash VARCHAR(66) NOT NULL, block_number BIGINT NOT NULL, log_index INTEGER NOT NULL, timestamp BIGINT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_market_events_market_key_timestamp ON market_events(market_key, timestamp); CREATE INDEX IF NOT EXISTS idx_market_events_event_type ON market_events(event_type); CREATE INDEX IF NOT EXISTS idx_market_events_block_number ON market_events(block_number); ` _, err := da.db.Exec(schema) return err } // SaveMarket saves a market to the database func (da *DatabaseAdapter) SaveMarket(ctx context.Context, market *Market) error { query := ` INSERT INTO markets ( key, factory_address, pool_address, token0_address, token1_address, fee, ticker, raw_ticker, protocol, created_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT (key) DO UPDATE SET factory_address = EXCLUDED.factory_address, pool_address = EXCLUDED.pool_address, token0_address = EXCLUDED.token0_address, token1_address = EXCLUDED.token1_address, fee = EXCLUDED.fee, ticker = EXCLUDED.ticker, raw_ticker = EXCLUDED.raw_ticker, protocol = EXCLUDED.protocol, updated_at = CURRENT_TIMESTAMP ` _, err := da.db.ExecContext(ctx, query, market.Key, market.Factory.Hex(), market.PoolAddress.Hex(), market.Token0.Hex(), market.Token1.Hex(), market.Fee, market.Ticker, market.RawTicker, market.Protocol, ) return err } // SaveMarketData saves market data to the database func (da *DatabaseAdapter) SaveMarketData(ctx context.Context, market *Market, source string) error { query := ` INSERT INTO market_data ( market_key, price, liquidity, sqrt_price_x96, tick, status, timestamp, block_number, tx_hash, source, created_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ` // Convert big.Float to string for storage priceStr := "0" if market.Price != nil { priceStr = market.Price.Text('f', -1) } // Convert big.Int to string for storage liquidityStr := "0" if market.Liquidity != nil { liquidityStr = market.Liquidity.String() } sqrtPriceStr := "0" if market.SqrtPriceX96 != nil { sqrtPriceStr = market.SqrtPriceX96.String() } _, err := da.db.ExecContext(ctx, query, market.Key, priceStr, liquidityStr, sqrtPriceStr, market.Tick, string(market.Status), market.Timestamp, market.BlockNumber, market.TxHash.Hex(), source, ) return err } // GetMarket retrieves a market from the database func (da *DatabaseAdapter) GetMarket(ctx context.Context, key string) (*Market, error) { query := ` SELECT key, factory_address, pool_address, token0_address, token1_address, fee, ticker, raw_ticker, protocol FROM markets WHERE key = $1 ` row := da.db.QueryRowContext(ctx, query, key) var market Market var factoryAddr, poolAddr, token0Addr, token1Addr string err := row.Scan( &market.Key, &factoryAddr, &poolAddr, &token0Addr, &token1Addr, &market.Fee, &market.Ticker, &market.RawTicker, &market.Protocol, ) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("market not found: %w", err) } return nil, fmt.Errorf("failed to query market: %w", err) } // Convert string addresses back to common.Address market.Factory = common.HexToAddress(factoryAddr) market.PoolAddress = common.HexToAddress(poolAddr) market.Token0 = common.HexToAddress(token0Addr) market.Token1 = common.HexToAddress(token1Addr) // Initialize price data market.Price = big.NewFloat(0) market.Liquidity = big.NewInt(0) market.SqrtPriceX96 = big.NewInt(0) return &market, nil } // GetLatestMarketData retrieves the latest market data from the database func (da *DatabaseAdapter) GetLatestMarketData(ctx context.Context, marketKey string) (*Market, error) { query := ` SELECT price, liquidity, sqrt_price_x96, tick, status, timestamp, block_number, tx_hash FROM market_data WHERE market_key = $1 ORDER BY timestamp DESC LIMIT 1 ` row := da.db.QueryRowContext(ctx, query, marketKey) var priceStr, liquidityStr, sqrtPriceStr string var market Market err := row.Scan( &priceStr, &liquidityStr, &sqrtPriceStr, &market.Tick, &market.Status, &market.Timestamp, &market.BlockNumber, &market.TxHash, ) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("no market data found: %w", err) } return nil, fmt.Errorf("failed to query market data: %w", err) } // Convert strings back to big numbers if priceStr != "" { if price, ok := new(big.Float).SetString(priceStr); ok { market.Price = price } } if liquidityStr != "" { if liquidity, ok := new(big.Int).SetString(liquidityStr, 10); ok { market.Liquidity = liquidity } } if sqrtPriceStr != "" { if sqrtPrice, ok := new(big.Int).SetString(sqrtPriceStr, 10); ok { market.SqrtPriceX96 = sqrtPrice } } return &market, nil } // SaveArbitrageOpportunity saves an arbitrage opportunity to the database func (da *DatabaseAdapter) SaveArbitrageOpportunity(ctx context.Context, opportunity *DatabaseArbitrageOpportunity) error { // Serialize path to JSON pathJSON, err := json.Marshal(opportunity.Path) if err != nil { return fmt.Errorf("failed to serialize path: %w", err) } query := ` INSERT INTO arbitrage_opportunities ( market_key_1, market_key_2, path, profit, gas_estimate, roi, status, detection_timestamp, execution_timestamp, tx_hash, created_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ` profitStr := "0" if opportunity.Profit != nil { profitStr = opportunity.Profit.String() } gasEstimateStr := "0" if opportunity.GasEstimate != nil { gasEstimateStr = opportunity.GasEstimate.String() } _, err = da.db.ExecContext(ctx, query, opportunity.MarketKey1, opportunity.MarketKey2, string(pathJSON), profitStr, gasEstimateStr, opportunity.ROI, string(opportunity.Status), opportunity.DetectionTimestamp, opportunity.ExecutionTimestamp, opportunity.TxHash, ) return err } // Close closes the database connection func (da *DatabaseAdapter) Close() error { return da.db.Close() } // DatabaseArbitrageOpportunity represents a detected arbitrage opportunity for database storage type DatabaseArbitrageOpportunity struct { MarketKey1 string MarketKey2 string Path []string Profit *big.Int GasEstimate *big.Int ROI float64 Status string DetectionTimestamp int64 ExecutionTimestamp int64 TxHash string }