- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing - Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives - Added LRU caching system for address validation with 10-minute TTL - Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures - Fixed duplicate function declarations and import conflicts across multiple files - Added error recovery mechanisms with multiple fallback strategies - Updated tests to handle new validation behavior for suspicious addresses - Fixed parser test expectations for improved validation system - Applied gofmt formatting fixes to ensure code style compliance - Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot - Resolved critical security vulnerabilities in heuristic address extraction - Progress: Updated TODO audit from 10% to 35% complete 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
424 lines
13 KiB
Go
424 lines
13 KiB
Go
package arbitrage
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/big"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
_ "github.com/mattn/go-sqlite3"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
pkgtypes "github.com/fraktal/mev-beta/pkg/types"
|
|
)
|
|
|
|
// SQLiteDatabase implements ArbitrageDatabase using SQLite
|
|
type SQLiteDatabase struct {
|
|
db *sql.DB
|
|
logger *logger.Logger
|
|
}
|
|
|
|
// NewSQLiteDatabase creates a new SQLite database for arbitrage data
|
|
func NewSQLiteDatabase(dbPath string, logger *logger.Logger) (*SQLiteDatabase, error) {
|
|
db, err := sql.Open("sqlite3", dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
|
|
database := &SQLiteDatabase{
|
|
db: db,
|
|
logger: logger,
|
|
}
|
|
|
|
if err := database.createTables(); err != nil {
|
|
return nil, fmt.Errorf("failed to create tables: %w", err)
|
|
}
|
|
|
|
return database, nil
|
|
}
|
|
|
|
// createTables creates the necessary database tables
|
|
func (db *SQLiteDatabase) createTables() error {
|
|
queries := []string{
|
|
`CREATE TABLE IF NOT EXISTS arbitrage_opportunities (
|
|
id TEXT PRIMARY KEY,
|
|
path_json TEXT NOT NULL,
|
|
trigger_event_json TEXT NOT NULL,
|
|
detected_at INTEGER NOT NULL,
|
|
estimated_profit TEXT NOT NULL,
|
|
required_amount TEXT NOT NULL,
|
|
urgency INTEGER NOT NULL,
|
|
expires_at INTEGER NOT NULL,
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now'))
|
|
)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS arbitrage_executions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
opportunity_id TEXT,
|
|
transaction_hash TEXT NOT NULL,
|
|
gas_used INTEGER NOT NULL,
|
|
gas_price TEXT NOT NULL,
|
|
profit_realized TEXT NOT NULL,
|
|
success BOOLEAN NOT NULL,
|
|
error_message TEXT,
|
|
execution_time_ms INTEGER NOT NULL,
|
|
path_json TEXT NOT NULL,
|
|
executed_at INTEGER DEFAULT (strftime('%s', 'now')),
|
|
FOREIGN KEY(opportunity_id) REFERENCES arbitrage_opportunities(id)
|
|
)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS pool_data (
|
|
address TEXT PRIMARY KEY,
|
|
token0 TEXT NOT NULL,
|
|
token1 TEXT NOT NULL,
|
|
protocol TEXT NOT NULL,
|
|
fee INTEGER NOT NULL,
|
|
liquidity TEXT NOT NULL,
|
|
sqrt_price_x96 TEXT NOT NULL,
|
|
tick INTEGER,
|
|
block_number INTEGER NOT NULL,
|
|
transaction_hash TEXT NOT NULL,
|
|
log_index INTEGER NOT NULL,
|
|
last_updated INTEGER NOT NULL,
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now'))
|
|
)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS swap_events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
transaction_hash TEXT NOT NULL,
|
|
pool_address TEXT NOT NULL,
|
|
token0 TEXT NOT NULL,
|
|
token1 TEXT NOT NULL,
|
|
amount0 TEXT NOT NULL,
|
|
amount1 TEXT NOT NULL,
|
|
sqrt_price_x96 TEXT NOT NULL,
|
|
liquidity TEXT NOT NULL,
|
|
tick INTEGER,
|
|
block_number INTEGER NOT NULL,
|
|
log_index INTEGER NOT NULL,
|
|
timestamp INTEGER NOT NULL,
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now'))
|
|
)`,
|
|
}
|
|
|
|
for _, query := range queries {
|
|
if _, err := db.db.Exec(query); err != nil {
|
|
return fmt.Errorf("failed to create table: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create indexes for better performance
|
|
indexes := []string{
|
|
`CREATE INDEX IF NOT EXISTS idx_opportunities_detected_at ON arbitrage_opportunities(detected_at)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_opportunities_urgency ON arbitrage_opportunities(urgency)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_executions_executed_at ON arbitrage_executions(executed_at)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_executions_success ON arbitrage_executions(success)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_pool_data_updated ON pool_data(last_updated)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_pool_data_tokens ON pool_data(token0, token1)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_swap_events_pool ON swap_events(pool_address)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_swap_events_block ON swap_events(block_number)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_swap_events_timestamp ON swap_events(timestamp)`,
|
|
}
|
|
|
|
for _, index := range indexes {
|
|
if _, err := db.db.Exec(index); err != nil {
|
|
return fmt.Errorf("failed to create index: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SaveOpportunity saves an arbitrage opportunity to the database
|
|
func (db *SQLiteDatabase) SaveOpportunity(ctx context.Context, opportunity *pkgtypes.ArbitrageOpportunity) error {
|
|
pathJSON, err := json.Marshal(opportunity.Path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal path: %w", err)
|
|
}
|
|
|
|
// Create empty trigger event for compatibility
|
|
triggerEvent := map[string]interface{}{
|
|
"protocol": opportunity.Protocol,
|
|
"tokenIn": opportunity.TokenIn.Hex(),
|
|
"tokenOut": opportunity.TokenOut.Hex(),
|
|
}
|
|
eventJSON, err := json.Marshal(triggerEvent)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal trigger event: %w", err)
|
|
}
|
|
|
|
// Generate a simple ID from timestamp and token addresses
|
|
opportunityID := fmt.Sprintf("%s_%s_%d",
|
|
opportunity.TokenIn.Hex()[:8],
|
|
opportunity.TokenOut.Hex()[:8],
|
|
opportunity.Timestamp)
|
|
|
|
query := `INSERT INTO arbitrage_opportunities
|
|
(id, path_json, trigger_event_json, detected_at, estimated_profit, required_amount, urgency, expires_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
|
|
|
|
_, err = db.db.ExecContext(ctx, query,
|
|
opportunityID,
|
|
string(pathJSON),
|
|
string(eventJSON),
|
|
opportunity.Timestamp,
|
|
opportunity.Profit.String(),
|
|
opportunity.AmountIn.String(),
|
|
1, // Default urgency
|
|
opportunity.Timestamp+3600, // Expires in 1 hour
|
|
)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save opportunity: %w", err)
|
|
}
|
|
|
|
db.logger.Debug(fmt.Sprintf("Saved arbitrage opportunity %s to database", opportunityID))
|
|
return nil
|
|
}
|
|
|
|
// SaveExecution saves an arbitrage execution result to the database
|
|
func (db *SQLiteDatabase) SaveExecution(ctx context.Context, result *ExecutionResult) error {
|
|
pathJSON, err := json.Marshal(result.Path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal path: %w", err)
|
|
}
|
|
|
|
var opportunityID *string
|
|
if result.Path != nil {
|
|
// Try to find the opportunity ID from the path
|
|
// This is a simplified approach - in production you'd want better linking
|
|
id := generateOpportunityIDFromPath(result.Path)
|
|
opportunityID = &id
|
|
}
|
|
|
|
var errorMessage *string
|
|
if result.Error != nil {
|
|
msg := result.Error.Error()
|
|
errorMessage = &msg
|
|
}
|
|
|
|
query := `INSERT INTO arbitrage_executions
|
|
(opportunity_id, transaction_hash, gas_used, gas_price, profit_realized, success,
|
|
error_message, execution_time_ms, path_json)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
|
|
|
|
_, err = db.db.ExecContext(ctx, query,
|
|
opportunityID,
|
|
result.TransactionHash.Hex(),
|
|
result.GasUsed,
|
|
result.GasPrice.String(),
|
|
result.ProfitRealized.String(),
|
|
result.Success,
|
|
errorMessage,
|
|
result.ExecutionTime.Milliseconds(),
|
|
string(pathJSON),
|
|
)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save execution: %w", err)
|
|
}
|
|
|
|
db.logger.Debug(fmt.Sprintf("Saved arbitrage execution %s to database", result.TransactionHash.Hex()))
|
|
return nil
|
|
}
|
|
|
|
// GetExecutionHistory retrieves historical arbitrage executions
|
|
func (db *SQLiteDatabase) GetExecutionHistory(ctx context.Context, limit int) ([]*ExecutionResult, error) {
|
|
query := `SELECT transaction_hash, gas_used, gas_price, profit_realized, success,
|
|
error_message, execution_time_ms, path_json, executed_at
|
|
FROM arbitrage_executions
|
|
ORDER BY executed_at DESC
|
|
LIMIT ?`
|
|
|
|
rows, err := db.db.QueryContext(ctx, query, limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query execution history: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var results []*ExecutionResult
|
|
for rows.Next() {
|
|
var txHashStr, gasPriceStr, profitStr, pathJSON string
|
|
var gasUsed, executionTimeMs, executedAt int64
|
|
var success bool
|
|
var errorMessage *string
|
|
|
|
err := rows.Scan(&txHashStr, &gasUsed, &gasPriceStr, &profitStr, &success,
|
|
&errorMessage, &executionTimeMs, &pathJSON, &executedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan execution row: %w", err)
|
|
}
|
|
|
|
// Parse path JSON
|
|
var path ArbitragePath
|
|
if err := json.Unmarshal([]byte(pathJSON), &path); err != nil {
|
|
db.logger.Warn(fmt.Sprintf("Failed to unmarshal path JSON: %v", err))
|
|
continue
|
|
}
|
|
|
|
result := &ExecutionResult{
|
|
TransactionHash: common.HexToHash(txHashStr),
|
|
GasUsed: uint64(gasUsed),
|
|
Success: success,
|
|
ExecutionTime: time.Duration(executionTimeMs) * time.Millisecond,
|
|
Path: &path,
|
|
}
|
|
|
|
// Parse gas price and profit
|
|
result.GasPrice, _ = parseBigInt(gasPriceStr)
|
|
result.ProfitRealized, _ = parseBigInt(profitStr)
|
|
|
|
// Parse error message
|
|
if errorMessage != nil {
|
|
result.Error = fmt.Errorf("execution error: %s", *errorMessage)
|
|
}
|
|
|
|
results = append(results, result)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("rows iteration error: %w", err)
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// SavePoolData saves pool data to the database
|
|
func (db *SQLiteDatabase) SavePoolData(ctx context.Context, poolData *SimplePoolData) error {
|
|
query := `INSERT OR REPLACE INTO pool_data
|
|
(address, token0, token1, protocol, fee, liquidity, sqrt_price_x96, tick,
|
|
block_number, transaction_hash, log_index, last_updated)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
|
|
|
|
_, err := db.db.ExecContext(ctx, query,
|
|
poolData.Address.Hex(),
|
|
poolData.Token0.Hex(),
|
|
poolData.Token1.Hex(),
|
|
"UniswapV3", // Default protocol
|
|
poolData.Fee,
|
|
poolData.Liquidity.String(),
|
|
poolData.SqrtPriceX96.String(),
|
|
poolData.Tick,
|
|
poolData.BlockNumber,
|
|
poolData.TxHash.Hex(),
|
|
poolData.LogIndex,
|
|
poolData.LastUpdated.Unix(),
|
|
)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save pool data: %w", err)
|
|
}
|
|
|
|
db.logger.Debug(fmt.Sprintf("Saved pool data %s to database", poolData.Address.Hex()))
|
|
return nil
|
|
}
|
|
|
|
// GetPoolData retrieves pool data from the database
|
|
func (db *SQLiteDatabase) GetPoolData(ctx context.Context, poolAddress common.Address) (*SimplePoolData, error) {
|
|
query := `SELECT address, token0, token1, protocol, fee, liquidity, sqrt_price_x96, tick,
|
|
block_number, transaction_hash, log_index, last_updated
|
|
FROM pool_data WHERE address = ?`
|
|
|
|
row := db.db.QueryRowContext(ctx, query, poolAddress.Hex())
|
|
|
|
var addrStr, token0Str, token1Str, protocol, liquidityStr, sqrtPriceStr, txHashStr string
|
|
var fee, tick, blockNumber, logIndex, lastUpdated int64
|
|
|
|
err := row.Scan(&addrStr, &token0Str, &token1Str, &protocol, &fee,
|
|
&liquidityStr, &sqrtPriceStr, &tick, &blockNumber, &txHashStr, &logIndex, &lastUpdated)
|
|
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, fmt.Errorf("pool data not found")
|
|
}
|
|
return nil, fmt.Errorf("failed to query pool data: %w", err)
|
|
}
|
|
|
|
// Parse the data
|
|
liquidity, ok1 := parseBigInt(liquidityStr)
|
|
sqrtPriceX96, ok2 := parseBigInt(sqrtPriceStr)
|
|
|
|
if !ok1 || !ok2 {
|
|
return nil, fmt.Errorf("failed to parse pool numeric data")
|
|
}
|
|
|
|
poolData := &SimplePoolData{
|
|
Address: common.HexToAddress(addrStr),
|
|
Token0: common.HexToAddress(token0Str),
|
|
Token1: common.HexToAddress(token1Str),
|
|
Fee: fee,
|
|
Liquidity: liquidity,
|
|
SqrtPriceX96: sqrtPriceX96,
|
|
Tick: int32(tick),
|
|
BlockNumber: uint64(blockNumber),
|
|
TxHash: common.HexToHash(txHashStr),
|
|
LogIndex: uint(logIndex),
|
|
LastUpdated: time.Unix(lastUpdated, 0),
|
|
}
|
|
|
|
return poolData, nil
|
|
}
|
|
|
|
// GetOpportunityCount returns the total number of opportunities detected
|
|
func (db *SQLiteDatabase) GetOpportunityCount(ctx context.Context) (int64, error) {
|
|
var count int64
|
|
err := db.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM arbitrage_opportunities").Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
// GetExecutionCount returns the total number of executions attempted
|
|
func (db *SQLiteDatabase) GetExecutionCount(ctx context.Context) (int64, error) {
|
|
var count int64
|
|
err := db.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM arbitrage_executions").Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
// GetSuccessfulExecutionCount returns the number of successful executions
|
|
func (db *SQLiteDatabase) GetSuccessfulExecutionCount(ctx context.Context) (int64, error) {
|
|
var count int64
|
|
err := db.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM arbitrage_executions WHERE success = 1").Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
// GetTotalProfit returns the total profit realized from all successful executions
|
|
func (db *SQLiteDatabase) GetTotalProfit(ctx context.Context) (string, error) {
|
|
var totalProfit sql.NullString
|
|
err := db.db.QueryRowContext(ctx,
|
|
"SELECT SUM(CAST(profit_realized AS REAL)) FROM arbitrage_executions WHERE success = 1").Scan(&totalProfit)
|
|
|
|
if err != nil {
|
|
return "0", err
|
|
}
|
|
|
|
if !totalProfit.Valid {
|
|
return "0", nil
|
|
}
|
|
|
|
return totalProfit.String, nil
|
|
}
|
|
|
|
// Close closes the database connection
|
|
func (db *SQLiteDatabase) Close() error {
|
|
return db.db.Close()
|
|
}
|
|
|
|
// Helper functions
|
|
|
|
// parseBigInt safely parses a big integer from string
|
|
func parseBigInt(s string) (*big.Int, bool) {
|
|
result := new(big.Int)
|
|
result, ok := result.SetString(s, 10)
|
|
return result, ok
|
|
}
|
|
|
|
// generateOpportunityIDFromPath generates a consistent ID from path data
|
|
func generateOpportunityIDFromPath(path *ArbitragePath) string {
|
|
if path == nil || len(path.Tokens) == 0 {
|
|
return fmt.Sprintf("unknown_%d", time.Now().UnixNano())
|
|
}
|
|
return fmt.Sprintf("path_%s_%d", path.Tokens[0].Hex()[:8], time.Now().UnixNano())
|
|
}
|