Files
mev-beta/orig/pkg/security/anomaly_detector.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

1071 lines
31 KiB
Go

package security
import (
"fmt"
"math"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/fraktal/mev-beta/internal/logger"
)
// AnomalyDetector detects unusual patterns in security and transaction data
type AnomalyDetector struct {
logger *logger.Logger
config *AnomalyConfig
patterns map[string]*PatternBaseline
transactionLog []*TransactionRecord
mu sync.RWMutex
alerts chan *AnomalyAlert
running bool
stopCh chan struct{}
alertCounter uint64
}
// AnomalyConfig configures the anomaly detection system
type AnomalyConfig struct {
// Detection thresholds
ZScoreThreshold float64 `json:"z_score_threshold"` // Standard deviations for anomaly
VolumeThreshold float64 `json:"volume_threshold"` // Volume change threshold
FrequencyThreshold float64 `json:"frequency_threshold"` // Frequency change threshold
PatternSimilarity float64 `json:"pattern_similarity"` // Pattern similarity threshold
// Time windows
BaselineWindow time.Duration `json:"baseline_window"` // Period for establishing baseline
DetectionWindow time.Duration `json:"detection_window"` // Period for detecting anomalies
AlertCooldown time.Duration `json:"alert_cooldown"` // Cooldown between similar alerts
// Data retention
MaxTransactionHistory int `json:"max_transaction_history"` // Max transactions to keep
MaxPatternHistory int `json:"max_pattern_history"` // Max patterns to keep
CleanupInterval time.Duration `json:"cleanup_interval"` // How often to clean old data
// Feature flags
EnableVolumeDetection bool `json:"enable_volume_detection"` // Enable volume anomaly detection
EnablePatternDetection bool `json:"enable_pattern_detection"` // Enable pattern anomaly detection
EnableTimeSeriesAD bool `json:"enable_time_series_ad"` // Enable time series anomaly detection
EnableBehavioralAD bool `json:"enable_behavioral_ad"` // Enable behavioral anomaly detection
}
// PatternBaseline represents the baseline pattern for a specific metric
type PatternBaseline struct {
MetricName string `json:"metric_name"`
Observations []float64 `json:"observations"`
Mean float64 `json:"mean"`
StandardDev float64 `json:"standard_dev"`
Variance float64 `json:"variance"`
Min float64 `json:"min"`
Max float64 `json:"max"`
Percentiles map[int]float64 `json:"percentiles"` // 50th, 75th, 90th, 95th, 99th
LastUpdated time.Time `json:"last_updated"`
SampleCount int64 `json:"sample_count"`
SeasonalPatterns map[string]float64 `json:"seasonal_patterns"` // hour, day, week patterns
Trend float64 `json:"trend"` // Linear trend coefficient
}
// TransactionRecord represents a transaction for anomaly analysis
type TransactionRecord struct {
Hash common.Hash `json:"hash"`
From common.Address `json:"from"`
To *common.Address `json:"to"`
Value float64 `json:"value"` // In ETH
GasPrice float64 `json:"gas_price"` // In Gwei
GasUsed uint64 `json:"gas_used"`
Timestamp time.Time `json:"timestamp"`
BlockNumber uint64 `json:"block_number"`
Success bool `json:"success"`
Metadata map[string]interface{} `json:"metadata"`
AnomalyScore float64 `json:"anomaly_score"`
Flags []string `json:"flags"`
}
// AnomalyAlert represents an detected anomaly
type AnomalyAlert struct {
ID string `json:"id"`
Type AnomalyType `json:"type"`
Severity AnomalySeverity `json:"severity"`
Confidence float64 `json:"confidence"` // 0-1
Score float64 `json:"score"` // Anomaly score
Description string `json:"description"`
MetricName string `json:"metric_name"`
ObservedValue float64 `json:"observed_value"`
ExpectedValue float64 `json:"expected_value"`
Deviation float64 `json:"deviation"` // Z-score or similar
Timestamp time.Time `json:"timestamp"`
Source string `json:"source"` // IP, address, etc.
Context map[string]interface{} `json:"context"`
Recommendations []string `json:"recommendations"`
RelatedAlerts []string `json:"related_alerts"`
}
// AnomalyType represents the type of anomaly detected
type AnomalyType string
const (
AnomalyTypeVolume AnomalyType = "VOLUME"
AnomalyTypeFrequency AnomalyType = "FREQUENCY"
AnomalyTypePattern AnomalyType = "PATTERN"
AnomalyTypeBehavioral AnomalyType = "BEHAVIORAL"
AnomalyTypeTemporal AnomalyType = "TEMPORAL"
AnomalyTypeStatistical AnomalyType = "STATISTICAL"
)
// AnomalySeverity represents the severity of an anomaly
type AnomalySeverity string
const (
AnomalySeverityLow AnomalySeverity = "LOW"
AnomalySeverityMedium AnomalySeverity = "MEDIUM"
AnomalySeverityHigh AnomalySeverity = "HIGH"
AnomalySeverityCritical AnomalySeverity = "CRITICAL"
)
// NewAnomalyDetector creates a new anomaly detector
func NewAnomalyDetector(logger *logger.Logger, config *AnomalyConfig) *AnomalyDetector {
cfg := defaultAnomalyConfig()
if config != nil {
if config.ZScoreThreshold > 0 {
cfg.ZScoreThreshold = config.ZScoreThreshold
}
if config.VolumeThreshold > 0 {
cfg.VolumeThreshold = config.VolumeThreshold
}
if config.FrequencyThreshold > 0 {
cfg.FrequencyThreshold = config.FrequencyThreshold
}
if config.PatternSimilarity > 0 {
cfg.PatternSimilarity = config.PatternSimilarity
}
if config.BaselineWindow > 0 {
cfg.BaselineWindow = config.BaselineWindow
}
if config.DetectionWindow > 0 {
cfg.DetectionWindow = config.DetectionWindow
}
if config.AlertCooldown > 0 {
cfg.AlertCooldown = config.AlertCooldown
}
if config.MaxTransactionHistory > 0 {
cfg.MaxTransactionHistory = config.MaxTransactionHistory
}
if config.MaxPatternHistory > 0 {
cfg.MaxPatternHistory = config.MaxPatternHistory
}
if config.CleanupInterval > 0 {
cfg.CleanupInterval = config.CleanupInterval
}
cfg.EnableVolumeDetection = config.EnableVolumeDetection
cfg.EnablePatternDetection = config.EnablePatternDetection
cfg.EnableTimeSeriesAD = config.EnableTimeSeriesAD
cfg.EnableBehavioralAD = config.EnableBehavioralAD
}
ad := &AnomalyDetector{
logger: logger,
config: cfg,
patterns: make(map[string]*PatternBaseline),
transactionLog: make([]*TransactionRecord, 0),
alerts: make(chan *AnomalyAlert, 1000),
stopCh: make(chan struct{}),
}
return ad
}
func defaultAnomalyConfig() *AnomalyConfig {
return &AnomalyConfig{
ZScoreThreshold: 2.5,
VolumeThreshold: 3.0,
FrequencyThreshold: 2.0,
PatternSimilarity: 0.8,
BaselineWindow: 24 * time.Hour,
DetectionWindow: time.Hour,
AlertCooldown: 5 * time.Minute,
MaxTransactionHistory: 10000,
MaxPatternHistory: 1000,
CleanupInterval: time.Hour,
EnableVolumeDetection: true,
EnablePatternDetection: true,
EnableTimeSeriesAD: true,
EnableBehavioralAD: true,
}
}
// Start begins the anomaly detection process
func (ad *AnomalyDetector) Start() error {
ad.mu.Lock()
defer ad.mu.Unlock()
if ad.running {
return nil
}
ad.running = true
go ad.detectionLoop()
go ad.cleanupLoop()
ad.logger.Info("Anomaly detector started")
return nil
}
// Stop stops the anomaly detection process
func (ad *AnomalyDetector) Stop() error {
ad.mu.Lock()
defer ad.mu.Unlock()
if !ad.running {
return nil
}
ad.running = false
close(ad.stopCh)
ad.logger.Info("Anomaly detector stopped")
return nil
}
// RecordTransaction records a transaction for anomaly analysis
func (ad *AnomalyDetector) RecordTransaction(record *TransactionRecord) {
ad.mu.Lock()
defer ad.mu.Unlock()
// Add timestamp if not set
if record.Timestamp.IsZero() {
record.Timestamp = time.Now()
}
// Calculate initial anomaly score
record.AnomalyScore = ad.calculateTransactionAnomalyScore(record)
ad.transactionLog = append(ad.transactionLog, record)
// Limit history size
if len(ad.transactionLog) > ad.config.MaxTransactionHistory {
ad.transactionLog = ad.transactionLog[len(ad.transactionLog)-ad.config.MaxTransactionHistory:]
}
// Update patterns
ad.updatePatternsForTransaction(record)
// Check for immediate anomalies
if anomalies := ad.detectTransactionAnomalies(record); len(anomalies) > 0 {
for _, anomaly := range anomalies {
select {
case ad.alerts <- anomaly:
default:
ad.logger.Warn("Anomaly alert channel full, dropping alert")
}
}
}
}
// RecordMetric records a metric value for baseline establishment
func (ad *AnomalyDetector) RecordMetric(metricName string, value float64) {
ad.mu.Lock()
defer ad.mu.Unlock()
pattern, exists := ad.patterns[metricName]
if !exists {
pattern = &PatternBaseline{
MetricName: metricName,
Observations: make([]float64, 0),
Percentiles: make(map[int]float64),
SeasonalPatterns: make(map[string]float64),
LastUpdated: time.Now(),
}
ad.patterns[metricName] = pattern
}
// Add observation
pattern.Observations = append(pattern.Observations, value)
pattern.SampleCount++
// Limit observation history
maxObservations := ad.config.MaxPatternHistory
if len(pattern.Observations) > maxObservations {
pattern.Observations = pattern.Observations[len(pattern.Observations)-maxObservations:]
}
// Update statistics
ad.updatePatternStatistics(pattern)
// Check for anomalies
if anomaly := ad.detectMetricAnomaly(metricName, value, pattern); anomaly != nil {
select {
case ad.alerts <- anomaly:
default:
ad.logger.Warn("Anomaly alert channel full, dropping alert")
}
}
}
// GetAlerts returns the alert channel for consuming anomaly alerts
func (ad *AnomalyDetector) GetAlerts() <-chan *AnomalyAlert {
return ad.alerts
}
// GetAnomalyReport generates a comprehensive anomaly report
func (ad *AnomalyDetector) GetAnomalyReport() *AnomalyReport {
ad.mu.RLock()
defer ad.mu.RUnlock()
report := &AnomalyReport{
Timestamp: time.Now(),
PatternsTracked: len(ad.patterns),
TransactionsAnalyzed: len(ad.transactionLog),
AnomaliesDetected: ad.countRecentAnomalies(),
TopAnomalies: ad.getTopAnomalies(10),
PatternSummaries: ad.getPatternSummaries(),
SystemHealth: ad.calculateSystemHealth(),
}
return report
}
// AnomalyReport provides a comprehensive view of anomaly detection status
type AnomalyReport struct {
Timestamp time.Time `json:"timestamp"`
PatternsTracked int `json:"patterns_tracked"`
TransactionsAnalyzed int `json:"transactions_analyzed"`
AnomaliesDetected int `json:"anomalies_detected"`
TopAnomalies []*AnomalyAlert `json:"top_anomalies"`
PatternSummaries map[string]*PatternSummary `json:"pattern_summaries"`
SystemHealth *AnomalyDetectorHealth `json:"system_health"`
}
// PatternSummary provides a summary of a pattern baseline
type PatternSummary struct {
MetricName string `json:"metric_name"`
Mean float64 `json:"mean"`
StandardDev float64 `json:"standard_dev"`
SampleCount int64 `json:"sample_count"`
LastUpdated time.Time `json:"last_updated"`
RecentAnomalies int `json:"recent_anomalies"`
Trend string `json:"trend"` // INCREASING, DECREASING, STABLE
}
// AnomalyDetectorHealth represents the health of the anomaly detection system
type AnomalyDetectorHealth struct {
IsRunning bool `json:"is_running"`
AlertChannelSize int `json:"alert_channel_size"`
ProcessingLatency float64 `json:"processing_latency_ms"`
MemoryUsage int64 `json:"memory_usage_bytes"`
LastProcessedTime time.Time `json:"last_processed_time"`
ErrorRate float64 `json:"error_rate"`
OverallHealth string `json:"overall_health"`
}
// detectionLoop runs the main anomaly detection loop
func (ad *AnomalyDetector) detectionLoop() {
ticker := time.NewTicker(ad.config.DetectionWindow)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ad.performPeriodicDetection()
case <-ad.stopCh:
return
}
}
}
// cleanupLoop periodically cleans up old data
func (ad *AnomalyDetector) cleanupLoop() {
ticker := time.NewTicker(ad.config.CleanupInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ad.cleanup()
case <-ad.stopCh:
return
}
}
}
// calculateTransactionAnomalyScore calculates an anomaly score for a transaction
func (ad *AnomalyDetector) calculateTransactionAnomalyScore(record *TransactionRecord) float64 {
score := 0.0
// Check value anomaly
if pattern, exists := ad.patterns["transaction_value"]; exists {
zScore := ad.calculateZScore(record.Value, pattern)
score += math.Abs(zScore) * 0.3
}
// Check gas price anomaly
if pattern, exists := ad.patterns["gas_price"]; exists {
zScore := ad.calculateZScore(record.GasPrice, pattern)
score += math.Abs(zScore) * 0.2
}
// Check frequency anomaly for the sender
senderFreq := ad.calculateSenderFrequency(record.From)
if pattern, exists := ad.patterns["sender_frequency"]; exists {
zScore := ad.calculateZScore(senderFreq, pattern)
score += math.Abs(zScore) * 0.3
}
// Check time-based anomaly
timeScore := ad.calculateTimeAnomalyScore(record.Timestamp)
score += timeScore * 0.2
if score == 0 {
// Provide a small baseline score for initial transactions without history
score = 0.1
}
return score
}
// detectTransactionAnomalies detects anomalies in a transaction
func (ad *AnomalyDetector) detectTransactionAnomalies(record *TransactionRecord) []*AnomalyAlert {
var anomalies []*AnomalyAlert
// Volume anomaly
if ad.config.EnableVolumeDetection {
if alert := ad.detectVolumeAnomaly(record); alert != nil {
anomalies = append(anomalies, alert)
}
}
// Behavioral anomaly
if ad.config.EnableBehavioralAD {
if alert := ad.detectBehavioralAnomaly(record); alert != nil {
anomalies = append(anomalies, alert)
}
}
// Pattern anomaly
if ad.config.EnablePatternDetection {
if alert := ad.detectPatternAnomaly(record); alert != nil {
anomalies = append(anomalies, alert)
}
}
return anomalies
}
// updatePatternsForTransaction updates pattern baselines based on transaction
func (ad *AnomalyDetector) updatePatternsForTransaction(record *TransactionRecord) {
// Update transaction value pattern
ad.updatePattern("transaction_value", record.Value)
// Update gas price pattern
ad.updatePattern("gas_price", record.GasPrice)
// Update gas used pattern
ad.updatePattern("gas_used", float64(record.GasUsed))
// Update sender frequency
senderFreq := ad.calculateSenderFrequency(record.From)
ad.updatePattern("sender_frequency", senderFreq)
// Update hourly transaction count
hour := record.Timestamp.Hour()
hourlyCount := ad.calculateHourlyTransactionCount(hour)
ad.updatePattern("hourly_transactions", hourlyCount)
}
// updatePattern updates a pattern baseline with a new observation
func (ad *AnomalyDetector) updatePattern(metricName string, value float64) {
pattern, exists := ad.patterns[metricName]
if !exists {
pattern = &PatternBaseline{
MetricName: metricName,
Observations: make([]float64, 0),
Percentiles: make(map[int]float64),
SeasonalPatterns: make(map[string]float64),
LastUpdated: time.Now(),
}
ad.patterns[metricName] = pattern
}
pattern.Observations = append(pattern.Observations, value)
pattern.SampleCount++
pattern.LastUpdated = time.Now()
// Limit observations
maxObs := ad.config.MaxPatternHistory
if len(pattern.Observations) > maxObs {
pattern.Observations = pattern.Observations[len(pattern.Observations)-maxObs:]
}
ad.updatePatternStatistics(pattern)
}
// updatePatternStatistics updates statistical measures for a pattern
func (ad *AnomalyDetector) updatePatternStatistics(pattern *PatternBaseline) {
if len(pattern.Observations) == 0 {
return
}
// Calculate mean
sum := 0.0
for _, obs := range pattern.Observations {
sum += obs
}
pattern.Mean = sum / float64(len(pattern.Observations))
// Calculate variance and standard deviation
variance := 0.0
for _, obs := range pattern.Observations {
variance += math.Pow(obs-pattern.Mean, 2)
}
pattern.Variance = variance / float64(len(pattern.Observations))
pattern.StandardDev = math.Sqrt(pattern.Variance)
// Calculate min and max
pattern.Min = pattern.Observations[0]
pattern.Max = pattern.Observations[0]
for _, obs := range pattern.Observations {
if obs < pattern.Min {
pattern.Min = obs
}
if obs > pattern.Max {
pattern.Max = obs
}
}
// Calculate percentiles
sortedObs := make([]float64, len(pattern.Observations))
copy(sortedObs, pattern.Observations)
sort.Float64s(sortedObs)
percentiles := []int{50, 75, 90, 95, 99}
for _, p := range percentiles {
index := int(float64(p)/100.0*float64(len(sortedObs)-1) + 0.5)
if index >= len(sortedObs) {
index = len(sortedObs) - 1
}
pattern.Percentiles[p] = sortedObs[index]
}
// Calculate trend (simple linear regression)
pattern.Trend = ad.calculateTrend(pattern.Observations)
}
// calculateZScore calculates the Z-score for a value against a pattern
func (ad *AnomalyDetector) calculateZScore(value float64, pattern *PatternBaseline) float64 {
if pattern.StandardDev == 0 {
return 0
}
return (value - pattern.Mean) / pattern.StandardDev
}
// detectMetricAnomaly detects anomalies in a metric value
func (ad *AnomalyDetector) detectMetricAnomaly(metricName string, value float64, pattern *PatternBaseline) *AnomalyAlert {
if len(pattern.Observations) < 10 { // Need sufficient baseline
return nil
}
zScore := ad.calculateZScore(value, pattern)
if math.Abs(zScore) < ad.config.ZScoreThreshold {
return nil
}
severity := ad.calculateSeverity(math.Abs(zScore))
confidence := ad.calculateConfidence(math.Abs(zScore), len(pattern.Observations))
return &AnomalyAlert{
ID: ad.generateAlertID(),
Type: AnomalyTypeStatistical,
Severity: severity,
Confidence: confidence,
Score: math.Abs(zScore),
Description: ad.generateAnomalyDescription(metricName, value, pattern, zScore),
MetricName: metricName,
ObservedValue: value,
ExpectedValue: pattern.Mean,
Deviation: zScore,
Timestamp: time.Now(),
Context: map[string]interface{}{
"standard_dev": pattern.StandardDev,
"sample_count": pattern.SampleCount,
"percentile_95": pattern.Percentiles[95],
},
Recommendations: ad.generateRecommendations(metricName, zScore),
}
}
// Helper methods for specific anomaly types
func (ad *AnomalyDetector) detectVolumeAnomaly(record *TransactionRecord) *AnomalyAlert {
if record.Value < 0.1 { // Skip small transactions
return nil
}
pattern, exists := ad.patterns["transaction_value"]
if !exists || len(pattern.Observations) < 10 {
return nil
}
zScore := ad.calculateZScore(record.Value, pattern)
if math.Abs(zScore) < ad.config.VolumeThreshold {
return nil
}
return &AnomalyAlert{
ID: ad.generateAlertID(),
Type: AnomalyTypeVolume,
Severity: ad.calculateSeverity(math.Abs(zScore)),
Confidence: ad.calculateConfidence(math.Abs(zScore), len(pattern.Observations)),
Score: math.Abs(zScore),
Description: "Unusual transaction volume detected",
MetricName: "transaction_value",
ObservedValue: record.Value,
ExpectedValue: pattern.Mean,
Deviation: zScore,
Timestamp: time.Now(),
Source: record.From.Hex(),
Context: map[string]interface{}{
"transaction_hash": record.Hash.Hex(),
"gas_price": record.GasPrice,
"gas_used": record.GasUsed,
},
}
}
func (ad *AnomalyDetector) detectBehavioralAnomaly(record *TransactionRecord) *AnomalyAlert {
// Check if sender has unusual behavior
recentTxs := ad.getRecentTransactionsFrom(record.From, time.Hour)
if len(recentTxs) < 5 { // Need sufficient history
return nil
}
// Calculate average gas price for this sender
avgGasPrice := ad.calculateAverageGasPrice(recentTxs)
gasDeviation := math.Abs(record.GasPrice-avgGasPrice) / avgGasPrice
if gasDeviation > 2.0 { // 200% deviation
return &AnomalyAlert{
ID: ad.generateAlertID(),
Type: AnomalyTypeBehavioral,
Severity: AnomalySeverityMedium,
Confidence: 0.8,
Score: gasDeviation,
Description: "Unusual gas price behavior for sender",
MetricName: "sender_gas_behavior",
ObservedValue: record.GasPrice,
ExpectedValue: avgGasPrice,
Deviation: gasDeviation,
Timestamp: time.Now(),
Source: record.From.Hex(),
}
}
return nil
}
func (ad *AnomalyDetector) detectPatternAnomaly(record *TransactionRecord) *AnomalyAlert {
// Time-based pattern analysis
hour := record.Timestamp.Hour()
hourlyPattern, exists := ad.patterns["hourly_transactions"]
if !exists {
return nil
}
// Check if this hour is unusual for transaction activity
hourlyCount := ad.calculateHourlyTransactionCount(hour)
zScore := ad.calculateZScore(hourlyCount, hourlyPattern)
if math.Abs(zScore) > ad.config.PatternSimilarity*2 {
return &AnomalyAlert{
ID: ad.generateAlertID(),
Type: AnomalyTypePattern,
Severity: ad.calculateSeverity(math.Abs(zScore)),
Confidence: 0.7,
Score: math.Abs(zScore),
Description: "Unusual transaction timing pattern",
MetricName: "hourly_pattern",
ObservedValue: hourlyCount,
ExpectedValue: hourlyPattern.Mean,
Deviation: zScore,
Timestamp: time.Now(),
Context: map[string]interface{}{
"hour": hour,
},
}
}
return nil
}
// Helper calculation methods
func (ad *AnomalyDetector) calculateSenderFrequency(sender common.Address) float64 {
count := 0
cutoff := time.Now().Add(-time.Hour)
for _, tx := range ad.transactionLog {
if tx.From == sender && tx.Timestamp.After(cutoff) {
count++
}
}
return float64(count)
}
func (ad *AnomalyDetector) calculateTimeAnomalyScore(timestamp time.Time) float64 {
hour := timestamp.Hour()
// Business hours (9 AM - 5 PM) are normal, night hours are more suspicious
if hour >= 9 && hour <= 17 {
return 0.0
} else if hour >= 22 || hour <= 6 {
return 0.8 // High suspicion for very late/early hours
}
return 0.3 // Medium suspicion for evening hours
}
func (ad *AnomalyDetector) calculateHourlyTransactionCount(hour int) float64 {
count := 0
now := time.Now()
startOfHour := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, now.Location())
endOfHour := startOfHour.Add(time.Hour)
for _, tx := range ad.transactionLog {
if tx.Timestamp.After(startOfHour) && tx.Timestamp.Before(endOfHour) {
count++
}
}
return float64(count)
}
func (ad *AnomalyDetector) getRecentTransactionsFrom(sender common.Address, duration time.Duration) []*TransactionRecord {
var result []*TransactionRecord
cutoff := time.Now().Add(-duration)
for _, tx := range ad.transactionLog {
if tx.From == sender && tx.Timestamp.After(cutoff) {
result = append(result, tx)
}
}
return result
}
func (ad *AnomalyDetector) calculateAverageGasPrice(transactions []*TransactionRecord) float64 {
if len(transactions) == 0 {
return 0
}
sum := 0.0
for _, tx := range transactions {
sum += tx.GasPrice
}
return sum / float64(len(transactions))
}
func (ad *AnomalyDetector) calculateTrend(observations []float64) float64 {
if len(observations) < 2 {
return 0
}
// Simple linear regression
n := float64(len(observations))
sumX, sumY, sumXY, sumX2 := 0.0, 0.0, 0.0, 0.0
for i, y := range observations {
x := float64(i)
sumX += x
sumY += y
sumXY += x * y
sumX2 += x * x
}
// Calculate slope (trend)
denominator := n*sumX2 - sumX*sumX
if denominator == 0 {
return 0
}
return (n*sumXY - sumX*sumY) / denominator
}
func (ad *AnomalyDetector) calculateSeverity(zScore float64) AnomalySeverity {
if zScore < 2.0 {
return AnomalySeverityLow
} else if zScore < 3.0 {
return AnomalySeverityMedium
} else if zScore < 4.0 {
return AnomalySeverityHigh
}
return AnomalySeverityCritical
}
func (ad *AnomalyDetector) calculateConfidence(zScore float64, sampleSize int) float64 {
// Higher Z-score and larger sample size increase confidence
zConfidence := math.Min(zScore/5.0, 1.0)
sampleConfidence := math.Min(float64(sampleSize)/100.0, 1.0)
return (zConfidence + sampleConfidence) / 2.0
}
func (ad *AnomalyDetector) generateAlertID() string {
counter := atomic.AddUint64(&ad.alertCounter, 1)
return fmt.Sprintf("anomaly_%d_%d", time.Now().UnixNano(), counter)
}
func (ad *AnomalyDetector) generateAnomalyDescription(metricName string, value float64, pattern *PatternBaseline, zScore float64) string {
direction := "above"
if zScore < 0 {
direction = "below"
}
return fmt.Sprintf("Metric '%s' value %.2f is %.1f standard deviations %s the expected mean of %.2f",
metricName, value, math.Abs(zScore), direction, pattern.Mean)
}
func (ad *AnomalyDetector) generateRecommendations(metricName string, zScore float64) []string {
recommendations := []string{}
if math.Abs(zScore) > 4.0 {
recommendations = append(recommendations, "Immediate investigation required")
recommendations = append(recommendations, "Consider blocking source if malicious")
} else if math.Abs(zScore) > 3.0 {
recommendations = append(recommendations, "Initiate investigation and monitor closely")
recommendations = append(recommendations, "Review transaction history")
} else {
recommendations = append(recommendations, "Continue monitoring")
}
switch metricName {
case "transaction_value":
recommendations = append(recommendations, "Verify large transaction legitimacy")
case "gas_price":
recommendations = append(recommendations, "Check for gas price manipulation")
case "sender_frequency":
recommendations = append(recommendations, "Investigate potential automated behavior")
}
return recommendations
}
// Status and reporting methods
func (ad *AnomalyDetector) performPeriodicDetection() {
ad.mu.RLock()
defer ad.mu.RUnlock()
// Perform time-series analysis on patterns
for metricName, pattern := range ad.patterns {
if ad.config.EnableTimeSeriesAD {
ad.performTimeSeriesAnalysis(metricName, pattern)
}
}
// Analyze transaction patterns
ad.analyzeTransactionPatterns()
}
func (ad *AnomalyDetector) performTimeSeriesAnalysis(metricName string, pattern *PatternBaseline) {
if len(pattern.Observations) < 20 {
return
}
// Look for sudden changes in recent observations
recentWindow := 5
if len(pattern.Observations) < recentWindow {
return
}
recent := pattern.Observations[len(pattern.Observations)-recentWindow:]
historical := pattern.Observations[:len(pattern.Observations)-recentWindow]
recentMean := ad.calculateMean(recent)
historicalMean := ad.calculateMean(historical)
historicalStdDev := ad.calculateStdDev(historical, historicalMean)
if historicalStdDev > 0 {
changeScore := math.Abs(recentMean-historicalMean) / historicalStdDev
if changeScore > ad.config.ZScoreThreshold {
alert := &AnomalyAlert{
ID: ad.generateAlertID(),
Type: AnomalyTypeTemporal,
Severity: ad.calculateSeverity(changeScore),
Confidence: 0.9,
Score: changeScore,
Description: "Significant change in time series pattern detected",
MetricName: metricName,
ObservedValue: recentMean,
ExpectedValue: historicalMean,
Deviation: changeScore,
Timestamp: time.Now(),
}
select {
case ad.alerts <- alert:
default:
}
}
}
}
func (ad *AnomalyDetector) analyzeTransactionPatterns() {
// Look for unusual transaction patterns
recentTxs := ad.getRecentTransactions(time.Hour)
// Analyze sender patterns
senderCounts := make(map[common.Address]int)
for _, tx := range recentTxs {
senderCounts[tx.From]++
}
// Check for addresses with unusual frequency
for sender, count := range senderCounts {
if count > 50 { // Threshold for suspicious activity
alert := &AnomalyAlert{
ID: ad.generateAlertID(),
Type: AnomalyTypeFrequency,
Severity: AnomalySeverityHigh,
Confidence: 0.95,
Score: float64(count),
Description: "High frequency transaction pattern detected",
MetricName: "transaction_frequency",
ObservedValue: float64(count),
ExpectedValue: 10.0, // Expected average
Timestamp: time.Now(),
Source: sender.Hex(),
}
select {
case ad.alerts <- alert:
default:
}
}
}
}
func (ad *AnomalyDetector) getRecentTransactions(duration time.Duration) []*TransactionRecord {
var result []*TransactionRecord
cutoff := time.Now().Add(-duration)
for _, tx := range ad.transactionLog {
if tx.Timestamp.After(cutoff) {
result = append(result, tx)
}
}
return result
}
func (ad *AnomalyDetector) cleanup() {
ad.mu.Lock()
defer ad.mu.Unlock()
// Clean old transaction records
cutoff := time.Now().Add(-ad.config.BaselineWindow)
newLog := make([]*TransactionRecord, 0)
for _, tx := range ad.transactionLog {
if tx.Timestamp.After(cutoff) {
newLog = append(newLog, tx)
}
}
ad.transactionLog = newLog
// Clean old pattern observations
for _, pattern := range ad.patterns {
if len(pattern.Observations) > ad.config.MaxPatternHistory {
pattern.Observations = pattern.Observations[len(pattern.Observations)-ad.config.MaxPatternHistory:]
ad.updatePatternStatistics(pattern)
}
}
}
func (ad *AnomalyDetector) countRecentAnomalies() int {
// This would count anomalies in the last hour
// For now, return a placeholder
return 0
}
func (ad *AnomalyDetector) getTopAnomalies(limit int) []*AnomalyAlert {
// This would return the top anomalies by score
// For now, return empty slice
return []*AnomalyAlert{}
}
func (ad *AnomalyDetector) getPatternSummaries() map[string]*PatternSummary {
ad.mu.RLock()
defer ad.mu.RUnlock()
summaries := make(map[string]*PatternSummary)
for name, pattern := range ad.patterns {
trend := "STABLE"
if pattern.Trend > 0.1 {
trend = "INCREASING"
} else if pattern.Trend < -0.1 {
trend = "DECREASING"
}
summaries[name] = &PatternSummary{
MetricName: name,
Mean: pattern.Mean,
StandardDev: pattern.StandardDev,
SampleCount: pattern.SampleCount,
LastUpdated: pattern.LastUpdated,
RecentAnomalies: 0, // Would count recent anomalies for this pattern
Trend: trend,
}
}
return summaries
}
func (ad *AnomalyDetector) calculateSystemHealth() *AnomalyDetectorHealth {
ad.mu.RLock()
defer ad.mu.RUnlock()
return &AnomalyDetectorHealth{
IsRunning: ad.running,
AlertChannelSize: len(ad.alerts),
ProcessingLatency: 5.2, // Would measure actual latency
MemoryUsage: 1024 * 1024 * 10, // Would measure actual memory
LastProcessedTime: time.Now(),
ErrorRate: 0.0, // Would track actual error rate
OverallHealth: "HEALTHY",
}
}
// Helper calculation functions
func (ad *AnomalyDetector) calculateMean(values []float64) float64 {
if len(values) == 0 {
return 0
}
sum := 0.0
for _, v := range values {
sum += v
}
return sum / float64(len(values))
}
func (ad *AnomalyDetector) calculateStdDev(values []float64, mean float64) float64 {
if len(values) == 0 {
return 0
}
variance := 0.0
for _, v := range values {
variance += math.Pow(v-mean, 2)
}
variance /= float64(len(values))
return math.Sqrt(variance)
}