Files
mev-beta/internal/monitoring/alert_handlers.go
Krypto Kajun 850223a953 fix(multicall): resolve critical multicall parsing corruption issues
- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing
- Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives
- Added LRU caching system for address validation with 10-minute TTL
- Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures
- Fixed duplicate function declarations and import conflicts across multiple files
- Added error recovery mechanisms with multiple fallback strategies
- Updated tests to handle new validation behavior for suspicious addresses
- Fixed parser test expectations for improved validation system
- Applied gofmt formatting fixes to ensure code style compliance
- Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot
- Resolved critical security vulnerabilities in heuristic address extraction
- Progress: Updated TODO audit from 10% to 35% complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 00:12:55 -05:00

401 lines
10 KiB
Go
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package monitoring
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/fraktal/mev-beta/internal/logger"
)
// LogAlertHandler logs alerts to the application logger
type LogAlertHandler struct {
logger *logger.Logger
}
// NewLogAlertHandler creates a new log-based alert handler
func NewLogAlertHandler(logger *logger.Logger) *LogAlertHandler {
return &LogAlertHandler{
logger: logger,
}
}
// HandleAlert logs the alert using structured logging
func (lah *LogAlertHandler) HandleAlert(alert CorruptionAlert) error {
switch alert.Severity {
case AlertSeverityEmergency:
lah.logger.Error("🚨 EMERGENCY ALERT",
"message", alert.Message,
"severity", alert.Severity.String(),
"timestamp", alert.Timestamp,
"context", alert.Context)
case AlertSeverityCritical:
lah.logger.Error("🔴 CRITICAL ALERT",
"message", alert.Message,
"severity", alert.Severity.String(),
"timestamp", alert.Timestamp,
"context", alert.Context)
case AlertSeverityWarning:
lah.logger.Warn("🟡 WARNING ALERT",
"message", alert.Message,
"severity", alert.Severity.String(),
"timestamp", alert.Timestamp,
"context", alert.Context)
default:
lah.logger.Info(" INFO ALERT",
"message", alert.Message,
"severity", alert.Severity.String(),
"timestamp", alert.Timestamp,
"context", alert.Context)
}
return nil
}
// FileAlertHandler writes alerts to a file in JSON format
type FileAlertHandler struct {
mu sync.Mutex
filePath string
logger *logger.Logger
}
// NewFileAlertHandler creates a new file-based alert handler
func NewFileAlertHandler(filePath string, logger *logger.Logger) *FileAlertHandler {
return &FileAlertHandler{
filePath: filePath,
logger: logger,
}
}
// HandleAlert writes the alert to a file
func (fah *FileAlertHandler) HandleAlert(alert CorruptionAlert) error {
fah.mu.Lock()
defer fah.mu.Unlock()
// Create alert record for file
alertRecord := map[string]interface{}{
"timestamp": alert.Timestamp.Format(time.RFC3339),
"severity": alert.Severity.String(),
"message": alert.Message,
"address": alert.Address.Hex(),
"corruption_score": alert.CorruptionScore,
"source": alert.Source,
"context": alert.Context,
}
// Convert to JSON
alertJSON, err := json.Marshal(alertRecord)
if err != nil {
return fmt.Errorf("failed to marshal alert: %w", err)
}
// Open file for appending
file, err := os.OpenFile(fah.filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("failed to open alert file: %w", err)
}
defer file.Close()
// Write alert with newline
if _, err := file.Write(append(alertJSON, '\n')); err != nil {
return fmt.Errorf("failed to write alert to file: %w", err)
}
fah.logger.Debug("Alert written to file",
"file", fah.filePath,
"severity", alert.Severity.String())
return nil
}
// HTTPAlertHandler sends alerts to an HTTP endpoint (e.g., Slack, Discord, PagerDuty)
type HTTPAlertHandler struct {
mu sync.Mutex
webhookURL string
client *http.Client
logger *logger.Logger
retryCount int
}
// NewHTTPAlertHandler creates a new HTTP-based alert handler
func NewHTTPAlertHandler(webhookURL string, logger *logger.Logger) *HTTPAlertHandler {
return &HTTPAlertHandler{
webhookURL: webhookURL,
client: &http.Client{
Timeout: 10 * time.Second,
},
logger: logger,
retryCount: 3,
}
}
// HandleAlert sends the alert to the configured HTTP endpoint
func (hah *HTTPAlertHandler) HandleAlert(alert CorruptionAlert) error {
if hah.webhookURL == "" {
return fmt.Errorf("webhook URL not configured")
}
// Create payload based on webhook type
payload := hah.createPayload(alert)
// Convert payload to JSON
payloadJSON, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal webhook payload: %w", err)
}
// Send with retries
for attempt := 1; attempt <= hah.retryCount; attempt++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
req, err := http.NewRequestWithContext(ctx, "POST", hah.webhookURL, strings.NewReader(string(payloadJSON)))
cancel()
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "MEV-Bot-AlertHandler/1.0")
resp, err := hah.client.Do(req)
if err != nil {
hah.logger.Warn("Failed to send alert to webhook",
"attempt", attempt,
"error", err)
if attempt == hah.retryCount {
return fmt.Errorf("failed to send alert after %d attempts: %w", hah.retryCount, err)
}
time.Sleep(time.Duration(attempt) * time.Second)
continue
}
defer resp.Body.Close()
// Read response body for debugging
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
hah.logger.Debug("Alert sent successfully",
"webhook_url", hah.webhookURL,
"status_code", resp.StatusCode,
"response", string(body))
return nil
}
hah.logger.Warn("Webhook returned error status",
"attempt", attempt,
"status_code", resp.StatusCode,
"response", string(body))
if attempt == hah.retryCount {
return fmt.Errorf("webhook returned status %d after %d attempts", resp.StatusCode, hah.retryCount)
}
time.Sleep(time.Duration(attempt) * time.Second)
}
return nil
}
// createPayload creates the webhook payload based on the webhook type
func (hah *HTTPAlertHandler) createPayload(alert CorruptionAlert) map[string]interface{} {
// Detect webhook type based on URL
if strings.Contains(hah.webhookURL, "slack.com") {
return hah.createSlackPayload(alert)
} else if strings.Contains(hah.webhookURL, "discord.com") {
return hah.createDiscordPayload(alert)
}
// Generic webhook payload
return map[string]interface{}{
"timestamp": alert.Timestamp.Format(time.RFC3339),
"severity": alert.Severity.String(),
"message": alert.Message,
"address": alert.Address.Hex(),
"corruption_score": alert.CorruptionScore,
"source": alert.Source,
"context": alert.Context,
}
}
// createSlackPayload creates a Slack-compatible webhook payload
func (hah *HTTPAlertHandler) createSlackPayload(alert CorruptionAlert) map[string]interface{} {
color := "good"
switch alert.Severity {
case AlertSeverityWarning:
color = "warning"
case AlertSeverityCritical:
color = "danger"
case AlertSeverityEmergency:
color = "#FF0000" // Bright red for emergency
}
attachment := map[string]interface{}{
"color": color,
"title": fmt.Sprintf("%s Alert - MEV Bot", alert.Severity.String()),
"text": alert.Message,
"timestamp": alert.Timestamp.Unix(),
"fields": []map[string]interface{}{
{
"title": "Address",
"value": alert.Address.Hex(),
"short": true,
},
{
"title": "Corruption Score",
"value": fmt.Sprintf("%d", alert.CorruptionScore),
"short": true,
},
{
"title": "Source",
"value": alert.Source,
"short": true,
},
},
}
return map[string]interface{}{
"text": fmt.Sprintf("MEV Bot Alert: %s", alert.Severity.String()),
"attachments": []map[string]interface{}{attachment},
}
}
// createDiscordPayload creates a Discord-compatible webhook payload
func (hah *HTTPAlertHandler) createDiscordPayload(alert CorruptionAlert) map[string]interface{} {
color := 0x00FF00 // Green
switch alert.Severity {
case AlertSeverityWarning:
color = 0xFFFF00 // Yellow
case AlertSeverityCritical:
color = 0xFF8000 // Orange
case AlertSeverityEmergency:
color = 0xFF0000 // Red
}
embed := map[string]interface{}{
"title": fmt.Sprintf("%s Alert - MEV Bot", alert.Severity.String()),
"description": alert.Message,
"color": color,
"timestamp": alert.Timestamp.Format(time.RFC3339),
"fields": []map[string]interface{}{
{
"name": "Address",
"value": alert.Address.Hex(),
"inline": true,
},
{
"name": "Corruption Score",
"value": fmt.Sprintf("%d", alert.CorruptionScore),
"inline": true,
},
{
"name": "Source",
"value": alert.Source,
"inline": true,
},
},
"footer": map[string]interface{}{
"text": "MEV Bot Integrity Monitor",
},
}
return map[string]interface{}{
"embeds": []map[string]interface{}{embed},
}
}
// MetricsAlertHandler integrates with metrics systems (Prometheus, etc.)
type MetricsAlertHandler struct {
mu sync.Mutex
logger *logger.Logger
counters map[string]int64
}
// NewMetricsAlertHandler creates a new metrics-based alert handler
func NewMetricsAlertHandler(logger *logger.Logger) *MetricsAlertHandler {
return &MetricsAlertHandler{
logger: logger,
counters: make(map[string]int64),
}
}
// HandleAlert updates metrics counters based on alert
func (mah *MetricsAlertHandler) HandleAlert(alert CorruptionAlert) error {
mah.mu.Lock()
defer mah.mu.Unlock()
// Increment counters
mah.counters["total_alerts"]++
mah.counters[fmt.Sprintf("alerts_%s", strings.ToLower(alert.Severity.String()))]++
if alert.CorruptionScore > 0 {
mah.counters["corruption_alerts"]++
}
mah.logger.Debug("Metrics updated for alert",
"severity", alert.Severity.String(),
"total_alerts", mah.counters["total_alerts"])
return nil
}
// GetCounters returns the current alert counters
func (mah *MetricsAlertHandler) GetCounters() map[string]int64 {
mah.mu.Lock()
defer mah.mu.Unlock()
// Return a copy
counters := make(map[string]int64)
for k, v := range mah.counters {
counters[k] = v
}
return counters
}
// CompositeAlertHandler combines multiple alert handlers
type CompositeAlertHandler struct {
handlers []AlertSubscriber
logger *logger.Logger
}
// NewCompositeAlertHandler creates a composite alert handler
func NewCompositeAlertHandler(logger *logger.Logger, handlers ...AlertSubscriber) *CompositeAlertHandler {
return &CompositeAlertHandler{
handlers: handlers,
logger: logger,
}
}
// HandleAlert sends the alert to all configured handlers
func (cah *CompositeAlertHandler) HandleAlert(alert CorruptionAlert) error {
errors := make([]error, 0)
for i, handler := range cah.handlers {
if err := handler.HandleAlert(alert); err != nil {
cah.logger.Error("Alert handler failed",
"handler_index", i,
"handler_type", fmt.Sprintf("%T", handler),
"error", err)
errors = append(errors, fmt.Errorf("handler %d (%T): %w", i, handler, err))
}
}
if len(errors) > 0 {
return fmt.Errorf("alert handler errors: %v", errors)
}
return nil
}
// AddHandler adds a new handler to the composite
func (cah *CompositeAlertHandler) AddHandler(handler AlertSubscriber) {
cah.handlers = append(cah.handlers, handler)
}