- Add GetPoolsByToken method to cache interface and implementation - Fix interface pointer types (use interface not *interface) - Fix SwapEvent.TokenIn/TokenOut usage to use GetInputToken/GetOutputToken methods - Fix ethereum.CallMsg import and usage - Fix parser factory and validator initialization in main.go - Remove unused variables and imports WIP: Still fixing main.go config struct field mismatches 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
524 lines
13 KiB
Go
524 lines
13 KiB
Go
package execution
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/ethclient"
|
|
|
|
"github.com/your-org/mev-bot/pkg/arbitrage"
|
|
)
|
|
|
|
// ExecutorConfig contains configuration for the executor
|
|
type ExecutorConfig struct {
|
|
// Wallet
|
|
PrivateKey []byte
|
|
WalletAddress common.Address
|
|
|
|
// RPC configuration
|
|
RPCEndpoint string
|
|
PrivateRPCEndpoint string // Optional private RPC (e.g., Flashbots)
|
|
UsePrivateRPC bool
|
|
|
|
// Transaction settings
|
|
ConfirmationBlocks uint64
|
|
TimeoutPerTx time.Duration
|
|
MaxRetries int
|
|
RetryDelay time.Duration
|
|
|
|
// Nonce management
|
|
NonceMargin uint64 // Number of nonces to keep ahead
|
|
|
|
// Gas price strategy
|
|
GasPriceStrategy string // "fast", "market", "aggressive"
|
|
GasPriceMultiplier float64 // Multiplier for gas price
|
|
MaxGasPriceIncrement float64 // Max increase for replacement txs
|
|
|
|
// Monitoring
|
|
MonitorInterval time.Duration
|
|
CleanupInterval time.Duration
|
|
}
|
|
|
|
// DefaultExecutorConfig returns default executor configuration
|
|
func DefaultExecutorConfig() *ExecutorConfig {
|
|
return &ExecutorConfig{
|
|
ConfirmationBlocks: 1,
|
|
TimeoutPerTx: 5 * time.Minute,
|
|
MaxRetries: 3,
|
|
RetryDelay: 5 * time.Second,
|
|
NonceMargin: 2,
|
|
GasPriceStrategy: "fast",
|
|
GasPriceMultiplier: 1.1, // 10% above market
|
|
MaxGasPriceIncrement: 1.5, // 50% max increase
|
|
MonitorInterval: 1 * time.Second,
|
|
CleanupInterval: 1 * time.Minute,
|
|
}
|
|
}
|
|
|
|
// Executor executes arbitrage transactions
|
|
type Executor struct {
|
|
config *ExecutorConfig
|
|
logger *slog.Logger
|
|
|
|
// Clients
|
|
client *ethclient.Client
|
|
privateClient *ethclient.Client // Optional
|
|
|
|
// Components
|
|
builder *TransactionBuilder
|
|
riskManager *RiskManager
|
|
flashloanMgr *FlashloanManager
|
|
|
|
// Nonce management
|
|
mu sync.Mutex
|
|
currentNonce uint64
|
|
nonceCache map[uint64]*PendingTransaction
|
|
|
|
// Monitoring
|
|
stopCh chan struct{}
|
|
stopped bool
|
|
}
|
|
|
|
// PendingTransaction tracks a pending transaction
|
|
type PendingTransaction struct {
|
|
Hash common.Hash
|
|
Nonce uint64
|
|
Opportunity *arbitrage.Opportunity
|
|
SubmittedAt time.Time
|
|
LastChecked time.Time
|
|
Confirmed bool
|
|
Failed bool
|
|
FailReason string
|
|
Receipt *types.Receipt
|
|
Retries int
|
|
}
|
|
|
|
// NewExecutor creates a new executor
|
|
func NewExecutor(
|
|
config *ExecutorConfig,
|
|
builder *TransactionBuilder,
|
|
riskManager *RiskManager,
|
|
flashloanMgr *FlashloanManager,
|
|
logger *slog.Logger,
|
|
) (*Executor, error) {
|
|
if config == nil {
|
|
config = DefaultExecutorConfig()
|
|
}
|
|
|
|
// Connect to RPC
|
|
client, err := ethclient.Dial(config.RPCEndpoint)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to RPC: %w", err)
|
|
}
|
|
|
|
var privateClient *ethclient.Client
|
|
if config.UsePrivateRPC && config.PrivateRPCEndpoint != "" {
|
|
privateClient, err = ethclient.Dial(config.PrivateRPCEndpoint)
|
|
if err != nil {
|
|
logger.Warn("failed to connect to private RPC", "error", err)
|
|
}
|
|
}
|
|
|
|
executor := &Executor{
|
|
config: config,
|
|
logger: logger.With("component", "executor"),
|
|
client: client,
|
|
privateClient: privateClient,
|
|
builder: builder,
|
|
riskManager: riskManager,
|
|
flashloanMgr: flashloanMgr,
|
|
nonceCache: make(map[uint64]*PendingTransaction),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
// Initialize nonce
|
|
err = executor.initializeNonce(context.Background())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize nonce: %w", err)
|
|
}
|
|
|
|
// Start monitoring
|
|
go executor.monitorTransactions()
|
|
go executor.cleanupOldTransactions()
|
|
|
|
return executor, nil
|
|
}
|
|
|
|
// ExecutionResult contains the result of an execution
|
|
type ExecutionResult struct {
|
|
Success bool
|
|
TxHash common.Hash
|
|
Receipt *types.Receipt
|
|
ActualProfit *big.Int
|
|
GasCost *big.Int
|
|
Error error
|
|
Duration time.Duration
|
|
}
|
|
|
|
// Execute executes an arbitrage opportunity
|
|
func (e *Executor) Execute(ctx context.Context, opp *arbitrage.Opportunity) (*ExecutionResult, error) {
|
|
startTime := time.Now()
|
|
|
|
e.logger.Info("executing opportunity",
|
|
"opportunityID", opp.ID,
|
|
"type", opp.Type,
|
|
"expectedProfit", opp.NetProfit.String(),
|
|
)
|
|
|
|
// Build transaction
|
|
tx, err := e.builder.BuildTransaction(ctx, opp, e.config.WalletAddress)
|
|
if err != nil {
|
|
return &ExecutionResult{
|
|
Success: false,
|
|
Error: fmt.Errorf("failed to build transaction: %w", err),
|
|
Duration: time.Since(startTime),
|
|
}, nil
|
|
}
|
|
|
|
// Risk assessment
|
|
assessment, err := e.riskManager.AssessRisk(ctx, opp, tx)
|
|
if err != nil {
|
|
return &ExecutionResult{
|
|
Success: false,
|
|
Error: fmt.Errorf("failed to assess risk: %w", err),
|
|
Duration: time.Since(startTime),
|
|
}, nil
|
|
}
|
|
|
|
if !assessment.Approved {
|
|
return &ExecutionResult{
|
|
Success: false,
|
|
Error: fmt.Errorf("risk assessment failed: %s", assessment.Reason),
|
|
Duration: time.Since(startTime),
|
|
}, nil
|
|
}
|
|
|
|
// Log warnings if any
|
|
for _, warning := range assessment.Warnings {
|
|
e.logger.Warn("risk warning", "warning", warning)
|
|
}
|
|
|
|
// Submit transaction
|
|
hash, err := e.submitTransaction(ctx, tx, opp)
|
|
if err != nil {
|
|
return &ExecutionResult{
|
|
Success: false,
|
|
Error: fmt.Errorf("failed to submit transaction: %w", err),
|
|
Duration: time.Since(startTime),
|
|
}, nil
|
|
}
|
|
|
|
e.logger.Info("transaction submitted",
|
|
"hash", hash.Hex(),
|
|
"opportunityID", opp.ID,
|
|
)
|
|
|
|
// Wait for confirmation
|
|
receipt, err := e.waitForConfirmation(ctx, hash)
|
|
if err != nil {
|
|
return &ExecutionResult{
|
|
Success: false,
|
|
TxHash: hash,
|
|
Error: fmt.Errorf("transaction failed: %w", err),
|
|
Duration: time.Since(startTime),
|
|
}, nil
|
|
}
|
|
|
|
// Calculate actual profit
|
|
actualProfit := e.calculateActualProfit(receipt, opp)
|
|
gasCost := new(big.Int).Mul(big.NewInt(int64(receipt.GasUsed)), receipt.EffectiveGasPrice)
|
|
|
|
result := &ExecutionResult{
|
|
Success: receipt.Status == types.ReceiptStatusSuccessful,
|
|
TxHash: hash,
|
|
Receipt: receipt,
|
|
ActualProfit: actualProfit,
|
|
GasCost: gasCost,
|
|
Duration: time.Since(startTime),
|
|
}
|
|
|
|
if result.Success {
|
|
e.logger.Info("execution succeeded",
|
|
"hash", hash.Hex(),
|
|
"actualProfit", actualProfit.String(),
|
|
"gasCost", gasCost.String(),
|
|
"duration", result.Duration,
|
|
)
|
|
e.riskManager.RecordSuccess(hash, actualProfit)
|
|
} else {
|
|
e.logger.Error("execution failed",
|
|
"hash", hash.Hex(),
|
|
"status", receipt.Status,
|
|
)
|
|
e.riskManager.RecordFailure(hash, "transaction reverted")
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// submitTransaction submits a transaction to the network
|
|
func (e *Executor) submitTransaction(ctx context.Context, tx *SwapTransaction, opp *arbitrage.Opportunity) (common.Hash, error) {
|
|
// Get nonce
|
|
nonce := e.getNextNonce()
|
|
|
|
// Sign transaction
|
|
signedTx, err := e.builder.SignTransaction(tx, nonce, e.config.PrivateKey)
|
|
if err != nil {
|
|
e.releaseNonce(nonce)
|
|
return common.Hash{}, fmt.Errorf("failed to sign transaction: %w", err)
|
|
}
|
|
|
|
// Choose client (private or public)
|
|
client := e.client
|
|
if e.config.UsePrivateRPC && e.privateClient != nil {
|
|
client = e.privateClient
|
|
e.logger.Debug("using private RPC")
|
|
}
|
|
|
|
// Send transaction
|
|
err = client.SendTransaction(ctx, signedTx)
|
|
if err != nil {
|
|
e.releaseNonce(nonce)
|
|
return common.Hash{}, fmt.Errorf("failed to send transaction: %w", err)
|
|
}
|
|
|
|
hash := signedTx.Hash()
|
|
|
|
// Track transaction
|
|
e.trackPendingTransaction(nonce, hash, opp)
|
|
e.riskManager.TrackTransaction(hash, opp, tx.MaxFeePerGas)
|
|
|
|
return hash, nil
|
|
}
|
|
|
|
// waitForConfirmation waits for transaction confirmation
|
|
func (e *Executor) waitForConfirmation(ctx context.Context, hash common.Hash) (*types.Receipt, error) {
|
|
timeoutCtx, cancel := context.WithTimeout(ctx, e.config.TimeoutPerTx)
|
|
defer cancel()
|
|
|
|
ticker := time.NewTicker(e.config.MonitorInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-timeoutCtx.Done():
|
|
return nil, fmt.Errorf("transaction timeout")
|
|
|
|
case <-ticker.C:
|
|
receipt, err := e.client.TransactionReceipt(ctx, hash)
|
|
if err != nil {
|
|
// Transaction not yet mined
|
|
continue
|
|
}
|
|
|
|
// Check confirmations
|
|
currentBlock, err := e.client.BlockNumber(ctx)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
confirmations := currentBlock - receipt.BlockNumber.Uint64()
|
|
if confirmations >= e.config.ConfirmationBlocks {
|
|
return receipt, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorTransactions monitors pending transactions
|
|
func (e *Executor) monitorTransactions() {
|
|
ticker := time.NewTicker(e.config.MonitorInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-e.stopCh:
|
|
return
|
|
|
|
case <-ticker.C:
|
|
e.checkPendingTransactions()
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkPendingTransactions checks status of pending transactions
|
|
func (e *Executor) checkPendingTransactions() {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
ctx := context.Background()
|
|
|
|
for nonce, pending := range e.nonceCache {
|
|
if pending.Confirmed || pending.Failed {
|
|
continue
|
|
}
|
|
|
|
// Check transaction status
|
|
receipt, err := e.client.TransactionReceipt(ctx, pending.Hash)
|
|
if err != nil {
|
|
// Still pending
|
|
pending.LastChecked = time.Now()
|
|
|
|
// Check for timeout
|
|
if time.Since(pending.SubmittedAt) > e.config.TimeoutPerTx {
|
|
e.logger.Warn("transaction timeout",
|
|
"hash", pending.Hash.Hex(),
|
|
"nonce", nonce,
|
|
)
|
|
|
|
// Attempt replacement
|
|
if pending.Retries < e.config.MaxRetries {
|
|
e.logger.Info("attempting transaction replacement",
|
|
"hash", pending.Hash.Hex(),
|
|
"retry", pending.Retries+1,
|
|
)
|
|
// In production, implement transaction replacement logic
|
|
pending.Retries++
|
|
} else {
|
|
pending.Failed = true
|
|
pending.FailReason = "timeout after retries"
|
|
e.riskManager.RecordFailure(pending.Hash, "timeout")
|
|
e.riskManager.UntrackTransaction(pending.Hash)
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Transaction mined
|
|
pending.Receipt = receipt
|
|
pending.Confirmed = true
|
|
pending.LastChecked = time.Now()
|
|
|
|
if receipt.Status == types.ReceiptStatusFailed {
|
|
pending.Failed = true
|
|
pending.FailReason = "transaction reverted"
|
|
e.riskManager.RecordFailure(pending.Hash, "reverted")
|
|
}
|
|
|
|
e.riskManager.UntrackTransaction(pending.Hash)
|
|
|
|
e.logger.Debug("transaction confirmed",
|
|
"hash", pending.Hash.Hex(),
|
|
"nonce", nonce,
|
|
"status", receipt.Status,
|
|
)
|
|
}
|
|
}
|
|
|
|
// cleanupOldTransactions removes old completed transactions
|
|
func (e *Executor) cleanupOldTransactions() {
|
|
ticker := time.NewTicker(e.config.CleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-e.stopCh:
|
|
return
|
|
|
|
case <-ticker.C:
|
|
e.mu.Lock()
|
|
|
|
cutoff := time.Now().Add(-1 * time.Hour)
|
|
for nonce, pending := range e.nonceCache {
|
|
if (pending.Confirmed || pending.Failed) && pending.LastChecked.Before(cutoff) {
|
|
delete(e.nonceCache, nonce)
|
|
}
|
|
}
|
|
|
|
e.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// initializeNonce initializes the nonce from the network
|
|
func (e *Executor) initializeNonce(ctx context.Context) error {
|
|
nonce, err := e.client.PendingNonceAt(ctx, e.config.WalletAddress)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get pending nonce: %w", err)
|
|
}
|
|
|
|
e.currentNonce = nonce
|
|
e.logger.Info("initialized nonce", "nonce", nonce)
|
|
|
|
return nil
|
|
}
|
|
|
|
// getNextNonce gets the next available nonce
|
|
func (e *Executor) getNextNonce() uint64 {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
nonce := e.currentNonce
|
|
e.currentNonce++
|
|
|
|
return nonce
|
|
}
|
|
|
|
// releaseNonce releases a nonce back to the pool
|
|
func (e *Executor) releaseNonce(nonce uint64) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
// Only release if it's the current nonce - 1
|
|
if nonce == e.currentNonce-1 {
|
|
e.currentNonce = nonce
|
|
}
|
|
}
|
|
|
|
// trackPendingTransaction tracks a pending transaction
|
|
func (e *Executor) trackPendingTransaction(nonce uint64, hash common.Hash, opp *arbitrage.Opportunity) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
e.nonceCache[nonce] = &PendingTransaction{
|
|
Hash: hash,
|
|
Nonce: nonce,
|
|
Opportunity: opp,
|
|
SubmittedAt: time.Now(),
|
|
LastChecked: time.Now(),
|
|
Confirmed: false,
|
|
Failed: false,
|
|
}
|
|
}
|
|
|
|
// calculateActualProfit calculates the actual profit from a receipt
|
|
func (e *Executor) calculateActualProfit(receipt *types.Receipt, opp *arbitrage.Opportunity) *big.Int {
|
|
// In production, parse logs to get actual output amounts
|
|
// For now, estimate based on expected profit and gas cost
|
|
|
|
gasCost := new(big.Int).Mul(new(big.Int).SetUint64(receipt.GasUsed), receipt.EffectiveGasPrice)
|
|
estimatedProfit := new(big.Int).Sub(opp.GrossProfit, gasCost)
|
|
|
|
return estimatedProfit
|
|
}
|
|
|
|
// GetPendingTransactions returns all pending transactions
|
|
func (e *Executor) GetPendingTransactions() []*PendingTransaction {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
txs := make([]*PendingTransaction, 0, len(e.nonceCache))
|
|
for _, tx := range e.nonceCache {
|
|
if !tx.Confirmed && !tx.Failed {
|
|
txs = append(txs, tx)
|
|
}
|
|
}
|
|
|
|
return txs
|
|
}
|
|
|
|
// Stop stops the executor
|
|
func (e *Executor) Stop() {
|
|
if !e.stopped {
|
|
close(e.stopCh)
|
|
e.stopped = true
|
|
e.logger.Info("executor stopped")
|
|
}
|
|
}
|