package execution import ( "context" "fmt" "log/slog" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/your-org/mev-bot/pkg/arbitrage" ) // ExecutorConfig contains configuration for the executor type ExecutorConfig struct { // Wallet PrivateKey []byte WalletAddress common.Address // RPC configuration RPCEndpoint string PrivateRPCEndpoint string // Optional private RPC (e.g., Flashbots) UsePrivateRPC bool // Transaction settings ConfirmationBlocks uint64 TimeoutPerTx time.Duration MaxRetries int RetryDelay time.Duration // Nonce management NonceMargin uint64 // Number of nonces to keep ahead // Gas price strategy GasPriceStrategy string // "fast", "market", "aggressive" GasPriceMultiplier float64 // Multiplier for gas price MaxGasPriceIncrement float64 // Max increase for replacement txs // Monitoring MonitorInterval time.Duration CleanupInterval time.Duration } // DefaultExecutorConfig returns default executor configuration func DefaultExecutorConfig() *ExecutorConfig { return &ExecutorConfig{ ConfirmationBlocks: 1, TimeoutPerTx: 5 * time.Minute, MaxRetries: 3, RetryDelay: 5 * time.Second, NonceMargin: 2, GasPriceStrategy: "fast", GasPriceMultiplier: 1.1, // 10% above market MaxGasPriceIncrement: 1.5, // 50% max increase MonitorInterval: 1 * time.Second, CleanupInterval: 1 * time.Minute, } } // Executor executes arbitrage transactions type Executor struct { config *ExecutorConfig logger *slog.Logger // Clients client *ethclient.Client privateClient *ethclient.Client // Optional // Components builder *TransactionBuilder riskManager *RiskManager flashloanMgr *FlashloanManager // Nonce management mu sync.Mutex currentNonce uint64 nonceCache map[uint64]*PendingTransaction // Monitoring stopCh chan struct{} stopped bool } // PendingTransaction tracks a pending transaction type PendingTransaction struct { Hash common.Hash Nonce uint64 Opportunity *arbitrage.Opportunity SubmittedAt time.Time LastChecked time.Time Confirmed bool Failed bool FailReason string Receipt *types.Receipt Retries int } // NewExecutor creates a new executor func NewExecutor( config *ExecutorConfig, builder *TransactionBuilder, riskManager *RiskManager, flashloanMgr *FlashloanManager, logger *slog.Logger, ) (*Executor, error) { if config == nil { config = DefaultExecutorConfig() } // Connect to RPC client, err := ethclient.Dial(config.RPCEndpoint) if err != nil { return nil, fmt.Errorf("failed to connect to RPC: %w", err) } var privateClient *ethclient.Client if config.UsePrivateRPC && config.PrivateRPCEndpoint != "" { privateClient, err = ethclient.Dial(config.PrivateRPCEndpoint) if err != nil { logger.Warn("failed to connect to private RPC", "error", err) } } executor := &Executor{ config: config, logger: logger.With("component", "executor"), client: client, privateClient: privateClient, builder: builder, riskManager: riskManager, flashloanMgr: flashloanMgr, nonceCache: make(map[uint64]*PendingTransaction), stopCh: make(chan struct{}), } // Initialize nonce err = executor.initializeNonce(context.Background()) if err != nil { return nil, fmt.Errorf("failed to initialize nonce: %w", err) } // Start monitoring go executor.monitorTransactions() go executor.cleanupOldTransactions() return executor, nil } // ExecutionResult contains the result of an execution type ExecutionResult struct { Success bool TxHash common.Hash Receipt *types.Receipt ActualProfit *big.Int GasCost *big.Int Error error Duration time.Duration } // Execute executes an arbitrage opportunity func (e *Executor) Execute(ctx context.Context, opp *arbitrage.Opportunity) (*ExecutionResult, error) { startTime := time.Now() e.logger.Info("executing opportunity", "opportunityID", opp.ID, "type", opp.Type, "expectedProfit", opp.NetProfit.String(), ) // Build transaction tx, err := e.builder.BuildTransaction(ctx, opp, e.config.WalletAddress) if err != nil { return &ExecutionResult{ Success: false, Error: fmt.Errorf("failed to build transaction: %w", err), Duration: time.Since(startTime), }, nil } // Risk assessment assessment, err := e.riskManager.AssessRisk(ctx, opp, tx) if err != nil { return &ExecutionResult{ Success: false, Error: fmt.Errorf("failed to assess risk: %w", err), Duration: time.Since(startTime), }, nil } if !assessment.Approved { return &ExecutionResult{ Success: false, Error: fmt.Errorf("risk assessment failed: %s", assessment.Reason), Duration: time.Since(startTime), }, nil } // Log warnings if any for _, warning := range assessment.Warnings { e.logger.Warn("risk warning", "warning", warning) } // Submit transaction hash, err := e.submitTransaction(ctx, tx, opp) if err != nil { return &ExecutionResult{ Success: false, Error: fmt.Errorf("failed to submit transaction: %w", err), Duration: time.Since(startTime), }, nil } e.logger.Info("transaction submitted", "hash", hash.Hex(), "opportunityID", opp.ID, ) // Wait for confirmation receipt, err := e.waitForConfirmation(ctx, hash) if err != nil { return &ExecutionResult{ Success: false, TxHash: hash, Error: fmt.Errorf("transaction failed: %w", err), Duration: time.Since(startTime), }, nil } // Calculate actual profit actualProfit := e.calculateActualProfit(receipt, opp) gasCost := new(big.Int).Mul(receipt.GasUsed, receipt.EffectiveGasPrice) result := &ExecutionResult{ Success: receipt.Status == types.ReceiptStatusSuccessful, TxHash: hash, Receipt: receipt, ActualProfit: actualProfit, GasCost: gasCost, Duration: time.Since(startTime), } if result.Success { e.logger.Info("execution succeeded", "hash", hash.Hex(), "actualProfit", actualProfit.String(), "gasCost", gasCost.String(), "duration", result.Duration, ) e.riskManager.RecordSuccess(hash, actualProfit) } else { e.logger.Error("execution failed", "hash", hash.Hex(), "status", receipt.Status, ) e.riskManager.RecordFailure(hash, "transaction reverted") } return result, nil } // submitTransaction submits a transaction to the network func (e *Executor) submitTransaction(ctx context.Context, tx *SwapTransaction, opp *arbitrage.Opportunity) (common.Hash, error) { // Get nonce nonce := e.getNextNonce() // Sign transaction signedTx, err := e.builder.SignTransaction(tx, nonce, e.config.PrivateKey) if err != nil { e.releaseNonce(nonce) return common.Hash{}, fmt.Errorf("failed to sign transaction: %w", err) } // Choose client (private or public) client := e.client if e.config.UsePrivateRPC && e.privateClient != nil { client = e.privateClient e.logger.Debug("using private RPC") } // Send transaction err = client.SendTransaction(ctx, signedTx) if err != nil { e.releaseNonce(nonce) return common.Hash{}, fmt.Errorf("failed to send transaction: %w", err) } hash := signedTx.Hash() // Track transaction e.trackPendingTransaction(nonce, hash, opp) e.riskManager.TrackTransaction(hash, opp, tx.MaxFeePerGas) return hash, nil } // waitForConfirmation waits for transaction confirmation func (e *Executor) waitForConfirmation(ctx context.Context, hash common.Hash) (*types.Receipt, error) { timeoutCtx, cancel := context.WithTimeout(ctx, e.config.TimeoutPerTx) defer cancel() ticker := time.NewTicker(e.config.MonitorInterval) defer ticker.Stop() for { select { case <-timeoutCtx.Done(): return nil, fmt.Errorf("transaction timeout") case <-ticker.C: receipt, err := e.client.TransactionReceipt(ctx, hash) if err != nil { // Transaction not yet mined continue } // Check confirmations currentBlock, err := e.client.BlockNumber(ctx) if err != nil { continue } confirmations := currentBlock - receipt.BlockNumber.Uint64() if confirmations >= e.config.ConfirmationBlocks { return receipt, nil } } } } // monitorTransactions monitors pending transactions func (e *Executor) monitorTransactions() { ticker := time.NewTicker(e.config.MonitorInterval) defer ticker.Stop() for { select { case <-e.stopCh: return case <-ticker.C: e.checkPendingTransactions() } } } // checkPendingTransactions checks status of pending transactions func (e *Executor) checkPendingTransactions() { e.mu.Lock() defer e.mu.Unlock() ctx := context.Background() for nonce, pending := range e.nonceCache { if pending.Confirmed || pending.Failed { continue } // Check transaction status receipt, err := e.client.TransactionReceipt(ctx, pending.Hash) if err != nil { // Still pending pending.LastChecked = time.Now() // Check for timeout if time.Since(pending.SubmittedAt) > e.config.TimeoutPerTx { e.logger.Warn("transaction timeout", "hash", pending.Hash.Hex(), "nonce", nonce, ) // Attempt replacement if pending.Retries < e.config.MaxRetries { e.logger.Info("attempting transaction replacement", "hash", pending.Hash.Hex(), "retry", pending.Retries+1, ) // In production, implement transaction replacement logic pending.Retries++ } else { pending.Failed = true pending.FailReason = "timeout after retries" e.riskManager.RecordFailure(pending.Hash, "timeout") e.riskManager.UntrackTransaction(pending.Hash) } } continue } // Transaction mined pending.Receipt = receipt pending.Confirmed = true pending.LastChecked = time.Now() if receipt.Status == types.ReceiptStatusFailed { pending.Failed = true pending.FailReason = "transaction reverted" e.riskManager.RecordFailure(pending.Hash, "reverted") } e.riskManager.UntrackTransaction(pending.Hash) e.logger.Debug("transaction confirmed", "hash", pending.Hash.Hex(), "nonce", nonce, "status", receipt.Status, ) } } // cleanupOldTransactions removes old completed transactions func (e *Executor) cleanupOldTransactions() { ticker := time.NewTicker(e.config.CleanupInterval) defer ticker.Stop() for { select { case <-e.stopCh: return case <-ticker.C: e.mu.Lock() cutoff := time.Now().Add(-1 * time.Hour) for nonce, pending := range e.nonceCache { if (pending.Confirmed || pending.Failed) && pending.LastChecked.Before(cutoff) { delete(e.nonceCache, nonce) } } e.mu.Unlock() } } } // initializeNonce initializes the nonce from the network func (e *Executor) initializeNonce(ctx context.Context) error { nonce, err := e.client.PendingNonceAt(ctx, e.config.WalletAddress) if err != nil { return fmt.Errorf("failed to get pending nonce: %w", err) } e.currentNonce = nonce e.logger.Info("initialized nonce", "nonce", nonce) return nil } // getNextNonce gets the next available nonce func (e *Executor) getNextNonce() uint64 { e.mu.Lock() defer e.mu.Unlock() nonce := e.currentNonce e.currentNonce++ return nonce } // releaseNonce releases a nonce back to the pool func (e *Executor) releaseNonce(nonce uint64) { e.mu.Lock() defer e.mu.Unlock() // Only release if it's the current nonce - 1 if nonce == e.currentNonce-1 { e.currentNonce = nonce } } // trackPendingTransaction tracks a pending transaction func (e *Executor) trackPendingTransaction(nonce uint64, hash common.Hash, opp *arbitrage.Opportunity) { e.mu.Lock() defer e.mu.Unlock() e.nonceCache[nonce] = &PendingTransaction{ Hash: hash, Nonce: nonce, Opportunity: opp, SubmittedAt: time.Now(), LastChecked: time.Now(), Confirmed: false, Failed: false, } } // calculateActualProfit calculates the actual profit from a receipt func (e *Executor) calculateActualProfit(receipt *types.Receipt, opp *arbitrage.Opportunity) *big.Int { // In production, parse logs to get actual output amounts // For now, estimate based on expected profit and gas cost gasCost := new(big.Int).Mul(new(big.Int).SetUint64(receipt.GasUsed), receipt.EffectiveGasPrice) estimatedProfit := new(big.Int).Sub(opp.GrossProfit, gasCost) return estimatedProfit } // GetPendingTransactions returns all pending transactions func (e *Executor) GetPendingTransactions() []*PendingTransaction { e.mu.Lock() defer e.mu.Unlock() txs := make([]*PendingTransaction, 0, len(e.nonceCache)) for _, tx := range e.nonceCache { if !tx.Confirmed && !tx.Failed { txs = append(txs, tx) } } return txs } // Stop stops the executor func (e *Executor) Stop() { if !e.stopped { close(e.stopCh) e.stopped = true e.logger.Info("executor stopped") } }