package arbitrum import ( "context" "fmt" "math/big" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" "github.com/fraktal/mev-beta/internal/logger" pkgerrors "github.com/fraktal/mev-beta/pkg/errors" ) // ArbitrumClient extends the standard Ethereum client with Arbitrum-specific functionality type ArbitrumClient struct { *ethclient.Client rpcClient *rpc.Client Logger *logger.Logger ChainID *big.Int } // NewArbitrumClient creates a new Arbitrum-specific client func NewArbitrumClient(endpoint string, logger *logger.Logger) (*ArbitrumClient, error) { rpcClient, err := rpc.Dial(endpoint) if err != nil { return nil, fmt.Errorf("failed to connect to Arbitrum RPC: %v", err) } ethClient := ethclient.NewClient(rpcClient) // Get chain ID to verify we're connected to Arbitrum chainID, err := ethClient.ChainID(context.Background()) if err != nil { return nil, fmt.Errorf("failed to get chain ID: %v", err) } // Verify this is Arbitrum (42161 for mainnet, 421613 for testnet) if chainID.Uint64() != 42161 && chainID.Uint64() != 421613 { logger.Warn(fmt.Sprintf("Chain ID %d might not be Arbitrum mainnet (42161) or testnet (421613)", chainID.Uint64())) } return &ArbitrumClient{ Client: ethClient, rpcClient: rpcClient, Logger: logger, ChainID: chainID, }, nil } // SubscribeToL2Messages subscribes to L2 message events func (c *ArbitrumClient) SubscribeToL2Messages(ctx context.Context, ch chan<- *L2Message) (ethereum.Subscription, error) { // Validate inputs if ctx == nil { return nil, fmt.Errorf("context is nil") } if ch == nil { return nil, fmt.Errorf("channel is nil") } // Subscribe to new heads to get L2 blocks headers := make(chan *types.Header) sub, err := c.SubscribeNewHead(ctx, headers) if err != nil { return nil, fmt.Errorf("failed to subscribe to new heads: %v", err) } // Process headers and extract L2 messages go func() { defer func() { // Recover from potential panic when closing channel if r := recover(); r != nil { c.Logger.Error(fmt.Sprintf("Panic while closing L2 message channel: %v", r)) } // Safely close the channel defer func() { if r := recover(); r != nil { c.Logger.Debug("L2 message channel already closed") } }() select { case <-ctx.Done(): // Context cancelled, don't close channel as it might be used elsewhere default: close(ch) } }() for { select { case header := <-headers: if header != nil { if err := c.processBlockForL2Messages(ctx, header, ch); err != nil { c.Logger.Error(fmt.Sprintf("Error processing block %d for L2 messages: %v", header.Number.Uint64(), err)) } } case <-ctx.Done(): return } } }() return sub, nil } // processBlockForL2Messages processes a block to extract L2 messages func (c *ArbitrumClient) processBlockForL2Messages(ctx context.Context, header *types.Header, ch chan<- *L2Message) error { // Validate inputs if ctx == nil { return fmt.Errorf("context is nil") } if header == nil { return fmt.Errorf("header is nil") } if ch == nil { return fmt.Errorf("channel is nil") } // For Arbitrum, we create L2 messages from the block data itself // This represents the block as an L2 message containing potential transactions l2Message := &L2Message{ Type: L2Transaction, // Treat each block as containing transaction data MessageNumber: header.Number, Data: c.encodeBlockAsL2Message(header), Timestamp: header.Time, BlockNumber: header.Number.Uint64(), BlockHash: header.Hash(), } // Try to get block transactions for more detailed analysis block, err := c.BlockByHash(ctx, header.Hash()) if err != nil { c.Logger.Debug(fmt.Sprintf("Could not fetch full block %d, using header only: %v", header.Number.Uint64(), err)) } else if block != nil { // Add transaction count and basic stats to the message l2Message.TxCount = len(block.Transactions()) // For each transaction in the block, we could create separate L2 messages // but to avoid overwhelming the system, we'll process them in batches if len(block.Transactions()) > 0 { // Create a summary message with transaction data l2Message.Data = c.encodeTransactionsAsL2Message(block.Transactions()) } } select { case ch <- l2Message: case <-ctx.Done(): return pkgerrors.WrapContextError(ctx.Err(), "processBlockForL2Messages.send", map[string]interface{}{ "blockNumber": header.Number.Uint64(), "blockHash": header.Hash().Hex(), "txCount": l2Message.TxCount, "timestamp": header.Time, }) } return nil } // encodeBlockAsL2Message creates a simple L2 message encoding from a block header func (c *ArbitrumClient) encodeBlockAsL2Message(header *types.Header) []byte { // Create a simple encoding with block number and timestamp data := make([]byte, 16) // 8 bytes for block number + 8 bytes for timestamp // Encode block number (8 bytes) blockNum := header.Number.Uint64() for i := 0; i < 8; i++ { data[i] = byte(blockNum >> (8 * (7 - i))) } // Encode timestamp (8 bytes) timestamp := header.Time for i := 0; i < 8; i++ { data[8+i] = byte(timestamp >> (8 * (7 - i))) } return data } // encodeTransactionsAsL2Message creates an encoding from transaction list func (c *ArbitrumClient) encodeTransactionsAsL2Message(transactions []*types.Transaction) []byte { if len(transactions) == 0 { return []byte{} } // Create a simple encoding with transaction count and first few transaction hashes data := make([]byte, 4) // Start with 4 bytes for transaction count // Encode transaction count txCount := uint32(len(transactions)) data[0] = byte(txCount >> 24) data[1] = byte(txCount >> 16) data[2] = byte(txCount >> 8) data[3] = byte(txCount) // Add up to first 3 transaction hashes (32 bytes each) maxTxHashes := 3 if len(transactions) < maxTxHashes { maxTxHashes = len(transactions) } for i := 0; i < maxTxHashes; i++ { if transactions[i] != nil { txHash := transactions[i].Hash() data = append(data, txHash.Bytes()...) } } return data } // extractL2MessageFromTransaction extracts L2 message data from a transaction func (c *ArbitrumClient) extractL2MessageFromTransaction(tx *types.Transaction, timestamp uint64) *L2Message { // Check if this transaction contains L2 message data if len(tx.Data()) < 4 { return nil } // Create L2 message l2Message := &L2Message{ Type: L2Transaction, Sender: common.Address{}, // Would need signature recovery Data: tx.Data(), Timestamp: timestamp, TxHash: tx.Hash(), GasUsed: tx.Gas(), GasPrice: tx.GasPrice(), ParsedTx: tx, } // Check if this is a DEX interaction for more detailed processing if tx.To() != nil { // We'll add more detailed DEX detection here // For now, we mark all transactions as potential DEX interactions // The parser will filter out non-DEX transactions } return l2Message } // GetL2TransactionReceipt gets the receipt for an L2 transaction with additional data func (c *ArbitrumClient) GetL2TransactionReceipt(ctx context.Context, txHash common.Hash) (*L2TransactionReceipt, error) { receipt, err := c.TransactionReceipt(ctx, txHash) if err != nil { return nil, err } l2Receipt := &L2TransactionReceipt{ Receipt: receipt, L2BlockNumber: receipt.BlockNumber.Uint64(), L2TxIndex: uint64(receipt.TransactionIndex), } // Extract additional L2-specific data if err := c.enrichL2Receipt(ctx, l2Receipt); err != nil { c.Logger.Warn(fmt.Sprintf("Failed to enrich L2 receipt: %v", err)) } return l2Receipt, nil } // enrichL2Receipt adds L2-specific data to the receipt using real Arbitrum RPC methods func (c *ArbitrumClient) enrichL2Receipt(ctx context.Context, receipt *L2TransactionReceipt) error { // Use Arbitrum-specific RPC methods to get L1 batch information if err := c.addL1BatchInfo(ctx, receipt); err != nil { c.Logger.Debug(fmt.Sprintf("Failed to add L1 batch info: %v", err)) } // Add gas usage breakdown for Arbitrum if err := c.addGasBreakdown(ctx, receipt); err != nil { c.Logger.Debug(fmt.Sprintf("Failed to add gas breakdown: %v", err)) } // Check for retryable tickets in logs for _, log := range receipt.Logs { if c.isRetryableTicketLog(log) { ticket, err := c.parseRetryableTicket(log) if err == nil { receipt.RetryableTicket = ticket } } } return nil } // isRetryableTicketLog checks if a log represents a retryable ticket func (c *ArbitrumClient) isRetryableTicketLog(log *types.Log) bool { // Retryable ticket creation signature retryableTicketSig := common.HexToHash("0xb4df3847300f076a369cd76d2314b470a1194d9e8a6bb97f1860aee88a5f6748") return len(log.Topics) > 0 && log.Topics[0] == retryableTicketSig } // parseRetryableTicket parses retryable ticket data from a log func (c *ArbitrumClient) parseRetryableTicket(log *types.Log) (*RetryableTicket, error) { if len(log.Topics) < 3 { return nil, fmt.Errorf("insufficient topics for retryable ticket") } ticket := &RetryableTicket{ TicketID: log.Topics[1], From: common.BytesToAddress(log.Topics[2].Bytes()), } // Parse data field for additional parameters if len(log.Data) >= 96 { ticket.Value = new(big.Int).SetBytes(log.Data[:32]) ticket.MaxGas = new(big.Int).SetBytes(log.Data[32:64]).Uint64() ticket.GasPriceBid = new(big.Int).SetBytes(log.Data[64:96]) } return ticket, nil } // GetL2MessageByNumber gets an L2 message by its number func (c *ArbitrumClient) GetL2MessageByNumber(ctx context.Context, messageNumber *big.Int) (*L2Message, error) { // This would use Arbitrum-specific RPC methods var result map[string]interface{} err := c.rpcClient.CallContext(ctx, &result, "arb_getL2ToL1Msg", messageNumber) if err != nil { return nil, fmt.Errorf("failed to get L2 message: %v", err) } // Parse the result into L2Message l2Message := &L2Message{ MessageNumber: messageNumber, Type: L2Unknown, } // Extract data from result map if data, ok := result["data"].(string); ok { l2Message.Data = common.FromHex(data) } if timestamp, ok := result["timestamp"].(string); ok { ts := new(big.Int) if _, success := ts.SetString(timestamp, 0); success { l2Message.Timestamp = ts.Uint64() } } return l2Message, nil } // GetBatchByNumber gets a batch by its number func (c *ArbitrumClient) GetBatchByNumber(ctx context.Context, batchNumber *big.Int) (*BatchInfo, error) { var result map[string]interface{} err := c.rpcClient.CallContext(ctx, &result, "arb_getBatch", batchNumber) if err != nil { return nil, fmt.Errorf("failed to get batch: %v", err) } batch := &BatchInfo{ BatchNumber: batchNumber, } if batchRoot, ok := result["batchRoot"].(string); ok { batch.BatchRoot = common.HexToHash(batchRoot) } if txCount, ok := result["txCount"].(string); ok { count := new(big.Int) if _, success := count.SetString(txCount, 0); success { batch.TxCount = count.Uint64() } } return batch, nil } // SubscribeToNewBatches subscribes to new batch submissions func (c *ArbitrumClient) SubscribeToNewBatches(ctx context.Context, ch chan<- *BatchInfo) (ethereum.Subscription, error) { // Create filter for batch submission events query := ethereum.FilterQuery{ Addresses: []common.Address{ common.HexToAddress("0x1c479675ad559DC151F6Ec7ed3FbF8ceE79582B6"), // Sequencer Inbox }, Topics: [][]common.Hash{ {common.HexToHash("0x8ca1a4adb985e8dd52c4b83e8e5ffa4ad1f6fca85ad893f4f9e5b45a5c1e5e9e")}, // SequencerBatchDelivered }, } logs := make(chan types.Log) sub, err := c.SubscribeFilterLogs(ctx, query, logs) if err != nil { return nil, fmt.Errorf("failed to subscribe to batch logs: %v", err) } // Process logs and extract batch info go func() { defer close(ch) for { select { case log := <-logs: if batch := c.parseBatchFromLog(log); batch != nil { select { case ch <- batch: case <-ctx.Done(): return } } case <-ctx.Done(): return } } }() return sub, nil } // parseBatchFromLog parses batch information from a log event func (c *ArbitrumClient) parseBatchFromLog(log types.Log) *BatchInfo { if len(log.Topics) < 2 { return nil } batchNumber := new(big.Int).SetBytes(log.Topics[1].Bytes()) batch := &BatchInfo{ BatchNumber: batchNumber, L1SubmissionTx: log.TxHash, } if len(log.Data) >= 64 { batch.BatchRoot = common.BytesToHash(log.Data[:32]) batch.TxCount = new(big.Int).SetBytes(log.Data[32:64]).Uint64() } return batch } // addL1BatchInfo adds L1 batch information to the receipt func (c *ArbitrumClient) addL1BatchInfo(ctx context.Context, receipt *L2TransactionReceipt) error { // Call Arbitrum-specific RPC method to get L1 batch info var batchInfo struct { BatchNumber uint64 `json:"batchNumber"` L1BlockNum uint64 `json:"l1BlockNum"` } err := c.rpcClient.CallContext(ctx, &batchInfo, "arb_getL1BatchInfo", receipt.TxHash) if err != nil { return fmt.Errorf("failed to get L1 batch info: %w", err) } receipt.L1BatchNumber = batchInfo.BatchNumber receipt.L1BlockNumber = batchInfo.L1BlockNum return nil } // addGasBreakdown adds detailed gas usage information func (c *ArbitrumClient) addGasBreakdown(ctx context.Context, receipt *L2TransactionReceipt) error { // Call Arbitrum-specific RPC method to get gas breakdown var gasBreakdown struct { L1Gas uint64 `json:"l1Gas"` L2Gas uint64 `json:"l2Gas"` L1Fee uint64 `json:"l1Fee"` L2Fee uint64 `json:"l2Fee"` } err := c.rpcClient.CallContext(ctx, &gasBreakdown, "arb_getGasBreakdown", receipt.TxHash) if err != nil { return fmt.Errorf("failed to get gas breakdown: %w", err) } receipt.L1GasUsed = gasBreakdown.L1Gas receipt.L2GasUsed = gasBreakdown.L2Gas return nil } // Close closes the Arbitrum client func (c *ArbitrumClient) Close() { c.Client.Close() c.rpcClient.Close() }