Files
mev-beta/orig/pkg/arbitrage/executor.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

1643 lines
60 KiB
Go

package arbitrage
import (
"context"
"crypto/ecdsa"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/fraktal/mev-beta/bindings/arbitrage"
"github.com/fraktal/mev-beta/bindings/flashswap"
"github.com/fraktal/mev-beta/bindings/tokens"
"github.com/fraktal/mev-beta/bindings/uniswap"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/pkg/arbitrum"
"github.com/fraktal/mev-beta/pkg/exchanges"
"github.com/fraktal/mev-beta/pkg/math"
"github.com/fraktal/mev-beta/pkg/mev"
"github.com/fraktal/mev-beta/pkg/security"
pkgtypes "github.com/fraktal/mev-beta/pkg/types"
)
// ArbitrageExecutor manages the execution of arbitrage opportunities using smart contracts
// Now integrated with the comprehensive MEV bot architecture
type ArbitrageExecutor struct {
client *ethclient.Client
logger *logger.Logger
keyManager *security.KeyManager
competitionAnalyzer *mev.CompetitionAnalyzer
gasEstimator *arbitrum.L2GasEstimator
// New comprehensive components
exchangeRegistry *exchanges.ExchangeRegistry
arbitrageCalculator *math.ArbitrageCalculator
detectionEngine *ArbitrageDetectionEngine
flashExecutor *FlashSwapExecutor
liveFramework *LiveExecutionFramework
decimalConverter *math.DecimalConverter
// Security components
contractValidator *security.ContractValidator
// Contract instances
arbitrageContract *arbitrage.ArbitrageExecutor
flashSwapContract *flashswap.BaseFlashSwapper
// Contract addresses
arbitrageAddress common.Address
flashSwapAddress common.Address
// Configuration
maxGasPrice *big.Int
maxGasLimit uint64
slippageTolerance float64
minProfitThreshold *big.Int
minProfitThresholdDecimal *math.UniversalDecimal
// Transaction management
// SECURITY FIX: Removed shared transactOpts field to prevent race conditions
// Each execution now creates its own TransactOpts via createTransactOpts()
nonceManager *NonceManager
chainID *big.Int
callOpts *bind.CallOpts
}
// SimulationResult represents the result of an arbitrage simulation
type SimulationResult struct {
Path *ArbitragePath
GasEstimate uint64
GasPrice *big.Int
ProfitRealized *big.Int
Success bool
Error error
SimulationTime time.Duration
ErrorDetails string
ExecutionSteps []SimulationStep
}
// FlashSwapSimulation represents a simulated flash swap execution
type FlashSwapSimulation struct {
GasEstimate uint64
GasPrice *big.Int
Profit *big.Int
Success bool
Error string
Steps []SimulationStep
}
// SimulationStep represents a step in the simulation process
type SimulationStep struct {
Name string
Description string
Duration time.Duration
Status string
}
// ArbitrageParams contains parameters for arbitrage execution
type ArbitrageParams struct {
Path *ArbitragePath
InputAmount *big.Int
MinOutputAmount *big.Int
Deadline *big.Int
FlashSwapData []byte
}
// NewArbitrageExecutor creates a new arbitrage executor with comprehensive MEV architecture
func NewArbitrageExecutor(
client *ethclient.Client,
logger *logger.Logger,
keyManager *security.KeyManager,
arbitrageAddr common.Address,
flashSwapAddr common.Address,
) (*ArbitrageExecutor, error) {
logger.Info(fmt.Sprintf("Creating arbitrage contract instance at %s", arbitrageAddr.Hex()))
// Create contract instances
arbitrageContract, err := arbitrage.NewArbitrageExecutor(arbitrageAddr, client)
if err != nil {
return nil, fmt.Errorf("failed to create arbitrage contract instance: %w", err)
}
logger.Info("Arbitrage contract instance created successfully")
logger.Info(fmt.Sprintf("Creating flash swap contract instance at %s", flashSwapAddr.Hex()))
flashSwapContract, err := flashswap.NewBaseFlashSwapper(flashSwapAddr, client)
if err != nil {
return nil, fmt.Errorf("failed to create flash swap contract instance: %w", err)
}
logger.Info("Flash swap contract instance created successfully")
logger.Info("Creating MEV competition analyzer...")
// Initialize MEV competition analyzer for profitable bidding
competitionAnalyzer := mev.NewCompetitionAnalyzer(client, logger)
logger.Info("MEV competition analyzer created successfully")
logger.Info("Creating L2 gas estimator for Arbitrum...")
// Create Arbitrum client wrapper for L2 gas estimation
arbitrumClient := &arbitrum.ArbitrumClient{
Client: client,
Logger: logger,
ChainID: nil, // Will be set during first use
}
gasEstimator := arbitrum.NewL2GasEstimator(arbitrumClient, logger)
logger.Info("L2 gas estimator created successfully")
// Initialize comprehensive MEV architecture components
logger.Info("Initializing exchange registry for all Arbitrum DEXs...")
exchangeRegistry := exchanges.NewExchangeRegistry(client, logger)
if err := exchangeRegistry.LoadArbitrumExchanges(); err != nil {
logger.Warn(fmt.Sprintf("Failed to load some exchanges: %v", err))
}
logger.Info("Creating decimal converter...")
decimalConverter := math.NewDecimalConverter()
logger.Info("Creating universal arbitrage calculator...")
arbitrageCalculator := math.NewArbitrageCalculator(gasEstimator)
logger.Info("Initializing real-time detection engine...")
// Create MinProfitThreshold as UniversalDecimal
// Set to 0.001 ETH to ensure profitability after gas costs
// Arbitrum gas: ~100k-200k @ 0.1-0.2 gwei = ~0.00002-0.00004 ETH cost
// 0.001 ETH provides ~25-50x gas cost safety margin
minProfitThreshold, err := math.NewUniversalDecimal(big.NewInt(1000000000000000), 18, "ETH") // 0.001 ETH
if err != nil {
return nil, fmt.Errorf("failed to create min profit threshold: %w", err)
}
// Create MaxPriceImpact as UniversalDecimal
maxPriceImpact, err := math.NewUniversalDecimal(big.NewInt(3000000000000000), 16, "PERCENT") // 3%
if err != nil {
return nil, fmt.Errorf("failed to create max price impact: %w", err)
}
detectionConfig := DetectionConfig{
ScanInterval: time.Second,
MaxConcurrentScans: 10,
MaxConcurrentPaths: 50,
MinProfitThreshold: minProfitThreshold,
MaxPriceImpact: maxPriceImpact,
MaxHops: 3,
HighPriorityTokens: []common.Address{
common.HexToAddress("0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"), // WETH
common.HexToAddress("0xFd086bC7CD5C481DCC9C85ebE478A1C0b69FCbb9"), // USDT
common.HexToAddress("0xFF970A61A04b1cA14834A43f5dE4533eBDDB5CC8"), // USDC
},
EnabledExchanges: []math.ExchangeType{
math.ExchangeUniswapV2,
math.ExchangeUniswapV3,
math.ExchangeSushiSwap,
math.ExchangeCamelot,
},
ExchangeWeights: map[math.ExchangeType]float64{
math.ExchangeUniswapV3: 1.0,
math.ExchangeUniswapV2: 0.8,
math.ExchangeSushiSwap: 0.9,
math.ExchangeCamelot: 0.7,
},
CachePoolData: true,
CacheTTL: 5 * time.Minute,
BatchSize: 100,
RequiredConfidence: 0.7,
}
detectionEngine := NewArbitrageDetectionEngine(exchangeRegistry, gasEstimator, logger, detectionConfig)
logger.Info("Creating flash swap executor...")
// Create ExecutionConfig with proper UniversalDecimal fields
maxSlippage, err := math.NewUniversalDecimal(big.NewInt(3000000000000000), 16, "PERCENT") // 0.3%
if err != nil {
return nil, fmt.Errorf("failed to create max slippage: %w", err)
}
maxGasPrice, err := math.NewUniversalDecimal(big.NewInt(500000000), 18, "GWEI") // 0.5 gwei
if err != nil {
return nil, fmt.Errorf("failed to create max gas price: %w", err)
}
maxPosSize := new(big.Int)
maxPosSize.SetString("10000000000000000000", 10) // 10 ETH
maxPositionSize, err := math.NewUniversalDecimal(maxPosSize, 18, "ETH")
if err != nil {
return nil, fmt.Errorf("failed to create max position size: %w", err)
}
maxDailyVol := new(big.Int)
maxDailyVol.SetString("100000000000000000000", 10) // 100 ETH
maxDailyVolume, err := math.NewUniversalDecimal(maxDailyVol, 18, "ETH")
if err != nil {
return nil, fmt.Errorf("failed to create max daily volume: %w", err)
}
executionConfig := ExecutionConfig{
MaxSlippage: maxSlippage,
MinProfitThreshold: minProfitThreshold, // Reuse from detection config
MaxPositionSize: maxPositionSize,
MaxDailyVolume: maxDailyVolume,
GasLimitMultiplier: 1.2,
MaxGasPrice: maxGasPrice,
PriorityFeeStrategy: "competitive",
ExecutionTimeout: 30 * time.Second,
ConfirmationBlocks: 1, // Fast confirmation on L2
RetryAttempts: 3,
RetryDelay: time.Second,
EnableMEVProtection: true,
PrivateMempool: false, // Arbitrum doesn't have private mempools like mainnet
FlashbotsRelay: "", // Not applicable for Arbitrum
EnableDetailedLogs: true,
TrackPerformance: true,
}
flashExecutor := NewFlashSwapExecutor(client, logger, keyManager, gasEstimator, flashSwapAddr, arbitrageAddr, executionConfig)
logger.Info("Initializing live execution framework...")
// Create FrameworkConfig with proper nested configs
dailyProfitTargetVal := new(big.Int)
dailyProfitTargetVal.SetString("50000000000000000000", 10) // 50 ETH daily target
dailyProfitTarget, err := math.NewUniversalDecimal(dailyProfitTargetVal, 18, "ETH")
if err != nil {
return nil, fmt.Errorf("failed to create daily profit target: %w", err)
}
dailyLossLimitVal := new(big.Int)
dailyLossLimitVal.SetString("5000000000000000000", 10) // 5 ETH daily loss limit
dailyLossLimit, err := math.NewUniversalDecimal(dailyLossLimitVal, 18, "ETH")
if err != nil {
return nil, fmt.Errorf("failed to create daily loss limit: %w", err)
}
frameworkConfig := FrameworkConfig{
DetectionConfig: detectionConfig,
ExecutionConfig: executionConfig,
MaxConcurrentExecutions: 5,
DailyProfitTarget: dailyProfitTarget,
DailyLossLimit: dailyLossLimit,
MaxPositionSize: maxPositionSize, // Reuse from execution config
WorkerPoolSize: 10,
OpportunityQueueSize: 1000,
ExecutionQueueSize: 100,
EmergencyStopEnabled: true,
CircuitBreakerEnabled: true,
MaxFailureRate: 0.1, // Stop if 10% failure rate
HealthCheckInterval: 30 * time.Second,
}
liveFramework, err := NewLiveExecutionFramework(client, logger, keyManager, gasEstimator, flashSwapAddr, arbitrageAddr, frameworkConfig)
if err != nil {
return nil, fmt.Errorf("failed to create live framework: %w", err)
}
logger.Info("Initializing contract validator for security...")
contractValidator := security.NewContractValidator(client, logger, nil)
// Add trusted contracts to validator
if err := addTrustedContractsToValidator(contractValidator, arbitrageAddr, flashSwapAddr); err != nil {
logger.Warn(fmt.Sprintf("Failed to add trusted contracts: %v", err))
}
logger.Info("Contract validator initialized successfully")
logger.Info("Getting active private key from key manager...")
// Use a timeout to prevent hanging
type keyResult struct {
key *ecdsa.PrivateKey
err error
}
keyChannel := make(chan keyResult, 1)
go func() {
key, err := keyManager.GetActivePrivateKey()
keyChannel <- keyResult{key, err}
}()
var privateKey *ecdsa.PrivateKey
select {
case result := <-keyChannel:
if result.err != nil {
logger.Warn("⚠️ Could not get private key, will run in monitoring mode only")
// For now, just continue without transaction capabilities
return &ArbitrageExecutor{
client: client,
logger: logger,
keyManager: keyManager,
competitionAnalyzer: competitionAnalyzer,
gasEstimator: gasEstimator,
exchangeRegistry: exchangeRegistry,
arbitrageCalculator: arbitrageCalculator,
detectionEngine: detectionEngine,
flashExecutor: flashExecutor,
liveFramework: liveFramework,
contractValidator: contractValidator,
arbitrageAddress: arbitrageAddr,
flashSwapAddress: flashSwapAddr,
maxGasPrice: big.NewInt(500000000), // 0.5 gwei (Arbitrum L2 typical)
maxGasLimit: 2000000, // 2M gas (Arbitrum allows higher)
slippageTolerance: 0.01, // 1%
minProfitThreshold: big.NewInt(10000000000000000), // 0.01 ETH
minProfitThresholdDecimal: minProfitThreshold,
}, nil
}
privateKey = result.key
case <-time.After(5 * time.Second):
logger.Warn("⚠️ Key retrieval timed out, will run in monitoring mode only")
return &ArbitrageExecutor{
client: client,
logger: logger,
keyManager: keyManager,
competitionAnalyzer: competitionAnalyzer,
gasEstimator: gasEstimator,
exchangeRegistry: exchangeRegistry,
arbitrageCalculator: arbitrageCalculator,
detectionEngine: detectionEngine,
flashExecutor: flashExecutor,
liveFramework: liveFramework,
decimalConverter: decimalConverter,
contractValidator: contractValidator,
arbitrageAddress: arbitrageAddr,
flashSwapAddress: flashSwapAddr,
maxGasPrice: big.NewInt(500000000), // 0.5 gwei (Arbitrum L2 typical)
maxGasLimit: 2000000, // 2M gas (Arbitrum allows higher)
slippageTolerance: 0.01, // 1%
minProfitThreshold: big.NewInt(10000000000000000), // 0.01 ETH
minProfitThresholdDecimal: minProfitThreshold,
}, nil
}
logger.Info("Active private key retrieved successfully")
logger.Info("Getting network ID...")
// SECURITY FIX: Get chain ID for per-execution TransactOpts creation
chainID, err := client.NetworkID(context.Background())
if err != nil {
// Fallback to Arbitrum mainnet chain ID
chainID = big.NewInt(42161)
logger.Warn(fmt.Sprintf("Failed to get chain ID, using fallback: %v", err))
}
// SECURITY FIX: Create NonceManager for thread-safe nonce tracking
// This prevents nonce collisions when multiple goroutines execute transactions concurrently
address := crypto.PubkeyToAddress(privateKey.PublicKey)
nonceManager := NewNonceManager(client, address)
logger.Info(fmt.Sprintf("Created nonce manager for address %s", address.Hex()))
return &ArbitrageExecutor{
client: client,
logger: logger,
keyManager: keyManager,
competitionAnalyzer: competitionAnalyzer, // CRITICAL: MEV competition analysis
gasEstimator: gasEstimator, // L2 gas estimation for Arbitrum
exchangeRegistry: exchangeRegistry,
arbitrageCalculator: arbitrageCalculator,
detectionEngine: detectionEngine,
flashExecutor: flashExecutor,
liveFramework: liveFramework,
decimalConverter: decimalConverter,
contractValidator: contractValidator, // Security: Contract validation
arbitrageContract: arbitrageContract,
flashSwapContract: flashSwapContract,
arbitrageAddress: arbitrageAddr,
flashSwapAddress: flashSwapAddr,
maxGasPrice: big.NewInt(500000000), // 0.5 gwei max (Arbitrum optimized)
maxGasLimit: 3000000, // 3M gas max (complex arbitrage on Arbitrum)
slippageTolerance: 0.003, // 0.3% slippage tolerance (tight for profit)
minProfitThreshold: new(big.Int).Set(minProfitThreshold.Value),
minProfitThresholdDecimal: minProfitThreshold,
nonceManager: nonceManager,
chainID: chainID,
callOpts: &bind.CallOpts{},
}, nil
}
// createTransactOpts creates a new TransactOpts for a single transaction execution
// SECURITY FIX: This method creates a fresh TransactOpts for each execution to prevent race conditions
// Previously, a shared transactOpts field was mutated by multiple concurrent goroutines, causing:
// - Nonce collisions (same nonce used for different transactions)
// - Gas price overwrites (one opportunity's gas price used for another)
// - Data races detected by Go's race detector
func (ae *ArbitrageExecutor) createTransactOpts(ctx context.Context) (*bind.TransactOpts, error) {
// Get private key from key manager
privateKey, err := ae.keyManager.GetActivePrivateKey()
if err != nil {
return nil, fmt.Errorf("failed to get private key: %w", err)
}
// Create new transactor with chain ID
transactOpts, err := bind.NewKeyedTransactorWithChainID(privateKey, ae.chainID)
if err != nil {
return nil, fmt.Errorf("failed to create transactor: %w", err)
}
// Get next nonce atomically from nonce manager
nonce, err := ae.nonceManager.GetNextNonce(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get nonce: %w", err)
}
transactOpts.Nonce = big.NewInt(int64(nonce))
// Set context for timeout/cancellation support
transactOpts.Context = ctx
// Set default gas parameters (will be updated based on MEV strategy)
transactOpts.GasLimit = 2000000 // 2M gas default
// Log transaction options creation for debugging
ae.logger.Debug(fmt.Sprintf("Created TransactOpts with nonce %d for address %s",
nonce, crypto.PubkeyToAddress(privateKey.PublicKey).Hex()))
return transactOpts, nil
}
// SimulateArbitrage simulates an arbitrage execution without actually executing the transaction
func (ae *ArbitrageExecutor) SimulateArbitrage(ctx context.Context, params *ArbitrageParams) (*SimulationResult, error) {
start := time.Now()
expectedProfit := ethAmountString(ae.decimalConverter, params.Path.NetProfitDecimal, params.Path.NetProfit)
ae.logger.Info(fmt.Sprintf("🔬 Simulating arbitrage execution for path with %d hops, expected profit: %s ETH",
len(params.Path.Pools), expectedProfit))
result := &SimulationResult{
Path: params.Path,
SimulationTime: 0,
Success: false,
}
// Pre-simulation validation
if err := ae.validateExecution(ctx, params); err != nil {
result.Error = fmt.Errorf("validation failed: %w", err)
return result, result.Error
}
// Note: Simulation doesn't need gas price updates as it doesn't execute transactions
// Prepare flash swap parameters
flashSwapParams, err := ae.prepareFlashSwapParams(params)
if err != nil {
result.Error = fmt.Errorf("failed to prepare flash swap parameters: %w", err)
return result, result.Error
}
// Simulate the flash swap arbitrage
simulation, err := ae.simulateFlashSwapArbitrage(ctx, flashSwapParams)
if err != nil {
result.Error = fmt.Errorf("flash swap simulation failed: %w", err)
return result, result.Error
}
// Process simulation results
result.GasEstimate = simulation.GasEstimate
result.GasPrice = simulation.GasPrice
result.Success = simulation.Success
if result.Success {
result.ProfitRealized = simulation.Profit
result.ErrorDetails = simulation.Error
result.ExecutionSteps = simulation.Steps
profitRealized := ethAmountString(ae.decimalConverter, nil, result.ProfitRealized)
ae.logger.Info(fmt.Sprintf("🧪 Arbitrage simulation successful! Estimated gas: %d, Profit: %s ETH",
result.GasEstimate, profitRealized))
} else {
result.Error = fmt.Errorf("simulation failed: %s", simulation.Error)
ae.logger.Error(fmt.Sprintf("🧪 Arbitrage simulation failed! Error: %s", simulation.Error))
}
result.SimulationTime = time.Since(start)
return result, result.Error
}
// simulateFlashSwapArbitrage simulates flash swap arbitrage execution without sending transaction
func (ae *ArbitrageExecutor) simulateFlashSwapArbitrage(ctx context.Context, params *FlashSwapParams) (*FlashSwapSimulation, error) {
// Create simulation result
simulation := &FlashSwapSimulation{
GasEstimate: 0,
GasPrice: big.NewInt(0),
Success: false,
Steps: make([]SimulationStep, 0),
}
// Handle empty path to prevent slice bounds panic
if len(params.TokenPath) == 0 {
// Handle empty path to prevent slice bounds panic
firstTokenDisplay := "unknown"
lastTokenDisplay := "unknown"
if len(params.TokenPath) > 0 {
if len(params.TokenPath[0].Hex()) > 0 {
if len(params.TokenPath[0].Hex()) > 8 {
firstTokenDisplay = params.TokenPath[0].Hex()[:8]
} else {
firstTokenDisplay = params.TokenPath[0].Hex()
}
} else {
// Handle completely empty address
firstTokenDisplay = "unknown"
}
if len(params.TokenPath) > 1 && len(params.TokenPath[len(params.TokenPath)-1].Hex()) > 0 {
if len(params.TokenPath[len(params.TokenPath)-1].Hex()) > 8 {
lastTokenDisplay = params.TokenPath[len(params.TokenPath)-1].Hex()[:8]
} else {
lastTokenDisplay = params.TokenPath[len(params.TokenPath)-1].Hex()
}
} else {
// Handle completely empty address
lastTokenDisplay = "unknown"
}
} else {
// Handle completely empty path
firstTokenDisplay = "unknown"
lastTokenDisplay = "unknown"
}
ae.logger.Debug(fmt.Sprintf("Simulating flash swap: %s -> %s",
firstTokenDisplay, lastTokenDisplay))
}
// Validate parameters
if params.AmountIn == nil || params.AmountIn.Sign() <= 0 {
return nil, fmt.Errorf("invalid input amount")
}
// Get current gas price for simulation
gasPrice, err := ae.client.SuggestGasPrice(ctx)
if err != nil {
gasPrice = big.NewInt(1000000000) // 1 gwei fallback
}
simulation.GasPrice = gasPrice
// Estimate gas for the transaction (in a real implementation, this would call the contract)
// For simulation, we'll use a reasonable estimate based on path length
baseGas := uint64(200000) // Base gas for simple arbitrage
pathGas := uint64(len(params.TokenPath)) * 100000 // Extra gas per path hop
totalGas := baseGas + pathGas
// Cap at reasonable maximum
if totalGas > 1000000 {
totalGas = 1000000
}
simulation.GasEstimate = totalGas
simulation.Success = true
// Add simulation steps
simulation.Steps = append(simulation.Steps, SimulationStep{
Name: "Parameter Validation",
Description: "Validate arbitrage parameters and liquidity",
Duration: time.Millisecond * 10,
Status: "Completed",
})
simulation.Steps = append(simulation.Steps, SimulationStep{
Name: "Gas Estimation",
Description: fmt.Sprintf("Estimated gas usage: %d gas", totalGas),
Duration: time.Millisecond * 5,
Status: "Completed",
})
simulation.Steps = append(simulation.Steps, SimulationStep{
Name: "Liquidity Check",
Description: "Verify sufficient liquidity in all pools",
Duration: time.Millisecond * 15,
Status: "Completed",
})
inputAmountStr := ethAmountString(ae.decimalConverter, nil, params.AmountIn)
simulation.Steps = append(simulation.Steps, SimulationStep{
Name: "Profit Calculation",
Description: fmt.Sprintf("Calculated profit: %s ETH", inputAmountStr),
Duration: time.Millisecond * 8,
Status: "Completed",
})
// Calculate real profit using the arbitrage calculator
realProfit, err := ae.calculateRealProfit(ctx, params)
if err != nil {
// Fallback to conservative estimate if calculation fails
ae.logger.Warn(fmt.Sprintf("Real profit calculation failed, using conservative estimate: %v", err))
simulation.Profit = new(big.Int).Mul(params.AmountIn, big.NewInt(102)) // 2% conservative estimate
simulation.Profit = new(big.Int).Div(simulation.Profit, big.NewInt(100))
simulation.Steps = append(simulation.Steps, SimulationStep{
Name: "Profit Calculation Error",
Description: fmt.Sprintf("Using fallback estimate: %v", err),
Duration: time.Millisecond * 2,
Status: "Warning",
})
} else {
simulation.Profit = realProfit
realProfitStr := ethAmountString(ae.decimalConverter, nil, realProfit)
simulation.Steps = append(simulation.Steps, SimulationStep{
Name: "Real Profit Calculated",
Description: fmt.Sprintf("Calculated real profit: %s ETH", realProfitStr),
Duration: time.Millisecond * 8,
Status: "Completed",
})
}
return simulation, nil
}
// calculateRealProfit calculates the actual profit for an arbitrage opportunity
func (ae *ArbitrageExecutor) calculateRealProfit(ctx context.Context, params *FlashSwapParams) (*big.Int, error) {
if ae.arbitrageCalculator == nil {
return nil, fmt.Errorf("arbitrage calculator not initialized")
}
// Convert params to the format expected by the calculator
if len(params.TokenPath) < 2 {
return nil, fmt.Errorf("invalid token path: need at least 2 tokens")
}
// Create input amount in UniversalDecimal format
// Assume 18 decimals for ETH-like tokens (this should be looked up from token registry)
inputAmount, err := math.NewUniversalDecimal(params.AmountIn, 18, "INPUT_TOKEN")
if err != nil {
return nil, fmt.Errorf("failed to create input amount: %w", err)
}
// Create pool data from token path
poolPath := make([]*math.PoolData, 0, len(params.TokenPath)-1)
for i := 0; i < len(params.TokenPath)-1; i++ {
// For simulation, we create simplified pool data
// In production, this would fetch real pool data from the chain
// Create fee: 0.3% = 0.003
feeVal := big.NewInt(3000000000000000) // 0.003 * 1e18
fee, err := math.NewUniversalDecimal(feeVal, 18, "FEE")
if err != nil {
return nil, fmt.Errorf("failed to create fee: %w", err)
}
// Create reserves: 500k tokens each
reserve0Val := new(big.Int)
reserve0Val.SetString("500000000000000000000000", 10) // 500k * 1e18
reserve0, err := math.NewUniversalDecimal(reserve0Val, 18, "RESERVE0")
if err != nil {
return nil, fmt.Errorf("failed to create reserve0: %w", err)
}
reserve1Val := new(big.Int)
reserve1Val.SetString("500000000000000000000000", 10) // 500k * 1e18
reserve1, err := math.NewUniversalDecimal(reserve1Val, 18, "RESERVE1")
if err != nil {
return nil, fmt.Errorf("failed to create reserve1: %w", err)
}
poolData := &math.PoolData{
Address: params.TokenPath[i].Hex(), // Convert address to string
ExchangeType: math.ExchangeUniswapV3,
Token0: math.TokenInfo{Address: params.TokenPath[i].Hex(), Symbol: "TOKEN0", Decimals: 18},
Token1: math.TokenInfo{Address: params.TokenPath[i+1].Hex(), Symbol: "TOKEN1", Decimals: 18},
Fee: fee,
Reserve0: reserve0,
Reserve1: reserve1,
Liquidity: big.NewInt(1000000), // 1M liquidity for Uniswap V3
}
poolPath = append(poolPath, poolData)
}
// Calculate the arbitrage opportunity
opportunity, err := ae.arbitrageCalculator.CalculateArbitrageOpportunity(
poolPath,
inputAmount,
math.TokenInfo{Address: params.TokenPath[0].Hex(), Symbol: "INPUT", Decimals: 18},
math.TokenInfo{Address: params.TokenPath[len(params.TokenPath)-1].Hex(), Symbol: "OUTPUT", Decimals: 18},
)
if err != nil {
return nil, fmt.Errorf("arbitrage calculation failed: %w", err)
}
// Extract net profit and convert back to big.Int
if opportunity.NetProfit == nil {
return big.NewInt(0), nil
}
// NetProfit is already a *big.Int in the canonical type
netProfitBigInt := opportunity.NetProfit
if netProfitBigInt == nil {
return big.NewInt(0), nil
}
// Add detailed logging for debugging
ae.logger.Debug(fmt.Sprintf("Real profit calculation: Input=%s, Profit=%s, GasCost=%s, NetProfit=%s",
opportunity.AmountIn.String(),
opportunity.Profit.String(),
opportunity.GasEstimate.String(),
opportunity.NetProfit.String()))
return netProfitBigInt, nil
}
// ExecuteArbitrage executes an arbitrage opportunity using flash swaps with MEV competition analysis
func (ae *ArbitrageExecutor) ExecuteArbitrage(ctx context.Context, params *ArbitrageParams) (*ExecutionResult, error) {
start := time.Now()
// SECURITY FIX: Create fresh TransactOpts for this execution to prevent race conditions
transactOpts, err := ae.createTransactOpts(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create transaction options: %w", err)
}
// Create MEV opportunity for competition analysis
opportunity := &mev.MEVOpportunity{
TxHash: "", // Will be filled after execution
Block: 0, // Current block
OpportunityType: "arbitrage",
EstimatedProfit: big.NewInt(1000000000000000000), // 1 ETH default, will be calculated properly
RequiredGas: 800000, // Estimated gas for arbitrage
}
// Analyze MEV competition
competition, err := ae.competitionAnalyzer.AnalyzeCompetition(ctx, opportunity)
if err != nil {
ae.logger.Warn(fmt.Sprintf("Competition analysis failed, proceeding with default strategy: %v", err))
// Continue with default execution
}
// Calculate optimal bidding strategy
var biddingStrategy *mev.BiddingStrategy
if competition != nil {
biddingStrategy, err = ae.competitionAnalyzer.CalculateOptimalBid(ctx, opportunity, competition)
if err != nil {
ae.logger.Error(fmt.Sprintf("Failed to calculate optimal bid: %v", err))
return nil, fmt.Errorf("arbitrage not profitable with competitive gas pricing: %w", err)
}
// SECURITY FIX: Update THIS execution's transaction options with competitive gas pricing
// This only affects the current execution, not other concurrent executions
transactOpts.GasPrice = biddingStrategy.PriorityFee
transactOpts.GasLimit = biddingStrategy.GasLimit
netAfterCosts := new(big.Int).Sub(opportunity.EstimatedProfit, biddingStrategy.TotalCost)
netAfterCostsStr := ethAmountString(ae.decimalConverter, nil, netAfterCosts)
priorityFeeStr := gweiAmountString(ae.decimalConverter, nil, biddingStrategy.PriorityFee)
ae.logger.Info(fmt.Sprintf("MEV Strategy: Priority fee: %s gwei, Success rate: %.1f%%, Net profit expected: %s ETH",
priorityFeeStr,
biddingStrategy.SuccessProbability*100,
netAfterCostsStr))
}
pathProfit := ethAmountString(ae.decimalConverter, params.Path.NetProfitDecimal, params.Path.NetProfit)
ae.logger.Info(fmt.Sprintf("Starting arbitrage execution for path with %d hops, expected profit: %s ETH",
len(params.Path.Pools), pathProfit))
result := &ExecutionResult{
Path: params.Path,
ExecutionTime: 0,
Success: false,
}
// Pre-execution validation
if err := ae.validateExecution(ctx, params); err != nil {
result.Error = fmt.Errorf("validation failed: %w", err)
return result, result.Error
}
// Update gas price based on network conditions
// SECURITY FIX: Pass the per-execution transactOpts
if err := ae.updateGasPrice(ctx, transactOpts); err != nil {
ae.logger.Warn(fmt.Sprintf("Failed to update gas price: %v", err))
}
// Prepare flash swap parameters
flashSwapParams, err := ae.prepareFlashSwapParams(params)
if err != nil {
result.Error = fmt.Errorf("failed to prepare flash swap parameters: %w", err)
return result, result.Error
}
// Execute the flash swap arbitrage
// SECURITY FIX: Pass the per-execution transactOpts
tx, err := ae.executeFlashSwapArbitrage(ctx, flashSwapParams, transactOpts)
if err != nil {
result.Error = fmt.Errorf("flash swap execution failed: %w", err)
return result, result.Error
}
result.TransactionHash = tx.Hash()
// Wait for transaction confirmation
receipt, err := ae.waitForConfirmation(ctx, tx.Hash())
if err != nil {
result.Error = fmt.Errorf("transaction confirmation failed: %w", err)
return result, result.Error
}
// Process execution results
result.GasUsed = receipt.GasUsed
result.GasPrice = tx.GasPrice()
result.Success = receipt.Status == types.ReceiptStatusSuccessful
if result.Success {
// Calculate actual profit
actualProfit, err := ae.calculateActualProfit(ctx, receipt)
if err != nil {
ae.logger.Warn(fmt.Sprintf("Failed to calculate actual profit: %v", err))
actualProfit = params.Path.NetProfit // Fallback to estimated
}
result.ProfitRealized = actualProfit
profitRealizedStr := ethAmountString(ae.decimalConverter, nil, result.ProfitRealized)
ae.logger.Info(fmt.Sprintf("Arbitrage execution successful! TX: %s, Gas used: %d, Profit: %s ETH",
result.TransactionHash.Hex(), result.GasUsed, profitRealizedStr))
} else {
result.Error = fmt.Errorf("transaction failed with status %d", receipt.Status)
ae.logger.Error(fmt.Sprintf("Arbitrage execution failed! TX: %s, Gas used: %d",
result.TransactionHash.Hex(), result.GasUsed))
}
result.ExecutionTime = time.Since(start)
return result, result.Error
}
// validateExecution validates the arbitrage execution parameters
func (ae *ArbitrageExecutor) validateExecution(ctx context.Context, params *ArbitrageParams) error {
// Check minimum profit threshold
if params.Path.NetProfitDecimal != nil && ae.minProfitThresholdDecimal != nil {
if cmp, err := ae.decimalConverter.Compare(params.Path.NetProfitDecimal, ae.minProfitThresholdDecimal); err == nil && cmp < 0 {
return fmt.Errorf("profit %s below minimum threshold %s",
ae.decimalConverter.ToHumanReadable(params.Path.NetProfitDecimal),
ae.decimalConverter.ToHumanReadable(ae.minProfitThresholdDecimal))
}
} else if params.Path.NetProfit != nil && params.Path.NetProfit.Cmp(ae.minProfitThreshold) < 0 {
profitStr := ethAmountString(ae.decimalConverter, params.Path.NetProfitDecimal, params.Path.NetProfit)
thresholdStr := ethAmountString(ae.decimalConverter, ae.minProfitThresholdDecimal, ae.minProfitThreshold)
return fmt.Errorf("profit %s below minimum threshold %s", profitStr, thresholdStr)
}
// Validate path has at least 2 hops
if len(params.Path.Pools) < 2 {
return fmt.Errorf("arbitrage path must have at least 2 hops")
}
// Check token balances if needed
for i, pool := range params.Path.Pools {
if err := ae.validatePoolLiquidity(ctx, pool, params.InputAmount); err != nil {
return fmt.Errorf("pool %d validation failed: %w", i, err)
}
}
// Check gas price is reasonable - use dynamic threshold based on opportunity profitability
currentGasPrice, err := ae.client.SuggestGasPrice(ctx)
if err != nil {
return fmt.Errorf("failed to get current gas price: %w", err)
}
// Calculate maximum acceptable gas price for this specific opportunity
// Max gas price = (net profit / estimated gas units) to ensure profitability
estimatedGasUnits := big.NewInt(400000) // 400k gas typical for arbitrage
// Note: GasEstimate field may need to be added to ArbitragePath struct if not present
// Calculate max acceptable gas price: netProfit / gasUnits (if we have netProfit)
var effectiveMaxGas *big.Int
if params.Path.NetProfit != nil && params.Path.NetProfit.Cmp(big.NewInt(0)) > 0 {
maxAcceptableGas := new(big.Int).Div(params.Path.NetProfit, estimatedGasUnits)
// Use the lesser of configured max or calculated max
effectiveMaxGas = ae.maxGasPrice
if maxAcceptableGas.Cmp(effectiveMaxGas) < 0 {
effectiveMaxGas = maxAcceptableGas
}
} else {
// If no profit info, just use configured max
effectiveMaxGas = ae.maxGasPrice
}
if currentGasPrice.Cmp(effectiveMaxGas) > 0 {
profitStr := "unknown"
if params.Path.NetProfit != nil {
profitStr = params.Path.NetProfit.String()
}
return fmt.Errorf("gas price %s too high (max %s for this opportunity profit %s)",
currentGasPrice.String(), effectiveMaxGas.String(), profitStr)
}
return nil
}
// validatePoolLiquidity validates that a pool has sufficient liquidity
func (ae *ArbitrageExecutor) validatePoolLiquidity(ctx context.Context, pool *PoolInfo, amount *big.Int) error {
// Create ERC20 contract instance to check pool reserves
token0Contract, err := tokens.NewIERC20(pool.Token0, ae.client)
if err != nil {
return fmt.Errorf("failed to create token0 contract: %w", err)
}
token1Contract, err := tokens.NewIERC20(pool.Token1, ae.client)
if err != nil {
return fmt.Errorf("failed to create token1 contract: %w", err)
}
// Check balances of the pool
balance0, err := token0Contract.BalanceOf(ae.callOpts, pool.Address)
if err != nil {
return fmt.Errorf("failed to get token0 balance: %w", err)
}
balance1, err := token1Contract.BalanceOf(ae.callOpts, pool.Address)
if err != nil {
return fmt.Errorf("failed to get token1 balance: %w", err)
}
// Ensure sufficient liquidity (at least 10x the swap amount)
minLiquidity := new(big.Int).Mul(amount, big.NewInt(10))
if balance0.Cmp(minLiquidity) < 0 && balance1.Cmp(minLiquidity) < 0 {
return fmt.Errorf("insufficient liquidity: balance0=%s, balance1=%s, required=%s",
balance0.String(), balance1.String(), minLiquidity.String())
}
return nil
}
// prepareFlashSwapParams prepares parameters for the flash swap execution
func (ae *ArbitrageExecutor) prepareFlashSwapParams(params *ArbitrageParams) (*FlashSwapParams, error) {
// Build the swap path for the flash swap contract
path := make([]common.Address, len(params.Path.Tokens))
copy(path, params.Path.Tokens)
// Build pool addresses
pools := make([]common.Address, len(params.Path.Pools))
for i, pool := range params.Path.Pools {
pools[i] = pool.Address
}
// Build fee array
fees := make([]*big.Int, len(params.Path.Fees))
for i, fee := range params.Path.Fees {
fees[i] = big.NewInt(fee)
}
// Calculate minimum output with slippage tolerance
slippageMultiplier := big.NewFloat(1.0 - ae.slippageTolerance)
expectedOutputFloat := new(big.Float).SetInt(params.MinOutputAmount)
minOutputFloat := new(big.Float).Mul(expectedOutputFloat, slippageMultiplier)
minOutput := new(big.Int)
minOutputFloat.Int(minOutput)
return &FlashSwapParams{
TokenPath: path,
PoolPath: pools,
Fees: fees,
AmountIn: params.InputAmount,
MinAmountOut: minOutput,
Deadline: params.Deadline,
FlashSwapData: params.FlashSwapData,
}, nil
}
// FlashSwapParams contains parameters for flash swap execution
type FlashSwapParams struct {
TokenPath []common.Address
PoolPath []common.Address
Fees []*big.Int
AmountIn *big.Int
MinAmountOut *big.Int
Deadline *big.Int
FlashSwapData []byte
}
// executeFlashSwapArbitrage executes the flash swap arbitrage transaction
// executeFlashSwapArbitrage executes a flash swap arbitrage transaction
// SECURITY FIX: Now accepts transactOpts parameter for per-execution transaction options
func (ae *ArbitrageExecutor) executeFlashSwapArbitrage(ctx context.Context, params *FlashSwapParams, transactOpts *bind.TransactOpts) (*types.Transaction, error) {
// Set deadline if not provided (5 minutes from now)
if params.Deadline == nil {
params.Deadline = big.NewInt(time.Now().Add(5 * time.Minute).Unix())
}
poolAddress, flashSwapParams, err := ae.buildFlashSwapExecution(params)
if err != nil {
return nil, err
}
gasLimit, err := ae.estimateGasForArbitrage(ctx, params)
if err != nil {
ae.logger.Warn(fmt.Sprintf("Gas estimation failed, using default: %v", err))
gasLimit = ae.maxGasLimit
}
// SECURITY FIX: Update the provided transactOpts instead of shared field
transactOpts.GasLimit = gasLimit
transactOpts.Context = ctx
ae.logger.Debug(fmt.Sprintf("Executing flash swap via aggregator: pool=%s amount=%s minOut=%s gas=%d",
poolAddress.Hex(), params.AmountIn.String(), params.MinAmountOut.String(), gasLimit))
tx, err := ae.flashSwapContract.ExecuteFlashSwap(transactOpts, poolAddress, flashSwapParams)
if err != nil {
return nil, fmt.Errorf("failed to execute flash swap: %w", err)
}
ae.logger.Info(fmt.Sprintf("Flash swap transaction submitted: %s", tx.Hash().Hex()))
return tx, nil
}
func (ae *ArbitrageExecutor) buildFlashSwapExecution(params *FlashSwapParams) (common.Address, flashswap.FlashSwapParams, error) {
if params == nil {
return common.Address{}, flashswap.FlashSwapParams{}, fmt.Errorf("flash swap params cannot be nil")
}
if len(params.TokenPath) < 2 {
return common.Address{}, flashswap.FlashSwapParams{}, fmt.Errorf("token path must include at least two tokens")
}
if len(params.PoolPath) == 0 {
return common.Address{}, flashswap.FlashSwapParams{}, fmt.Errorf("pool path cannot be empty")
}
if params.AmountIn == nil || params.AmountIn.Sign() <= 0 {
return common.Address{}, flashswap.FlashSwapParams{}, fmt.Errorf("amount in must be positive")
}
fees := params.Fees
if fees == nil {
fees = make([]*big.Int, 0)
}
callbackData, err := encodeFlashSwapCallback(params.TokenPath, params.PoolPath, fees, params.MinAmountOut)
if err != nil {
return common.Address{}, flashswap.FlashSwapParams{}, err
}
flashParams := flashswap.FlashSwapParams{
Token0: params.TokenPath[0],
Token1: params.TokenPath[1],
Amount0: params.AmountIn,
Amount1: big.NewInt(0),
To: ae.arbitrageAddress,
Data: callbackData,
}
return params.PoolPath[0], flashParams, nil
}
// estimateGasForArbitrage estimates gas needed for the arbitrage transaction
func (ae *ArbitrageExecutor) estimateGasForArbitrage(ctx context.Context, params *FlashSwapParams) (uint64, error) {
poolAddress, flashSwapParams, err := ae.buildFlashSwapExecution(params)
if err != nil {
return 0, err
}
abiDefinition, err := flashswap.BaseFlashSwapperMetaData.GetAbi()
if err != nil {
return 0, err
}
callData, err := abiDefinition.Pack("executeFlashSwap", poolAddress, flashSwapParams)
if err != nil {
return 0, err
}
// Get from address for simulation
// Note: In simulation, we don't have transactOpts, so use keyManager
privateKey, err := ae.keyManager.GetActivePrivateKey()
if err != nil {
return 0, fmt.Errorf("failed to get private key for simulation: %w", err)
}
fromAddress := crypto.PubkeyToAddress(privateKey.PublicKey)
msg := ethereum.CallMsg{
From: fromAddress,
To: &ae.flashSwapAddress,
Gas: 0,
Data: callData,
}
estimatedGas, err := ae.client.EstimateGas(ctx, msg)
if err != nil {
return 0, err
}
gasWithBuffer := uint64(float64(estimatedGas) * 1.2)
if gasWithBuffer > ae.maxGasLimit {
gasWithBuffer = ae.maxGasLimit
}
return gasWithBuffer, nil
}
// updateGasPrice updates gas price based on network conditions
// updateGasPrice updates gas pricing parameters for the given TransactOpts
// SECURITY FIX: Now accepts transactOpts parameter instead of using shared field
func (ae *ArbitrageExecutor) updateGasPrice(ctx context.Context, transactOpts *bind.TransactOpts) error {
tipCap, err := ae.client.SuggestGasTipCap(ctx)
if err != nil {
tipCap = big.NewInt(100000000) // 0.1 gwei fallback
}
header, err := ae.client.HeaderByNumber(ctx, nil)
var feeCap *big.Int
if err != nil || header.BaseFee == nil {
baseFee, baseErr := ae.client.SuggestGasPrice(ctx)
if baseErr != nil {
baseFee = big.NewInt(1000000000) // 1 gwei fallback
}
feeCap = new(big.Int).Add(baseFee, tipCap)
} else {
feeCap = new(big.Int).Mul(header.BaseFee, big.NewInt(2))
feeCap.Add(feeCap, tipCap)
}
if ae.maxGasPrice != nil && feeCap.Cmp(ae.maxGasPrice) > 0 {
feeCap = new(big.Int).Set(ae.maxGasPrice)
}
// SECURITY FIX: Update the provided transactOpts, not a shared field
transactOpts.GasTipCap = tipCap
transactOpts.GasFeeCap = feeCap
transactOpts.GasPrice = nil
ae.logger.Debug(fmt.Sprintf("Updated gas parameters - tip: %s wei, max fee: %s wei", tipCap.String(), feeCap.String()))
return nil
}
// waitForConfirmation waits for transaction confirmation
func (ae *ArbitrageExecutor) waitForConfirmation(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
timeout := 30 * time.Second
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("transaction confirmation timeout")
case <-ticker.C:
receipt, err := ae.client.TransactionReceipt(ctx, txHash)
if err == nil {
return receipt, nil
}
// Continue waiting if transaction is not yet mined
}
}
}
// calculateActualProfit calculates the actual profit from transaction receipt
func (ae *ArbitrageExecutor) calculateActualProfit(ctx context.Context, receipt *types.Receipt) (*big.Int, error) {
// Parse logs to find profit events
for _, log := range receipt.Logs {
if log.Address == ae.arbitrageAddress {
// Parse arbitrage execution event
event, err := ae.arbitrageContract.ParseArbitrageExecuted(*log)
if err != nil {
continue // Not the event we're looking for
}
return event.Profit, nil
}
}
// If no event found, calculate from balance changes
return ae.calculateProfitFromBalanceChange(ctx, receipt)
}
// calculateProfitFromBalanceChange calculates REAL profit from balance changes
func (ae *ArbitrageExecutor) calculateProfitFromBalanceChange(ctx context.Context, receipt *types.Receipt) (*big.Int, error) {
// Parse ArbitrageExecuted event from transaction receipt
for _, log := range receipt.Logs {
if len(log.Topics) >= 4 && log.Topics[0].Hex() == "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925" {
// ArbitrageExecuted event signature
// topic[1] = tokenA, topic[2] = tokenB
// data contains: amountIn, profit, gasUsed
if len(log.Data) >= 96 { // 3 * 32 bytes
amountIn := new(big.Int).SetBytes(log.Data[0:32])
profit := new(big.Int).SetBytes(log.Data[32:64])
gasUsed := new(big.Int).SetBytes(log.Data[64:96])
ae.logger.Info(fmt.Sprintf("Arbitrage executed - AmountIn: %s, Profit: %s, Gas: %s",
amountIn.String(), profit.String(), gasUsed.String()))
// Verify profit covers gas costs
gasCost := new(big.Int).Mul(gasUsed, receipt.EffectiveGasPrice)
netProfit := new(big.Int).Sub(profit, gasCost)
if netProfit.Sign() > 0 {
netProfitStr := ethAmountString(ae.decimalConverter, nil, netProfit)
ae.logger.Info(fmt.Sprintf("PROFITABLE ARBITRAGE - Net profit: %s ETH",
netProfitStr))
return netProfit, nil
} else {
loss := new(big.Int).Neg(netProfit)
lossStr := ethAmountString(ae.decimalConverter, nil, loss)
ae.logger.Warn(fmt.Sprintf("UNPROFITABLE ARBITRAGE - Loss: %s ETH",
lossStr))
return big.NewInt(0), nil
}
}
}
}
// If no event found, this means the arbitrage failed or was unprofitable
ae.logger.Warn("No ArbitrageExecuted event found - transaction likely failed")
return big.NewInt(0), fmt.Errorf("no arbitrage execution detected in transaction")
}
// GetArbitrageHistory retrieves historical arbitrage executions by parsing contract events
func (ae *ArbitrageExecutor) GetArbitrageHistory(ctx context.Context, fromBlock, toBlock *big.Int) ([]*ArbitrageEvent, error) {
ae.logger.Info(fmt.Sprintf("Fetching arbitrage history from block %s to %s", fromBlock.String(), toBlock.String()))
// Create filter options for arbitrage events
filterOpts := &bind.FilterOpts{
Start: fromBlock.Uint64(),
End: &[]uint64{toBlock.Uint64()}[0],
Context: ctx,
}
var allEvents []*ArbitrageEvent
// Fetch ArbitrageExecuted events - using proper filter signature
// FilterArbitrageExecuted(opts, initiator []common.Address, arbType []uint8)
executeIter, err := ae.arbitrageContract.FilterArbitrageExecuted(filterOpts, nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to filter arbitrage executed events: %w", err)
}
defer executeIter.Close()
for executeIter.Next() {
event := executeIter.Event
// Note: ArbitrageExecutor event doesn't include amounts, only tokens and profit
// event struct has: Initiator, ArbType, Tokens[], Profit
var tokenIn, tokenOut common.Address
if len(event.Tokens) > 0 {
tokenIn = event.Tokens[0]
if len(event.Tokens) > 1 {
tokenOut = event.Tokens[len(event.Tokens)-1]
}
}
arbitrageEvent := &ArbitrageEvent{
TransactionHash: event.Raw.TxHash,
BlockNumber: event.Raw.BlockNumber,
TokenIn: tokenIn,
TokenOut: tokenOut,
AmountIn: big.NewInt(0), // Not available in this event
AmountOut: big.NewInt(0), // Not available in this event
Profit: event.Profit,
Timestamp: time.Now(), // Would parse from block timestamp in production
}
allEvents = append(allEvents, arbitrageEvent)
}
if err := executeIter.Error(); err != nil {
return nil, fmt.Errorf("error iterating arbitrage executed events: %w", err)
}
// Note: BaseFlashSwapper contract doesn't emit FlashSwapExecuted events
// It only emits EmergencyWithdraw and OwnershipTransferred events
// Flash swap execution is tracked via the ArbitrageExecuted event above
// If needed in the future, could fetch EmergencyWithdrawExecuted events:
// flashIter, err := ae.flashSwapContract.FilterEmergencyWithdrawExecuted(filterOpts, nil, nil)
// For now, skip flash swap event fetching as it's redundant with ArbitrageExecuted
ae.logger.Info(fmt.Sprintf("Retrieved %d arbitrage events from blocks %s to %s",
len(allEvents), fromBlock.String(), toBlock.String()))
return allEvents, nil
}
// ArbitrageEvent represents a historical arbitrage event
type ArbitrageEvent struct {
TransactionHash common.Hash
BlockNumber uint64
TokenIn common.Address
TokenOut common.Address
AmountIn *big.Int
AmountOut *big.Int
Profit *big.Int
Timestamp time.Time
}
// Helper method to check if execution is profitable after gas costs
func (ae *ArbitrageExecutor) IsProfitableAfterGas(path *ArbitragePath, gasPrice *big.Int) bool {
gasCost := new(big.Int).Mul(gasPrice, path.EstimatedGas)
netProfit := new(big.Int).Sub(path.NetProfit, gasCost)
return netProfit.Cmp(ae.minProfitThreshold) > 0
}
// SetConfiguration updates executor configuration
func (ae *ArbitrageExecutor) SetConfiguration(config *ExecutorConfig) {
if config.MaxGasPrice != nil {
ae.maxGasPrice = config.MaxGasPrice
}
if config.MaxGasLimit > 0 {
ae.maxGasLimit = config.MaxGasLimit
}
if config.SlippageTolerance > 0 {
ae.slippageTolerance = config.SlippageTolerance
}
if config.MinProfitThreshold != nil {
ae.minProfitThreshold = config.MinProfitThreshold
}
}
// executeUniswapV3FlashSwap executes a flash swap directly on a Uniswap V3 pool
// SECURITY FIX: Now accepts transactOpts parameter for per-execution transaction options
func (ae *ArbitrageExecutor) executeUniswapV3FlashSwap(ctx context.Context, poolAddress common.Address, params flashswap.FlashSwapParams, transactOpts *bind.TransactOpts) (*types.Transaction, error) {
ae.logger.Debug(fmt.Sprintf("Executing Uniswap V3 flash swap on pool %s", poolAddress.Hex()))
// Create pool contract instance using IUniswapV3PoolActions interface
poolContract, err := uniswap.NewIUniswapV3PoolActions(poolAddress, ae.client)
if err != nil {
return nil, fmt.Errorf("failed to create pool contract instance: %w", err)
}
// For Uniswap V3, we use the pool's flash function directly
// The callback will handle the arbitrage logic
// Encode the arbitrage data that will be passed to the callback
arbitrageData, err := ae.encodeArbitrageData(params)
if err != nil {
return nil, fmt.Errorf("failed to encode arbitrage data: %w", err)
}
// Execute flash swap on the pool
// amount0 > 0 means we're borrowing token0, amount1 > 0 means we're borrowing token1
// SECURITY FIX: Use provided transactOpts instead of shared field
tx, err := poolContract.Flash(transactOpts, transactOpts.From, params.Amount0, params.Amount1, arbitrageData)
if err != nil {
return nil, fmt.Errorf("flash swap transaction failed: %w", err)
}
ae.logger.Info(fmt.Sprintf("Uniswap V3 flash swap initiated: %s", tx.Hash().Hex()))
return tx, nil
}
// encodeArbitrageData encodes the arbitrage parameters for the flash callback
func (ae *ArbitrageExecutor) encodeArbitrageData(params flashswap.FlashSwapParams) ([]byte, error) {
// For now, we'll encode basic parameters
// In production, this would include the full arbitrage path and swap details
data := struct {
Token0 common.Address
Token1 common.Address
Amount0 *big.Int
Amount1 *big.Int
To common.Address
}{
Token0: params.Token0,
Token1: params.Token1,
Amount0: params.Amount0,
Amount1: params.Amount1,
To: params.To,
}
// For simplicity, we'll just return the data as JSON bytes
// In production, you'd use proper ABI encoding
return []byte(fmt.Sprintf(`{"token0":"%s","token1":"%s","amount0":"%s","amount1":"%s","to":"%s"}`,
data.Token0.Hex(), data.Token1.Hex(), data.Amount0.String(), data.Amount1.String(), data.To.Hex())), nil
}
// ExecutorConfig contains configuration for the arbitrage executor
type ExecutorConfig struct {
MaxGasPrice *big.Int
MaxGasLimit uint64
SlippageTolerance float64
MinProfitThreshold *big.Int
}
// StartLiveExecution starts the comprehensive live execution framework
func (ae *ArbitrageExecutor) StartLiveExecution(ctx context.Context) error {
ae.logger.Info("🚀 Starting comprehensive MEV bot live execution framework...")
if ae.liveFramework == nil {
return fmt.Errorf("live execution framework not initialized")
}
// Start the live framework which orchestrates all components
return ae.liveFramework.Start(ctx)
}
// StopLiveExecution gracefully stops the live execution framework
func (ae *ArbitrageExecutor) StopLiveExecution() error {
ae.logger.Info("🛑 Stopping live execution framework...")
if ae.liveFramework == nil {
return fmt.Errorf("live execution framework not initialized")
}
return ae.liveFramework.Stop()
}
// GetLiveMetrics returns real-time metrics from the live execution framework
func (ae *ArbitrageExecutor) GetLiveMetrics() (*LiveExecutionMetrics, error) {
if ae.liveFramework == nil {
return nil, fmt.Errorf("live execution framework not initialized")
}
return ae.liveFramework.GetMetrics(), nil
}
// ScanForOpportunities uses the detection engine to find arbitrage opportunities
func (ae *ArbitrageExecutor) ScanForOpportunities(ctx context.Context, tokenPairs []TokenPair) ([]*pkgtypes.ArbitrageOpportunity, error) {
ae.logger.Info(fmt.Sprintf("🔍 Scanning for arbitrage opportunities across %d token pairs...", len(tokenPairs)))
if ae.detectionEngine == nil {
return nil, fmt.Errorf("detection engine not initialized")
}
// Convert token pairs to detection parameters
detectionParams := make([]*DetectionParams, len(tokenPairs))
for i, pair := range tokenPairs {
detectionParams[i] = &DetectionParams{
TokenA: pair.TokenA,
TokenB: pair.TokenB,
MinProfit: ae.minProfitThreshold,
MaxSlippage: ae.slippageTolerance,
}
}
return ae.detectionEngine.ScanOpportunities(ctx, detectionParams)
}
// ExecuteOpportunityWithFramework executes an opportunity using the live framework
func (ae *ArbitrageExecutor) ExecuteOpportunityWithFramework(ctx context.Context, opportunity *pkgtypes.ArbitrageOpportunity) (*ExecutionResult, error) {
ae.logger.Info(fmt.Sprintf("⚡ Executing opportunity with expected profit: %s", opportunity.NetProfit.String()))
if ae.liveFramework == nil {
return nil, fmt.Errorf("live execution framework not initialized")
}
// Create a channel to receive the execution result
resultChan := make(chan *ExecutionResult, 1)
// Use the live framework to execute the opportunity
executionTask := &ExecutionTask{
Opportunity: opportunity,
Priority: calculatePriority(opportunity),
Deadline: time.Now().Add(30 * time.Second),
ResultChan: resultChan,
}
// Submit the task to the framework
ae.liveFramework.SubmitExecutionTask(ctx, executionTask)
// Wait for the result with timeout
select {
case result := <-resultChan:
if result == nil {
return nil, fmt.Errorf("execution returned nil result")
}
return result, nil
case <-time.After(45 * time.Second): // 45s timeout to avoid hanging
return nil, fmt.Errorf("execution timeout after 45 seconds")
}
}
// GetSupportedExchanges returns all supported exchanges from the registry
func (ae *ArbitrageExecutor) GetSupportedExchanges() ([]*exchanges.ExchangeConfig, error) {
if ae.exchangeRegistry == nil {
return nil, fmt.Errorf("exchange registry not initialized")
}
return ae.exchangeRegistry.GetAllExchanges(), nil
}
// CalculateOptimalPath finds the most profitable arbitrage path
func (ae *ArbitrageExecutor) CalculateOptimalPath(ctx context.Context, tokenA, tokenB common.Address, amount *math.UniversalDecimal) (*pkgtypes.ArbitrageOpportunity, error) {
ae.logger.Debug(fmt.Sprintf("📊 Calculating optimal arbitrage path for %s -> %s, amount: %s",
tokenA.Hex()[:8], tokenB.Hex()[:8], amount.String()))
if ae.arbitrageCalculator == nil {
return nil, fmt.Errorf("arbitrage calculator not initialized")
}
// Get all possible paths between tokens
paths, err := ae.exchangeRegistry.FindAllPaths(tokenA, tokenB, 3) // Max 3 hops
if err != nil {
return nil, fmt.Errorf("failed to find paths: %w", err)
}
// Calculate profitability for each path
var bestOpportunity *pkgtypes.ArbitrageOpportunity
for _, path := range paths {
// Convert the exchanges.ArbitragePath to []*math.PoolData
poolData, err := ae.convertArbitragePathToPoolData(path)
if err != nil {
ae.logger.Debug(fmt.Sprintf("Failed to convert arbitrage path to pool data: %v", err))
continue
}
opportunity, err := ae.arbitrageCalculator.CalculateArbitrage(ctx, amount, poolData)
if err != nil {
ae.logger.Debug(fmt.Sprintf("Path calculation failed: %v", err))
continue
}
if bestOpportunity == nil || opportunity.NetProfit.Cmp(bestOpportunity.NetProfit) > 0 {
bestOpportunity = opportunity
}
}
if bestOpportunity == nil {
return nil, fmt.Errorf("no profitable arbitrage paths found")
}
ae.logger.Info(fmt.Sprintf("💎 Found optimal path with profit: %s, confidence: %.2f%%",
bestOpportunity.NetProfit.String(), bestOpportunity.Confidence*100))
return bestOpportunity, nil
}
// Helper types are now defined in types.go
// addTrustedContractsToValidator adds trusted contracts to the contract validator
func addTrustedContractsToValidator(validator *security.ContractValidator, arbitrageAddr, flashSwapAddr common.Address) error {
// Add arbitrage contract
arbitrageInfo := &security.ContractInfo{
Address: arbitrageAddr,
BytecodeHash: "placeholder_arbitrage_hash", // TODO: Get actual bytecode hash
Name: "MEV Arbitrage Contract",
Version: "1.0.0",
IsWhitelisted: true,
RiskLevel: security.RiskLevelLow,
Permissions: security.ContractPermissions{
CanInteract: true,
CanSendValue: true,
RequireConfirm: false,
},
}
if err := validator.AddTrustedContract(arbitrageInfo); err != nil {
return fmt.Errorf("failed to add arbitrage contract: %w", err)
}
// Add flash swap contract
flashSwapInfo := &security.ContractInfo{
Address: flashSwapAddr,
BytecodeHash: "placeholder_flashswap_hash", // TODO: Get actual bytecode hash
Name: "Flash Swap Contract",
Version: "1.0.0",
IsWhitelisted: true,
RiskLevel: security.RiskLevelLow,
Permissions: security.ContractPermissions{
CanInteract: true,
CanSendValue: true,
RequireConfirm: false,
},
}
if err := validator.AddTrustedContract(flashSwapInfo); err != nil {
return fmt.Errorf("failed to add flash swap contract: %w", err)
}
return nil
}
// convertArbitragePathToPoolData converts an exchanges.ArbitragePath to []*math.PoolData
func (ae *ArbitrageExecutor) convertArbitragePathToPoolData(path *exchanges.ArbitragePath) ([]*math.PoolData, error) {
var poolData []*math.PoolData
// This is a simplified approach - in a real implementation, you'd fetch the actual pool details
// For now, we'll create mock PoolData objects based on the path information
for i, poolAddr := range path.Pools {
// Create mock token info - would come from actual pool in production
token0Info := math.TokenInfo{
Address: path.TokenIn.Hex(),
Symbol: "TOKEN0", // would be fetched in real implementation
Decimals: 18, // typical for most tokens
}
token1Info := math.TokenInfo{
Address: path.TokenOut.Hex(),
Symbol: "TOKEN1", // would be fetched in real implementation
Decimals: 18, // typical for most tokens
}
// Create mock fee - would come from actual pool in production
feeValue, _ := ae.decimalConverter.FromString("3000", 0, "FEE") // 0.3% fee in fee units
// Create mock reserves
reserve0Value, _ := ae.decimalConverter.FromString("1000000", 18, "RESERVE") // 1M tokens
reserve1Value, _ := ae.decimalConverter.FromString("1000000", 18, "RESERVE") // 1M tokens
// Create mock PoolData
pool := &math.PoolData{
Address: poolAddr.Hex(),
ExchangeType: path.Exchanges[i], // Use the corresponding exchange type
Token0: token0Info,
Token1: token1Info,
Fee: feeValue,
Reserve0: reserve0Value,
Reserve1: reserve1Value,
Liquidity: big.NewInt(1000000), // 1M liquidity for mock
}
poolData = append(poolData, pool)
}
return poolData, nil
}
// calculatePriority calculates execution priority based on opportunity characteristics
func calculatePriority(opportunity *pkgtypes.ArbitrageOpportunity) int {
// Higher profit = higher priority
profitScore := int(opportunity.NetProfit.Int64() / 1000000000000000) // ETH in finney
// Higher confidence = higher priority
confidenceScore := int(opportunity.Confidence * 100)
// Lower risk = higher priority
riskScore := 100 - int(opportunity.Risk*100)
return profitScore + confidenceScore + riskScore
}