package security import ( "fmt" "math" "sort" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/fraktal/mev-beta/internal/logger" ) // AnomalyDetector detects unusual patterns in security and transaction data type AnomalyDetector struct { logger *logger.Logger config *AnomalyConfig patterns map[string]*PatternBaseline transactionLog []*TransactionRecord mu sync.RWMutex alerts chan *AnomalyAlert running bool stopCh chan struct{} alertCounter uint64 } // AnomalyConfig configures the anomaly detection system type AnomalyConfig struct { // Detection thresholds ZScoreThreshold float64 `json:"z_score_threshold"` // Standard deviations for anomaly VolumeThreshold float64 `json:"volume_threshold"` // Volume change threshold FrequencyThreshold float64 `json:"frequency_threshold"` // Frequency change threshold PatternSimilarity float64 `json:"pattern_similarity"` // Pattern similarity threshold // Time windows BaselineWindow time.Duration `json:"baseline_window"` // Period for establishing baseline DetectionWindow time.Duration `json:"detection_window"` // Period for detecting anomalies AlertCooldown time.Duration `json:"alert_cooldown"` // Cooldown between similar alerts // Data retention MaxTransactionHistory int `json:"max_transaction_history"` // Max transactions to keep MaxPatternHistory int `json:"max_pattern_history"` // Max patterns to keep CleanupInterval time.Duration `json:"cleanup_interval"` // How often to clean old data // Feature flags EnableVolumeDetection bool `json:"enable_volume_detection"` // Enable volume anomaly detection EnablePatternDetection bool `json:"enable_pattern_detection"` // Enable pattern anomaly detection EnableTimeSeriesAD bool `json:"enable_time_series_ad"` // Enable time series anomaly detection EnableBehavioralAD bool `json:"enable_behavioral_ad"` // Enable behavioral anomaly detection } // PatternBaseline represents the baseline pattern for a specific metric type PatternBaseline struct { MetricName string `json:"metric_name"` Observations []float64 `json:"observations"` Mean float64 `json:"mean"` StandardDev float64 `json:"standard_dev"` Variance float64 `json:"variance"` Min float64 `json:"min"` Max float64 `json:"max"` Percentiles map[int]float64 `json:"percentiles"` // 50th, 75th, 90th, 95th, 99th LastUpdated time.Time `json:"last_updated"` SampleCount int64 `json:"sample_count"` SeasonalPatterns map[string]float64 `json:"seasonal_patterns"` // hour, day, week patterns Trend float64 `json:"trend"` // Linear trend coefficient } // TransactionRecord represents a transaction for anomaly analysis type TransactionRecord struct { Hash common.Hash `json:"hash"` From common.Address `json:"from"` To *common.Address `json:"to"` Value float64 `json:"value"` // In ETH GasPrice float64 `json:"gas_price"` // In Gwei GasUsed uint64 `json:"gas_used"` Timestamp time.Time `json:"timestamp"` BlockNumber uint64 `json:"block_number"` Success bool `json:"success"` Metadata map[string]interface{} `json:"metadata"` AnomalyScore float64 `json:"anomaly_score"` Flags []string `json:"flags"` } // AnomalyAlert represents an detected anomaly type AnomalyAlert struct { ID string `json:"id"` Type AnomalyType `json:"type"` Severity AnomalySeverity `json:"severity"` Confidence float64 `json:"confidence"` // 0-1 Score float64 `json:"score"` // Anomaly score Description string `json:"description"` MetricName string `json:"metric_name"` ObservedValue float64 `json:"observed_value"` ExpectedValue float64 `json:"expected_value"` Deviation float64 `json:"deviation"` // Z-score or similar Timestamp time.Time `json:"timestamp"` Source string `json:"source"` // IP, address, etc. Context map[string]interface{} `json:"context"` Recommendations []string `json:"recommendations"` RelatedAlerts []string `json:"related_alerts"` } // AnomalyType represents the type of anomaly detected type AnomalyType string const ( AnomalyTypeVolume AnomalyType = "VOLUME" AnomalyTypeFrequency AnomalyType = "FREQUENCY" AnomalyTypePattern AnomalyType = "PATTERN" AnomalyTypeBehavioral AnomalyType = "BEHAVIORAL" AnomalyTypeTemporal AnomalyType = "TEMPORAL" AnomalyTypeStatistical AnomalyType = "STATISTICAL" ) // AnomalySeverity represents the severity of an anomaly type AnomalySeverity string const ( AnomalySeverityLow AnomalySeverity = "LOW" AnomalySeverityMedium AnomalySeverity = "MEDIUM" AnomalySeverityHigh AnomalySeverity = "HIGH" AnomalySeverityCritical AnomalySeverity = "CRITICAL" ) // NewAnomalyDetector creates a new anomaly detector func NewAnomalyDetector(logger *logger.Logger, config *AnomalyConfig) *AnomalyDetector { cfg := defaultAnomalyConfig() if config != nil { if config.ZScoreThreshold > 0 { cfg.ZScoreThreshold = config.ZScoreThreshold } if config.VolumeThreshold > 0 { cfg.VolumeThreshold = config.VolumeThreshold } if config.FrequencyThreshold > 0 { cfg.FrequencyThreshold = config.FrequencyThreshold } if config.PatternSimilarity > 0 { cfg.PatternSimilarity = config.PatternSimilarity } if config.BaselineWindow > 0 { cfg.BaselineWindow = config.BaselineWindow } if config.DetectionWindow > 0 { cfg.DetectionWindow = config.DetectionWindow } if config.AlertCooldown > 0 { cfg.AlertCooldown = config.AlertCooldown } if config.MaxTransactionHistory > 0 { cfg.MaxTransactionHistory = config.MaxTransactionHistory } if config.MaxPatternHistory > 0 { cfg.MaxPatternHistory = config.MaxPatternHistory } if config.CleanupInterval > 0 { cfg.CleanupInterval = config.CleanupInterval } cfg.EnableVolumeDetection = config.EnableVolumeDetection cfg.EnablePatternDetection = config.EnablePatternDetection cfg.EnableTimeSeriesAD = config.EnableTimeSeriesAD cfg.EnableBehavioralAD = config.EnableBehavioralAD } ad := &AnomalyDetector{ logger: logger, config: cfg, patterns: make(map[string]*PatternBaseline), transactionLog: make([]*TransactionRecord, 0), alerts: make(chan *AnomalyAlert, 1000), stopCh: make(chan struct{}), } return ad } func defaultAnomalyConfig() *AnomalyConfig { return &AnomalyConfig{ ZScoreThreshold: 2.5, VolumeThreshold: 3.0, FrequencyThreshold: 2.0, PatternSimilarity: 0.8, BaselineWindow: 24 * time.Hour, DetectionWindow: time.Hour, AlertCooldown: 5 * time.Minute, MaxTransactionHistory: 10000, MaxPatternHistory: 1000, CleanupInterval: time.Hour, EnableVolumeDetection: true, EnablePatternDetection: true, EnableTimeSeriesAD: true, EnableBehavioralAD: true, } } // Start begins the anomaly detection process func (ad *AnomalyDetector) Start() error { ad.mu.Lock() defer ad.mu.Unlock() if ad.running { return nil } ad.running = true go ad.detectionLoop() go ad.cleanupLoop() ad.logger.Info("Anomaly detector started") return nil } // Stop stops the anomaly detection process func (ad *AnomalyDetector) Stop() error { ad.mu.Lock() defer ad.mu.Unlock() if !ad.running { return nil } ad.running = false close(ad.stopCh) ad.logger.Info("Anomaly detector stopped") return nil } // RecordTransaction records a transaction for anomaly analysis func (ad *AnomalyDetector) RecordTransaction(record *TransactionRecord) { ad.mu.Lock() defer ad.mu.Unlock() // Add timestamp if not set if record.Timestamp.IsZero() { record.Timestamp = time.Now() } // Calculate initial anomaly score record.AnomalyScore = ad.calculateTransactionAnomalyScore(record) ad.transactionLog = append(ad.transactionLog, record) // Limit history size if len(ad.transactionLog) > ad.config.MaxTransactionHistory { ad.transactionLog = ad.transactionLog[len(ad.transactionLog)-ad.config.MaxTransactionHistory:] } // Update patterns ad.updatePatternsForTransaction(record) // Check for immediate anomalies if anomalies := ad.detectTransactionAnomalies(record); len(anomalies) > 0 { for _, anomaly := range anomalies { select { case ad.alerts <- anomaly: default: ad.logger.Warn("Anomaly alert channel full, dropping alert") } } } } // RecordMetric records a metric value for baseline establishment func (ad *AnomalyDetector) RecordMetric(metricName string, value float64) { ad.mu.Lock() defer ad.mu.Unlock() pattern, exists := ad.patterns[metricName] if !exists { pattern = &PatternBaseline{ MetricName: metricName, Observations: make([]float64, 0), Percentiles: make(map[int]float64), SeasonalPatterns: make(map[string]float64), LastUpdated: time.Now(), } ad.patterns[metricName] = pattern } // Add observation pattern.Observations = append(pattern.Observations, value) pattern.SampleCount++ // Limit observation history maxObservations := ad.config.MaxPatternHistory if len(pattern.Observations) > maxObservations { pattern.Observations = pattern.Observations[len(pattern.Observations)-maxObservations:] } // Update statistics ad.updatePatternStatistics(pattern) // Check for anomalies if anomaly := ad.detectMetricAnomaly(metricName, value, pattern); anomaly != nil { select { case ad.alerts <- anomaly: default: ad.logger.Warn("Anomaly alert channel full, dropping alert") } } } // GetAlerts returns the alert channel for consuming anomaly alerts func (ad *AnomalyDetector) GetAlerts() <-chan *AnomalyAlert { return ad.alerts } // GetAnomalyReport generates a comprehensive anomaly report func (ad *AnomalyDetector) GetAnomalyReport() *AnomalyReport { ad.mu.RLock() defer ad.mu.RUnlock() report := &AnomalyReport{ Timestamp: time.Now(), PatternsTracked: len(ad.patterns), TransactionsAnalyzed: len(ad.transactionLog), AnomaliesDetected: ad.countRecentAnomalies(), TopAnomalies: ad.getTopAnomalies(10), PatternSummaries: ad.getPatternSummaries(), SystemHealth: ad.calculateSystemHealth(), } return report } // AnomalyReport provides a comprehensive view of anomaly detection status type AnomalyReport struct { Timestamp time.Time `json:"timestamp"` PatternsTracked int `json:"patterns_tracked"` TransactionsAnalyzed int `json:"transactions_analyzed"` AnomaliesDetected int `json:"anomalies_detected"` TopAnomalies []*AnomalyAlert `json:"top_anomalies"` PatternSummaries map[string]*PatternSummary `json:"pattern_summaries"` SystemHealth *AnomalyDetectorHealth `json:"system_health"` } // PatternSummary provides a summary of a pattern baseline type PatternSummary struct { MetricName string `json:"metric_name"` Mean float64 `json:"mean"` StandardDev float64 `json:"standard_dev"` SampleCount int64 `json:"sample_count"` LastUpdated time.Time `json:"last_updated"` RecentAnomalies int `json:"recent_anomalies"` Trend string `json:"trend"` // INCREASING, DECREASING, STABLE } // AnomalyDetectorHealth represents the health of the anomaly detection system type AnomalyDetectorHealth struct { IsRunning bool `json:"is_running"` AlertChannelSize int `json:"alert_channel_size"` ProcessingLatency float64 `json:"processing_latency_ms"` MemoryUsage int64 `json:"memory_usage_bytes"` LastProcessedTime time.Time `json:"last_processed_time"` ErrorRate float64 `json:"error_rate"` OverallHealth string `json:"overall_health"` } // detectionLoop runs the main anomaly detection loop func (ad *AnomalyDetector) detectionLoop() { ticker := time.NewTicker(ad.config.DetectionWindow) defer ticker.Stop() for { select { case <-ticker.C: ad.performPeriodicDetection() case <-ad.stopCh: return } } } // cleanupLoop periodically cleans up old data func (ad *AnomalyDetector) cleanupLoop() { ticker := time.NewTicker(ad.config.CleanupInterval) defer ticker.Stop() for { select { case <-ticker.C: ad.cleanup() case <-ad.stopCh: return } } } // calculateTransactionAnomalyScore calculates an anomaly score for a transaction func (ad *AnomalyDetector) calculateTransactionAnomalyScore(record *TransactionRecord) float64 { score := 0.0 // Check value anomaly if pattern, exists := ad.patterns["transaction_value"]; exists { zScore := ad.calculateZScore(record.Value, pattern) score += math.Abs(zScore) * 0.3 } // Check gas price anomaly if pattern, exists := ad.patterns["gas_price"]; exists { zScore := ad.calculateZScore(record.GasPrice, pattern) score += math.Abs(zScore) * 0.2 } // Check frequency anomaly for the sender senderFreq := ad.calculateSenderFrequency(record.From) if pattern, exists := ad.patterns["sender_frequency"]; exists { zScore := ad.calculateZScore(senderFreq, pattern) score += math.Abs(zScore) * 0.3 } // Check time-based anomaly timeScore := ad.calculateTimeAnomalyScore(record.Timestamp) score += timeScore * 0.2 if score == 0 { // Provide a small baseline score for initial transactions without history score = 0.1 } return score } // detectTransactionAnomalies detects anomalies in a transaction func (ad *AnomalyDetector) detectTransactionAnomalies(record *TransactionRecord) []*AnomalyAlert { var anomalies []*AnomalyAlert // Volume anomaly if ad.config.EnableVolumeDetection { if alert := ad.detectVolumeAnomaly(record); alert != nil { anomalies = append(anomalies, alert) } } // Behavioral anomaly if ad.config.EnableBehavioralAD { if alert := ad.detectBehavioralAnomaly(record); alert != nil { anomalies = append(anomalies, alert) } } // Pattern anomaly if ad.config.EnablePatternDetection { if alert := ad.detectPatternAnomaly(record); alert != nil { anomalies = append(anomalies, alert) } } return anomalies } // updatePatternsForTransaction updates pattern baselines based on transaction func (ad *AnomalyDetector) updatePatternsForTransaction(record *TransactionRecord) { // Update transaction value pattern ad.updatePattern("transaction_value", record.Value) // Update gas price pattern ad.updatePattern("gas_price", record.GasPrice) // Update gas used pattern ad.updatePattern("gas_used", float64(record.GasUsed)) // Update sender frequency senderFreq := ad.calculateSenderFrequency(record.From) ad.updatePattern("sender_frequency", senderFreq) // Update hourly transaction count hour := record.Timestamp.Hour() hourlyCount := ad.calculateHourlyTransactionCount(hour) ad.updatePattern("hourly_transactions", hourlyCount) } // updatePattern updates a pattern baseline with a new observation func (ad *AnomalyDetector) updatePattern(metricName string, value float64) { pattern, exists := ad.patterns[metricName] if !exists { pattern = &PatternBaseline{ MetricName: metricName, Observations: make([]float64, 0), Percentiles: make(map[int]float64), SeasonalPatterns: make(map[string]float64), LastUpdated: time.Now(), } ad.patterns[metricName] = pattern } pattern.Observations = append(pattern.Observations, value) pattern.SampleCount++ pattern.LastUpdated = time.Now() // Limit observations maxObs := ad.config.MaxPatternHistory if len(pattern.Observations) > maxObs { pattern.Observations = pattern.Observations[len(pattern.Observations)-maxObs:] } ad.updatePatternStatistics(pattern) } // updatePatternStatistics updates statistical measures for a pattern func (ad *AnomalyDetector) updatePatternStatistics(pattern *PatternBaseline) { if len(pattern.Observations) == 0 { return } // Calculate mean sum := 0.0 for _, obs := range pattern.Observations { sum += obs } pattern.Mean = sum / float64(len(pattern.Observations)) // Calculate variance and standard deviation variance := 0.0 for _, obs := range pattern.Observations { variance += math.Pow(obs-pattern.Mean, 2) } pattern.Variance = variance / float64(len(pattern.Observations)) pattern.StandardDev = math.Sqrt(pattern.Variance) // Calculate min and max pattern.Min = pattern.Observations[0] pattern.Max = pattern.Observations[0] for _, obs := range pattern.Observations { if obs < pattern.Min { pattern.Min = obs } if obs > pattern.Max { pattern.Max = obs } } // Calculate percentiles sortedObs := make([]float64, len(pattern.Observations)) copy(sortedObs, pattern.Observations) sort.Float64s(sortedObs) percentiles := []int{50, 75, 90, 95, 99} for _, p := range percentiles { index := int(float64(p)/100.0*float64(len(sortedObs)-1) + 0.5) if index >= len(sortedObs) { index = len(sortedObs) - 1 } pattern.Percentiles[p] = sortedObs[index] } // Calculate trend (simple linear regression) pattern.Trend = ad.calculateTrend(pattern.Observations) } // calculateZScore calculates the Z-score for a value against a pattern func (ad *AnomalyDetector) calculateZScore(value float64, pattern *PatternBaseline) float64 { if pattern.StandardDev == 0 { return 0 } return (value - pattern.Mean) / pattern.StandardDev } // detectMetricAnomaly detects anomalies in a metric value func (ad *AnomalyDetector) detectMetricAnomaly(metricName string, value float64, pattern *PatternBaseline) *AnomalyAlert { if len(pattern.Observations) < 10 { // Need sufficient baseline return nil } zScore := ad.calculateZScore(value, pattern) if math.Abs(zScore) < ad.config.ZScoreThreshold { return nil } severity := ad.calculateSeverity(math.Abs(zScore)) confidence := ad.calculateConfidence(math.Abs(zScore), len(pattern.Observations)) return &AnomalyAlert{ ID: ad.generateAlertID(), Type: AnomalyTypeStatistical, Severity: severity, Confidence: confidence, Score: math.Abs(zScore), Description: ad.generateAnomalyDescription(metricName, value, pattern, zScore), MetricName: metricName, ObservedValue: value, ExpectedValue: pattern.Mean, Deviation: zScore, Timestamp: time.Now(), Context: map[string]interface{}{ "standard_dev": pattern.StandardDev, "sample_count": pattern.SampleCount, "percentile_95": pattern.Percentiles[95], }, Recommendations: ad.generateRecommendations(metricName, zScore), } } // Helper methods for specific anomaly types func (ad *AnomalyDetector) detectVolumeAnomaly(record *TransactionRecord) *AnomalyAlert { if record.Value < 0.1 { // Skip small transactions return nil } pattern, exists := ad.patterns["transaction_value"] if !exists || len(pattern.Observations) < 10 { return nil } zScore := ad.calculateZScore(record.Value, pattern) if math.Abs(zScore) < ad.config.VolumeThreshold { return nil } return &AnomalyAlert{ ID: ad.generateAlertID(), Type: AnomalyTypeVolume, Severity: ad.calculateSeverity(math.Abs(zScore)), Confidence: ad.calculateConfidence(math.Abs(zScore), len(pattern.Observations)), Score: math.Abs(zScore), Description: "Unusual transaction volume detected", MetricName: "transaction_value", ObservedValue: record.Value, ExpectedValue: pattern.Mean, Deviation: zScore, Timestamp: time.Now(), Source: record.From.Hex(), Context: map[string]interface{}{ "transaction_hash": record.Hash.Hex(), "gas_price": record.GasPrice, "gas_used": record.GasUsed, }, } } func (ad *AnomalyDetector) detectBehavioralAnomaly(record *TransactionRecord) *AnomalyAlert { // Check if sender has unusual behavior recentTxs := ad.getRecentTransactionsFrom(record.From, time.Hour) if len(recentTxs) < 5 { // Need sufficient history return nil } // Calculate average gas price for this sender avgGasPrice := ad.calculateAverageGasPrice(recentTxs) gasDeviation := math.Abs(record.GasPrice-avgGasPrice) / avgGasPrice if gasDeviation > 2.0 { // 200% deviation return &AnomalyAlert{ ID: ad.generateAlertID(), Type: AnomalyTypeBehavioral, Severity: AnomalySeverityMedium, Confidence: 0.8, Score: gasDeviation, Description: "Unusual gas price behavior for sender", MetricName: "sender_gas_behavior", ObservedValue: record.GasPrice, ExpectedValue: avgGasPrice, Deviation: gasDeviation, Timestamp: time.Now(), Source: record.From.Hex(), } } return nil } func (ad *AnomalyDetector) detectPatternAnomaly(record *TransactionRecord) *AnomalyAlert { // Time-based pattern analysis hour := record.Timestamp.Hour() hourlyPattern, exists := ad.patterns["hourly_transactions"] if !exists { return nil } // Check if this hour is unusual for transaction activity hourlyCount := ad.calculateHourlyTransactionCount(hour) zScore := ad.calculateZScore(hourlyCount, hourlyPattern) if math.Abs(zScore) > ad.config.PatternSimilarity*2 { return &AnomalyAlert{ ID: ad.generateAlertID(), Type: AnomalyTypePattern, Severity: ad.calculateSeverity(math.Abs(zScore)), Confidence: 0.7, Score: math.Abs(zScore), Description: "Unusual transaction timing pattern", MetricName: "hourly_pattern", ObservedValue: hourlyCount, ExpectedValue: hourlyPattern.Mean, Deviation: zScore, Timestamp: time.Now(), Context: map[string]interface{}{ "hour": hour, }, } } return nil } // Helper calculation methods func (ad *AnomalyDetector) calculateSenderFrequency(sender common.Address) float64 { count := 0 cutoff := time.Now().Add(-time.Hour) for _, tx := range ad.transactionLog { if tx.From == sender && tx.Timestamp.After(cutoff) { count++ } } return float64(count) } func (ad *AnomalyDetector) calculateTimeAnomalyScore(timestamp time.Time) float64 { hour := timestamp.Hour() // Business hours (9 AM - 5 PM) are normal, night hours are more suspicious if hour >= 9 && hour <= 17 { return 0.0 } else if hour >= 22 || hour <= 6 { return 0.8 // High suspicion for very late/early hours } return 0.3 // Medium suspicion for evening hours } func (ad *AnomalyDetector) calculateHourlyTransactionCount(hour int) float64 { count := 0 now := time.Now() startOfHour := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, now.Location()) endOfHour := startOfHour.Add(time.Hour) for _, tx := range ad.transactionLog { if tx.Timestamp.After(startOfHour) && tx.Timestamp.Before(endOfHour) { count++ } } return float64(count) } func (ad *AnomalyDetector) getRecentTransactionsFrom(sender common.Address, duration time.Duration) []*TransactionRecord { var result []*TransactionRecord cutoff := time.Now().Add(-duration) for _, tx := range ad.transactionLog { if tx.From == sender && tx.Timestamp.After(cutoff) { result = append(result, tx) } } return result } func (ad *AnomalyDetector) calculateAverageGasPrice(transactions []*TransactionRecord) float64 { if len(transactions) == 0 { return 0 } sum := 0.0 for _, tx := range transactions { sum += tx.GasPrice } return sum / float64(len(transactions)) } func (ad *AnomalyDetector) calculateTrend(observations []float64) float64 { if len(observations) < 2 { return 0 } // Simple linear regression n := float64(len(observations)) sumX, sumY, sumXY, sumX2 := 0.0, 0.0, 0.0, 0.0 for i, y := range observations { x := float64(i) sumX += x sumY += y sumXY += x * y sumX2 += x * x } // Calculate slope (trend) denominator := n*sumX2 - sumX*sumX if denominator == 0 { return 0 } return (n*sumXY - sumX*sumY) / denominator } func (ad *AnomalyDetector) calculateSeverity(zScore float64) AnomalySeverity { if zScore < 2.0 { return AnomalySeverityLow } else if zScore < 3.0 { return AnomalySeverityMedium } else if zScore < 4.0 { return AnomalySeverityHigh } return AnomalySeverityCritical } func (ad *AnomalyDetector) calculateConfidence(zScore float64, sampleSize int) float64 { // Higher Z-score and larger sample size increase confidence zConfidence := math.Min(zScore/5.0, 1.0) sampleConfidence := math.Min(float64(sampleSize)/100.0, 1.0) return (zConfidence + sampleConfidence) / 2.0 } func (ad *AnomalyDetector) generateAlertID() string { counter := atomic.AddUint64(&ad.alertCounter, 1) return fmt.Sprintf("anomaly_%d_%d", time.Now().UnixNano(), counter) } func (ad *AnomalyDetector) generateAnomalyDescription(metricName string, value float64, pattern *PatternBaseline, zScore float64) string { direction := "above" if zScore < 0 { direction = "below" } return fmt.Sprintf("Metric '%s' value %.2f is %.1f standard deviations %s the expected mean of %.2f", metricName, value, math.Abs(zScore), direction, pattern.Mean) } func (ad *AnomalyDetector) generateRecommendations(metricName string, zScore float64) []string { recommendations := []string{} if math.Abs(zScore) > 4.0 { recommendations = append(recommendations, "Immediate investigation required") recommendations = append(recommendations, "Consider blocking source if malicious") } else if math.Abs(zScore) > 3.0 { recommendations = append(recommendations, "Initiate investigation and monitor closely") recommendations = append(recommendations, "Review transaction history") } else { recommendations = append(recommendations, "Continue monitoring") } switch metricName { case "transaction_value": recommendations = append(recommendations, "Verify large transaction legitimacy") case "gas_price": recommendations = append(recommendations, "Check for gas price manipulation") case "sender_frequency": recommendations = append(recommendations, "Investigate potential automated behavior") } return recommendations } // Status and reporting methods func (ad *AnomalyDetector) performPeriodicDetection() { ad.mu.RLock() defer ad.mu.RUnlock() // Perform time-series analysis on patterns for metricName, pattern := range ad.patterns { if ad.config.EnableTimeSeriesAD { ad.performTimeSeriesAnalysis(metricName, pattern) } } // Analyze transaction patterns ad.analyzeTransactionPatterns() } func (ad *AnomalyDetector) performTimeSeriesAnalysis(metricName string, pattern *PatternBaseline) { if len(pattern.Observations) < 20 { return } // Look for sudden changes in recent observations recentWindow := 5 if len(pattern.Observations) < recentWindow { return } recent := pattern.Observations[len(pattern.Observations)-recentWindow:] historical := pattern.Observations[:len(pattern.Observations)-recentWindow] recentMean := ad.calculateMean(recent) historicalMean := ad.calculateMean(historical) historicalStdDev := ad.calculateStdDev(historical, historicalMean) if historicalStdDev > 0 { changeScore := math.Abs(recentMean-historicalMean) / historicalStdDev if changeScore > ad.config.ZScoreThreshold { alert := &AnomalyAlert{ ID: ad.generateAlertID(), Type: AnomalyTypeTemporal, Severity: ad.calculateSeverity(changeScore), Confidence: 0.9, Score: changeScore, Description: "Significant change in time series pattern detected", MetricName: metricName, ObservedValue: recentMean, ExpectedValue: historicalMean, Deviation: changeScore, Timestamp: time.Now(), } select { case ad.alerts <- alert: default: } } } } func (ad *AnomalyDetector) analyzeTransactionPatterns() { // Look for unusual transaction patterns recentTxs := ad.getRecentTransactions(time.Hour) // Analyze sender patterns senderCounts := make(map[common.Address]int) for _, tx := range recentTxs { senderCounts[tx.From]++ } // Check for addresses with unusual frequency for sender, count := range senderCounts { if count > 50 { // Threshold for suspicious activity alert := &AnomalyAlert{ ID: ad.generateAlertID(), Type: AnomalyTypeFrequency, Severity: AnomalySeverityHigh, Confidence: 0.95, Score: float64(count), Description: "High frequency transaction pattern detected", MetricName: "transaction_frequency", ObservedValue: float64(count), ExpectedValue: 10.0, // Expected average Timestamp: time.Now(), Source: sender.Hex(), } select { case ad.alerts <- alert: default: } } } } func (ad *AnomalyDetector) getRecentTransactions(duration time.Duration) []*TransactionRecord { var result []*TransactionRecord cutoff := time.Now().Add(-duration) for _, tx := range ad.transactionLog { if tx.Timestamp.After(cutoff) { result = append(result, tx) } } return result } func (ad *AnomalyDetector) cleanup() { ad.mu.Lock() defer ad.mu.Unlock() // Clean old transaction records cutoff := time.Now().Add(-ad.config.BaselineWindow) newLog := make([]*TransactionRecord, 0) for _, tx := range ad.transactionLog { if tx.Timestamp.After(cutoff) { newLog = append(newLog, tx) } } ad.transactionLog = newLog // Clean old pattern observations for _, pattern := range ad.patterns { if len(pattern.Observations) > ad.config.MaxPatternHistory { pattern.Observations = pattern.Observations[len(pattern.Observations)-ad.config.MaxPatternHistory:] ad.updatePatternStatistics(pattern) } } } func (ad *AnomalyDetector) countRecentAnomalies() int { // This would count anomalies in the last hour // For now, return a placeholder return 0 } func (ad *AnomalyDetector) getTopAnomalies(limit int) []*AnomalyAlert { // This would return the top anomalies by score // For now, return empty slice return []*AnomalyAlert{} } func (ad *AnomalyDetector) getPatternSummaries() map[string]*PatternSummary { ad.mu.RLock() defer ad.mu.RUnlock() summaries := make(map[string]*PatternSummary) for name, pattern := range ad.patterns { trend := "STABLE" if pattern.Trend > 0.1 { trend = "INCREASING" } else if pattern.Trend < -0.1 { trend = "DECREASING" } summaries[name] = &PatternSummary{ MetricName: name, Mean: pattern.Mean, StandardDev: pattern.StandardDev, SampleCount: pattern.SampleCount, LastUpdated: pattern.LastUpdated, RecentAnomalies: 0, // Would count recent anomalies for this pattern Trend: trend, } } return summaries } func (ad *AnomalyDetector) calculateSystemHealth() *AnomalyDetectorHealth { ad.mu.RLock() defer ad.mu.RUnlock() return &AnomalyDetectorHealth{ IsRunning: ad.running, AlertChannelSize: len(ad.alerts), ProcessingLatency: 5.2, // Would measure actual latency MemoryUsage: 1024 * 1024 * 10, // Would measure actual memory LastProcessedTime: time.Now(), ErrorRate: 0.0, // Would track actual error rate OverallHealth: "HEALTHY", } } // Helper calculation functions func (ad *AnomalyDetector) calculateMean(values []float64) float64 { if len(values) == 0 { return 0 } sum := 0.0 for _, v := range values { sum += v } return sum / float64(len(values)) } func (ad *AnomalyDetector) calculateStdDev(values []float64, mean float64) float64 { if len(values) == 0 { return 0 } variance := 0.0 for _, v := range values { variance += math.Pow(v-mean, 2) } variance /= float64(len(values)) return math.Sqrt(variance) }