package monitoring import ( "encoding/json" "fmt" "net/http" "sync" "time" "github.com/fraktal/mev-beta/internal/logger" "github.com/fraktal/mev-beta/pkg/execution" ) // Dashboard provides real-time monitoring of MEV bot performance type Dashboard struct { logger *logger.Logger port int server *http.Server // Metrics stats *BotStats statsMu sync.RWMutex // Execution queue reference executionQueue *execution.ExecutionQueue } // BotStats holds comprehensive bot performance metrics type BotStats struct { // Runtime metrics StartTime time.Time `json:"start_time"` Uptime string `json:"uptime"` // Detection metrics BlocksProcessed int64 `json:"blocks_processed"` TransactionsAnalyzed int64 `json:"transactions_analyzed"` SwapsDetected int64 `json:"swaps_detected"` ArbitrageOpportunities int64 `json:"arbitrage_opportunities"` // Pool metrics PoolsTracked int `json:"pools_tracked"` NewPoolsDiscovered int64 `json:"new_pools_discovered"` CrossFactoryMatches int64 `json:"cross_factory_matches"` // Profitability metrics TotalProfitUSD float64 `json:"total_profit_usd"` SuccessfulArbitrages int64 `json:"successful_arbitrages"` FailedExecutions int64 `json:"failed_executions"` AverageProfitPerTrade float64 `json:"average_profit_per_trade"` // Performance metrics AverageBlockProcessTime time.Duration `json:"average_block_process_time"` AverageGasUsed uint64 `json:"average_gas_used"` AverageGasPriceGwei float64 `json:"average_gas_price_gwei"` // Top opportunities TopOpportunities []OpportunitySummary `json:"top_opportunities"` RecentActivity []ActivityItem `json:"recent_activity"` // System health SystemHealth HealthStatus `json:"system_health"` LastUpdated time.Time `json:"last_updated"` } type OpportunitySummary struct { ID string `json:"id"` ProfitUSD float64 `json:"profit_usd"` ProfitMargin float64 `json:"profit_margin"` ExchangeA string `json:"exchange_a"` ExchangeB string `json:"exchange_b"` Timestamp time.Time `json:"timestamp"` Executed bool `json:"executed"` } type ActivityItem struct { Type string `json:"type"` // "swap", "arbitrage", "execution", "error" Message string `json:"message"` Value float64 `json:"value,omitempty"` Timestamp time.Time `json:"timestamp"` Level string `json:"level"` // "info", "warn", "error", "success" } type HealthStatus struct { Overall string `json:"overall"` // "healthy", "warning", "critical" Components map[string]string `json:"components"` Issues []string `json:"issues"` Recommendations []string `json:"recommendations"` } // NewDashboard creates a new monitoring dashboard func NewDashboard(logger *logger.Logger, port int, executionQueue *execution.ExecutionQueue) *Dashboard { dashboard := &Dashboard{ logger: logger, port: port, executionQueue: executionQueue, stats: &BotStats{ StartTime: time.Now(), TopOpportunities: make([]OpportunitySummary, 0), RecentActivity: make([]ActivityItem, 0), SystemHealth: HealthStatus{ Overall: "healthy", Components: make(map[string]string), Issues: make([]string, 0), Recommendations: make([]string, 0), }, }, } // Initialize health components dashboard.stats.SystemHealth.Components["rpc_connection"] = "healthy" dashboard.stats.SystemHealth.Components["execution_queue"] = "healthy" dashboard.stats.SystemHealth.Components["pool_discovery"] = "healthy" dashboard.stats.SystemHealth.Components["arbitrage_detection"] = "healthy" return dashboard } // Start starts the dashboard web server func (d *Dashboard) Start() error { mux := http.NewServeMux() // API endpoints mux.HandleFunc("/api/stats", d.handleStats) mux.HandleFunc("/api/health", d.handleHealth) mux.HandleFunc("/api/opportunities", d.handleOpportunities) mux.HandleFunc("/api/activity", d.handleActivity) // Static dashboard page mux.HandleFunc("/", d.handleDashboard) d.server = &http.Server{ Addr: fmt.Sprintf(":%d", d.port), Handler: mux, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } d.logger.Info(fmt.Sprintf("🖥️ Starting monitoring dashboard on port %d", d.port)) d.logger.Info(fmt.Sprintf("🌐 Dashboard available at: http://localhost:%d", d.port)) return d.server.ListenAndServe() } // Stop stops the dashboard server func (d *Dashboard) Stop() error { if d.server != nil { return d.server.Close() } return nil } // UpdateStats updates the dashboard statistics func (d *Dashboard) UpdateStats(update StatsUpdate) { d.statsMu.Lock() defer d.statsMu.Unlock() switch update.Type { case "block_processed": d.stats.BlocksProcessed++ case "transaction_analyzed": d.stats.TransactionsAnalyzed++ case "swap_detected": d.stats.SwapsDetected++ case "arbitrage_opportunity": d.stats.ArbitrageOpportunities++ if update.Opportunity != nil { d.addOpportunity(*update.Opportunity) } case "pool_discovered": d.stats.NewPoolsDiscovered++ case "execution_success": d.stats.SuccessfulArbitrages++ d.stats.TotalProfitUSD += update.Value case "execution_failure": d.stats.FailedExecutions++ } // Add to recent activity if update.ActivityItem != nil { d.addActivity(*update.ActivityItem) } // Update calculated fields if d.stats.SuccessfulArbitrages > 0 { d.stats.AverageProfitPerTrade = d.stats.TotalProfitUSD / float64(d.stats.SuccessfulArbitrages) } // Update uptime d.stats.Uptime = time.Since(d.stats.StartTime).Round(time.Second).String() d.stats.LastUpdated = time.Now() // Update system health d.updateSystemHealth() } type StatsUpdate struct { Type string Value float64 Opportunity *OpportunitySummary ActivityItem *ActivityItem } // addOpportunity adds an opportunity to the top opportunities list func (d *Dashboard) addOpportunity(opp OpportunitySummary) { d.stats.TopOpportunities = append(d.stats.TopOpportunities, opp) // Keep only top 20 opportunities by profit if len(d.stats.TopOpportunities) > 20 { // Sort by profit descending for i := 0; i < len(d.stats.TopOpportunities)-1; i++ { for j := i + 1; j < len(d.stats.TopOpportunities); j++ { if d.stats.TopOpportunities[i].ProfitUSD < d.stats.TopOpportunities[j].ProfitUSD { d.stats.TopOpportunities[i], d.stats.TopOpportunities[j] = d.stats.TopOpportunities[j], d.stats.TopOpportunities[i] } } } d.stats.TopOpportunities = d.stats.TopOpportunities[:20] } } // addActivity adds an activity item to recent activity func (d *Dashboard) addActivity(activity ActivityItem) { d.stats.RecentActivity = append(d.stats.RecentActivity, activity) // Keep only last 50 activities if len(d.stats.RecentActivity) > 50 { d.stats.RecentActivity = d.stats.RecentActivity[len(d.stats.RecentActivity)-50:] } } // updateSystemHealth updates the overall system health status func (d *Dashboard) updateSystemHealth() { issues := make([]string, 0) recommendations := make([]string, 0) // Check success rate if d.stats.SuccessfulArbitrages+d.stats.FailedExecutions > 0 { successRate := float64(d.stats.SuccessfulArbitrages) / float64(d.stats.SuccessfulArbitrages+d.stats.FailedExecutions) if successRate < 0.5 { issues = append(issues, "Low execution success rate") recommendations = append(recommendations, "Review execution strategy and gas pricing") } } // Check execution queue health if d.executionQueue != nil { queueStats := d.executionQueue.GetStats() if circuitBreakerOpen, ok := queueStats["circuit_breaker_open"].(bool); ok && circuitBreakerOpen { issues = append(issues, "Circuit breaker is open") recommendations = append(recommendations, "Check execution failures and network conditions") } } // Check opportunity detection rate if d.stats.BlocksProcessed > 100 && d.stats.ArbitrageOpportunities == 0 { issues = append(issues, "No arbitrage opportunities detected") recommendations = append(recommendations, "Review price impact thresholds and market conditions") } // Determine overall health d.stats.SystemHealth.Issues = issues d.stats.SystemHealth.Recommendations = recommendations if len(issues) == 0 { d.stats.SystemHealth.Overall = "healthy" } else if len(issues) <= 2 { d.stats.SystemHealth.Overall = "warning" } else { d.stats.SystemHealth.Overall = "critical" } } // HTTP handlers func (d *Dashboard) handleStats(w http.ResponseWriter, r *http.Request) { d.statsMu.RLock() defer d.statsMu.RUnlock() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(d.stats) } func (d *Dashboard) handleHealth(w http.ResponseWriter, r *http.Request) { d.statsMu.RLock() defer d.statsMu.RUnlock() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(d.stats.SystemHealth) } func (d *Dashboard) handleOpportunities(w http.ResponseWriter, r *http.Request) { d.statsMu.RLock() defer d.statsMu.RUnlock() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(d.stats.TopOpportunities) } func (d *Dashboard) handleActivity(w http.ResponseWriter, r *http.Request) { d.statsMu.RLock() defer d.statsMu.RUnlock() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(d.stats.RecentActivity) } func (d *Dashboard) handleDashboard(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html") // Simple HTML dashboard html := `
Real-time monitoring of arbitrage opportunities on Arbitrum
All systems operational. Monitoring 8 DEX protocols on Arbitrum.