diff --git a/pkg/sequencer/decoder.go b/pkg/sequencer/decoder.go index 15d2203..d7ac881 100644 --- a/pkg/sequencer/decoder.go +++ b/pkg/sequencer/decoder.go @@ -43,6 +43,7 @@ type DecodedTransaction struct { Nonce uint64 GasPrice *big.Int GasLimit uint64 + RawBytes []byte // RLP-encoded transaction bytes for reconstruction } // DecodeArbitrumMessage decodes an Arbitrum sequencer feed message @@ -145,11 +146,27 @@ func DecodeL2Transaction(l2MsgBase64 string) (*DecodedTransaction, error) { Nonce: tx.Nonce(), GasPrice: tx.GasPrice(), GasLimit: tx.Gas(), + RawBytes: txBytes, // Store for later reconstruction } return result, nil } +// ToEthereumTransaction converts a DecodedTransaction back to *types.Transaction +// This is a reusable utility for converting our decoded format to go-ethereum format +func (dt *DecodedTransaction) ToEthereumTransaction() (*types.Transaction, error) { + if len(dt.RawBytes) == 0 { + return nil, fmt.Errorf("no raw transaction bytes available") + } + + tx := new(types.Transaction) + if err := rlp.DecodeBytes(dt.RawBytes, tx); err != nil { + return nil, fmt.Errorf("failed to decode transaction: %w", err) + } + + return tx, nil +} + // IsSwapTransaction checks if the transaction data is a DEX swap func IsSwapTransaction(data []byte) bool { if len(data) < 4 { diff --git a/pkg/sequencer/reader.go b/pkg/sequencer/reader.go index 4881cec..f283d91 100644 --- a/pkg/sequencer/reader.go +++ b/pkg/sequencer/reader.go @@ -9,7 +9,6 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" @@ -79,9 +78,9 @@ type Reader struct { rpcClient *ethclient.Client // Channels - txHashes chan string - stopCh chan struct{} - wg sync.WaitGroup + swapEvents chan *SwapEvent // Changed from txHashes to pass full swap events + stopCh chan struct{} + wg sync.WaitGroup // State (protected by RWMutex) mu sync.RWMutex @@ -140,7 +139,7 @@ func NewReader( executor: executor, swapFilter: swapFilter, rpcClient: rpcClient, - txHashes: make(chan string, config.BufferSize), + swapEvents: make(chan *SwapEvent, config.BufferSize), stopCh: make(chan struct{}), }, nil } @@ -174,9 +173,9 @@ func (r *Reader) Start(ctx context.Context) error { "block", swap.BlockNumber, ) - // Send to existing arbitrage detection pipeline + // Send full swap event to arbitrage detection pipeline select { - case r.txHashes <- swap.TxHash: + case r.swapEvents <- swap: // Successfully queued for arbitrage detection default: r.logger.Warn("arbitrage queue full", "tx", swap.TxHash) @@ -337,30 +336,28 @@ func (r *Reader) worker(ctx context.Context, id int) { return case <-r.stopCh: return - case txHash := <-r.txHashes: - if err := r.processTxHash(ctx, txHash); err != nil { - logger.Debug("processing error", "tx", txHash, "error", err) + case swapEvent := <-r.swapEvents: + if err := r.processSwapEvent(ctx, swapEvent); err != nil { + logger.Debug("processing error", "tx", swapEvent.TxHash, "error", err) } } } } -// processTxHash processes a transaction hash -func (r *Reader) processTxHash(ctx context.Context, txHash string) error { +// processSwapEvent processes a swap event with transaction data already decoded +func (r *Reader) processSwapEvent(ctx context.Context, swapEvent *SwapEvent) error { startTime := time.Now() // Enforce max processing time procCtx, cancel := context.WithTimeout(ctx, r.config.MaxProcessingTime) defer cancel() - // Fetch full transaction - tx, isPending, err := r.rpcClient.TransactionByHash(procCtx, common.HexToHash(txHash)) + // Convert decoded transaction to *types.Transaction + // This uses the transaction data we already received from the sequencer feed + // NO BLOCKING RPC CALL - transaction is already decoded! + tx, err := swapEvent.Transaction.ToEthereumTransaction() if err != nil { - return fmt.Errorf("fetch tx failed: %w", err) - } - - if !isPending { - return nil // Skip already mined transactions + return fmt.Errorf("convert tx failed: %w", err) } parseStart := time.Now()