Compare commits

...

1 Commits

Author SHA1 Message Date
Administrator
691d5ba67d refactor: remove blocking RPC call from hot path
CRITICAL FIX: Eliminated blocking RPC call in reader.go that was fetching
transaction data we already had from the sequencer feed.

Changes for consistency and reusability:
1. Added RawBytes field to DecodedTransaction to store RLP-encoded transaction
2. Created reusable ToEthereumTransaction() method for type conversion
3. Changed channel from 'chan string' (txHashes) to 'chan *SwapEvent' (swapEvents)
4. Updated processSwapEvent to use transaction from swap event instead of RPC

Impact:
- REMOVES blocking RPC call from hot path (pkg/sequencer/reader.go:357)
- Eliminates network latency from transaction processing pipeline
- Uses data already available from Arbitrum sequencer feed
- Improves throughput and reduces RPC dependency

This fixes the #1 CRITICAL blocker for production deployment identified in
PRODUCTION_READINESS.md.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-11 07:58:48 +01:00
2 changed files with 33 additions and 19 deletions

View File

@@ -43,6 +43,7 @@ type DecodedTransaction struct {
Nonce uint64
GasPrice *big.Int
GasLimit uint64
RawBytes []byte // RLP-encoded transaction bytes for reconstruction
}
// DecodeArbitrumMessage decodes an Arbitrum sequencer feed message
@@ -145,11 +146,27 @@ func DecodeL2Transaction(l2MsgBase64 string) (*DecodedTransaction, error) {
Nonce: tx.Nonce(),
GasPrice: tx.GasPrice(),
GasLimit: tx.Gas(),
RawBytes: txBytes, // Store for later reconstruction
}
return result, nil
}
// ToEthereumTransaction converts a DecodedTransaction back to *types.Transaction
// This is a reusable utility for converting our decoded format to go-ethereum format
func (dt *DecodedTransaction) ToEthereumTransaction() (*types.Transaction, error) {
if len(dt.RawBytes) == 0 {
return nil, fmt.Errorf("no raw transaction bytes available")
}
tx := new(types.Transaction)
if err := rlp.DecodeBytes(dt.RawBytes, tx); err != nil {
return nil, fmt.Errorf("failed to decode transaction: %w", err)
}
return tx, nil
}
// IsSwapTransaction checks if the transaction data is a DEX swap
func IsSwapTransaction(data []byte) bool {
if len(data) < 4 {

View File

@@ -9,7 +9,6 @@ import (
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
@@ -79,9 +78,9 @@ type Reader struct {
rpcClient *ethclient.Client
// Channels
txHashes chan string
stopCh chan struct{}
wg sync.WaitGroup
swapEvents chan *SwapEvent // Changed from txHashes to pass full swap events
stopCh chan struct{}
wg sync.WaitGroup
// State (protected by RWMutex)
mu sync.RWMutex
@@ -140,7 +139,7 @@ func NewReader(
executor: executor,
swapFilter: swapFilter,
rpcClient: rpcClient,
txHashes: make(chan string, config.BufferSize),
swapEvents: make(chan *SwapEvent, config.BufferSize),
stopCh: make(chan struct{}),
}, nil
}
@@ -174,9 +173,9 @@ func (r *Reader) Start(ctx context.Context) error {
"block", swap.BlockNumber,
)
// Send to existing arbitrage detection pipeline
// Send full swap event to arbitrage detection pipeline
select {
case r.txHashes <- swap.TxHash:
case r.swapEvents <- swap:
// Successfully queued for arbitrage detection
default:
r.logger.Warn("arbitrage queue full", "tx", swap.TxHash)
@@ -337,30 +336,28 @@ func (r *Reader) worker(ctx context.Context, id int) {
return
case <-r.stopCh:
return
case txHash := <-r.txHashes:
if err := r.processTxHash(ctx, txHash); err != nil {
logger.Debug("processing error", "tx", txHash, "error", err)
case swapEvent := <-r.swapEvents:
if err := r.processSwapEvent(ctx, swapEvent); err != nil {
logger.Debug("processing error", "tx", swapEvent.TxHash, "error", err)
}
}
}
}
// processTxHash processes a transaction hash
func (r *Reader) processTxHash(ctx context.Context, txHash string) error {
// processSwapEvent processes a swap event with transaction data already decoded
func (r *Reader) processSwapEvent(ctx context.Context, swapEvent *SwapEvent) error {
startTime := time.Now()
// Enforce max processing time
procCtx, cancel := context.WithTimeout(ctx, r.config.MaxProcessingTime)
defer cancel()
// Fetch full transaction
tx, isPending, err := r.rpcClient.TransactionByHash(procCtx, common.HexToHash(txHash))
// Convert decoded transaction to *types.Transaction
// This uses the transaction data we already received from the sequencer feed
// NO BLOCKING RPC CALL - transaction is already decoded!
tx, err := swapEvent.Transaction.ToEthereumTransaction()
if err != nil {
return fmt.Errorf("fetch tx failed: %w", err)
}
if !isPending {
return nil // Skip already mined transactions
return fmt.Errorf("convert tx failed: %w", err)
}
parseStart := time.Now()