Completed clean root directory structure: - Root now contains only: .git, .env, docs/, orig/ - Moved all remaining files and directories to orig/: - Config files (.claude, .dockerignore, .drone.yml, etc.) - All .env variants (except active .env) - Git config (.gitconfig, .github, .gitignore, etc.) - Tool configs (.golangci.yml, .revive.toml, etc.) - Documentation (*.md files, @prompts) - Build files (Dockerfiles, Makefile, go.mod, go.sum) - Docker compose files - All source directories (scripts, tests, tools, etc.) - Runtime directories (logs, monitoring, reports) - Dependency files (node_modules, lib, cache) - Special files (--delete) - Removed empty runtime directories (bin/, data/) V2 structure is now clean: - docs/planning/ - V2 planning documents - orig/ - Complete V1 codebase preserved - .env - Active environment config (not in git) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
578 lines
15 KiB
Go
578 lines
15 KiB
Go
//go:build integration && legacy && forked
|
|
// +build integration,legacy,forked
|
|
|
|
package test_main
|
|
|
|
import (
|
|
"compress/gzip"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
)
|
|
|
|
// NewTransactionStorage creates a new transaction storage system
|
|
func NewTransactionStorage(config *SequencerConfig, logger *logger.Logger) (*TransactionStorage, error) {
|
|
dataDir := config.DataDir
|
|
if err := os.MkdirAll(dataDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create data directory: %w", err)
|
|
}
|
|
|
|
storage := &TransactionStorage{
|
|
config: config,
|
|
logger: logger,
|
|
dataDir: dataDir,
|
|
indexFile: filepath.Join(dataDir, "transaction_index.json"),
|
|
index: make(map[string]*TransactionIndex),
|
|
}
|
|
|
|
// Load existing index
|
|
if err := storage.loadIndex(); err != nil {
|
|
logger.Warn(fmt.Sprintf("Failed to load existing index: %v", err))
|
|
}
|
|
|
|
return storage, nil
|
|
}
|
|
|
|
// StoreBatch stores a batch of transactions
|
|
func (ts *TransactionStorage) StoreBatch(transactions []*RealTransactionData) error {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
|
|
for _, tx := range transactions {
|
|
if err := ts.storeTransaction(tx); err != nil {
|
|
ts.logger.Error(fmt.Sprintf("Failed to store transaction %s: %v", tx.Hash.Hex(), err))
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Save updated index
|
|
return ts.saveIndex()
|
|
}
|
|
|
|
// storeTransaction stores a single transaction
|
|
func (ts *TransactionStorage) storeTransaction(tx *RealTransactionData) error {
|
|
// Create filename based on block and hash
|
|
filename := fmt.Sprintf("block_%d_tx_%s.json", tx.BlockNumber, tx.Hash.Hex())
|
|
if ts.config.CompressionLevel > 0 {
|
|
filename += ".gz"
|
|
}
|
|
|
|
filePath := filepath.Join(ts.dataDir, filename)
|
|
|
|
// Serialize transaction data
|
|
data, err := json.MarshalIndent(tx, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal transaction: %w", err)
|
|
}
|
|
|
|
// Write file
|
|
if ts.config.CompressionLevel > 0 {
|
|
if err := ts.writeCompressedFile(filePath, data); err != nil {
|
|
return fmt.Errorf("failed to write compressed file: %w", err)
|
|
}
|
|
} else {
|
|
if err := os.WriteFile(filePath, data, 0644); err != nil {
|
|
return fmt.Errorf("failed to write file: %w", err)
|
|
}
|
|
}
|
|
|
|
// Update index
|
|
indexEntry := &TransactionIndex{
|
|
Hash: tx.Hash.Hex(),
|
|
BlockNumber: tx.BlockNumber,
|
|
Protocol: "",
|
|
ValueUSD: tx.EstimatedValueUSD,
|
|
MEVType: tx.MEVClassification,
|
|
FilePath: filePath,
|
|
StoredAt: time.Now(),
|
|
DataSize: int64(len(data)),
|
|
}
|
|
|
|
if tx.ParsedDEX != nil {
|
|
indexEntry.Protocol = tx.ParsedDEX.Protocol
|
|
}
|
|
|
|
if ts.config.CompressionLevel > 0 {
|
|
indexEntry.CompressionMeta = map[string]interface{}{
|
|
"compression_level": ts.config.CompressionLevel,
|
|
"original_size": len(data),
|
|
}
|
|
}
|
|
|
|
ts.index[tx.Hash.Hex()] = indexEntry
|
|
|
|
return nil
|
|
}
|
|
|
|
// writeCompressedFile writes data to a gzip compressed file
|
|
func (ts *TransactionStorage) writeCompressedFile(filePath string, data []byte) error {
|
|
file, err := os.Create(filePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
gzWriter, err := gzip.NewWriterLevel(file, ts.config.CompressionLevel)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer gzWriter.Close()
|
|
|
|
_, err = gzWriter.Write(data)
|
|
return err
|
|
}
|
|
|
|
// LoadTransaction loads a transaction by hash
|
|
func (ts *TransactionStorage) LoadTransaction(hash string) (*RealTransactionData, error) {
|
|
ts.mu.RLock()
|
|
indexEntry, exists := ts.index[hash]
|
|
ts.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, fmt.Errorf("transaction %s not found in index", hash)
|
|
}
|
|
|
|
// Read file
|
|
var data []byte
|
|
var err error
|
|
|
|
if strings.HasSuffix(indexEntry.FilePath, ".gz") {
|
|
data, err = ts.readCompressedFile(indexEntry.FilePath)
|
|
} else {
|
|
data, err = os.ReadFile(indexEntry.FilePath)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read transaction file: %w", err)
|
|
}
|
|
|
|
// Deserialize
|
|
var tx RealTransactionData
|
|
if err := json.Unmarshal(data, &tx); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal transaction: %w", err)
|
|
}
|
|
|
|
return &tx, nil
|
|
}
|
|
|
|
// readCompressedFile reads data from a gzip compressed file
|
|
func (ts *TransactionStorage) readCompressedFile(filePath string) ([]byte, error) {
|
|
file, err := os.Open(filePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer file.Close()
|
|
|
|
gzReader, err := gzip.NewReader(file)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer gzReader.Close()
|
|
|
|
return io.ReadAll(gzReader)
|
|
}
|
|
|
|
// GetTransactionsByProtocol returns transactions for a specific protocol
|
|
func (ts *TransactionStorage) GetTransactionsByProtocol(protocol string) ([]*TransactionIndex, error) {
|
|
ts.mu.RLock()
|
|
defer ts.mu.RUnlock()
|
|
|
|
var results []*TransactionIndex
|
|
for _, entry := range ts.index {
|
|
if strings.EqualFold(entry.Protocol, protocol) {
|
|
results = append(results, entry)
|
|
}
|
|
}
|
|
|
|
// Sort by block number
|
|
sort.Slice(results, func(i, j int) bool {
|
|
return results[i].BlockNumber < results[j].BlockNumber
|
|
})
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// GetTransactionsByValueRange returns transactions within a value range
|
|
func (ts *TransactionStorage) GetTransactionsByValueRange(minUSD, maxUSD float64) ([]*TransactionIndex, error) {
|
|
ts.mu.RLock()
|
|
defer ts.mu.RUnlock()
|
|
|
|
var results []*TransactionIndex
|
|
for _, entry := range ts.index {
|
|
if entry.ValueUSD >= minUSD && entry.ValueUSD <= maxUSD {
|
|
results = append(results, entry)
|
|
}
|
|
}
|
|
|
|
// Sort by value descending
|
|
sort.Slice(results, func(i, j int) bool {
|
|
return results[i].ValueUSD > results[j].ValueUSD
|
|
})
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// GetTransactionsByMEVType returns transactions by MEV classification
|
|
func (ts *TransactionStorage) GetTransactionsByMEVType(mevType string) ([]*TransactionIndex, error) {
|
|
ts.mu.RLock()
|
|
defer ts.mu.RUnlock()
|
|
|
|
var results []*TransactionIndex
|
|
for _, entry := range ts.index {
|
|
if strings.EqualFold(entry.MEVType, mevType) {
|
|
results = append(results, entry)
|
|
}
|
|
}
|
|
|
|
// Sort by block number
|
|
sort.Slice(results, func(i, j int) bool {
|
|
return results[i].BlockNumber < results[j].BlockNumber
|
|
})
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// GetTransactionsByBlockRange returns transactions within a block range
|
|
func (ts *TransactionStorage) GetTransactionsByBlockRange(startBlock, endBlock uint64) ([]*TransactionIndex, error) {
|
|
ts.mu.RLock()
|
|
defer ts.mu.RUnlock()
|
|
|
|
var results []*TransactionIndex
|
|
for _, entry := range ts.index {
|
|
if entry.BlockNumber >= startBlock && entry.BlockNumber <= endBlock {
|
|
results = append(results, entry)
|
|
}
|
|
}
|
|
|
|
// Sort by block number
|
|
sort.Slice(results, func(i, j int) bool {
|
|
return results[i].BlockNumber < results[j].BlockNumber
|
|
})
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// GetStorageStats returns storage statistics
|
|
func (ts *TransactionStorage) GetStorageStats() *StorageStats {
|
|
ts.mu.RLock()
|
|
defer ts.mu.RUnlock()
|
|
|
|
stats := &StorageStats{
|
|
TotalTransactions: len(ts.index),
|
|
ProtocolStats: make(map[string]int),
|
|
MEVTypeStats: make(map[string]int),
|
|
BlockRange: [2]uint64{^uint64(0), 0}, // min, max
|
|
}
|
|
|
|
var totalSize int64
|
|
var totalValue float64
|
|
|
|
for _, entry := range ts.index {
|
|
// Size
|
|
totalSize += entry.DataSize
|
|
|
|
// Value
|
|
totalValue += entry.ValueUSD
|
|
|
|
// Protocol stats
|
|
stats.ProtocolStats[entry.Protocol]++
|
|
|
|
// MEV type stats
|
|
stats.MEVTypeStats[entry.MEVType]++
|
|
|
|
// Block range
|
|
if entry.BlockNumber < stats.BlockRange[0] {
|
|
stats.BlockRange[0] = entry.BlockNumber
|
|
}
|
|
if entry.BlockNumber > stats.BlockRange[1] {
|
|
stats.BlockRange[1] = entry.BlockNumber
|
|
}
|
|
}
|
|
|
|
stats.TotalSizeBytes = totalSize
|
|
stats.TotalValueUSD = totalValue
|
|
|
|
if stats.TotalTransactions > 0 {
|
|
stats.AverageValueUSD = totalValue / float64(stats.TotalTransactions)
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// StorageStats contains storage statistics
|
|
type StorageStats struct {
|
|
TotalTransactions int `json:"total_transactions"`
|
|
TotalSizeBytes int64 `json:"total_size_bytes"`
|
|
TotalValueUSD float64 `json:"total_value_usd"`
|
|
AverageValueUSD float64 `json:"average_value_usd"`
|
|
ProtocolStats map[string]int `json:"protocol_stats"`
|
|
MEVTypeStats map[string]int `json:"mev_type_stats"`
|
|
BlockRange [2]uint64 `json:"block_range"` // [min, max]
|
|
}
|
|
|
|
// loadIndex loads the transaction index from disk
|
|
func (ts *TransactionStorage) loadIndex() error {
|
|
if _, err := os.Stat(ts.indexFile); os.IsNotExist(err) {
|
|
return nil // No existing index
|
|
}
|
|
|
|
data, err := os.ReadFile(ts.indexFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return json.Unmarshal(data, &ts.index)
|
|
}
|
|
|
|
// saveIndex saves the transaction index to disk
|
|
func (ts *TransactionStorage) saveIndex() error {
|
|
data, err := json.MarshalIndent(ts.index, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return os.WriteFile(ts.indexFile, data, 0644)
|
|
}
|
|
|
|
// ExportDataset exports transactions matching criteria to a dataset
|
|
func (ts *TransactionStorage) ExportDataset(criteria *DatasetCriteria) (*Dataset, error) {
|
|
ts.mu.RLock()
|
|
defer ts.mu.RUnlock()
|
|
|
|
var transactions []*RealTransactionData
|
|
|
|
for _, indexEntry := range ts.index {
|
|
// Apply filters
|
|
if !ts.matchesCriteria(indexEntry, criteria) {
|
|
continue
|
|
}
|
|
|
|
// Load transaction data
|
|
tx, err := ts.LoadTransaction(indexEntry.Hash)
|
|
if err != nil {
|
|
ts.logger.Warn(fmt.Sprintf("Failed to load transaction %s: %v", indexEntry.Hash, err))
|
|
continue
|
|
}
|
|
|
|
transactions = append(transactions, tx)
|
|
|
|
// Check limit
|
|
if criteria.MaxTransactions > 0 && len(transactions) >= criteria.MaxTransactions {
|
|
break
|
|
}
|
|
}
|
|
|
|
return &Dataset{
|
|
Transactions: transactions,
|
|
Criteria: criteria,
|
|
GeneratedAt: time.Now(),
|
|
Stats: ts.calculateDatasetStats(transactions),
|
|
}, nil
|
|
}
|
|
|
|
// DatasetCriteria defines criteria for dataset export
|
|
type DatasetCriteria struct {
|
|
Protocols []string `json:"protocols,omitempty"`
|
|
MEVTypes []string `json:"mev_types,omitempty"`
|
|
MinValueUSD float64 `json:"min_value_usd,omitempty"`
|
|
MaxValueUSD float64 `json:"max_value_usd,omitempty"`
|
|
StartBlock uint64 `json:"start_block,omitempty"`
|
|
EndBlock uint64 `json:"end_block,omitempty"`
|
|
MaxTransactions int `json:"max_transactions,omitempty"`
|
|
SortBy string `json:"sort_by,omitempty"` // "value", "block", "time"
|
|
SortDesc bool `json:"sort_desc,omitempty"`
|
|
}
|
|
|
|
// Dataset represents an exported dataset
|
|
type Dataset struct {
|
|
Transactions []*RealTransactionData `json:"transactions"`
|
|
Criteria *DatasetCriteria `json:"criteria"`
|
|
GeneratedAt time.Time `json:"generated_at"`
|
|
Stats *DatasetStats `json:"stats"`
|
|
}
|
|
|
|
// DatasetStats contains statistics about a dataset
|
|
type DatasetStats struct {
|
|
Count int `json:"count"`
|
|
TotalValueUSD float64 `json:"total_value_usd"`
|
|
AverageValueUSD float64 `json:"average_value_usd"`
|
|
ProtocolCounts map[string]int `json:"protocol_counts"`
|
|
MEVTypeCounts map[string]int `json:"mev_type_counts"`
|
|
BlockRange [2]uint64 `json:"block_range"`
|
|
TimeRange [2]time.Time `json:"time_range"`
|
|
}
|
|
|
|
// matchesCriteria checks if a transaction index entry matches the criteria
|
|
func (ts *TransactionStorage) matchesCriteria(entry *TransactionIndex, criteria *DatasetCriteria) bool {
|
|
// Protocol filter
|
|
if len(criteria.Protocols) > 0 {
|
|
found := false
|
|
for _, protocol := range criteria.Protocols {
|
|
if strings.EqualFold(entry.Protocol, protocol) {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// MEV type filter
|
|
if len(criteria.MEVTypes) > 0 {
|
|
found := false
|
|
for _, mevType := range criteria.MEVTypes {
|
|
if strings.EqualFold(entry.MEVType, mevType) {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Value range filter
|
|
if criteria.MinValueUSD > 0 && entry.ValueUSD < criteria.MinValueUSD {
|
|
return false
|
|
}
|
|
if criteria.MaxValueUSD > 0 && entry.ValueUSD > criteria.MaxValueUSD {
|
|
return false
|
|
}
|
|
|
|
// Block range filter
|
|
if criteria.StartBlock > 0 && entry.BlockNumber < criteria.StartBlock {
|
|
return false
|
|
}
|
|
if criteria.EndBlock > 0 && entry.BlockNumber > criteria.EndBlock {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// calculateDatasetStats calculates statistics for a dataset
|
|
func (ts *TransactionStorage) calculateDatasetStats(transactions []*RealTransactionData) *DatasetStats {
|
|
if len(transactions) == 0 {
|
|
return &DatasetStats{}
|
|
}
|
|
|
|
stats := &DatasetStats{
|
|
Count: len(transactions),
|
|
ProtocolCounts: make(map[string]int),
|
|
MEVTypeCounts: make(map[string]int),
|
|
BlockRange: [2]uint64{^uint64(0), 0},
|
|
TimeRange: [2]time.Time{time.Now(), time.Time{}},
|
|
}
|
|
|
|
var totalValue float64
|
|
|
|
for _, tx := range transactions {
|
|
// Value
|
|
totalValue += tx.EstimatedValueUSD
|
|
|
|
// Protocol
|
|
if tx.ParsedDEX != nil {
|
|
stats.ProtocolCounts[tx.ParsedDEX.Protocol]++
|
|
}
|
|
|
|
// MEV type
|
|
stats.MEVTypeCounts[tx.MEVClassification]++
|
|
|
|
// Block range
|
|
if tx.BlockNumber < stats.BlockRange[0] {
|
|
stats.BlockRange[0] = tx.BlockNumber
|
|
}
|
|
if tx.BlockNumber > stats.BlockRange[1] {
|
|
stats.BlockRange[1] = tx.BlockNumber
|
|
}
|
|
|
|
// Time range
|
|
if tx.SequencerTimestamp.Before(stats.TimeRange[0]) {
|
|
stats.TimeRange[0] = tx.SequencerTimestamp
|
|
}
|
|
if tx.SequencerTimestamp.After(stats.TimeRange[1]) {
|
|
stats.TimeRange[1] = tx.SequencerTimestamp
|
|
}
|
|
}
|
|
|
|
stats.TotalValueUSD = totalValue
|
|
if stats.Count > 0 {
|
|
stats.AverageValueUSD = totalValue / float64(stats.Count)
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// SaveDataset saves a dataset to a file
|
|
func (ts *TransactionStorage) SaveDataset(dataset *Dataset, filename string) error {
|
|
data, err := json.MarshalIndent(dataset, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal dataset: %w", err)
|
|
}
|
|
|
|
filePath := filepath.Join(ts.dataDir, filename)
|
|
if err := os.WriteFile(filePath, data, 0644); err != nil {
|
|
return fmt.Errorf("failed to write dataset file: %w", err)
|
|
}
|
|
|
|
ts.logger.Info(fmt.Sprintf("Saved dataset with %d transactions to %s", len(dataset.Transactions), filePath))
|
|
return nil
|
|
}
|
|
|
|
// LoadDataset loads a dataset from a file
|
|
func (ts *TransactionStorage) LoadDataset(filename string) (*Dataset, error) {
|
|
filePath := filepath.Join(ts.dataDir, filename)
|
|
data, err := os.ReadFile(filePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read dataset file: %w", err)
|
|
}
|
|
|
|
var dataset Dataset
|
|
if err := json.Unmarshal(data, &dataset); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal dataset: %w", err)
|
|
}
|
|
|
|
return &dataset, nil
|
|
}
|
|
|
|
// CleanupOldData removes old transaction data beyond retention period
|
|
func (ts *TransactionStorage) CleanupOldData(retentionDays int) error {
|
|
if retentionDays <= 0 {
|
|
return nil // No cleanup
|
|
}
|
|
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
|
|
cutoffTime := time.Now().AddDate(0, 0, -retentionDays)
|
|
var removedCount int
|
|
|
|
for hash, entry := range ts.index {
|
|
if entry.StoredAt.Before(cutoffTime) {
|
|
// Remove file
|
|
if err := os.Remove(entry.FilePath); err != nil && !os.IsNotExist(err) {
|
|
ts.logger.Warn(fmt.Sprintf("Failed to remove file %s: %v", entry.FilePath, err))
|
|
}
|
|
|
|
// Remove from index
|
|
delete(ts.index, hash)
|
|
removedCount++
|
|
}
|
|
}
|
|
|
|
if removedCount > 0 {
|
|
ts.logger.Info(fmt.Sprintf("Cleaned up %d old transactions", removedCount))
|
|
return ts.saveIndex()
|
|
}
|
|
|
|
return nil
|
|
}
|