Some checks failed
V2 CI/CD Pipeline / Pre-Flight Checks (push) Has been cancelled
V2 CI/CD Pipeline / Build & Dependencies (push) Has been cancelled
V2 CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
V2 CI/CD Pipeline / Unit Tests (100% Coverage Required) (push) Has been cancelled
V2 CI/CD Pipeline / Integration Tests (push) Has been cancelled
V2 CI/CD Pipeline / Performance Benchmarks (push) Has been cancelled
V2 CI/CD Pipeline / Decimal Precision Validation (push) Has been cancelled
V2 CI/CD Pipeline / Modularity Validation (push) Has been cancelled
V2 CI/CD Pipeline / Final Validation Summary (push) Has been cancelled
Implemented Phase 3 of the V2 architecture: a comprehensive arbitrage detection engine with path finding, profitability calculation, and opportunity detection. Core Components: - Opportunity struct: Represents arbitrage opportunities with full execution context - PathFinder: Finds two-pool, triangular, and multi-hop arbitrage paths using BFS - Calculator: Calculates profitability using protocol-specific math (V2, V3, Curve) - GasEstimator: Estimates gas costs and optimal gas prices - Detector: Main orchestration component for opportunity detection Features: - Multi-protocol support: UniswapV2, UniswapV3, Curve StableSwap - Concurrent path evaluation with configurable limits - Input amount optimization for maximum profit - Real-time swap monitoring and opportunity stream - Comprehensive statistics tracking - Token whitelisting and filtering Path Finding: - Two-pool arbitrage: A→B→A across different pools - Triangular arbitrage: A→B→C→A with three pools - Multi-hop arbitrage: Up to 4 hops with BFS search - Liquidity and protocol filtering - Duplicate path detection Profitability Calculation: - Protocol-specific swap calculations - Price impact estimation - Gas cost estimation with multipliers - Net profit after fees and gas - ROI and priority scoring - Executable opportunity filtering Testing: - 100% test coverage for all components - 1,400+ lines of comprehensive tests - Unit tests for all public methods - Integration tests for full workflows - Edge case and error handling tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
487 lines
12 KiB
Go
487 lines
12 KiB
Go
package arbitrage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
|
|
"github.com/your-org/mev-bot/pkg/cache"
|
|
mevtypes "github.com/your-org/mev-bot/pkg/types"
|
|
)
|
|
|
|
// DetectorConfig contains configuration for the opportunity detector
|
|
type DetectorConfig struct {
|
|
// Path finding
|
|
MaxPathsToEvaluate int
|
|
EvaluationTimeout time.Duration
|
|
|
|
// Input amount optimization
|
|
MinInputAmount *big.Int
|
|
MaxInputAmount *big.Int
|
|
OptimizeInput bool
|
|
|
|
// Gas price
|
|
DefaultGasPrice *big.Int
|
|
MaxGasPrice *big.Int
|
|
|
|
// Token whitelist (empty = all tokens allowed)
|
|
WhitelistedTokens []common.Address
|
|
|
|
// Concurrent evaluation
|
|
MaxConcurrentEvaluations int
|
|
}
|
|
|
|
// DefaultDetectorConfig returns default configuration
|
|
func DefaultDetectorConfig() *DetectorConfig {
|
|
return &DetectorConfig{
|
|
MaxPathsToEvaluate: 50,
|
|
EvaluationTimeout: 5 * time.Second,
|
|
MinInputAmount: new(big.Int).Mul(big.NewInt(1), big.NewInt(1e17)), // 0.1 ETH
|
|
MaxInputAmount: new(big.Int).Mul(big.NewInt(10), big.NewInt(1e18)), // 10 ETH
|
|
OptimizeInput: true,
|
|
DefaultGasPrice: big.NewInt(1e9), // 1 gwei
|
|
MaxGasPrice: big.NewInt(100e9), // 100 gwei
|
|
WhitelistedTokens: []common.Address{},
|
|
MaxConcurrentEvaluations: 10,
|
|
}
|
|
}
|
|
|
|
// Detector detects arbitrage opportunities
|
|
type Detector struct {
|
|
config *DetectorConfig
|
|
pathFinder *PathFinder
|
|
calculator *Calculator
|
|
poolCache *cache.PoolCache
|
|
logger *slog.Logger
|
|
|
|
// Statistics
|
|
stats *OpportunityStats
|
|
statsMutex sync.RWMutex
|
|
|
|
// Channels for opportunity stream
|
|
opportunityCh chan *Opportunity
|
|
}
|
|
|
|
// NewDetector creates a new opportunity detector
|
|
func NewDetector(
|
|
config *DetectorConfig,
|
|
pathFinder *PathFinder,
|
|
calculator *Calculator,
|
|
poolCache *cache.PoolCache,
|
|
logger *slog.Logger,
|
|
) *Detector {
|
|
if config == nil {
|
|
config = DefaultDetectorConfig()
|
|
}
|
|
|
|
return &Detector{
|
|
config: config,
|
|
pathFinder: pathFinder,
|
|
calculator: calculator,
|
|
poolCache: poolCache,
|
|
logger: logger.With("component", "detector"),
|
|
stats: &OpportunityStats{},
|
|
opportunityCh: make(chan *Opportunity, 100),
|
|
}
|
|
}
|
|
|
|
// DetectOpportunities finds all arbitrage opportunities for a token
|
|
func (d *Detector) DetectOpportunities(ctx context.Context, token common.Address) ([]*Opportunity, error) {
|
|
d.logger.Debug("detecting opportunities", "token", token.Hex())
|
|
|
|
startTime := time.Now()
|
|
|
|
// Check if token is whitelisted (if whitelist is configured)
|
|
if !d.isTokenWhitelisted(token) {
|
|
return nil, fmt.Errorf("token %s not whitelisted", token.Hex())
|
|
}
|
|
|
|
// Find all possible paths
|
|
paths, err := d.pathFinder.FindAllArbitragePaths(ctx, token)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to find paths: %w", err)
|
|
}
|
|
|
|
if len(paths) == 0 {
|
|
d.logger.Debug("no paths found", "token", token.Hex())
|
|
return []*Opportunity{}, nil
|
|
}
|
|
|
|
d.logger.Info("found paths for evaluation",
|
|
"token", token.Hex(),
|
|
"pathCount", len(paths),
|
|
)
|
|
|
|
// Limit number of paths to evaluate
|
|
if len(paths) > d.config.MaxPathsToEvaluate {
|
|
paths = paths[:d.config.MaxPathsToEvaluate]
|
|
}
|
|
|
|
// Evaluate paths concurrently
|
|
opportunities, err := d.evaluatePathsConcurrently(ctx, paths)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to evaluate paths: %w", err)
|
|
}
|
|
|
|
// Filter to only profitable opportunities
|
|
profitable := d.filterProfitable(opportunities)
|
|
|
|
// Update statistics
|
|
d.updateStats(profitable)
|
|
|
|
d.logger.Info("detection complete",
|
|
"token", token.Hex(),
|
|
"totalPaths", len(paths),
|
|
"evaluated", len(opportunities),
|
|
"profitable", len(profitable),
|
|
"duration", time.Since(startTime),
|
|
)
|
|
|
|
return profitable, nil
|
|
}
|
|
|
|
// DetectOpportunitiesForSwap detects opportunities triggered by a new swap event
|
|
func (d *Detector) DetectOpportunitiesForSwap(ctx context.Context, swapEvent *mevtypes.SwapEvent) ([]*Opportunity, error) {
|
|
d.logger.Debug("detecting opportunities from swap",
|
|
"pool", swapEvent.PoolAddress.Hex(),
|
|
"protocol", swapEvent.Protocol,
|
|
)
|
|
|
|
// Get affected tokens
|
|
tokens := []common.Address{swapEvent.TokenIn, swapEvent.TokenOut}
|
|
|
|
allOpportunities := make([]*Opportunity, 0)
|
|
|
|
// Check for opportunities involving either token
|
|
for _, token := range tokens {
|
|
opps, err := d.DetectOpportunities(ctx, token)
|
|
if err != nil {
|
|
d.logger.Warn("failed to detect opportunities for token",
|
|
"token", token.Hex(),
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
|
|
allOpportunities = append(allOpportunities, opps...)
|
|
}
|
|
|
|
d.logger.Info("detection from swap complete",
|
|
"pool", swapEvent.PoolAddress.Hex(),
|
|
"opportunitiesFound", len(allOpportunities),
|
|
)
|
|
|
|
return allOpportunities, nil
|
|
}
|
|
|
|
// DetectBetweenTokens finds arbitrage opportunities between two specific tokens
|
|
func (d *Detector) DetectBetweenTokens(ctx context.Context, tokenA, tokenB common.Address) ([]*Opportunity, error) {
|
|
d.logger.Debug("detecting opportunities between tokens",
|
|
"tokenA", tokenA.Hex(),
|
|
"tokenB", tokenB.Hex(),
|
|
)
|
|
|
|
// Find two-pool arbitrage paths
|
|
paths, err := d.pathFinder.FindTwoPoolPaths(ctx, tokenA, tokenB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to find two-pool paths: %w", err)
|
|
}
|
|
|
|
// Evaluate paths
|
|
opportunities, err := d.evaluatePathsConcurrently(ctx, paths)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to evaluate paths: %w", err)
|
|
}
|
|
|
|
// Filter profitable
|
|
profitable := d.filterProfitable(opportunities)
|
|
|
|
d.logger.Info("detection between tokens complete",
|
|
"tokenA", tokenA.Hex(),
|
|
"tokenB", tokenB.Hex(),
|
|
"profitable", len(profitable),
|
|
)
|
|
|
|
return profitable, nil
|
|
}
|
|
|
|
// evaluatePathsConcurrently evaluates multiple paths concurrently
|
|
func (d *Detector) evaluatePathsConcurrently(ctx context.Context, paths []*Path) ([]*Opportunity, error) {
|
|
evalCtx, cancel := context.WithTimeout(ctx, d.config.EvaluationTimeout)
|
|
defer cancel()
|
|
|
|
// Semaphore for limiting concurrent evaluations
|
|
sem := make(chan struct{}, d.config.MaxConcurrentEvaluations)
|
|
|
|
var wg sync.WaitGroup
|
|
results := make(chan *Opportunity, len(paths))
|
|
errors := make(chan error, len(paths))
|
|
|
|
for _, path := range paths {
|
|
wg.Add(1)
|
|
|
|
go func(p *Path) {
|
|
defer wg.Done()
|
|
|
|
// Acquire semaphore
|
|
select {
|
|
case sem <- struct{}{}:
|
|
defer func() { <-sem }()
|
|
case <-evalCtx.Done():
|
|
errors <- evalCtx.Err()
|
|
return
|
|
}
|
|
|
|
opp, err := d.evaluatePath(evalCtx, p)
|
|
if err != nil {
|
|
d.logger.Debug("failed to evaluate path", "error", err)
|
|
errors <- err
|
|
return
|
|
}
|
|
|
|
if opp != nil {
|
|
results <- opp
|
|
}
|
|
}(path)
|
|
}
|
|
|
|
// Wait for all evaluations to complete
|
|
go func() {
|
|
wg.Wait()
|
|
close(results)
|
|
close(errors)
|
|
}()
|
|
|
|
// Collect results
|
|
opportunities := make([]*Opportunity, 0)
|
|
for opp := range results {
|
|
opportunities = append(opportunities, opp)
|
|
}
|
|
|
|
return opportunities, nil
|
|
}
|
|
|
|
// evaluatePath evaluates a single path for profitability
|
|
func (d *Detector) evaluatePath(ctx context.Context, path *Path) (*Opportunity, error) {
|
|
gasPrice := d.config.DefaultGasPrice
|
|
|
|
// Determine input amount
|
|
inputAmount := d.config.MinInputAmount
|
|
|
|
var opportunity *Opportunity
|
|
var err error
|
|
|
|
if d.config.OptimizeInput {
|
|
// Optimize input amount for maximum profit
|
|
opportunity, err = d.calculator.OptimizeInputAmount(ctx, path, gasPrice, d.config.MaxInputAmount)
|
|
} else {
|
|
// Use fixed input amount
|
|
opportunity, err = d.calculator.CalculateProfitability(ctx, path, inputAmount, gasPrice)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to calculate profitability: %w", err)
|
|
}
|
|
|
|
return opportunity, nil
|
|
}
|
|
|
|
// filterProfitable filters opportunities to only include profitable ones
|
|
func (d *Detector) filterProfitable(opportunities []*Opportunity) []*Opportunity {
|
|
profitable := make([]*Opportunity, 0)
|
|
|
|
for _, opp := range opportunities {
|
|
if opp.IsProfitable() && opp.CanExecute() {
|
|
profitable = append(profitable, opp)
|
|
}
|
|
}
|
|
|
|
return profitable
|
|
}
|
|
|
|
// isTokenWhitelisted checks if a token is whitelisted
|
|
func (d *Detector) isTokenWhitelisted(token common.Address) bool {
|
|
if len(d.config.WhitelistedTokens) == 0 {
|
|
return true // No whitelist = all tokens allowed
|
|
}
|
|
|
|
for _, whitelisted := range d.config.WhitelistedTokens {
|
|
if token == whitelisted {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// updateStats updates detection statistics
|
|
func (d *Detector) updateStats(opportunities []*Opportunity) {
|
|
d.statsMutex.Lock()
|
|
defer d.statsMutex.Unlock()
|
|
|
|
d.stats.TotalDetected += len(opportunities)
|
|
d.stats.LastDetected = time.Now()
|
|
|
|
for _, opp := range opportunities {
|
|
if opp.IsProfitable() {
|
|
d.stats.TotalProfitable++
|
|
}
|
|
|
|
if opp.CanExecute() {
|
|
d.stats.TotalExecutable++
|
|
}
|
|
|
|
// Update max profit
|
|
if d.stats.MaxProfit == nil || opp.NetProfit.Cmp(d.stats.MaxProfit) > 0 {
|
|
d.stats.MaxProfit = new(big.Int).Set(opp.NetProfit)
|
|
}
|
|
|
|
// Update total profit
|
|
if d.stats.TotalProfit == nil {
|
|
d.stats.TotalProfit = big.NewInt(0)
|
|
}
|
|
d.stats.TotalProfit.Add(d.stats.TotalProfit, opp.NetProfit)
|
|
}
|
|
|
|
// Calculate average profit
|
|
if d.stats.TotalDetected > 0 && d.stats.TotalProfit != nil {
|
|
d.stats.AverageProfit = new(big.Int).Div(
|
|
d.stats.TotalProfit,
|
|
big.NewInt(int64(d.stats.TotalDetected)),
|
|
)
|
|
}
|
|
}
|
|
|
|
// GetStats returns current detection statistics
|
|
func (d *Detector) GetStats() OpportunityStats {
|
|
d.statsMutex.RLock()
|
|
defer d.statsMutex.RUnlock()
|
|
|
|
// Create a copy to avoid race conditions
|
|
stats := *d.stats
|
|
|
|
if d.stats.AverageProfit != nil {
|
|
stats.AverageProfit = new(big.Int).Set(d.stats.AverageProfit)
|
|
}
|
|
if d.stats.MaxProfit != nil {
|
|
stats.MaxProfit = new(big.Int).Set(d.stats.MaxProfit)
|
|
}
|
|
if d.stats.TotalProfit != nil {
|
|
stats.TotalProfit = new(big.Int).Set(d.stats.TotalProfit)
|
|
}
|
|
if d.stats.MedianProfit != nil {
|
|
stats.MedianProfit = new(big.Int).Set(d.stats.MedianProfit)
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// OpportunityStream returns a channel that receives detected opportunities
|
|
func (d *Detector) OpportunityStream() <-chan *Opportunity {
|
|
return d.opportunityCh
|
|
}
|
|
|
|
// PublishOpportunity publishes an opportunity to the stream
|
|
func (d *Detector) PublishOpportunity(opp *Opportunity) {
|
|
select {
|
|
case d.opportunityCh <- opp:
|
|
default:
|
|
d.logger.Warn("opportunity channel full, dropping opportunity", "id", opp.ID)
|
|
}
|
|
}
|
|
|
|
// MonitorSwaps monitors swap events and detects opportunities
|
|
func (d *Detector) MonitorSwaps(ctx context.Context, swapCh <-chan *mevtypes.SwapEvent) {
|
|
d.logger.Info("starting swap monitor")
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
d.logger.Info("swap monitor stopped")
|
|
return
|
|
|
|
case swap, ok := <-swapCh:
|
|
if !ok {
|
|
d.logger.Info("swap channel closed")
|
|
return
|
|
}
|
|
|
|
// Detect opportunities for this swap
|
|
opportunities, err := d.DetectOpportunitiesForSwap(ctx, swap)
|
|
if err != nil {
|
|
d.logger.Error("failed to detect opportunities for swap",
|
|
"pool", swap.PoolAddress.Hex(),
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Publish opportunities to stream
|
|
for _, opp := range opportunities {
|
|
d.PublishOpportunity(opp)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ScanForOpportunities continuously scans for arbitrage opportunities
|
|
func (d *Detector) ScanForOpportunities(ctx context.Context, interval time.Duration, tokens []common.Address) {
|
|
d.logger.Info("starting opportunity scanner",
|
|
"interval", interval,
|
|
"tokenCount", len(tokens),
|
|
)
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
d.logger.Info("opportunity scanner stopped")
|
|
return
|
|
|
|
case <-ticker.C:
|
|
d.logger.Debug("scanning for opportunities")
|
|
|
|
for _, token := range tokens {
|
|
opportunities, err := d.DetectOpportunities(ctx, token)
|
|
if err != nil {
|
|
d.logger.Warn("failed to detect opportunities",
|
|
"token", token.Hex(),
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Publish opportunities
|
|
for _, opp := range opportunities {
|
|
d.PublishOpportunity(opp)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// RankOpportunities ranks opportunities by priority
|
|
func (d *Detector) RankOpportunities(opportunities []*Opportunity) []*Opportunity {
|
|
// Sort by priority (highest first)
|
|
ranked := make([]*Opportunity, len(opportunities))
|
|
copy(ranked, opportunities)
|
|
|
|
// Simple bubble sort (good enough for small lists)
|
|
for i := 0; i < len(ranked)-1; i++ {
|
|
for j := 0; j < len(ranked)-i-1; j++ {
|
|
if ranked[j].Priority < ranked[j+1].Priority {
|
|
ranked[j], ranked[j+1] = ranked[j+1], ranked[j]
|
|
}
|
|
}
|
|
}
|
|
|
|
return ranked
|
|
}
|