Files
mev-beta/orig/test/sequencer_storage.go
Administrator c54c569f30 refactor: move all remaining files to orig/ directory
Completed clean root directory structure:
- Root now contains only: .git, .env, docs/, orig/
- Moved all remaining files and directories to orig/:
  - Config files (.claude, .dockerignore, .drone.yml, etc.)
  - All .env variants (except active .env)
  - Git config (.gitconfig, .github, .gitignore, etc.)
  - Tool configs (.golangci.yml, .revive.toml, etc.)
  - Documentation (*.md files, @prompts)
  - Build files (Dockerfiles, Makefile, go.mod, go.sum)
  - Docker compose files
  - All source directories (scripts, tests, tools, etc.)
  - Runtime directories (logs, monitoring, reports)
  - Dependency files (node_modules, lib, cache)
  - Special files (--delete)

- Removed empty runtime directories (bin/, data/)

V2 structure is now clean:
- docs/planning/ - V2 planning documents
- orig/ - Complete V1 codebase preserved
- .env - Active environment config (not in git)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:53:05 +01:00

578 lines
15 KiB
Go

//go:build integration && legacy && forked
// +build integration,legacy,forked
package test_main
import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/fraktal/mev-beta/internal/logger"
)
// NewTransactionStorage creates a new transaction storage system
func NewTransactionStorage(config *SequencerConfig, logger *logger.Logger) (*TransactionStorage, error) {
dataDir := config.DataDir
if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create data directory: %w", err)
}
storage := &TransactionStorage{
config: config,
logger: logger,
dataDir: dataDir,
indexFile: filepath.Join(dataDir, "transaction_index.json"),
index: make(map[string]*TransactionIndex),
}
// Load existing index
if err := storage.loadIndex(); err != nil {
logger.Warn(fmt.Sprintf("Failed to load existing index: %v", err))
}
return storage, nil
}
// StoreBatch stores a batch of transactions
func (ts *TransactionStorage) StoreBatch(transactions []*RealTransactionData) error {
ts.mu.Lock()
defer ts.mu.Unlock()
for _, tx := range transactions {
if err := ts.storeTransaction(tx); err != nil {
ts.logger.Error(fmt.Sprintf("Failed to store transaction %s: %v", tx.Hash.Hex(), err))
continue
}
}
// Save updated index
return ts.saveIndex()
}
// storeTransaction stores a single transaction
func (ts *TransactionStorage) storeTransaction(tx *RealTransactionData) error {
// Create filename based on block and hash
filename := fmt.Sprintf("block_%d_tx_%s.json", tx.BlockNumber, tx.Hash.Hex())
if ts.config.CompressionLevel > 0 {
filename += ".gz"
}
filePath := filepath.Join(ts.dataDir, filename)
// Serialize transaction data
data, err := json.MarshalIndent(tx, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal transaction: %w", err)
}
// Write file
if ts.config.CompressionLevel > 0 {
if err := ts.writeCompressedFile(filePath, data); err != nil {
return fmt.Errorf("failed to write compressed file: %w", err)
}
} else {
if err := os.WriteFile(filePath, data, 0644); err != nil {
return fmt.Errorf("failed to write file: %w", err)
}
}
// Update index
indexEntry := &TransactionIndex{
Hash: tx.Hash.Hex(),
BlockNumber: tx.BlockNumber,
Protocol: "",
ValueUSD: tx.EstimatedValueUSD,
MEVType: tx.MEVClassification,
FilePath: filePath,
StoredAt: time.Now(),
DataSize: int64(len(data)),
}
if tx.ParsedDEX != nil {
indexEntry.Protocol = tx.ParsedDEX.Protocol
}
if ts.config.CompressionLevel > 0 {
indexEntry.CompressionMeta = map[string]interface{}{
"compression_level": ts.config.CompressionLevel,
"original_size": len(data),
}
}
ts.index[tx.Hash.Hex()] = indexEntry
return nil
}
// writeCompressedFile writes data to a gzip compressed file
func (ts *TransactionStorage) writeCompressedFile(filePath string, data []byte) error {
file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()
gzWriter, err := gzip.NewWriterLevel(file, ts.config.CompressionLevel)
if err != nil {
return err
}
defer gzWriter.Close()
_, err = gzWriter.Write(data)
return err
}
// LoadTransaction loads a transaction by hash
func (ts *TransactionStorage) LoadTransaction(hash string) (*RealTransactionData, error) {
ts.mu.RLock()
indexEntry, exists := ts.index[hash]
ts.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("transaction %s not found in index", hash)
}
// Read file
var data []byte
var err error
if strings.HasSuffix(indexEntry.FilePath, ".gz") {
data, err = ts.readCompressedFile(indexEntry.FilePath)
} else {
data, err = os.ReadFile(indexEntry.FilePath)
}
if err != nil {
return nil, fmt.Errorf("failed to read transaction file: %w", err)
}
// Deserialize
var tx RealTransactionData
if err := json.Unmarshal(data, &tx); err != nil {
return nil, fmt.Errorf("failed to unmarshal transaction: %w", err)
}
return &tx, nil
}
// readCompressedFile reads data from a gzip compressed file
func (ts *TransactionStorage) readCompressedFile(filePath string) ([]byte, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
gzReader, err := gzip.NewReader(file)
if err != nil {
return nil, err
}
defer gzReader.Close()
return io.ReadAll(gzReader)
}
// GetTransactionsByProtocol returns transactions for a specific protocol
func (ts *TransactionStorage) GetTransactionsByProtocol(protocol string) ([]*TransactionIndex, error) {
ts.mu.RLock()
defer ts.mu.RUnlock()
var results []*TransactionIndex
for _, entry := range ts.index {
if strings.EqualFold(entry.Protocol, protocol) {
results = append(results, entry)
}
}
// Sort by block number
sort.Slice(results, func(i, j int) bool {
return results[i].BlockNumber < results[j].BlockNumber
})
return results, nil
}
// GetTransactionsByValueRange returns transactions within a value range
func (ts *TransactionStorage) GetTransactionsByValueRange(minUSD, maxUSD float64) ([]*TransactionIndex, error) {
ts.mu.RLock()
defer ts.mu.RUnlock()
var results []*TransactionIndex
for _, entry := range ts.index {
if entry.ValueUSD >= minUSD && entry.ValueUSD <= maxUSD {
results = append(results, entry)
}
}
// Sort by value descending
sort.Slice(results, func(i, j int) bool {
return results[i].ValueUSD > results[j].ValueUSD
})
return results, nil
}
// GetTransactionsByMEVType returns transactions by MEV classification
func (ts *TransactionStorage) GetTransactionsByMEVType(mevType string) ([]*TransactionIndex, error) {
ts.mu.RLock()
defer ts.mu.RUnlock()
var results []*TransactionIndex
for _, entry := range ts.index {
if strings.EqualFold(entry.MEVType, mevType) {
results = append(results, entry)
}
}
// Sort by block number
sort.Slice(results, func(i, j int) bool {
return results[i].BlockNumber < results[j].BlockNumber
})
return results, nil
}
// GetTransactionsByBlockRange returns transactions within a block range
func (ts *TransactionStorage) GetTransactionsByBlockRange(startBlock, endBlock uint64) ([]*TransactionIndex, error) {
ts.mu.RLock()
defer ts.mu.RUnlock()
var results []*TransactionIndex
for _, entry := range ts.index {
if entry.BlockNumber >= startBlock && entry.BlockNumber <= endBlock {
results = append(results, entry)
}
}
// Sort by block number
sort.Slice(results, func(i, j int) bool {
return results[i].BlockNumber < results[j].BlockNumber
})
return results, nil
}
// GetStorageStats returns storage statistics
func (ts *TransactionStorage) GetStorageStats() *StorageStats {
ts.mu.RLock()
defer ts.mu.RUnlock()
stats := &StorageStats{
TotalTransactions: len(ts.index),
ProtocolStats: make(map[string]int),
MEVTypeStats: make(map[string]int),
BlockRange: [2]uint64{^uint64(0), 0}, // min, max
}
var totalSize int64
var totalValue float64
for _, entry := range ts.index {
// Size
totalSize += entry.DataSize
// Value
totalValue += entry.ValueUSD
// Protocol stats
stats.ProtocolStats[entry.Protocol]++
// MEV type stats
stats.MEVTypeStats[entry.MEVType]++
// Block range
if entry.BlockNumber < stats.BlockRange[0] {
stats.BlockRange[0] = entry.BlockNumber
}
if entry.BlockNumber > stats.BlockRange[1] {
stats.BlockRange[1] = entry.BlockNumber
}
}
stats.TotalSizeBytes = totalSize
stats.TotalValueUSD = totalValue
if stats.TotalTransactions > 0 {
stats.AverageValueUSD = totalValue / float64(stats.TotalTransactions)
}
return stats
}
// StorageStats contains storage statistics
type StorageStats struct {
TotalTransactions int `json:"total_transactions"`
TotalSizeBytes int64 `json:"total_size_bytes"`
TotalValueUSD float64 `json:"total_value_usd"`
AverageValueUSD float64 `json:"average_value_usd"`
ProtocolStats map[string]int `json:"protocol_stats"`
MEVTypeStats map[string]int `json:"mev_type_stats"`
BlockRange [2]uint64 `json:"block_range"` // [min, max]
}
// loadIndex loads the transaction index from disk
func (ts *TransactionStorage) loadIndex() error {
if _, err := os.Stat(ts.indexFile); os.IsNotExist(err) {
return nil // No existing index
}
data, err := os.ReadFile(ts.indexFile)
if err != nil {
return err
}
return json.Unmarshal(data, &ts.index)
}
// saveIndex saves the transaction index to disk
func (ts *TransactionStorage) saveIndex() error {
data, err := json.MarshalIndent(ts.index, "", " ")
if err != nil {
return err
}
return os.WriteFile(ts.indexFile, data, 0644)
}
// ExportDataset exports transactions matching criteria to a dataset
func (ts *TransactionStorage) ExportDataset(criteria *DatasetCriteria) (*Dataset, error) {
ts.mu.RLock()
defer ts.mu.RUnlock()
var transactions []*RealTransactionData
for _, indexEntry := range ts.index {
// Apply filters
if !ts.matchesCriteria(indexEntry, criteria) {
continue
}
// Load transaction data
tx, err := ts.LoadTransaction(indexEntry.Hash)
if err != nil {
ts.logger.Warn(fmt.Sprintf("Failed to load transaction %s: %v", indexEntry.Hash, err))
continue
}
transactions = append(transactions, tx)
// Check limit
if criteria.MaxTransactions > 0 && len(transactions) >= criteria.MaxTransactions {
break
}
}
return &Dataset{
Transactions: transactions,
Criteria: criteria,
GeneratedAt: time.Now(),
Stats: ts.calculateDatasetStats(transactions),
}, nil
}
// DatasetCriteria defines criteria for dataset export
type DatasetCriteria struct {
Protocols []string `json:"protocols,omitempty"`
MEVTypes []string `json:"mev_types,omitempty"`
MinValueUSD float64 `json:"min_value_usd,omitempty"`
MaxValueUSD float64 `json:"max_value_usd,omitempty"`
StartBlock uint64 `json:"start_block,omitempty"`
EndBlock uint64 `json:"end_block,omitempty"`
MaxTransactions int `json:"max_transactions,omitempty"`
SortBy string `json:"sort_by,omitempty"` // "value", "block", "time"
SortDesc bool `json:"sort_desc,omitempty"`
}
// Dataset represents an exported dataset
type Dataset struct {
Transactions []*RealTransactionData `json:"transactions"`
Criteria *DatasetCriteria `json:"criteria"`
GeneratedAt time.Time `json:"generated_at"`
Stats *DatasetStats `json:"stats"`
}
// DatasetStats contains statistics about a dataset
type DatasetStats struct {
Count int `json:"count"`
TotalValueUSD float64 `json:"total_value_usd"`
AverageValueUSD float64 `json:"average_value_usd"`
ProtocolCounts map[string]int `json:"protocol_counts"`
MEVTypeCounts map[string]int `json:"mev_type_counts"`
BlockRange [2]uint64 `json:"block_range"`
TimeRange [2]time.Time `json:"time_range"`
}
// matchesCriteria checks if a transaction index entry matches the criteria
func (ts *TransactionStorage) matchesCriteria(entry *TransactionIndex, criteria *DatasetCriteria) bool {
// Protocol filter
if len(criteria.Protocols) > 0 {
found := false
for _, protocol := range criteria.Protocols {
if strings.EqualFold(entry.Protocol, protocol) {
found = true
break
}
}
if !found {
return false
}
}
// MEV type filter
if len(criteria.MEVTypes) > 0 {
found := false
for _, mevType := range criteria.MEVTypes {
if strings.EqualFold(entry.MEVType, mevType) {
found = true
break
}
}
if !found {
return false
}
}
// Value range filter
if criteria.MinValueUSD > 0 && entry.ValueUSD < criteria.MinValueUSD {
return false
}
if criteria.MaxValueUSD > 0 && entry.ValueUSD > criteria.MaxValueUSD {
return false
}
// Block range filter
if criteria.StartBlock > 0 && entry.BlockNumber < criteria.StartBlock {
return false
}
if criteria.EndBlock > 0 && entry.BlockNumber > criteria.EndBlock {
return false
}
return true
}
// calculateDatasetStats calculates statistics for a dataset
func (ts *TransactionStorage) calculateDatasetStats(transactions []*RealTransactionData) *DatasetStats {
if len(transactions) == 0 {
return &DatasetStats{}
}
stats := &DatasetStats{
Count: len(transactions),
ProtocolCounts: make(map[string]int),
MEVTypeCounts: make(map[string]int),
BlockRange: [2]uint64{^uint64(0), 0},
TimeRange: [2]time.Time{time.Now(), time.Time{}},
}
var totalValue float64
for _, tx := range transactions {
// Value
totalValue += tx.EstimatedValueUSD
// Protocol
if tx.ParsedDEX != nil {
stats.ProtocolCounts[tx.ParsedDEX.Protocol]++
}
// MEV type
stats.MEVTypeCounts[tx.MEVClassification]++
// Block range
if tx.BlockNumber < stats.BlockRange[0] {
stats.BlockRange[0] = tx.BlockNumber
}
if tx.BlockNumber > stats.BlockRange[1] {
stats.BlockRange[1] = tx.BlockNumber
}
// Time range
if tx.SequencerTimestamp.Before(stats.TimeRange[0]) {
stats.TimeRange[0] = tx.SequencerTimestamp
}
if tx.SequencerTimestamp.After(stats.TimeRange[1]) {
stats.TimeRange[1] = tx.SequencerTimestamp
}
}
stats.TotalValueUSD = totalValue
if stats.Count > 0 {
stats.AverageValueUSD = totalValue / float64(stats.Count)
}
return stats
}
// SaveDataset saves a dataset to a file
func (ts *TransactionStorage) SaveDataset(dataset *Dataset, filename string) error {
data, err := json.MarshalIndent(dataset, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal dataset: %w", err)
}
filePath := filepath.Join(ts.dataDir, filename)
if err := os.WriteFile(filePath, data, 0644); err != nil {
return fmt.Errorf("failed to write dataset file: %w", err)
}
ts.logger.Info(fmt.Sprintf("Saved dataset with %d transactions to %s", len(dataset.Transactions), filePath))
return nil
}
// LoadDataset loads a dataset from a file
func (ts *TransactionStorage) LoadDataset(filename string) (*Dataset, error) {
filePath := filepath.Join(ts.dataDir, filename)
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read dataset file: %w", err)
}
var dataset Dataset
if err := json.Unmarshal(data, &dataset); err != nil {
return nil, fmt.Errorf("failed to unmarshal dataset: %w", err)
}
return &dataset, nil
}
// CleanupOldData removes old transaction data beyond retention period
func (ts *TransactionStorage) CleanupOldData(retentionDays int) error {
if retentionDays <= 0 {
return nil // No cleanup
}
ts.mu.Lock()
defer ts.mu.Unlock()
cutoffTime := time.Now().AddDate(0, 0, -retentionDays)
var removedCount int
for hash, entry := range ts.index {
if entry.StoredAt.Before(cutoffTime) {
// Remove file
if err := os.Remove(entry.FilePath); err != nil && !os.IsNotExist(err) {
ts.logger.Warn(fmt.Sprintf("Failed to remove file %s: %v", entry.FilePath, err))
}
// Remove from index
delete(ts.index, hash)
removedCount++
}
}
if removedCount > 0 {
ts.logger.Info(fmt.Sprintf("Cleaned up %d old transactions", removedCount))
return ts.saveIndex()
}
return nil
}