package transport import ( "context" "fmt" "sync" "time" ) // FailoverManager handles transport failover and redundancy type FailoverManager struct { transports map[string]*ManagedTransport primaryTransport string backupTransports []string failoverPolicy FailoverPolicy healthChecker HealthChecker circuitBreaker *CircuitBreaker mu sync.RWMutex ctx context.Context cancel context.CancelFunc metrics FailoverMetrics notifications chan FailoverEvent } // ManagedTransport wraps a transport with management metadata type ManagedTransport struct { Transport Transport ID string Name string Priority int Status TransportStatus LastHealthCheck time.Time FailureCount int LastFailure time.Time Config TransportConfig Metrics TransportMetrics } // TransportStatus represents the current status of a transport type TransportStatus string const ( StatusHealthy TransportStatus = "healthy" StatusDegraded TransportStatus = "degraded" StatusUnhealthy TransportStatus = "unhealthy" StatusDisabled TransportStatus = "disabled" ) // FailoverPolicy defines when and how to failover type FailoverPolicy struct { FailureThreshold int // Number of failures before marking unhealthy HealthCheckInterval time.Duration // How often to check health FailoverTimeout time.Duration // Timeout for failover operations RetryInterval time.Duration // Interval between retry attempts MaxRetries int // Maximum retry attempts AutoFailback bool // Whether to automatically failback to primary FailbackDelay time.Duration // Delay before attempting failback RequireAllHealthy bool // Whether all transports must be healthy } // FailoverMetrics tracks failover statistics type FailoverMetrics struct { TotalFailovers int64 `json:"total_failovers"` TotalFailbacks int64 `json:"total_failbacks"` CurrentTransport string `json:"current_transport"` LastFailover time.Time `json:"last_failover"` LastFailback time.Time `json:"last_failback"` FailoverDuration time.Duration `json:"failover_duration"` FailoverSuccessRate float64 `json:"failover_success_rate"` HealthCheckFailures int64 `json:"health_check_failures"` CircuitBreakerTrips int64 `json:"circuit_breaker_trips"` } // FailoverEvent represents a failover-related event type FailoverEvent struct { Type FailoverEventType `json:"type"` FromTransport string `json:"from_transport"` ToTransport string `json:"to_transport"` Reason string `json:"reason"` Timestamp time.Time `json:"timestamp"` Success bool `json:"success"` Duration time.Duration `json:"duration"` } // FailoverEventType defines types of failover events type FailoverEventType string const ( EventFailover FailoverEventType = "failover" EventFailback FailoverEventType = "failback" EventHealthCheck FailoverEventType = "health_check" EventCircuitBreak FailoverEventType = "circuit_break" EventRecovery FailoverEventType = "recovery" ) // HealthChecker interface for custom health checking logic type HealthChecker interface { CheckHealth(ctx context.Context, transport Transport) (bool, error) GetHealthScore(transport Transport) float64 } // NewFailoverManager creates a new failover manager func NewFailoverManager(policy FailoverPolicy) *FailoverManager { ctx, cancel := context.WithCancel(context.Background()) fm := &FailoverManager{ transports: make(map[string]*ManagedTransport), failoverPolicy: policy, healthChecker: NewDefaultHealthChecker(), circuitBreaker: NewCircuitBreaker(CircuitBreakerConfig{ FailureThreshold: policy.FailureThreshold, RecoveryTimeout: policy.RetryInterval, MaxRetries: policy.MaxRetries, }), ctx: ctx, cancel: cancel, notifications: make(chan FailoverEvent, 100), } // Start background routines go fm.healthCheckLoop() go fm.failoverMonitorLoop() return fm } // RegisterTransport adds a transport to the failover manager func (fm *FailoverManager) RegisterTransport(id, name string, transport Transport, priority int, config TransportConfig) error { fm.mu.Lock() defer fm.mu.Unlock() managedTransport := &ManagedTransport{ Transport: transport, ID: id, Name: name, Priority: priority, Status: StatusHealthy, LastHealthCheck: time.Now(), Config: config, } fm.transports[id] = managedTransport // Set as primary if it's the first or highest priority transport if fm.primaryTransport == "" || priority > fm.transports[fm.primaryTransport].Priority { fm.primaryTransport = id } else { fm.backupTransports = append(fm.backupTransports, id) } return nil } // UnregisterTransport removes a transport from the failover manager func (fm *FailoverManager) UnregisterTransport(id string) error { fm.mu.Lock() defer fm.mu.Unlock() if _, exists := fm.transports[id]; !exists { return fmt.Errorf("transport not found: %s", id) } delete(fm.transports, id) // Update primary if needed if fm.primaryTransport == id { fm.selectNewPrimary() } // Remove from backups for i, backupID := range fm.backupTransports { if backupID == id { fm.backupTransports = append(fm.backupTransports[:i], fm.backupTransports[i+1:]...) break } } return nil } // GetActiveTransport returns the currently active transport func (fm *FailoverManager) GetActiveTransport() (Transport, error) { fm.mu.RLock() defer fm.mu.RUnlock() if fm.primaryTransport == "" { return nil, fmt.Errorf("no active transport available") } transport, exists := fm.transports[fm.primaryTransport] if !exists { return nil, fmt.Errorf("primary transport not found: %s", fm.primaryTransport) } if transport.Status == StatusHealthy || transport.Status == StatusDegraded { return transport.Transport, nil } // Try to failover to a backup if err := fm.performFailover(); err != nil { return nil, fmt.Errorf("failover failed: %w", err) } // Return new primary after failover newPrimary := fm.transports[fm.primaryTransport] return newPrimary.Transport, nil } // Send sends a message through the active transport with automatic failover func (fm *FailoverManager) Send(ctx context.Context, msg *Message) error { transport, err := fm.GetActiveTransport() if err != nil { return fmt.Errorf("no available transport: %w", err) } // Try to send through circuit breaker return fm.circuitBreaker.Execute(func() error { return transport.Send(ctx, msg) }) } // Receive receives messages from the active transport func (fm *FailoverManager) Receive(ctx context.Context) (<-chan *Message, error) { transport, err := fm.GetActiveTransport() if err != nil { return nil, fmt.Errorf("no available transport: %w", err) } return transport.Receive(ctx) } // ForceFailover manually triggers a failover to a specific transport func (fm *FailoverManager) ForceFailover(targetTransportID string) error { fm.mu.Lock() defer fm.mu.Unlock() target, exists := fm.transports[targetTransportID] if !exists { return fmt.Errorf("target transport not found: %s", targetTransportID) } if target.Status != StatusHealthy && target.Status != StatusDegraded { return fmt.Errorf("target transport is not healthy: %s", target.Status) } return fm.switchPrimary(targetTransportID, "manual failover") } // GetTransportStatus returns the status of all transports func (fm *FailoverManager) GetTransportStatus() map[string]TransportStatus { fm.mu.RLock() defer fm.mu.RUnlock() status := make(map[string]TransportStatus) for id, transport := range fm.transports { status[id] = transport.Status } return status } // GetMetrics returns failover metrics func (fm *FailoverManager) GetMetrics() FailoverMetrics { fm.mu.RLock() defer fm.mu.RUnlock() return fm.metrics } // GetNotifications returns a channel for failover events func (fm *FailoverManager) GetNotifications() <-chan FailoverEvent { return fm.notifications } // SetHealthChecker sets a custom health checker func (fm *FailoverManager) SetHealthChecker(checker HealthChecker) { fm.mu.Lock() defer fm.mu.Unlock() fm.healthChecker = checker } // Stop gracefully stops the failover manager func (fm *FailoverManager) Stop() error { fm.cancel() close(fm.notifications) return nil } // Private methods func (fm *FailoverManager) healthCheckLoop() { ticker := time.NewTicker(fm.failoverPolicy.HealthCheckInterval) defer ticker.Stop() for { select { case <-fm.ctx.Done(): return case <-ticker.C: fm.performHealthChecks() } } } func (fm *FailoverManager) failoverMonitorLoop() { for { select { case <-fm.ctx.Done(): return default: if fm.shouldPerformFailover() { if err := fm.performFailover(); err != nil { fm.metrics.HealthCheckFailures++ } } if fm.shouldPerformFailback() { if err := fm.performFailback(); err != nil { fm.metrics.HealthCheckFailures++ } } time.Sleep(time.Second) // Check every second } } } func (fm *FailoverManager) performHealthChecks() { fm.mu.Lock() defer fm.mu.Unlock() for id, transport := range fm.transports { healthy, err := fm.healthChecker.CheckHealth(fm.ctx, transport.Transport) transport.LastHealthCheck = time.Now() previousStatus := transport.Status if err != nil || !healthy { transport.FailureCount++ transport.LastFailure = time.Now() if transport.FailureCount >= fm.failoverPolicy.FailureThreshold { transport.Status = StatusUnhealthy } else { transport.Status = StatusDegraded } } else { // Reset failure count on successful health check transport.FailureCount = 0 transport.Status = StatusHealthy } // Notify status change if previousStatus != transport.Status { fm.notifyEvent(FailoverEvent{ Type: EventHealthCheck, ToTransport: id, Reason: fmt.Sprintf("status changed from %s to %s", previousStatus, transport.Status), Timestamp: time.Now(), Success: transport.Status == StatusHealthy, }) } } } func (fm *FailoverManager) shouldPerformFailover() bool { fm.mu.RLock() defer fm.mu.RUnlock() if fm.primaryTransport == "" { return false } primary := fm.transports[fm.primaryTransport] return primary.Status == StatusUnhealthy } func (fm *FailoverManager) shouldPerformFailback() bool { if !fm.failoverPolicy.AutoFailback { return false } fm.mu.RLock() defer fm.mu.RUnlock() // Find the highest priority healthy transport var highestPriority int var highestPriorityID string for id, transport := range fm.transports { if transport.Status == StatusHealthy && transport.Priority > highestPriority { highestPriority = transport.Priority highestPriorityID = id } } // Failback if there's a higher priority transport available return highestPriorityID != "" && highestPriorityID != fm.primaryTransport } func (fm *FailoverManager) performFailover() error { fm.mu.Lock() defer fm.mu.Unlock() // Find the best backup transport var bestBackup string var bestPriority int for _, backupID := range fm.backupTransports { backup := fm.transports[backupID] if (backup.Status == StatusHealthy || backup.Status == StatusDegraded) && backup.Priority > bestPriority { bestBackup = backupID bestPriority = backup.Priority } } if bestBackup == "" { return fmt.Errorf("no healthy backup transport available") } return fm.switchPrimary(bestBackup, "automatic failover") } func (fm *FailoverManager) performFailback() error { fm.mu.Lock() defer fm.mu.Unlock() // Find the highest priority healthy transport var highestPriority int var highestPriorityID string for id, transport := range fm.transports { if transport.Status == StatusHealthy && transport.Priority > highestPriority { highestPriority = transport.Priority highestPriorityID = id } } if highestPriorityID == "" || highestPriorityID == fm.primaryTransport { return nil // No failback needed } // Wait for failback delay if time.Since(fm.metrics.LastFailover) < fm.failoverPolicy.FailbackDelay { return nil } return fm.switchPrimary(highestPriorityID, "automatic failback") } func (fm *FailoverManager) switchPrimary(newPrimaryID, reason string) error { start := time.Now() oldPrimary := fm.primaryTransport // Update primary and backup lists fm.primaryTransport = newPrimaryID // Rebuild backup list fm.backupTransports = make([]string, 0) for id := range fm.transports { if id != newPrimaryID { fm.backupTransports = append(fm.backupTransports, id) } } // Update metrics duration := time.Since(start) if oldPrimary != newPrimaryID { if reason == "automatic failback" { fm.metrics.TotalFailbacks++ fm.metrics.LastFailback = time.Now() } else { fm.metrics.TotalFailovers++ fm.metrics.LastFailover = time.Now() } fm.metrics.FailoverDuration = duration fm.metrics.CurrentTransport = newPrimaryID } // Notify eventType := EventFailover if reason == "automatic failback" { eventType = EventFailback } fm.notifyEvent(FailoverEvent{ Type: eventType, FromTransport: oldPrimary, ToTransport: newPrimaryID, Reason: reason, Timestamp: time.Now(), Success: true, Duration: duration, }) return nil } func (fm *FailoverManager) selectNewPrimary() { var bestID string var bestPriority int for id, transport := range fm.transports { if transport.Status == StatusHealthy && transport.Priority > bestPriority { bestID = id bestPriority = transport.Priority } } fm.primaryTransport = bestID } func (fm *FailoverManager) notifyEvent(event FailoverEvent) { select { case fm.notifications <- event: default: // Channel full, drop event } } // DefaultHealthChecker implements basic health checking type DefaultHealthChecker struct{} func NewDefaultHealthChecker() *DefaultHealthChecker { return &DefaultHealthChecker{} } func (dhc *DefaultHealthChecker) CheckHealth(ctx context.Context, transport Transport) (bool, error) { health := transport.Health() return health.Status == "healthy", nil } func (dhc *DefaultHealthChecker) GetHealthScore(transport Transport) float64 { health := transport.Health() switch health.Status { case "healthy": return 1.0 case "degraded": return 0.5 default: return 0.0 } } // CircuitBreaker implements circuit breaker pattern for transport operations type CircuitBreaker struct { config CircuitBreakerConfig state CircuitBreakerState failureCount int lastFailure time.Time mu sync.Mutex } type CircuitBreakerConfig struct { FailureThreshold int RecoveryTimeout time.Duration MaxRetries int } type CircuitBreakerState string const ( StateClosed CircuitBreakerState = "closed" StateOpen CircuitBreakerState = "open" StateHalfOpen CircuitBreakerState = "half_open" ) func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker { return &CircuitBreaker{ config: config, state: StateClosed, } } func (cb *CircuitBreaker) Execute(operation func() error) error { cb.mu.Lock() defer cb.mu.Unlock() if cb.state == StateOpen { if time.Since(cb.lastFailure) < cb.config.RecoveryTimeout { return fmt.Errorf("circuit breaker is open") } cb.state = StateHalfOpen } err := operation() if err != nil { cb.onFailure() return err } cb.onSuccess() return nil } func (cb *CircuitBreaker) onFailure() { cb.failureCount++ cb.lastFailure = time.Now() if cb.failureCount >= cb.config.FailureThreshold { cb.state = StateOpen } } func (cb *CircuitBreaker) onSuccess() { cb.failureCount = 0 cb.state = StateClosed } func (cb *CircuitBreaker) GetState() CircuitBreakerState { cb.mu.Lock() defer cb.mu.Unlock() return cb.state }