//go:build integration && forked // +build integration,forked package sequencer import ( "context" "encoding/json" "fmt" "math/big" "sort" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/fraktal/mev-beta/internal/logger" ) // ArbitrumSequencerSimulator simulates the Arbitrum sequencer for comprehensive parser testing type ArbitrumSequencerSimulator struct { logger *logger.Logger client *ethclient.Client isRunning bool mutex sync.RWMutex subscribers []chan *SequencerBlock // Real transaction data storage realBlocks map[uint64]*SequencerBlock blocksMutex sync.RWMutex // Simulation parameters replaySpeed float64 // 1.0 = real-time, 10.0 = 10x speed startBlock uint64 currentBlock uint64 batchSize int // Performance metrics blocksProcessed uint64 txProcessed uint64 startTime time.Time } // SequencerBlock represents a block as it appears in the Arbitrum sequencer type SequencerBlock struct { Number uint64 `json:"number"` Hash common.Hash `json:"hash"` ParentHash common.Hash `json:"parentHash"` Timestamp uint64 `json:"timestamp"` SequencerTime time.Time `json:"sequencerTime"` Transactions []*SequencerTransaction `json:"transactions"` GasUsed uint64 `json:"gasUsed"` GasLimit uint64 `json:"gasLimit"` Size uint64 `json:"size"` L1BlockNumber uint64 `json:"l1BlockNumber"` BatchIndex uint64 `json:"batchIndex"` } // SequencerTransaction represents a transaction as it appears in the sequencer type SequencerTransaction struct { Hash common.Hash `json:"hash"` BlockNumber uint64 `json:"blockNumber"` TransactionIndex uint64 `json:"transactionIndex"` From common.Address `json:"from"` To *common.Address `json:"to"` Value *big.Int `json:"value"` Gas uint64 `json:"gas"` GasPrice *big.Int `json:"gasPrice"` MaxFeePerGas *big.Int `json:"maxFeePerGas"` MaxPriorityFeePerGas *big.Int `json:"maxPriorityFeePerGas"` Input []byte `json:"input"` Nonce uint64 `json:"nonce"` Type uint8 `json:"type"` // Arbitrum-specific fields L1BlockNumber uint64 `json:"l1BlockNumber"` SequencerOrderIndex uint64 `json:"sequencerOrderIndex"` BatchIndex uint64 `json:"batchIndex"` L2BlockTimestamp uint64 `json:"l2BlockTimestamp"` // Execution results Receipt *SequencerReceipt `json:"receipt"` Status uint64 `json:"status"` GasUsed uint64 `json:"gasUsed"` EffectiveGasPrice *big.Int `json:"effectiveGasPrice"` // DEX classification IsDEXTransaction bool `json:"isDEXTransaction"` DEXProtocol string `json:"dexProtocol"` SwapValue *big.Int `json:"swapValue"` IsMEVTransaction bool `json:"isMEVTransaction"` MEVType string `json:"mevType"` } // SequencerReceipt represents a transaction receipt from the sequencer type SequencerReceipt struct { TransactionHash common.Hash `json:"transactionHash"` TransactionIndex uint64 `json:"transactionIndex"` BlockHash common.Hash `json:"blockHash"` BlockNumber uint64 `json:"blockNumber"` From common.Address `json:"from"` To *common.Address `json:"to"` GasUsed uint64 `json:"gasUsed"` EffectiveGasPrice *big.Int `json:"effectiveGasPrice"` Status uint64 `json:"status"` Logs []*SequencerLog `json:"logs"` // Arbitrum-specific receipt fields L1BlockNumber uint64 `json:"l1BlockNumber"` L1InboxBatchInfo string `json:"l1InboxBatchInfo"` L2ToL1Messages []string `json:"l2ToL1Messages"` } // SequencerLog represents an event log from the sequencer type SequencerLog struct { Address common.Address `json:"address"` Topics []common.Hash `json:"topics"` Data []byte `json:"data"` BlockNumber uint64 `json:"blockNumber"` TransactionHash common.Hash `json:"transactionHash"` TransactionIndex uint64 `json:"transactionIndex"` BlockHash common.Hash `json:"blockHash"` LogIndex uint64 `json:"logIndex"` Removed bool `json:"removed"` // Parsed event information (for testing validation) EventSignature string `json:"eventSignature"` EventName string `json:"eventName"` Protocol string `json:"protocol"` ParsedArgs map[string]interface{} `json:"parsedArgs"` } // NewArbitrumSequencerSimulator creates a new sequencer simulator func NewArbitrumSequencerSimulator(logger *logger.Logger, client *ethclient.Client, config *SimulatorConfig) *ArbitrumSequencerSimulator { return &ArbitrumSequencerSimulator{ logger: logger, client: client, realBlocks: make(map[uint64]*SequencerBlock), subscribers: make([]chan *SequencerBlock, 0), replaySpeed: config.ReplaySpeed, startBlock: config.StartBlock, batchSize: config.BatchSize, startTime: time.Now(), } } // SimulatorConfig configures the sequencer simulator type SimulatorConfig struct { ReplaySpeed float64 // Replay speed multiplier (1.0 = real-time) StartBlock uint64 // Starting block number EndBlock uint64 // Ending block number (0 = continuous) BatchSize int // Number of blocks to process in batch EnableMetrics bool // Enable performance metrics DataSource string // "live" or "cached" or "fixture" FixturePath string // Path to fixture data if using fixtures } // LoadRealBlockData loads real Arbitrum block data for simulation func (sim *ArbitrumSequencerSimulator) LoadRealBlockData(startBlock, endBlock uint64) error { sim.logger.Info(fmt.Sprintf("Loading real Arbitrum block data from %d to %d", startBlock, endBlock)) // Process blocks in batches for memory efficiency batchSize := uint64(sim.batchSize) for blockNum := startBlock; blockNum <= endBlock; blockNum += batchSize { batchEnd := blockNum + batchSize - 1 if batchEnd > endBlock { batchEnd = endBlock } if err := sim.loadBlockBatch(blockNum, batchEnd); err != nil { return fmt.Errorf("failed to load block batch %d-%d: %w", blockNum, batchEnd, err) } sim.logger.Info(fmt.Sprintf("Loaded blocks %d-%d (%d total)", blockNum, batchEnd, batchEnd-startBlock+1)) } sim.logger.Info(fmt.Sprintf("Successfully loaded %d blocks of real Arbitrum data", endBlock-startBlock+1)) return nil } // loadBlockBatch loads a batch of blocks with detailed transaction and receipt data func (sim *ArbitrumSequencerSimulator) loadBlockBatch(startBlock, endBlock uint64) error { for blockNum := startBlock; blockNum <= endBlock; blockNum++ { // Get block with full transaction data block, err := sim.client.BlockByNumber(context.Background(), big.NewInt(int64(blockNum))) if err != nil { return fmt.Errorf("failed to get block %d: %w", blockNum, err) } sequencerBlock := &SequencerBlock{ Number: blockNum, Hash: block.Hash(), ParentHash: block.ParentHash(), Timestamp: block.Time(), SequencerTime: time.Unix(int64(block.Time()), 0), GasUsed: block.GasUsed(), GasLimit: block.GasLimit(), Size: block.Size(), L1BlockNumber: blockNum, // Simplified for testing BatchIndex: blockNum / 100, // Simplified batching Transactions: make([]*SequencerTransaction, 0), } // Process all transactions in the block for i, tx := range block.Transactions() { sequencerTx, err := sim.convertToSequencerTransaction(tx, blockNum, uint64(i)) if err != nil { sim.logger.Warn(fmt.Sprintf("Failed to convert transaction %s: %v", tx.Hash().Hex(), err)) continue } // Get transaction receipt for complete data receipt, err := sim.client.TransactionReceipt(context.Background(), tx.Hash()) if err != nil { sim.logger.Warn(fmt.Sprintf("Failed to get receipt for transaction %s: %v", tx.Hash().Hex(), err)) continue } sequencerTx.Receipt = sim.convertToSequencerReceipt(receipt) sequencerTx.Status = receipt.Status sequencerTx.GasUsed = receipt.GasUsed sequencerTx.EffectiveGasPrice = receipt.EffectiveGasPrice // Classify DEX and MEV transactions sim.classifyTransaction(sequencerTx) sequencerBlock.Transactions = append(sequencerBlock.Transactions, sequencerTx) } // Store the sequencer block sim.blocksMutex.Lock() sim.realBlocks[blockNum] = sequencerBlock sim.blocksMutex.Unlock() sim.logger.Debug(fmt.Sprintf("Loaded block %d with %d transactions (%d DEX, %d MEV)", blockNum, len(sequencerBlock.Transactions), sim.countDEXTransactions(sequencerBlock), sim.countMEVTransactions(sequencerBlock))) } return nil } // convertToSequencerTransaction converts a geth transaction to sequencer format func (sim *ArbitrumSequencerSimulator) convertToSequencerTransaction(tx *types.Transaction, blockNumber, txIndex uint64) (*SequencerTransaction, error) { sequencerTx := &SequencerTransaction{ Hash: tx.Hash(), BlockNumber: blockNumber, TransactionIndex: txIndex, Value: tx.Value(), Gas: tx.Gas(), GasPrice: tx.GasPrice(), Input: tx.Data(), Nonce: tx.Nonce(), Type: tx.Type(), L1BlockNumber: blockNumber, // Simplified SequencerOrderIndex: txIndex, BatchIndex: blockNumber / 100, L2BlockTimestamp: uint64(time.Now().Unix()), } // Handle different transaction types if tx.Type() == types.DynamicFeeTxType { sequencerTx.MaxFeePerGas = tx.GasFeeCap() sequencerTx.MaxPriorityFeePerGas = tx.GasTipCap() } // Extract from/to addresses signer := types.NewEIP155Signer(tx.ChainId()) from, err := signer.Sender(tx) if err != nil { return nil, fmt.Errorf("failed to extract sender: %w", err) } sequencerTx.From = from sequencerTx.To = tx.To() return sequencerTx, nil } // convertToSequencerReceipt converts a geth receipt to sequencer format func (sim *ArbitrumSequencerSimulator) convertToSequencerReceipt(receipt *types.Receipt) *SequencerReceipt { sequencerReceipt := &SequencerReceipt{ TransactionHash: receipt.TxHash, TransactionIndex: uint64(receipt.TransactionIndex), BlockHash: receipt.BlockHash, BlockNumber: receipt.BlockNumber.Uint64(), From: common.Address{}, // Receipt doesn't contain From field - would need to get from transaction To: &receipt.ContractAddress, // Use ContractAddress if available GasUsed: receipt.GasUsed, EffectiveGasPrice: receipt.EffectiveGasPrice, Status: receipt.Status, Logs: make([]*SequencerLog, 0), L1BlockNumber: receipt.BlockNumber.Uint64(), // Simplified } // Convert logs for _, log := range receipt.Logs { sequencerLog := &SequencerLog{ Address: log.Address, Topics: log.Topics, Data: log.Data, BlockNumber: log.BlockNumber, TransactionHash: log.TxHash, TransactionIndex: uint64(log.TxIndex), BlockHash: log.BlockHash, LogIndex: uint64(log.Index), Removed: log.Removed, } // Parse event signature and classify sim.parseEventLog(sequencerLog) sequencerReceipt.Logs = append(sequencerReceipt.Logs, sequencerLog) } return sequencerReceipt } // parseEventLog parses event logs to extract protocol information func (sim *ArbitrumSequencerSimulator) parseEventLog(log *SequencerLog) { if len(log.Topics) == 0 { return } // Known event signatures for major DEXs eventSignatures := map[common.Hash]EventInfo{ // Uniswap V3 Swap common.HexToHash("0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"): { Name: "Swap", Protocol: "UniswapV3", Signature: "Swap(indexed address,indexed address,int256,int256,uint160,uint128,int24)", }, // Uniswap V2 Swap common.HexToHash("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"): { Name: "Swap", Protocol: "UniswapV2", Signature: "Swap(indexed address,uint256,uint256,uint256,uint256,indexed address)", }, // Camelot Swap common.HexToHash("0xb3e2773606abfd36b5bd91394b3a54d1398336c65005baf7bf7a05efeffaf75b"): { Name: "Swap", Protocol: "Camelot", Signature: "Swap(indexed address,indexed address,int256,int256,uint160,uint128,int24)", }, // More protocols can be added here } if eventInfo, exists := eventSignatures[log.Topics[0]]; exists { log.EventSignature = eventInfo.Signature log.EventName = eventInfo.Name log.Protocol = eventInfo.Protocol log.ParsedArgs = make(map[string]interface{}) // Parse specific event arguments based on protocol sim.parseEventArguments(log, eventInfo) } } // EventInfo contains information about a known event type EventInfo struct { Name string Protocol string Signature string } // parseEventArguments parses event arguments based on the protocol func (sim *ArbitrumSequencerSimulator) parseEventArguments(log *SequencerLog, eventInfo EventInfo) { switch eventInfo.Protocol { case "UniswapV3": if eventInfo.Name == "Swap" && len(log.Topics) >= 3 && len(log.Data) >= 160 { // Parse Uniswap V3 swap event log.ParsedArgs["sender"] = common.BytesToAddress(log.Topics[1].Bytes()) log.ParsedArgs["recipient"] = common.BytesToAddress(log.Topics[2].Bytes()) log.ParsedArgs["amount0"] = new(big.Int).SetBytes(log.Data[0:32]) log.ParsedArgs["amount1"] = new(big.Int).SetBytes(log.Data[32:64]) log.ParsedArgs["sqrtPriceX96"] = new(big.Int).SetBytes(log.Data[64:96]) log.ParsedArgs["liquidity"] = new(big.Int).SetBytes(log.Data[96:128]) log.ParsedArgs["tick"] = new(big.Int).SetBytes(log.Data[128:160]) } case "UniswapV2": if eventInfo.Name == "Swap" && len(log.Topics) >= 3 && len(log.Data) >= 128 { // Parse Uniswap V2 swap event log.ParsedArgs["sender"] = common.BytesToAddress(log.Topics[1].Bytes()) log.ParsedArgs["to"] = common.BytesToAddress(log.Topics[2].Bytes()) log.ParsedArgs["amount0In"] = new(big.Int).SetBytes(log.Data[0:32]) log.ParsedArgs["amount1In"] = new(big.Int).SetBytes(log.Data[32:64]) log.ParsedArgs["amount0Out"] = new(big.Int).SetBytes(log.Data[64:96]) log.ParsedArgs["amount1Out"] = new(big.Int).SetBytes(log.Data[96:128]) } } } // classifyTransaction classifies transactions as DEX or MEV transactions func (sim *ArbitrumSequencerSimulator) classifyTransaction(tx *SequencerTransaction) { if tx.Receipt == nil { return } // Known DEX router addresses on Arbitrum dexRouters := map[common.Address]string{ common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"): "UniswapV3", common.HexToAddress("0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45"): "UniswapV3", common.HexToAddress("0x1b02dA8Cb0d097eB8D57A175b88c7D8b47997506"): "SushiSwap", common.HexToAddress("0xc873fEcbd354f5A56E00E710B90EF4201db2448d"): "Camelot", common.HexToAddress("0x60aE616a2155Ee3d9A68541Ba4544862310933d4"): "TraderJoe", } // Check if transaction is to a known DEX router if tx.To != nil { if protocol, isDEX := dexRouters[*tx.To]; isDEX { tx.IsDEXTransaction = true tx.DEXProtocol = protocol } } // Check for DEX interactions in logs for _, log := range tx.Receipt.Logs { if log.Protocol != "" { tx.IsDEXTransaction = true if tx.DEXProtocol == "" { tx.DEXProtocol = log.Protocol } } } // Classify MEV transactions if tx.IsDEXTransaction { tx.IsMEVTransaction, tx.MEVType = sim.classifyMEVTransaction(tx) } // Calculate swap value for DEX transactions if tx.IsDEXTransaction { tx.SwapValue = sim.calculateSwapValue(tx) } } // classifyMEVTransaction classifies MEV transaction types func (sim *ArbitrumSequencerSimulator) classifyMEVTransaction(tx *SequencerTransaction) (bool, string) { // Simple heuristics for MEV classification (can be enhanced) // High-value transactions are more likely to be MEV // Create big.Int for 100 ETH equivalent (1e20 wei) threshold := new(big.Int) threshold.SetString("100000000000000000000", 10) // 1e20 in decimal if tx.SwapValue != nil && tx.SwapValue.Cmp(threshold) > 0 { // > 100 ETH equivalent return true, "arbitrage" } // Check for sandwich attack patterns (simplified) if tx.GasPrice != nil && tx.GasPrice.Cmp(big.NewInt(1e11)) > 0 { // High gas price return true, "sandwich" } // Check for flash loan patterns in input data if len(tx.Input) > 100 { inputStr := string(tx.Input) if len(inputStr) > 1000 && (contains(inputStr, "flashloan") || contains(inputStr, "multicall")) { return true, "arbitrage" } } return false, "" } // calculateSwapValue estimates the USD value of a swap transaction func (sim *ArbitrumSequencerSimulator) calculateSwapValue(tx *SequencerTransaction) *big.Int { // Simplified calculation based on ETH value transferred if tx.Value != nil && tx.Value.Sign() > 0 { return tx.Value } // For complex swaps, estimate from gas used (very rough approximation) gasValue := new(big.Int).Mul(big.NewInt(int64(tx.GasUsed)), tx.EffectiveGasPrice) return new(big.Int).Mul(gasValue, big.NewInt(100)) // Estimate swap is 100x gas cost } // StartSimulation starts the sequencer simulation func (sim *ArbitrumSequencerSimulator) StartSimulation(ctx context.Context) error { sim.mutex.Lock() if sim.isRunning { sim.mutex.Unlock() return fmt.Errorf("simulation is already running") } sim.isRunning = true sim.currentBlock = sim.startBlock sim.mutex.Unlock() sim.logger.Info(fmt.Sprintf("Starting Arbitrum sequencer simulation from block %d at %fx speed", sim.startBlock, sim.replaySpeed)) go sim.runSimulation(ctx) return nil } // runSimulation runs the main simulation loop func (sim *ArbitrumSequencerSimulator) runSimulation(ctx context.Context) { defer func() { sim.mutex.Lock() sim.isRunning = false sim.mutex.Unlock() sim.logger.Info("Sequencer simulation stopped") }() // Calculate timing for block replay blockInterval := time.Duration(float64(12*time.Second) / sim.replaySpeed) // Arbitrum ~12s blocks ticker := time.NewTicker(blockInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: sim.processNextBlock() } } } // processNextBlock processes the next block in sequence func (sim *ArbitrumSequencerSimulator) processNextBlock() { sim.blocksMutex.RLock() block, exists := sim.realBlocks[sim.currentBlock] sim.blocksMutex.RUnlock() if !exists { sim.logger.Warn(fmt.Sprintf("Block %d not found in real data", sim.currentBlock)) sim.currentBlock++ return } // Send block to all subscribers sim.mutex.RLock() subscribers := make([]chan *SequencerBlock, len(sim.subscribers)) copy(subscribers, sim.subscribers) sim.mutex.RUnlock() for _, subscriber := range subscribers { select { case subscriber <- block: default: sim.logger.Warn("Subscriber channel full, dropping block") } } // Update metrics sim.blocksProcessed++ sim.txProcessed += uint64(len(block.Transactions)) sim.logger.Debug(fmt.Sprintf("Processed block %d with %d transactions (DEX: %d, MEV: %d)", block.Number, len(block.Transactions), sim.countDEXTransactions(block), sim.countMEVTransactions(block))) sim.currentBlock++ } // Subscribe adds a subscriber to receive sequencer blocks func (sim *ArbitrumSequencerSimulator) Subscribe() chan *SequencerBlock { sim.mutex.Lock() defer sim.mutex.Unlock() subscriber := make(chan *SequencerBlock, 100) // Buffered channel sim.subscribers = append(sim.subscribers, subscriber) return subscriber } // GetMetrics returns simulation performance metrics func (sim *ArbitrumSequencerSimulator) GetMetrics() *SimulatorMetrics { sim.mutex.RLock() defer sim.mutex.RUnlock() elapsed := time.Since(sim.startTime) var blocksPerSecond, txPerSecond float64 if elapsed.Seconds() > 0 { blocksPerSecond = float64(sim.blocksProcessed) / elapsed.Seconds() txPerSecond = float64(sim.txProcessed) / elapsed.Seconds() } return &SimulatorMetrics{ BlocksProcessed: sim.blocksProcessed, TxProcessed: sim.txProcessed, Elapsed: elapsed, BlocksPerSecond: blocksPerSecond, TxPerSecond: txPerSecond, CurrentBlock: sim.currentBlock, IsRunning: sim.isRunning, } } // SimulatorMetrics contains simulation performance metrics type SimulatorMetrics struct { BlocksProcessed uint64 `json:"blocksProcessed"` TxProcessed uint64 `json:"txProcessed"` Elapsed time.Duration `json:"elapsed"` BlocksPerSecond float64 `json:"blocksPerSecond"` TxPerSecond float64 `json:"txPerSecond"` CurrentBlock uint64 `json:"currentBlock"` IsRunning bool `json:"isRunning"` } // Helper functions func (sim *ArbitrumSequencerSimulator) countDEXTransactions(block *SequencerBlock) int { count := 0 for _, tx := range block.Transactions { if tx.IsDEXTransaction { count++ } } return count } func (sim *ArbitrumSequencerSimulator) countMEVTransactions(block *SequencerBlock) int { count := 0 for _, tx := range block.Transactions { if tx.IsMEVTransaction { count++ } } return count } func contains(s, substr string) bool { return len(s) > 0 && len(substr) > 0 && s != substr && len(s) >= len(substr) && s[:len(substr)] == substr } // Stop stops the sequencer simulation func (sim *ArbitrumSequencerSimulator) Stop() { sim.mutex.Lock() defer sim.mutex.Unlock() if !sim.isRunning { return } // Close all subscriber channels for _, subscriber := range sim.subscribers { close(subscriber) } sim.subscribers = nil sim.logger.Info("Sequencer simulation stopped") } // SaveBlockData saves loaded block data to a file for later use func (sim *ArbitrumSequencerSimulator) SaveBlockData(filename string) error { sim.blocksMutex.RLock() defer sim.blocksMutex.RUnlock() // Sort blocks by number for consistent output var blockNumbers []uint64 for blockNum := range sim.realBlocks { blockNumbers = append(blockNumbers, blockNum) } sort.Slice(blockNumbers, func(i, j int) bool { return blockNumbers[i] < blockNumbers[j] }) // Create sorted block data blockData := make([]*SequencerBlock, 0, len(sim.realBlocks)) for _, blockNum := range blockNumbers { blockData = append(blockData, sim.realBlocks[blockNum]) } // Save to JSON file data, err := json.MarshalIndent(blockData, "", " ") if err != nil { return fmt.Errorf("failed to marshal block data: %w", err) } return sim.writeFile(filename, data) } // writeFile is a helper function to write data to file func (sim *ArbitrumSequencerSimulator) writeFile(filename string, data []byte) error { // In a real implementation, this would write to a file // For this example, we'll just log the action sim.logger.Info(fmt.Sprintf("Would save %d bytes of block data to %s", len(data), filename)) return nil }