Files
mev-beta/internal/monitoring/integrity_monitor_test.go

392 lines
12 KiB
Go

package monitoring
import (
"fmt"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/fraktal/mev-beta/internal/logger"
"github.com/fraktal/mev-beta/internal/recovery"
)
// MockAlertSubscriber for testing
type MockAlertSubscriber struct {
alerts []CorruptionAlert
}
func (m *MockAlertSubscriber) HandleAlert(alert CorruptionAlert) error {
m.alerts = append(m.alerts, alert)
return nil
}
func (m *MockAlertSubscriber) GetAlerts() []CorruptionAlert {
return m.alerts
}
func (m *MockAlertSubscriber) Reset() {
m.alerts = nil
}
func TestIntegrityMonitor_RecordCorruptionDetected(t *testing.T) {
log := logger.New("error", "text", "")
monitor := NewIntegrityMonitor(log)
mockSubscriber := &MockAlertSubscriber{}
monitor.AddAlertSubscriber(mockSubscriber)
// Test various corruption scenarios
testCases := []struct {
name string
address string
corruptionScore int
source string
expectedSeverity AlertSeverity
}{
{
name: "Low corruption",
address: "0x1234567890123456789012345678901234567890",
corruptionScore: 30,
source: "test_source",
expectedSeverity: AlertSeverityInfo,
},
{
name: "Medium corruption",
address: "0x1234000000000000000000000000000000000000",
corruptionScore: 50,
source: "token_extraction",
expectedSeverity: AlertSeverityWarning,
},
{
name: "High corruption",
address: "0x0000001000000000000000000000000000000000",
corruptionScore: 80,
source: "abi_decoder",
expectedSeverity: AlertSeverityCritical,
},
{
name: "Critical corruption - TOKEN_0x000000",
address: "0x0000000300000000000000000000000000000000",
corruptionScore: 100,
source: "generic_extraction",
expectedSeverity: AlertSeverityEmergency,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockSubscriber.Reset()
addr := common.HexToAddress(tc.address)
monitor.RecordCorruptionDetected(addr, tc.corruptionScore, tc.source)
// Verify metrics were updated
metrics := monitor.GetMetrics()
assert.Greater(t, metrics.CorruptAddressesDetected, int64(0))
assert.GreaterOrEqual(t, metrics.MaxCorruptionScore, tc.corruptionScore)
// Verify alert was generated
alerts := mockSubscriber.GetAlerts()
require.Len(t, alerts, 1)
alert := alerts[0]
assert.Equal(t, tc.expectedSeverity, alert.Severity)
assert.Equal(t, addr, alert.Address)
assert.Equal(t, tc.corruptionScore, alert.CorruptionScore)
assert.Equal(t, tc.source, alert.Source)
assert.Contains(t, alert.Message, "Corruption detected")
})
}
}
func TestIntegrityMonitor_HealthScoreCalculation(t *testing.T) {
log := logger.New("error", "text", "")
monitor := NewIntegrityMonitor(log)
// Test initial health score
metrics := monitor.GetMetrics()
assert.Equal(t, 1.0, metrics.HealthScore) // Perfect health initially
// Record some activity
monitor.RecordAddressProcessed()
monitor.RecordAddressProcessed()
monitor.RecordValidationResult(true)
monitor.RecordValidationResult(true)
monitor.RecordContractCallResult(true)
monitor.RecordContractCallResult(true)
// Health should still be perfect
metrics = monitor.GetMetrics()
assert.Equal(t, 1.0, metrics.HealthScore)
// Introduce some corruption
addr := common.HexToAddress("0x0000000300000000000000000000000000000000")
monitor.RecordCorruptionDetected(addr, 80, "test")
// Health score should decrease
metrics = monitor.GetMetrics()
assert.Less(t, metrics.HealthScore, 1.0)
assert.Greater(t, metrics.HealthScore, 0.0)
// Add validation failures
monitor.RecordValidationResult(false)
monitor.RecordValidationResult(false)
// Health should decrease further
newMetrics := monitor.GetMetrics()
assert.Less(t, newMetrics.HealthScore, metrics.HealthScore)
}
func TestIntegrityMonitor_RecoveryActionTracking(t *testing.T) {
log := logger.New("error", "text", "")
monitor := NewIntegrityMonitor(log)
// Record various recovery actions
monitor.RecordRecoveryAction(recovery.ActionRetryWithBackoff)
monitor.RecordRecoveryAction(recovery.ActionRetryWithBackoff)
monitor.RecordRecoveryAction(recovery.ActionUseFallbackData)
monitor.RecordRecoveryAction(recovery.ActionCircuitBreaker)
metrics := monitor.GetMetrics()
// Verify action counts
assert.Equal(t, int64(2), metrics.RecoveryActions[recovery.ActionRetryWithBackoff])
assert.Equal(t, int64(1), metrics.RecoveryActions[recovery.ActionUseFallbackData])
assert.Equal(t, int64(1), metrics.RecoveryActions[recovery.ActionCircuitBreaker])
// Verify specific counters
assert.Equal(t, int64(2), metrics.RetryOperationsTriggered)
assert.Equal(t, int64(1), metrics.FallbackOperationsUsed)
assert.Equal(t, int64(1), metrics.CircuitBreakersTripped)
}
func TestIntegrityMonitor_ErrorTypeTracking(t *testing.T) {
log := logger.New("error", "text", "")
monitor := NewIntegrityMonitor(log)
// Record various error types
errorTypes := []recovery.ErrorType{
recovery.ErrorTypeAddressCorruption,
recovery.ErrorTypeContractCallFailed,
recovery.ErrorTypeRPCConnectionFailed,
recovery.ErrorTypeDataParsingFailed,
recovery.ErrorTypeValidationFailed,
recovery.ErrorTypeAddressCorruption, // Duplicate
}
for _, errorType := range errorTypes {
monitor.RecordErrorType(errorType)
}
metrics := monitor.GetMetrics()
// Verify error type counts
assert.Equal(t, int64(2), metrics.ErrorsByType[recovery.ErrorTypeAddressCorruption])
assert.Equal(t, int64(1), metrics.ErrorsByType[recovery.ErrorTypeContractCallFailed])
assert.Equal(t, int64(1), metrics.ErrorsByType[recovery.ErrorTypeRPCConnectionFailed])
assert.Equal(t, int64(1), metrics.ErrorsByType[recovery.ErrorTypeDataParsingFailed])
assert.Equal(t, int64(1), metrics.ErrorsByType[recovery.ErrorTypeValidationFailed])
}
func TestIntegrityMonitor_GetHealthSummary(t *testing.T) {
log := logger.New("error", "text", "")
monitor := NewIntegrityMonitor(log)
// Generate some activity
for i := 0; i < 100; i++ {
monitor.RecordAddressProcessed()
if i%10 == 0 { // 10% corruption rate
addr := common.HexToAddress(fmt.Sprintf("0x%040d", i))
monitor.RecordCorruptionDetected(addr, 50, "test")
}
monitor.RecordValidationResult(i%20 != 0) // 95% success rate
monitor.RecordContractCallResult(i%10 != 0) // 90% success rate
}
summary := monitor.GetHealthSummary()
// Verify summary structure
assert.True(t, summary["enabled"].(bool))
assert.Equal(t, int64(100), summary["total_addresses_processed"].(int64))
assert.Equal(t, int64(10), summary["corruption_detections"].(int64))
assert.InDelta(t, 0.1, summary["corruption_rate"].(float64), 0.01)
assert.InDelta(t, 0.95, summary["validation_success_rate"].(float64), 0.01)
assert.InDelta(t, 0.9, summary["contract_call_success_rate"].(float64), 0.01)
// Health score should be reasonable
healthScore := summary["health_score"].(float64)
assert.Greater(t, healthScore, 0.7) // Should be decent despite some issues
assert.Less(t, healthScore, 1.0) // Not perfect due to corruption
}
func TestIntegrityMonitor_AlertThresholds(t *testing.T) {
log := logger.New("error", "text", "")
monitor := NewIntegrityMonitor(log)
mockSubscriber := &MockAlertSubscriber{}
monitor.AddAlertSubscriber(mockSubscriber)
// Test health score threshold
monitor.SetThreshold("health_score_min", 0.8)
// Generate activity that drops health below threshold
for i := 0; i < 50; i++ {
monitor.RecordAddressProcessed()
// High corruption rate to drop health score
addr := common.HexToAddress(fmt.Sprintf("0x%040d", i))
monitor.RecordCorruptionDetected(addr, 80, "test")
}
// Should trigger health score alert
alerts := mockSubscriber.GetAlerts()
healthAlerts := 0
for _, alert := range alerts {
if alert.Severity == AlertSeverityCritical &&
alert.Context != nil &&
alert.Context["health_score"] != nil {
healthAlerts++
}
}
assert.Greater(t, healthAlerts, 0, "Should have triggered health score alerts")
}
func TestIntegrityMonitor_ConcurrentAccess(t *testing.T) {
log := logger.New("error", "text", "")
monitor := NewIntegrityMonitor(log)
const numGoroutines = 50
const operationsPerGoroutine = 100
done := make(chan bool, numGoroutines)
// Launch concurrent operations
for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer func() { done <- true }()
for j := 0; j < operationsPerGoroutine; j++ {
// Perform various operations
monitor.RecordAddressProcessed()
monitor.RecordValidationResult(j%10 != 0)
monitor.RecordContractCallResult(j%5 != 0)
if j%20 == 0 { // Occasional corruption
addr := common.HexToAddress(fmt.Sprintf("0x%020d%020d", id, j))
monitor.RecordCorruptionDetected(addr, 60, fmt.Sprintf("goroutine_%d", id))
}
// Recovery actions
if j%15 == 0 {
monitor.RecordRecoveryAction(recovery.ActionRetryWithBackoff)
}
if j%25 == 0 {
monitor.RecordErrorType(recovery.ErrorTypeAddressCorruption)
}
}
}(i)
}
// Wait for completion
for i := 0; i < numGoroutines; i++ {
select {
case <-done:
// Success
case <-time.After(10 * time.Second):
t.Fatal("Concurrent test timed out")
}
}
// Verify final metrics are consistent
metrics := monitor.GetMetrics()
expectedAddresses := int64(numGoroutines * operationsPerGoroutine)
assert.Equal(t, expectedAddresses, metrics.TotalAddressesProcessed)
// Should have some corruption detections
assert.Greater(t, metrics.CorruptAddressesDetected, int64(0))
// Should have recorded recovery actions
assert.Greater(t, metrics.RetryOperationsTriggered, int64(0))
// Health score should be calculated
assert.GreaterOrEqual(t, metrics.HealthScore, 0.0)
assert.LessOrEqual(t, metrics.HealthScore, 1.0)
t.Logf("Final metrics: Processed=%d, Corrupted=%d, Health=%.3f",
metrics.TotalAddressesProcessed,
metrics.CorruptAddressesDetected,
metrics.HealthScore)
}
func TestIntegrityMonitor_DisableEnable(t *testing.T) {
log := logger.New("error", "text", "")
monitor := NewIntegrityMonitor(log)
// Should be enabled by default
assert.True(t, monitor.IsEnabled())
// Record some activity
monitor.RecordAddressProcessed()
monitor.RecordValidationResult(true)
initialMetrics := monitor.GetMetrics()
assert.Greater(t, initialMetrics.TotalAddressesProcessed, int64(0))
// Disable monitor
monitor.Disable()
assert.False(t, monitor.IsEnabled())
// Activity should not be recorded when disabled
monitor.RecordAddressProcessed()
monitor.RecordValidationResult(true)
disabledMetrics := monitor.GetMetrics()
assert.Equal(t, initialMetrics.TotalAddressesProcessed, disabledMetrics.TotalAddressesProcessed)
// Re-enable
monitor.Enable()
assert.True(t, monitor.IsEnabled())
// Activity should be recorded again
monitor.RecordAddressProcessed()
enabledMetrics := monitor.GetMetrics()
assert.Greater(t, enabledMetrics.TotalAddressesProcessed, disabledMetrics.TotalAddressesProcessed)
}
func TestIntegrityMonitor_Performance(t *testing.T) {
log := logger.New("error", "text", "")
monitor := NewIntegrityMonitor(log)
const iterations = 10000
// Benchmark recording operations
start := time.Now()
for i := 0; i < iterations; i++ {
monitor.RecordAddressProcessed()
monitor.RecordValidationResult(i%10 != 0)
monitor.RecordContractCallResult(i%5 != 0)
if i%100 == 0 {
addr := common.HexToAddress(fmt.Sprintf("0x%040d", i))
monitor.RecordCorruptionDetected(addr, 50, "benchmark")
}
}
duration := time.Since(start)
avgTime := duration / iterations
t.Logf("Performance: %d operations in %v (avg: %v per operation)",
iterations, duration, avgTime)
// Should be reasonably fast (under 500 microseconds per operation is acceptable)
maxTime := 500 * time.Microsecond
assert.Less(t, avgTime.Nanoseconds(), maxTime.Nanoseconds(),
"Recording should be faster than %v per operation (got %v)", maxTime, avgTime)
// Verify metrics are accurate
metrics := monitor.GetMetrics()
assert.Equal(t, int64(iterations), metrics.TotalAddressesProcessed)
assert.Equal(t, int64(100), metrics.CorruptAddressesDetected) // Every 100th iteration
}