//go:build integration && legacy && forked // +build integration,legacy,forked package test_main import ( "compress/gzip" "encoding/json" "fmt" "io" "os" "path/filepath" "sort" "strings" "time" "github.com/fraktal/mev-beta/internal/logger" ) // NewTransactionStorage creates a new transaction storage system func NewTransactionStorage(config *SequencerConfig, logger *logger.Logger) (*TransactionStorage, error) { dataDir := config.DataDir if err := os.MkdirAll(dataDir, 0755); err != nil { return nil, fmt.Errorf("failed to create data directory: %w", err) } storage := &TransactionStorage{ config: config, logger: logger, dataDir: dataDir, indexFile: filepath.Join(dataDir, "transaction_index.json"), index: make(map[string]*TransactionIndex), } // Load existing index if err := storage.loadIndex(); err != nil { logger.Warn(fmt.Sprintf("Failed to load existing index: %v", err)) } return storage, nil } // StoreBatch stores a batch of transactions func (ts *TransactionStorage) StoreBatch(transactions []*RealTransactionData) error { ts.mu.Lock() defer ts.mu.Unlock() for _, tx := range transactions { if err := ts.storeTransaction(tx); err != nil { ts.logger.Error(fmt.Sprintf("Failed to store transaction %s: %v", tx.Hash.Hex(), err)) continue } } // Save updated index return ts.saveIndex() } // storeTransaction stores a single transaction func (ts *TransactionStorage) storeTransaction(tx *RealTransactionData) error { // Create filename based on block and hash filename := fmt.Sprintf("block_%d_tx_%s.json", tx.BlockNumber, tx.Hash.Hex()) if ts.config.CompressionLevel > 0 { filename += ".gz" } filePath := filepath.Join(ts.dataDir, filename) // Serialize transaction data data, err := json.MarshalIndent(tx, "", " ") if err != nil { return fmt.Errorf("failed to marshal transaction: %w", err) } // Write file if ts.config.CompressionLevel > 0 { if err := ts.writeCompressedFile(filePath, data); err != nil { return fmt.Errorf("failed to write compressed file: %w", err) } } else { if err := os.WriteFile(filePath, data, 0644); err != nil { return fmt.Errorf("failed to write file: %w", err) } } // Update index indexEntry := &TransactionIndex{ Hash: tx.Hash.Hex(), BlockNumber: tx.BlockNumber, Protocol: "", ValueUSD: tx.EstimatedValueUSD, MEVType: tx.MEVClassification, FilePath: filePath, StoredAt: time.Now(), DataSize: int64(len(data)), } if tx.ParsedDEX != nil { indexEntry.Protocol = tx.ParsedDEX.Protocol } if ts.config.CompressionLevel > 0 { indexEntry.CompressionMeta = map[string]interface{}{ "compression_level": ts.config.CompressionLevel, "original_size": len(data), } } ts.index[tx.Hash.Hex()] = indexEntry return nil } // writeCompressedFile writes data to a gzip compressed file func (ts *TransactionStorage) writeCompressedFile(filePath string, data []byte) error { file, err := os.Create(filePath) if err != nil { return err } defer file.Close() gzWriter, err := gzip.NewWriterLevel(file, ts.config.CompressionLevel) if err != nil { return err } defer gzWriter.Close() _, err = gzWriter.Write(data) return err } // LoadTransaction loads a transaction by hash func (ts *TransactionStorage) LoadTransaction(hash string) (*RealTransactionData, error) { ts.mu.RLock() indexEntry, exists := ts.index[hash] ts.mu.RUnlock() if !exists { return nil, fmt.Errorf("transaction %s not found in index", hash) } // Read file var data []byte var err error if strings.HasSuffix(indexEntry.FilePath, ".gz") { data, err = ts.readCompressedFile(indexEntry.FilePath) } else { data, err = os.ReadFile(indexEntry.FilePath) } if err != nil { return nil, fmt.Errorf("failed to read transaction file: %w", err) } // Deserialize var tx RealTransactionData if err := json.Unmarshal(data, &tx); err != nil { return nil, fmt.Errorf("failed to unmarshal transaction: %w", err) } return &tx, nil } // readCompressedFile reads data from a gzip compressed file func (ts *TransactionStorage) readCompressedFile(filePath string) ([]byte, error) { file, err := os.Open(filePath) if err != nil { return nil, err } defer file.Close() gzReader, err := gzip.NewReader(file) if err != nil { return nil, err } defer gzReader.Close() return io.ReadAll(gzReader) } // GetTransactionsByProtocol returns transactions for a specific protocol func (ts *TransactionStorage) GetTransactionsByProtocol(protocol string) ([]*TransactionIndex, error) { ts.mu.RLock() defer ts.mu.RUnlock() var results []*TransactionIndex for _, entry := range ts.index { if strings.EqualFold(entry.Protocol, protocol) { results = append(results, entry) } } // Sort by block number sort.Slice(results, func(i, j int) bool { return results[i].BlockNumber < results[j].BlockNumber }) return results, nil } // GetTransactionsByValueRange returns transactions within a value range func (ts *TransactionStorage) GetTransactionsByValueRange(minUSD, maxUSD float64) ([]*TransactionIndex, error) { ts.mu.RLock() defer ts.mu.RUnlock() var results []*TransactionIndex for _, entry := range ts.index { if entry.ValueUSD >= minUSD && entry.ValueUSD <= maxUSD { results = append(results, entry) } } // Sort by value descending sort.Slice(results, func(i, j int) bool { return results[i].ValueUSD > results[j].ValueUSD }) return results, nil } // GetTransactionsByMEVType returns transactions by MEV classification func (ts *TransactionStorage) GetTransactionsByMEVType(mevType string) ([]*TransactionIndex, error) { ts.mu.RLock() defer ts.mu.RUnlock() var results []*TransactionIndex for _, entry := range ts.index { if strings.EqualFold(entry.MEVType, mevType) { results = append(results, entry) } } // Sort by block number sort.Slice(results, func(i, j int) bool { return results[i].BlockNumber < results[j].BlockNumber }) return results, nil } // GetTransactionsByBlockRange returns transactions within a block range func (ts *TransactionStorage) GetTransactionsByBlockRange(startBlock, endBlock uint64) ([]*TransactionIndex, error) { ts.mu.RLock() defer ts.mu.RUnlock() var results []*TransactionIndex for _, entry := range ts.index { if entry.BlockNumber >= startBlock && entry.BlockNumber <= endBlock { results = append(results, entry) } } // Sort by block number sort.Slice(results, func(i, j int) bool { return results[i].BlockNumber < results[j].BlockNumber }) return results, nil } // GetStorageStats returns storage statistics func (ts *TransactionStorage) GetStorageStats() *StorageStats { ts.mu.RLock() defer ts.mu.RUnlock() stats := &StorageStats{ TotalTransactions: len(ts.index), ProtocolStats: make(map[string]int), MEVTypeStats: make(map[string]int), BlockRange: [2]uint64{^uint64(0), 0}, // min, max } var totalSize int64 var totalValue float64 for _, entry := range ts.index { // Size totalSize += entry.DataSize // Value totalValue += entry.ValueUSD // Protocol stats stats.ProtocolStats[entry.Protocol]++ // MEV type stats stats.MEVTypeStats[entry.MEVType]++ // Block range if entry.BlockNumber < stats.BlockRange[0] { stats.BlockRange[0] = entry.BlockNumber } if entry.BlockNumber > stats.BlockRange[1] { stats.BlockRange[1] = entry.BlockNumber } } stats.TotalSizeBytes = totalSize stats.TotalValueUSD = totalValue if stats.TotalTransactions > 0 { stats.AverageValueUSD = totalValue / float64(stats.TotalTransactions) } return stats } // StorageStats contains storage statistics type StorageStats struct { TotalTransactions int `json:"total_transactions"` TotalSizeBytes int64 `json:"total_size_bytes"` TotalValueUSD float64 `json:"total_value_usd"` AverageValueUSD float64 `json:"average_value_usd"` ProtocolStats map[string]int `json:"protocol_stats"` MEVTypeStats map[string]int `json:"mev_type_stats"` BlockRange [2]uint64 `json:"block_range"` // [min, max] } // loadIndex loads the transaction index from disk func (ts *TransactionStorage) loadIndex() error { if _, err := os.Stat(ts.indexFile); os.IsNotExist(err) { return nil // No existing index } data, err := os.ReadFile(ts.indexFile) if err != nil { return err } return json.Unmarshal(data, &ts.index) } // saveIndex saves the transaction index to disk func (ts *TransactionStorage) saveIndex() error { data, err := json.MarshalIndent(ts.index, "", " ") if err != nil { return err } return os.WriteFile(ts.indexFile, data, 0644) } // ExportDataset exports transactions matching criteria to a dataset func (ts *TransactionStorage) ExportDataset(criteria *DatasetCriteria) (*Dataset, error) { ts.mu.RLock() defer ts.mu.RUnlock() var transactions []*RealTransactionData for _, indexEntry := range ts.index { // Apply filters if !ts.matchesCriteria(indexEntry, criteria) { continue } // Load transaction data tx, err := ts.LoadTransaction(indexEntry.Hash) if err != nil { ts.logger.Warn(fmt.Sprintf("Failed to load transaction %s: %v", indexEntry.Hash, err)) continue } transactions = append(transactions, tx) // Check limit if criteria.MaxTransactions > 0 && len(transactions) >= criteria.MaxTransactions { break } } return &Dataset{ Transactions: transactions, Criteria: criteria, GeneratedAt: time.Now(), Stats: ts.calculateDatasetStats(transactions), }, nil } // DatasetCriteria defines criteria for dataset export type DatasetCriteria struct { Protocols []string `json:"protocols,omitempty"` MEVTypes []string `json:"mev_types,omitempty"` MinValueUSD float64 `json:"min_value_usd,omitempty"` MaxValueUSD float64 `json:"max_value_usd,omitempty"` StartBlock uint64 `json:"start_block,omitempty"` EndBlock uint64 `json:"end_block,omitempty"` MaxTransactions int `json:"max_transactions,omitempty"` SortBy string `json:"sort_by,omitempty"` // "value", "block", "time" SortDesc bool `json:"sort_desc,omitempty"` } // Dataset represents an exported dataset type Dataset struct { Transactions []*RealTransactionData `json:"transactions"` Criteria *DatasetCriteria `json:"criteria"` GeneratedAt time.Time `json:"generated_at"` Stats *DatasetStats `json:"stats"` } // DatasetStats contains statistics about a dataset type DatasetStats struct { Count int `json:"count"` TotalValueUSD float64 `json:"total_value_usd"` AverageValueUSD float64 `json:"average_value_usd"` ProtocolCounts map[string]int `json:"protocol_counts"` MEVTypeCounts map[string]int `json:"mev_type_counts"` BlockRange [2]uint64 `json:"block_range"` TimeRange [2]time.Time `json:"time_range"` } // matchesCriteria checks if a transaction index entry matches the criteria func (ts *TransactionStorage) matchesCriteria(entry *TransactionIndex, criteria *DatasetCriteria) bool { // Protocol filter if len(criteria.Protocols) > 0 { found := false for _, protocol := range criteria.Protocols { if strings.EqualFold(entry.Protocol, protocol) { found = true break } } if !found { return false } } // MEV type filter if len(criteria.MEVTypes) > 0 { found := false for _, mevType := range criteria.MEVTypes { if strings.EqualFold(entry.MEVType, mevType) { found = true break } } if !found { return false } } // Value range filter if criteria.MinValueUSD > 0 && entry.ValueUSD < criteria.MinValueUSD { return false } if criteria.MaxValueUSD > 0 && entry.ValueUSD > criteria.MaxValueUSD { return false } // Block range filter if criteria.StartBlock > 0 && entry.BlockNumber < criteria.StartBlock { return false } if criteria.EndBlock > 0 && entry.BlockNumber > criteria.EndBlock { return false } return true } // calculateDatasetStats calculates statistics for a dataset func (ts *TransactionStorage) calculateDatasetStats(transactions []*RealTransactionData) *DatasetStats { if len(transactions) == 0 { return &DatasetStats{} } stats := &DatasetStats{ Count: len(transactions), ProtocolCounts: make(map[string]int), MEVTypeCounts: make(map[string]int), BlockRange: [2]uint64{^uint64(0), 0}, TimeRange: [2]time.Time{time.Now(), time.Time{}}, } var totalValue float64 for _, tx := range transactions { // Value totalValue += tx.EstimatedValueUSD // Protocol if tx.ParsedDEX != nil { stats.ProtocolCounts[tx.ParsedDEX.Protocol]++ } // MEV type stats.MEVTypeCounts[tx.MEVClassification]++ // Block range if tx.BlockNumber < stats.BlockRange[0] { stats.BlockRange[0] = tx.BlockNumber } if tx.BlockNumber > stats.BlockRange[1] { stats.BlockRange[1] = tx.BlockNumber } // Time range if tx.SequencerTimestamp.Before(stats.TimeRange[0]) { stats.TimeRange[0] = tx.SequencerTimestamp } if tx.SequencerTimestamp.After(stats.TimeRange[1]) { stats.TimeRange[1] = tx.SequencerTimestamp } } stats.TotalValueUSD = totalValue if stats.Count > 0 { stats.AverageValueUSD = totalValue / float64(stats.Count) } return stats } // SaveDataset saves a dataset to a file func (ts *TransactionStorage) SaveDataset(dataset *Dataset, filename string) error { data, err := json.MarshalIndent(dataset, "", " ") if err != nil { return fmt.Errorf("failed to marshal dataset: %w", err) } filePath := filepath.Join(ts.dataDir, filename) if err := os.WriteFile(filePath, data, 0644); err != nil { return fmt.Errorf("failed to write dataset file: %w", err) } ts.logger.Info(fmt.Sprintf("Saved dataset with %d transactions to %s", len(dataset.Transactions), filePath)) return nil } // LoadDataset loads a dataset from a file func (ts *TransactionStorage) LoadDataset(filename string) (*Dataset, error) { filePath := filepath.Join(ts.dataDir, filename) data, err := os.ReadFile(filePath) if err != nil { return nil, fmt.Errorf("failed to read dataset file: %w", err) } var dataset Dataset if err := json.Unmarshal(data, &dataset); err != nil { return nil, fmt.Errorf("failed to unmarshal dataset: %w", err) } return &dataset, nil } // CleanupOldData removes old transaction data beyond retention period func (ts *TransactionStorage) CleanupOldData(retentionDays int) error { if retentionDays <= 0 { return nil // No cleanup } ts.mu.Lock() defer ts.mu.Unlock() cutoffTime := time.Now().AddDate(0, 0, -retentionDays) var removedCount int for hash, entry := range ts.index { if entry.StoredAt.Before(cutoffTime) { // Remove file if err := os.Remove(entry.FilePath); err != nil && !os.IsNotExist(err) { ts.logger.Warn(fmt.Sprintf("Failed to remove file %s: %v", entry.FilePath, err)) } // Remove from index delete(ts.index, hash) removedCount++ } } if removedCount > 0 { ts.logger.Info(fmt.Sprintf("Cleaned up %d old transactions", removedCount)) return ts.saveIndex() } return nil }