Restructured project for V2 refactor: **Structure Changes:** - Moved all V1 code to orig/ folder (preserved with git mv) - Created docs/planning/ directory - Added orig/README_V1.md explaining V1 preservation **Planning Documents:** - 00_V2_MASTER_PLAN.md: Complete architecture overview - Executive summary of critical V1 issues - High-level component architecture diagrams - 5-phase implementation roadmap - Success metrics and risk mitigation - 07_TASK_BREAKDOWN.md: Atomic task breakdown - 99+ hours of detailed tasks - Every task < 2 hours (atomic) - Clear dependencies and success criteria - Organized by implementation phase **V2 Key Improvements:** - Per-exchange parsers (factory pattern) - Multi-layer strict validation - Multi-index pool cache - Background validation pipeline - Comprehensive observability **Critical Issues Addressed:** - Zero address tokens (strict validation + cache enrichment) - Parsing accuracy (protocol-specific parsers) - No audit trail (background validation channel) - Inefficient lookups (multi-index cache) - Stats disconnection (event-driven metrics) Next Steps: 1. Review planning documents 2. Begin Phase 1: Foundation (P1-001 through P1-010) 3. Implement parsers in Phase 2 4. Build cache system in Phase 3 5. Add validation pipeline in Phase 4 6. Migrate and test in Phase 5 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
350 lines
9.1 KiB
Go
350 lines
9.1 KiB
Go
package health
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
)
|
|
|
|
// ProbeStatus represents the status of a health probe
|
|
type ProbeStatus string
|
|
|
|
const (
|
|
ProbeStatusHealthy ProbeStatus = "healthy"
|
|
ProbeStatusUnhealthy ProbeStatus = "unhealthy"
|
|
ProbeStatusDegraded ProbeStatus = "degraded"
|
|
)
|
|
|
|
// ProbeResult contains the result of a health check probe
|
|
type ProbeResult struct {
|
|
Status ProbeStatus `json:"status"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Checks map[string]string `json:"checks"`
|
|
Message string `json:"message,omitempty"`
|
|
}
|
|
|
|
// KubernetesProbeHandler provides Kubernetes-compatible health check endpoints
|
|
type KubernetesProbeHandler struct {
|
|
logger *logger.Logger
|
|
startupComplete atomic.Bool
|
|
ready atomic.Bool
|
|
healthy atomic.Bool
|
|
startupChecks []HealthCheck
|
|
readinessChecks []HealthCheck
|
|
livenessChecks []HealthCheck
|
|
mu sync.RWMutex
|
|
lastStartupTime time.Time
|
|
lastReadyTime time.Time
|
|
lastHealthyTime time.Time
|
|
startupTimeout time.Duration
|
|
}
|
|
|
|
// HealthCheck represents a single health check function
|
|
type HealthCheck struct {
|
|
Name string
|
|
Description string
|
|
Check func(ctx context.Context) error
|
|
Critical bool // If true, failure marks entire probe as unhealthy
|
|
}
|
|
|
|
// NewKubernetesProbeHandler creates a new Kubernetes probe handler
|
|
func NewKubernetesProbeHandler(logger *logger.Logger, startupTimeout time.Duration) *KubernetesProbeHandler {
|
|
handler := &KubernetesProbeHandler{
|
|
logger: logger,
|
|
startupTimeout: startupTimeout,
|
|
}
|
|
|
|
// Initially not started
|
|
handler.startupComplete.Store(false)
|
|
handler.ready.Store(false)
|
|
handler.healthy.Store(true) // Assume healthy until proven otherwise
|
|
|
|
return handler
|
|
}
|
|
|
|
// RegisterStartupCheck adds a startup health check
|
|
func (h *KubernetesProbeHandler) RegisterStartupCheck(name, description string, check func(ctx context.Context) error, critical bool) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
h.startupChecks = append(h.startupChecks, HealthCheck{
|
|
Name: name,
|
|
Description: description,
|
|
Check: check,
|
|
Critical: critical,
|
|
})
|
|
}
|
|
|
|
// RegisterReadinessCheck adds a readiness health check
|
|
func (h *KubernetesProbeHandler) RegisterReadinessCheck(name, description string, check func(ctx context.Context) error, critical bool) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
h.readinessChecks = append(h.readinessChecks, HealthCheck{
|
|
Name: name,
|
|
Description: description,
|
|
Check: check,
|
|
Critical: critical,
|
|
})
|
|
}
|
|
|
|
// RegisterLivenessCheck adds a liveness health check
|
|
func (h *KubernetesProbeHandler) RegisterLivenessCheck(name, description string, check func(ctx context.Context) error, critical bool) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
h.livenessChecks = append(h.livenessChecks, HealthCheck{
|
|
Name: name,
|
|
Description: description,
|
|
Check: check,
|
|
Critical: critical,
|
|
})
|
|
}
|
|
|
|
// MarkStartupComplete marks the startup phase as complete
|
|
func (h *KubernetesProbeHandler) MarkStartupComplete() {
|
|
h.startupComplete.Store(true)
|
|
h.lastStartupTime = time.Now()
|
|
h.logger.Info("✅ Startup phase complete")
|
|
}
|
|
|
|
// MarkReady marks the application as ready to serve traffic
|
|
func (h *KubernetesProbeHandler) MarkReady() {
|
|
h.ready.Store(true)
|
|
h.lastReadyTime = time.Now()
|
|
h.logger.Info("✅ Application ready to serve traffic")
|
|
}
|
|
|
|
// MarkUnready marks the application as not ready to serve traffic
|
|
func (h *KubernetesProbeHandler) MarkUnready() {
|
|
h.ready.Store(false)
|
|
h.logger.Warn("⚠️ Application marked as not ready")
|
|
}
|
|
|
|
// MarkHealthy marks the application as healthy
|
|
func (h *KubernetesProbeHandler) MarkHealthy() {
|
|
h.healthy.Store(true)
|
|
h.lastHealthyTime = time.Now()
|
|
}
|
|
|
|
// MarkUnhealthy marks the application as unhealthy
|
|
func (h *KubernetesProbeHandler) MarkUnhealthy() {
|
|
h.healthy.Store(false)
|
|
h.logger.Error("❌ Application marked as unhealthy")
|
|
}
|
|
|
|
// LivenessHandler handles Kubernetes liveness probe requests
|
|
// GET /health/live
|
|
func (h *KubernetesProbeHandler) LivenessHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
result := h.checkLiveness(ctx)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
if result.Status == ProbeStatusHealthy {
|
|
w.WriteHeader(http.StatusOK)
|
|
} else {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
}
|
|
|
|
json.NewEncoder(w).Encode(result)
|
|
}
|
|
|
|
// ReadinessHandler handles Kubernetes readiness probe requests
|
|
// GET /health/ready
|
|
func (h *KubernetesProbeHandler) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
result := h.checkReadiness(ctx)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
if result.Status == ProbeStatusHealthy {
|
|
w.WriteHeader(http.StatusOK)
|
|
} else {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
}
|
|
|
|
json.NewEncoder(w).Encode(result)
|
|
}
|
|
|
|
// StartupHandler handles Kubernetes startup probe requests
|
|
// GET /health/startup
|
|
func (h *KubernetesProbeHandler) StartupHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
result := h.checkStartup(ctx)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
if result.Status == ProbeStatusHealthy {
|
|
w.WriteHeader(http.StatusOK)
|
|
} else {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
}
|
|
|
|
json.NewEncoder(w).Encode(result)
|
|
}
|
|
|
|
// checkLiveness performs liveness checks
|
|
func (h *KubernetesProbeHandler) checkLiveness(ctx context.Context) ProbeResult {
|
|
h.mu.RLock()
|
|
checks := h.livenessChecks
|
|
h.mu.RUnlock()
|
|
|
|
result := ProbeResult{
|
|
Status: ProbeStatusHealthy,
|
|
Timestamp: time.Now(),
|
|
Checks: make(map[string]string),
|
|
}
|
|
|
|
// If manually marked unhealthy, return immediately
|
|
if !h.healthy.Load() {
|
|
result.Status = ProbeStatusUnhealthy
|
|
result.Message = "Application manually marked as unhealthy"
|
|
return result
|
|
}
|
|
|
|
// Run all liveness checks
|
|
hasFailure := false
|
|
hasCriticalFailure := false
|
|
|
|
for _, check := range checks {
|
|
err := check.Check(ctx)
|
|
if err != nil {
|
|
result.Checks[check.Name] = fmt.Sprintf("FAIL: %v", err)
|
|
hasFailure = true
|
|
if check.Critical {
|
|
hasCriticalFailure = true
|
|
}
|
|
} else {
|
|
result.Checks[check.Name] = "OK"
|
|
}
|
|
}
|
|
|
|
if hasCriticalFailure {
|
|
result.Status = ProbeStatusUnhealthy
|
|
result.Message = "Critical liveness check failed"
|
|
} else if hasFailure {
|
|
result.Status = ProbeStatusDegraded
|
|
result.Message = "Non-critical liveness check failed"
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// checkReadiness performs readiness checks
|
|
func (h *KubernetesProbeHandler) checkReadiness(ctx context.Context) ProbeResult {
|
|
h.mu.RLock()
|
|
checks := h.readinessChecks
|
|
h.mu.RUnlock()
|
|
|
|
result := ProbeResult{
|
|
Status: ProbeStatusHealthy,
|
|
Timestamp: time.Now(),
|
|
Checks: make(map[string]string),
|
|
}
|
|
|
|
// Must be marked ready
|
|
if !h.ready.Load() {
|
|
result.Status = ProbeStatusUnhealthy
|
|
result.Message = "Application not ready"
|
|
return result
|
|
}
|
|
|
|
// Run all readiness checks
|
|
hasFailure := false
|
|
hasCriticalFailure := false
|
|
|
|
for _, check := range checks {
|
|
err := check.Check(ctx)
|
|
if err != nil {
|
|
result.Checks[check.Name] = fmt.Sprintf("FAIL: %v", err)
|
|
hasFailure = true
|
|
if check.Critical {
|
|
hasCriticalFailure = true
|
|
}
|
|
} else {
|
|
result.Checks[check.Name] = "OK"
|
|
}
|
|
}
|
|
|
|
if hasCriticalFailure {
|
|
result.Status = ProbeStatusUnhealthy
|
|
result.Message = "Critical readiness check failed"
|
|
} else if hasFailure {
|
|
result.Status = ProbeStatusDegraded
|
|
result.Message = "Non-critical readiness check failed"
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// checkStartup performs startup checks
|
|
func (h *KubernetesProbeHandler) checkStartup(ctx context.Context) ProbeResult {
|
|
h.mu.RLock()
|
|
checks := h.startupChecks
|
|
h.mu.RUnlock()
|
|
|
|
result := ProbeResult{
|
|
Status: ProbeStatusHealthy,
|
|
Timestamp: time.Now(),
|
|
Checks: make(map[string]string),
|
|
}
|
|
|
|
// If startup is complete, always return healthy
|
|
if h.startupComplete.Load() {
|
|
result.Message = "Startup complete"
|
|
return result
|
|
}
|
|
|
|
// Run all startup checks
|
|
hasFailure := false
|
|
hasCriticalFailure := false
|
|
|
|
for _, check := range checks {
|
|
err := check.Check(ctx)
|
|
if err != nil {
|
|
result.Checks[check.Name] = fmt.Sprintf("FAIL: %v", err)
|
|
hasFailure = true
|
|
if check.Critical {
|
|
hasCriticalFailure = true
|
|
}
|
|
} else {
|
|
result.Checks[check.Name] = "OK"
|
|
}
|
|
}
|
|
|
|
if hasCriticalFailure {
|
|
result.Status = ProbeStatusUnhealthy
|
|
result.Message = "Critical startup check failed"
|
|
} else if hasFailure {
|
|
result.Status = ProbeStatusDegraded
|
|
result.Message = "Non-critical startup check failed"
|
|
} else {
|
|
result.Message = "Startup checks passing, awaiting completion signal"
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// RegisterHTTPHandlers registers all probe handlers on the provided mux
|
|
func (h *KubernetesProbeHandler) RegisterHTTPHandlers(mux *http.ServeMux) {
|
|
mux.HandleFunc("/health/live", h.LivenessHandler)
|
|
mux.HandleFunc("/health/ready", h.ReadinessHandler)
|
|
mux.HandleFunc("/health/startup", h.StartupHandler)
|
|
|
|
// Also register aliases for convenience
|
|
mux.HandleFunc("/healthz", h.LivenessHandler)
|
|
mux.HandleFunc("/readyz", h.ReadinessHandler)
|
|
|
|
h.logger.Info("✅ Kubernetes health probe endpoints registered")
|
|
}
|