Files
mev-beta/orig/internal/monitoring/alert_handlers.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

401 lines
10 KiB
Go
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package monitoring
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/fraktal/mev-beta/internal/logger"
)
// LogAlertHandler logs alerts to the application logger
type LogAlertHandler struct {
logger *logger.Logger
}
// NewLogAlertHandler creates a new log-based alert handler
func NewLogAlertHandler(logger *logger.Logger) *LogAlertHandler {
return &LogAlertHandler{
logger: logger,
}
}
// HandleAlert logs the alert using structured logging
func (lah *LogAlertHandler) HandleAlert(alert CorruptionAlert) error {
switch alert.Severity {
case AlertSeverityEmergency:
lah.logger.Error("🚨 EMERGENCY ALERT",
"message", alert.Message,
"severity", alert.Severity.String(),
"timestamp", alert.Timestamp,
"context", alert.Context)
case AlertSeverityCritical:
lah.logger.Error("🔴 CRITICAL ALERT",
"message", alert.Message,
"severity", alert.Severity.String(),
"timestamp", alert.Timestamp,
"context", alert.Context)
case AlertSeverityWarning:
lah.logger.Warn("🟡 WARNING ALERT",
"message", alert.Message,
"severity", alert.Severity.String(),
"timestamp", alert.Timestamp,
"context", alert.Context)
default:
lah.logger.Info(" INFO ALERT",
"message", alert.Message,
"severity", alert.Severity.String(),
"timestamp", alert.Timestamp,
"context", alert.Context)
}
return nil
}
// FileAlertHandler writes alerts to a file in JSON format
type FileAlertHandler struct {
mu sync.Mutex
filePath string
logger *logger.Logger
}
// NewFileAlertHandler creates a new file-based alert handler
func NewFileAlertHandler(filePath string, logger *logger.Logger) *FileAlertHandler {
return &FileAlertHandler{
filePath: filePath,
logger: logger,
}
}
// HandleAlert writes the alert to a file
func (fah *FileAlertHandler) HandleAlert(alert CorruptionAlert) error {
fah.mu.Lock()
defer fah.mu.Unlock()
// Create alert record for file
alertRecord := map[string]interface{}{
"timestamp": alert.Timestamp.Format(time.RFC3339),
"severity": alert.Severity.String(),
"message": alert.Message,
"address": alert.Address.Hex(),
"corruption_score": alert.CorruptionScore,
"source": alert.Source,
"context": alert.Context,
}
// Convert to JSON
alertJSON, err := json.Marshal(alertRecord)
if err != nil {
return fmt.Errorf("failed to marshal alert: %w", err)
}
// Open file for appending
file, err := os.OpenFile(fah.filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("failed to open alert file: %w", err)
}
defer file.Close()
// Write alert with newline
if _, err := file.Write(append(alertJSON, '\n')); err != nil {
return fmt.Errorf("failed to write alert to file: %w", err)
}
fah.logger.Debug("Alert written to file",
"file", fah.filePath,
"severity", alert.Severity.String())
return nil
}
// HTTPAlertHandler sends alerts to an HTTP endpoint (e.g., Slack, Discord, PagerDuty)
type HTTPAlertHandler struct {
mu sync.Mutex
webhookURL string
client *http.Client
logger *logger.Logger
retryCount int
}
// NewHTTPAlertHandler creates a new HTTP-based alert handler
func NewHTTPAlertHandler(webhookURL string, logger *logger.Logger) *HTTPAlertHandler {
return &HTTPAlertHandler{
webhookURL: webhookURL,
client: &http.Client{
Timeout: 10 * time.Second,
},
logger: logger,
retryCount: 3,
}
}
// HandleAlert sends the alert to the configured HTTP endpoint
func (hah *HTTPAlertHandler) HandleAlert(alert CorruptionAlert) error {
if hah.webhookURL == "" {
return fmt.Errorf("webhook URL not configured")
}
// Create payload based on webhook type
payload := hah.createPayload(alert)
// Convert payload to JSON
payloadJSON, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal webhook payload: %w", err)
}
// Send with retries
for attempt := 1; attempt <= hah.retryCount; attempt++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
req, err := http.NewRequestWithContext(ctx, "POST", hah.webhookURL, strings.NewReader(string(payloadJSON)))
cancel()
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "MEV-Bot-AlertHandler/1.0")
resp, err := hah.client.Do(req)
if err != nil {
hah.logger.Warn("Failed to send alert to webhook",
"attempt", attempt,
"error", err)
if attempt == hah.retryCount {
return fmt.Errorf("failed to send alert after %d attempts: %w", hah.retryCount, err)
}
time.Sleep(time.Duration(attempt) * time.Second)
continue
}
defer resp.Body.Close()
// Read response body for debugging
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
hah.logger.Debug("Alert sent successfully",
"webhook_url", hah.webhookURL,
"status_code", resp.StatusCode,
"response", string(body))
return nil
}
hah.logger.Warn("Webhook returned error status",
"attempt", attempt,
"status_code", resp.StatusCode,
"response", string(body))
if attempt == hah.retryCount {
return fmt.Errorf("webhook returned status %d after %d attempts", resp.StatusCode, hah.retryCount)
}
time.Sleep(time.Duration(attempt) * time.Second)
}
return nil
}
// createPayload creates the webhook payload based on the webhook type
func (hah *HTTPAlertHandler) createPayload(alert CorruptionAlert) map[string]interface{} {
// Detect webhook type based on URL
if strings.Contains(hah.webhookURL, "slack.com") {
return hah.createSlackPayload(alert)
} else if strings.Contains(hah.webhookURL, "discord.com") {
return hah.createDiscordPayload(alert)
}
// Generic webhook payload
return map[string]interface{}{
"timestamp": alert.Timestamp.Format(time.RFC3339),
"severity": alert.Severity.String(),
"message": alert.Message,
"address": alert.Address.Hex(),
"corruption_score": alert.CorruptionScore,
"source": alert.Source,
"context": alert.Context,
}
}
// createSlackPayload creates a Slack-compatible webhook payload
func (hah *HTTPAlertHandler) createSlackPayload(alert CorruptionAlert) map[string]interface{} {
color := "good"
switch alert.Severity {
case AlertSeverityWarning:
color = "warning"
case AlertSeverityCritical:
color = "danger"
case AlertSeverityEmergency:
color = "#FF0000" // Bright red for emergency
}
attachment := map[string]interface{}{
"color": color,
"title": fmt.Sprintf("%s Alert - MEV Bot", alert.Severity.String()),
"text": alert.Message,
"timestamp": alert.Timestamp.Unix(),
"fields": []map[string]interface{}{
{
"title": "Address",
"value": alert.Address.Hex(),
"short": true,
},
{
"title": "Corruption Score",
"value": fmt.Sprintf("%d", alert.CorruptionScore),
"short": true,
},
{
"title": "Source",
"value": alert.Source,
"short": true,
},
},
}
return map[string]interface{}{
"text": fmt.Sprintf("MEV Bot Alert: %s", alert.Severity.String()),
"attachments": []map[string]interface{}{attachment},
}
}
// createDiscordPayload creates a Discord-compatible webhook payload
func (hah *HTTPAlertHandler) createDiscordPayload(alert CorruptionAlert) map[string]interface{} {
color := 0x00FF00 // Green
switch alert.Severity {
case AlertSeverityWarning:
color = 0xFFFF00 // Yellow
case AlertSeverityCritical:
color = 0xFF8000 // Orange
case AlertSeverityEmergency:
color = 0xFF0000 // Red
}
embed := map[string]interface{}{
"title": fmt.Sprintf("%s Alert - MEV Bot", alert.Severity.String()),
"description": alert.Message,
"color": color,
"timestamp": alert.Timestamp.Format(time.RFC3339),
"fields": []map[string]interface{}{
{
"name": "Address",
"value": alert.Address.Hex(),
"inline": true,
},
{
"name": "Corruption Score",
"value": fmt.Sprintf("%d", alert.CorruptionScore),
"inline": true,
},
{
"name": "Source",
"value": alert.Source,
"inline": true,
},
},
"footer": map[string]interface{}{
"text": "MEV Bot Integrity Monitor",
},
}
return map[string]interface{}{
"embeds": []map[string]interface{}{embed},
}
}
// MetricsAlertHandler integrates with metrics systems (Prometheus, etc.)
type MetricsAlertHandler struct {
mu sync.Mutex
logger *logger.Logger
counters map[string]int64
}
// NewMetricsAlertHandler creates a new metrics-based alert handler
func NewMetricsAlertHandler(logger *logger.Logger) *MetricsAlertHandler {
return &MetricsAlertHandler{
logger: logger,
counters: make(map[string]int64),
}
}
// HandleAlert updates metrics counters based on alert
func (mah *MetricsAlertHandler) HandleAlert(alert CorruptionAlert) error {
mah.mu.Lock()
defer mah.mu.Unlock()
// Increment counters
mah.counters["total_alerts"]++
mah.counters[fmt.Sprintf("alerts_%s", strings.ToLower(alert.Severity.String()))]++
if alert.CorruptionScore > 0 {
mah.counters["corruption_alerts"]++
}
mah.logger.Debug("Metrics updated for alert",
"severity", alert.Severity.String(),
"total_alerts", mah.counters["total_alerts"])
return nil
}
// GetCounters returns the current alert counters
func (mah *MetricsAlertHandler) GetCounters() map[string]int64 {
mah.mu.Lock()
defer mah.mu.Unlock()
// Return a copy
counters := make(map[string]int64)
for k, v := range mah.counters {
counters[k] = v
}
return counters
}
// CompositeAlertHandler combines multiple alert handlers
type CompositeAlertHandler struct {
handlers []AlertSubscriber
logger *logger.Logger
}
// NewCompositeAlertHandler creates a composite alert handler
func NewCompositeAlertHandler(logger *logger.Logger, handlers ...AlertSubscriber) *CompositeAlertHandler {
return &CompositeAlertHandler{
handlers: handlers,
logger: logger,
}
}
// HandleAlert sends the alert to all configured handlers
func (cah *CompositeAlertHandler) HandleAlert(alert CorruptionAlert) error {
errors := make([]error, 0)
for i, handler := range cah.handlers {
if err := handler.HandleAlert(alert); err != nil {
cah.logger.Error("Alert handler failed",
"handler_index", i,
"handler_type", fmt.Sprintf("%T", handler),
"error", err)
errors = append(errors, fmt.Errorf("handler %d (%T): %w", i, handler, err))
}
}
if len(errors) > 0 {
return fmt.Errorf("alert handler errors: %v", errors)
}
return nil
}
// AddHandler adds a new handler to the composite
func (cah *CompositeAlertHandler) AddHandler(handler AlertSubscriber) {
cah.handlers = append(cah.handlers, handler)
}