Files
mev-beta/orig/pkg/metrics/metrics.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

415 lines
13 KiB
Go

package metrics
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/fraktal/mev-beta/internal/auth"
"github.com/fraktal/mev-beta/internal/logger"
)
// MetricsCollector collects and exposes MEV bot metrics
type MetricsCollector struct {
logger *logger.Logger
mu sync.RWMutex
// L2 Message Metrics
L2MessagesProcessed uint64
L2MessagesPerSecond float64
L2MessageLag time.Duration
BatchesProcessed uint64
// DEX Interaction Metrics
DEXInteractionsFound uint64
SwapOpportunities uint64
ArbitrageOpportunities uint64
OpportunitiesDropped uint64
OpportunityQueueLength uint64
// Performance Metrics
ProcessingLatency time.Duration
ErrorRate float64
SuccessfulTrades uint64
FailedTrades uint64
// Financial Metrics
TotalProfit float64
TotalLoss float64
GasCostsSpent float64
NetProfit float64
// Gas Metrics
AverageGasPrice uint64
L1DataFeesSpent float64
L2ComputeFeesSpent float64
ProfitFactor float64
UptimeSeries []float64
// Health Metrics
UptimeSeconds uint64
LastHealthCheck time.Time
// Start time for calculations
startTime time.Time
}
// NewMetricsCollector creates a new metrics collector
func NewMetricsCollector(logger *logger.Logger) *MetricsCollector {
return &MetricsCollector{
logger: logger,
startTime: time.Now(),
LastHealthCheck: time.Now(),
UptimeSeries: make([]float64, 0, 1024),
}
}
// RecordL2Message records processing of an L2 message
func (m *MetricsCollector) RecordL2Message(processingTime time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.L2MessagesProcessed++
m.ProcessingLatency = processingTime
// Calculate messages per second
elapsed := time.Since(m.startTime).Seconds()
if elapsed > 0 {
m.L2MessagesPerSecond = float64(m.L2MessagesProcessed) / elapsed
}
}
// RecordL2MessageLag records lag in L2 message processing
func (m *MetricsCollector) RecordL2MessageLag(lag time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.L2MessageLag = lag
}
// RecordBatchProcessed records processing of a batch
func (m *MetricsCollector) RecordBatchProcessed() {
m.mu.Lock()
defer m.mu.Unlock()
m.BatchesProcessed++
}
// RecordDEXInteraction records finding a DEX interaction
func (m *MetricsCollector) RecordDEXInteraction() {
m.mu.Lock()
defer m.mu.Unlock()
m.DEXInteractionsFound++
}
// RecordSwapOpportunity records finding a swap opportunity
func (m *MetricsCollector) RecordSwapOpportunity() {
m.mu.Lock()
defer m.mu.Unlock()
m.SwapOpportunities++
}
// RecordArbitrageOpportunity records finding an arbitrage opportunity
func (m *MetricsCollector) RecordArbitrageOpportunity() {
m.mu.Lock()
defer m.mu.Unlock()
m.ArbitrageOpportunities++
}
// RecordDroppedOpportunity increments the dropped opportunity counter.
func (m *MetricsCollector) RecordDroppedOpportunity() {
m.mu.Lock()
defer m.mu.Unlock()
m.OpportunitiesDropped++
}
// RecordSuccessfulTrade records a successful trade
func (m *MetricsCollector) RecordSuccessfulTrade(profit float64, gasCost float64) {
m.mu.Lock()
defer m.mu.Unlock()
m.SuccessfulTrades++
m.TotalProfit += profit
m.GasCostsSpent += gasCost
m.NetProfit = m.TotalProfit - m.TotalLoss - m.GasCostsSpent
m.updateProfitFactor()
// Update error rate
totalTrades := m.SuccessfulTrades + m.FailedTrades
if totalTrades > 0 {
m.ErrorRate = float64(m.FailedTrades) / float64(totalTrades)
}
}
// RecordFailedTrade records a failed trade
func (m *MetricsCollector) RecordFailedTrade(loss float64, gasCost float64) {
m.mu.Lock()
defer m.mu.Unlock()
m.FailedTrades++
m.TotalLoss += loss
m.GasCostsSpent += gasCost
m.NetProfit = m.TotalProfit - m.TotalLoss - m.GasCostsSpent
m.updateProfitFactor()
// Update error rate
totalTrades := m.SuccessfulTrades + m.FailedTrades
if totalTrades > 0 {
m.ErrorRate = float64(m.FailedTrades) / float64(totalTrades)
}
}
func (m *MetricsCollector) updateProfitFactor() {
if m.GasCostsSpent <= 0 {
m.ProfitFactor = 0
return
}
m.ProfitFactor = (m.TotalProfit - m.TotalLoss) / m.GasCostsSpent
}
// RecordGasMetrics records gas-related metrics
func (m *MetricsCollector) RecordGasMetrics(gasPrice uint64, l1DataFee, l2ComputeFee float64) {
m.mu.Lock()
defer m.mu.Unlock()
m.AverageGasPrice = gasPrice
m.L1DataFeesSpent += l1DataFee
m.L2ComputeFeesSpent += l2ComputeFee
}
// UpdateHealthCheck updates the health check timestamp
func (m *MetricsCollector) UpdateHealthCheck() {
m.mu.Lock()
defer m.mu.Unlock()
m.LastHealthCheck = time.Now()
m.UptimeSeconds = uint64(time.Since(m.startTime).Seconds())
m.UptimeSeries = append(m.UptimeSeries, float64(m.UptimeSeconds))
}
// GetSnapshot returns a snapshot of current metrics
func (m *MetricsCollector) GetSnapshot() MetricsSnapshot {
m.mu.RLock()
defer m.mu.RUnlock()
return MetricsSnapshot{
L2MessagesProcessed: m.L2MessagesProcessed,
L2MessagesPerSecond: m.L2MessagesPerSecond,
L2MessageLag: m.L2MessageLag,
BatchesProcessed: m.BatchesProcessed,
DEXInteractionsFound: m.DEXInteractionsFound,
SwapOpportunities: m.SwapOpportunities,
ArbitrageOpportunities: m.ArbitrageOpportunities,
ProcessingLatency: m.ProcessingLatency,
ErrorRate: m.ErrorRate,
SuccessfulTrades: m.SuccessfulTrades,
FailedTrades: m.FailedTrades,
TotalProfit: m.TotalProfit,
TotalLoss: m.TotalLoss,
GasCostsSpent: m.GasCostsSpent,
NetProfit: m.NetProfit,
ProfitFactor: m.ProfitFactor,
AverageGasPrice: m.AverageGasPrice,
L1DataFeesSpent: m.L1DataFeesSpent,
L2ComputeFeesSpent: m.L2ComputeFeesSpent,
UptimeSeconds: m.UptimeSeconds,
LastHealthCheck: m.LastHealthCheck,
}
}
// MetricsSnapshot represents a point-in-time view of metrics
type MetricsSnapshot struct {
L2MessagesProcessed uint64 `json:"l2_messages_processed"`
L2MessagesPerSecond float64 `json:"l2_messages_per_second"`
L2MessageLag time.Duration `json:"l2_message_lag_ms"`
BatchesProcessed uint64 `json:"batches_processed"`
DEXInteractionsFound uint64 `json:"dex_interactions_found"`
SwapOpportunities uint64 `json:"swap_opportunities"`
ArbitrageOpportunities uint64 `json:"arbitrage_opportunities"`
OpportunitiesDropped uint64 `json:"opportunities_dropped"`
OpportunityQueueLength uint64 `json:"opportunity_queue_length"`
ProcessingLatency time.Duration `json:"processing_latency_ms"`
ErrorRate float64 `json:"error_rate"`
SuccessfulTrades uint64 `json:"successful_trades"`
FailedTrades uint64 `json:"failed_trades"`
TotalProfit float64 `json:"total_profit_eth"`
TotalLoss float64 `json:"total_loss_eth"`
GasCostsSpent float64 `json:"gas_costs_spent_eth"`
NetProfit float64 `json:"net_profit_eth"`
AverageGasPrice uint64 `json:"average_gas_price_gwei"`
L1DataFeesSpent float64 `json:"l1_data_fees_spent_eth"`
L2ComputeFeesSpent float64 `json:"l2_compute_fees_spent_eth"`
ProfitFactor float64 `json:"profit_factor"`
UptimeSeconds uint64 `json:"uptime_seconds"`
LastHealthCheck time.Time `json:"last_health_check"`
}
// MetricsServer serves metrics over HTTP
type MetricsServer struct {
collector *MetricsCollector
logger *logger.Logger
server *http.Server
middleware *auth.Middleware
}
// NewMetricsServer creates a new metrics server
func NewMetricsServer(collector *MetricsCollector, logger *logger.Logger, port string) *MetricsServer {
mux := http.NewServeMux()
// Create authentication configuration
authConfig := &auth.AuthConfig{
Logger: logger,
RequireHTTPS: false, // Set to true in production
AllowedIPs: []string{"127.0.0.1", "::1"}, // Localhost only by default
}
// Create authentication middleware
middleware := auth.NewMiddleware(authConfig)
server := &MetricsServer{
collector: collector,
logger: logger,
server: &http.Server{
Addr: ":" + port,
Handler: mux,
ReadHeaderTimeout: 5 * time.Second, // Prevent Slowloris attacks
},
middleware: middleware,
}
// Register endpoints with authentication
mux.HandleFunc("/metrics", middleware.RequireAuthentication(server.handleMetrics))
mux.HandleFunc("/health", middleware.RequireAuthentication(server.handleHealth))
mux.HandleFunc("/metrics/prometheus", middleware.RequireAuthentication(server.handlePrometheus))
return server
}
// Start starts the metrics server
func (s *MetricsServer) Start() error {
s.logger.Info("Starting metrics server on " + s.server.Addr)
return s.server.ListenAndServe()
}
// Stop stops the metrics server
func (s *MetricsServer) Stop() error {
s.logger.Info("Stopping metrics server")
return s.server.Close()
}
// handleMetrics serves metrics in JSON format
func (s *MetricsServer) handleMetrics(w http.ResponseWriter, r *http.Request) {
snapshot := s.collector.GetSnapshot()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
// Simple JSON serialization
response := `{
"l2_messages_processed": ` + uintToString(snapshot.L2MessagesProcessed) + `,
"l2_messages_per_second": ` + floatToString(snapshot.L2MessagesPerSecond) + `,
"l2_message_lag_ms": ` + durationToString(snapshot.L2MessageLag) + `,
"batches_processed": ` + uintToString(snapshot.BatchesProcessed) + `,
"dex_interactions_found": ` + uintToString(snapshot.DEXInteractionsFound) + `,
"swap_opportunities": ` + uintToString(snapshot.SwapOpportunities) + `,
"arbitrage_opportunities": ` + uintToString(snapshot.ArbitrageOpportunities) + `,
"processing_latency_ms": ` + durationToString(snapshot.ProcessingLatency) + `,
"error_rate": ` + floatToString(snapshot.ErrorRate) + `,
"successful_trades": ` + uintToString(snapshot.SuccessfulTrades) + `,
"failed_trades": ` + uintToString(snapshot.FailedTrades) + `,
"total_profit_eth": ` + floatToString(snapshot.TotalProfit) + `,
"total_loss_eth": ` + floatToString(snapshot.TotalLoss) + `,
"gas_costs_spent_eth": ` + floatToString(snapshot.GasCostsSpent) + `,
"net_profit_eth": ` + floatToString(snapshot.NetProfit) + `,
"profit_factor": ` + floatToString(snapshot.ProfitFactor) + `,
"average_gas_price_gwei": ` + uintToString(snapshot.AverageGasPrice) + `,
"l1_data_fees_spent_eth": ` + floatToString(snapshot.L1DataFeesSpent) + `,
"l2_compute_fees_spent_eth": ` + floatToString(snapshot.L2ComputeFeesSpent) + `,
"uptime_seconds": ` + uintToString(snapshot.UptimeSeconds) + `
}`
w.Write([]byte(response))
}
// handleHealth serves health check
func (s *MetricsServer) handleHealth(w http.ResponseWriter, r *http.Request) {
s.collector.UpdateHealthCheck()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status": "healthy", "timestamp": "` + time.Now().Format(time.RFC3339) + `"}`))
}
// handlePrometheus serves metrics in Prometheus format
func (s *MetricsServer) handlePrometheus(w http.ResponseWriter, r *http.Request) {
snapshot := s.collector.GetSnapshot()
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
prometheus := `# HELP mev_bot_l2_messages_processed Total L2 messages processed
# TYPE mev_bot_l2_messages_processed counter
mev_bot_l2_messages_processed ` + uintToString(snapshot.L2MessagesProcessed) + `
# HELP mev_bot_l2_messages_per_second L2 messages processed per second
# TYPE mev_bot_l2_messages_per_second gauge
mev_bot_l2_messages_per_second ` + floatToString(snapshot.L2MessagesPerSecond) + `
# HELP mev_bot_processing_latency_ms Latest pipeline processing latency in milliseconds
# TYPE mev_bot_processing_latency_ms gauge
mev_bot_processing_latency_ms ` + durationToString(snapshot.ProcessingLatency) + `
# HELP mev_bot_successful_trades Total successful trades
# TYPE mev_bot_successful_trades counter
mev_bot_successful_trades ` + uintToString(snapshot.SuccessfulTrades) + `
# HELP mev_bot_failed_trades Total failed trades
# TYPE mev_bot_failed_trades counter
mev_bot_failed_trades ` + uintToString(snapshot.FailedTrades) + `
# HELP mev_bot_trade_error_rate Fraction of failed trades over total attempts
# TYPE mev_bot_trade_error_rate gauge
mev_bot_trade_error_rate ` + floatToString(snapshot.ErrorRate) + `
# HELP mev_bot_net_profit_eth Net profit in ETH
# TYPE mev_bot_net_profit_eth gauge
mev_bot_net_profit_eth ` + floatToString(snapshot.NetProfit) + `
# HELP mev_bot_profit_factor Profit factor (net profit divided by gas costs)
# TYPE mev_bot_profit_factor gauge
mev_bot_profit_factor ` + floatToString(snapshot.ProfitFactor) + `
# HELP mev_bot_total_profit_eth Cumulative gross profit in ETH
# TYPE mev_bot_total_profit_eth counter
mev_bot_total_profit_eth ` + floatToString(snapshot.TotalProfit) + `
# HELP mev_bot_gas_spent_eth Cumulative gas spend in ETH
# TYPE mev_bot_gas_spent_eth counter
mev_bot_gas_spent_eth ` + floatToString(snapshot.GasCostsSpent) + `
# HELP mev_bot_uptime_seconds Bot uptime in seconds
# TYPE mev_bot_uptime_seconds counter
mev_bot_uptime_seconds ` + uintToString(snapshot.UptimeSeconds) + `
`
w.Write([]byte(prometheus))
}
// Helper functions for string conversion
func uintToString(val uint64) string {
return fmt.Sprintf("%d", val)
}
func floatToString(val float64) string {
return fmt.Sprintf("%.6f", val)
}
func durationToString(val time.Duration) string {
return fmt.Sprintf("%.2f", float64(val.Nanoseconds())/1000000.0)
}