package lifecycle import ( "context" "errors" "fmt" "log/slog" "reflect" "sync" "time" ) // ModuleRegistry manages the registration, discovery, and lifecycle of system modules type ModuleRegistry struct { modules map[string]*RegisteredModule modulesByType map[reflect.Type][]*RegisteredModule dependencies map[string][]string startOrder []string stopOrder []string state RegistryState eventBus EventBus healthMonitor HealthMonitor config RegistryConfig logger *slog.Logger registryErrors []error registryErrorDetails []RecordedError errMu sync.Mutex mu sync.RWMutex ctx context.Context cancel context.CancelFunc } // RegisteredModule represents a module in the registry type RegisteredModule struct { ID string Name string Type reflect.Type Instance Module Config ModuleConfig Dependencies []string State ModuleState Metadata map[string]interface{} StartTime time.Time StopTime time.Time HealthStatus ModuleHealth Metrics ModuleMetrics Created time.Time Version string mu sync.RWMutex } // Module interface that all modules must implement type Module interface { // Core lifecycle methods Initialize(ctx context.Context, config ModuleConfig) error Start(ctx context.Context) error Stop(ctx context.Context) error Pause(ctx context.Context) error Resume(ctx context.Context) error // Module information GetID() string GetName() string GetVersion() string GetDependencies() []string // Health and status GetHealth() ModuleHealth GetState() ModuleState GetMetrics() ModuleMetrics } // ModuleState represents the current state of a module type ModuleState string const ( StateUninitialized ModuleState = "uninitialized" StateInitialized ModuleState = "initialized" StateStarting ModuleState = "starting" StateRunning ModuleState = "running" StatePausing ModuleState = "pausing" StatePaused ModuleState = "paused" StateResuming ModuleState = "resuming" StateStopping ModuleState = "stopping" StateStopped ModuleState = "stopped" StateFailed ModuleState = "failed" ) // RegistryState represents the state of the entire registry type RegistryState string const ( RegistryUninitialized RegistryState = "uninitialized" RegistryInitialized RegistryState = "initialized" RegistryStarting RegistryState = "starting" RegistryRunning RegistryState = "running" RegistryStopping RegistryState = "stopping" RegistryStopped RegistryState = "stopped" RegistryFailed RegistryState = "failed" ) // ModuleConfig contains configuration for a module type ModuleConfig struct { Settings map[string]interface{} `json:"settings"` Enabled bool `json:"enabled"` StartTimeout time.Duration `json:"start_timeout"` StopTimeout time.Duration `json:"stop_timeout"` HealthCheckInterval time.Duration `json:"health_check_interval"` MaxRestarts int `json:"max_restarts"` RestartDelay time.Duration `json:"restart_delay"` CriticalModule bool `json:"critical_module"` } // ModuleHealth represents the health status of a module type ModuleHealth struct { Status HealthStatus `json:"status"` LastCheck time.Time `json:"last_check"` Message string `json:"message"` Details map[string]interface{} `json:"details"` Uptime time.Duration `json:"uptime"` RestartCount int `json:"restart_count"` } // HealthStatus represents health check results type HealthStatus string const ( HealthHealthy HealthStatus = "healthy" HealthDegraded HealthStatus = "degraded" HealthUnhealthy HealthStatus = "unhealthy" HealthUnknown HealthStatus = "unknown" ) // ModuleMetrics contains performance metrics for a module type ModuleMetrics struct { StartupTime time.Duration `json:"startup_time"` ShutdownTime time.Duration `json:"shutdown_time"` MemoryUsage int64 `json:"memory_usage"` CPUUsage float64 `json:"cpu_usage"` RequestCount int64 `json:"request_count"` ErrorCount int64 `json:"error_count"` LastActivity time.Time `json:"last_activity"` CustomMetrics map[string]interface{} `json:"custom_metrics"` } // RegistryConfig configures the module registry type RegistryConfig struct { StartTimeout time.Duration `json:"start_timeout"` StopTimeout time.Duration `json:"stop_timeout"` HealthCheckInterval time.Duration `json:"health_check_interval"` EnableMetrics bool `json:"enable_metrics"` EnableHealthMonitor bool `json:"enable_health_monitor"` ParallelStartup bool `json:"parallel_startup"` ParallelShutdown bool `json:"parallel_shutdown"` FailureRecovery bool `json:"failure_recovery"` AutoRestart bool `json:"auto_restart"` MaxRestartAttempts int `json:"max_restart_attempts"` EventPublishRetries int `json:"event_publish_retries"` EventPublishDelay time.Duration `json:"event_publish_delay"` } // EventBus interface for module events type EventBus interface { Publish(event ModuleEvent) error Subscribe(eventType EventType, handler EventHandler) error } // HealthMonitor interface for health monitoring type HealthMonitor interface { CheckHealth(module *RegisteredModule) ModuleHealth StartMonitoring(module *RegisteredModule) error StopMonitoring(moduleID string) error GetHealthStatus() map[string]ModuleHealth } // ModuleEvent represents an event in the module lifecycle type ModuleEvent struct { Type EventType `json:"type"` ModuleID string `json:"module_id"` Timestamp time.Time `json:"timestamp"` Data map[string]interface{} `json:"data"` Error error `json:"error,omitempty"` } // EventType defines types of module events type EventType string const ( EventModuleRegistered EventType = "module_registered" EventModuleUnregistered EventType = "module_unregistered" EventModuleInitialized EventType = "module_initialized" EventModuleStarted EventType = "module_started" EventModuleStopped EventType = "module_stopped" EventModulePaused EventType = "module_paused" EventModuleResumed EventType = "module_resumed" EventModuleFailed EventType = "module_failed" EventModuleRestarted EventType = "module_restarted" EventHealthCheck EventType = "health_check" ) // EventHandler handles module events type EventHandler func(event ModuleEvent) error // NewModuleRegistry creates a new module registry func NewModuleRegistry(config RegistryConfig) *ModuleRegistry { ctx, cancel := context.WithCancel(context.Background()) registry := &ModuleRegistry{ modules: make(map[string]*RegisteredModule), modulesByType: make(map[reflect.Type][]*RegisteredModule), dependencies: make(map[string][]string), config: config, state: RegistryUninitialized, ctx: ctx, cancel: cancel, registryErrors: make([]error, 0), registryErrorDetails: make([]RecordedError, 0), } // Set default configuration if registry.config.StartTimeout == 0 { registry.config.StartTimeout = 30 * time.Second } if registry.config.StopTimeout == 0 { registry.config.StopTimeout = 15 * time.Second } if registry.config.HealthCheckInterval == 0 { registry.config.HealthCheckInterval = 30 * time.Second } if registry.config.EventPublishRetries == 0 { registry.config.EventPublishRetries = 3 } if registry.config.EventPublishDelay == 0 { registry.config.EventPublishDelay = 200 * time.Millisecond } if registry.logger == nil { registry.logger = slog.Default() } return registry } // Register registers a new module with the registry func (mr *ModuleRegistry) Register(module Module, config ModuleConfig) error { mr.mu.Lock() defer mr.mu.Unlock() id := module.GetID() if _, exists := mr.modules[id]; exists { return fmt.Errorf("module already registered: %s", id) } moduleType := reflect.TypeOf(module) registered := &RegisteredModule{ ID: id, Name: module.GetName(), Type: moduleType, Instance: module, Config: config, Dependencies: module.GetDependencies(), State: StateUninitialized, Metadata: make(map[string]interface{}), HealthStatus: ModuleHealth{Status: HealthUnknown}, Created: time.Now(), Version: module.GetVersion(), } mr.modules[id] = registered mr.modulesByType[moduleType] = append(mr.modulesByType[moduleType], registered) mr.dependencies[id] = module.GetDependencies() // Publish event if err := mr.publishEventWithRetry(ModuleEvent{ Type: EventModuleRegistered, ModuleID: id, Timestamp: time.Now(), Data: map[string]interface{}{ "name": module.GetName(), "version": module.GetVersion(), }, }, "Module registration event publish failed"); err != nil { // Log the error but don't fail the registration since this is a non-critical notification mr.logger.Warn("Failed to publish module registration event", "module_id", id, "error", err) } return nil } // Unregister removes a module from the registry func (mr *ModuleRegistry) Unregister(moduleID string) error { mr.mu.Lock() defer mr.mu.Unlock() registered, exists := mr.modules[moduleID] if !exists { return fmt.Errorf("module not found: %s", moduleID) } // Stop module if running if registered.State == StateRunning { if err := mr.stopModule(registered); err != nil { return fmt.Errorf("failed to stop module before unregistering: %w", err) } } // Remove from type index moduleType := registered.Type typeModules := mr.modulesByType[moduleType] for i, mod := range typeModules { if mod.ID == moduleID { mr.modulesByType[moduleType] = append(typeModules[:i], typeModules[i+1:]...) break } } // Remove from maps delete(mr.modules, moduleID) delete(mr.dependencies, moduleID) // Publish event if err := mr.publishEventWithRetry(ModuleEvent{ Type: EventModuleUnregistered, ModuleID: moduleID, Timestamp: time.Now(), }, "Module unregistration event publish failed"); err != nil { // Log the error but don't fail the unregistration since this is a non-critical notification mr.logger.Warn("Failed to publish module unregistration event", "module_id", moduleID, "error", err) } return nil } // Get retrieves a module by ID func (mr *ModuleRegistry) Get(moduleID string) (Module, error) { mr.mu.RLock() defer mr.mu.RUnlock() registered, exists := mr.modules[moduleID] if !exists { return nil, fmt.Errorf("module not found: %s", moduleID) } return registered.Instance, nil } // GetByType retrieves all modules of a specific type func (mr *ModuleRegistry) GetByType(moduleType reflect.Type) []Module { mr.mu.RLock() defer mr.mu.RUnlock() registeredModules := mr.modulesByType[moduleType] modules := make([]Module, len(registeredModules)) for i, registered := range registeredModules { modules[i] = registered.Instance } return modules } // List returns all registered module IDs func (mr *ModuleRegistry) List() []string { mr.mu.RLock() defer mr.mu.RUnlock() ids := make([]string, 0, len(mr.modules)) for id := range mr.modules { ids = append(ids, id) } return ids } // GetState returns the current state of a module func (mr *ModuleRegistry) GetState(moduleID string) (ModuleState, error) { mr.mu.RLock() defer mr.mu.RUnlock() registered, exists := mr.modules[moduleID] if !exists { return "", fmt.Errorf("module not found: %s", moduleID) } return registered.State, nil } // GetRegistryState returns the current state of the registry func (mr *ModuleRegistry) GetRegistryState() RegistryState { mr.mu.RLock() defer mr.mu.RUnlock() return mr.state } // Initialize initializes all registered modules func (mr *ModuleRegistry) Initialize(ctx context.Context) error { mr.mu.Lock() defer mr.mu.Unlock() if mr.state != RegistryUninitialized { return fmt.Errorf("registry already initialized") } mr.state = RegistryInitialized // Calculate start order based on dependencies startOrder, err := mr.calculateStartOrder() if err != nil { mr.state = RegistryFailed return fmt.Errorf("failed to calculate start order: %w", err) } mr.startOrder = startOrder // Calculate stop order (reverse of start order) mr.stopOrder = make([]string, len(startOrder)) for i, id := range startOrder { mr.stopOrder[len(startOrder)-1-i] = id } // Initialize all modules for _, moduleID := range mr.startOrder { registered := mr.modules[moduleID] if err := mr.initializeModule(ctx, registered); err != nil { mr.state = RegistryFailed return fmt.Errorf("failed to initialize module %s: %w", moduleID, err) } } return nil } // StartAll starts all registered modules in dependency order func (mr *ModuleRegistry) StartAll(ctx context.Context) error { mr.mu.Lock() defer mr.mu.Unlock() if mr.state != RegistryInitialized && mr.state != RegistryStopped { return fmt.Errorf("invalid registry state for start: %s", mr.state) } mr.state = RegistryStarting if mr.config.ParallelStartup { return mr.startAllParallel(ctx) } else { return mr.startAllSequential(ctx) } } // StopAll stops all modules in reverse dependency order func (mr *ModuleRegistry) StopAll(ctx context.Context) error { mr.mu.Lock() defer mr.mu.Unlock() if mr.state != RegistryRunning { return fmt.Errorf("invalid registry state for stop: %s", mr.state) } mr.state = RegistryStopping if mr.config.ParallelShutdown { return mr.stopAllParallel(ctx) } else { return mr.stopAllSequential(ctx) } } // Start starts a specific module func (mr *ModuleRegistry) Start(ctx context.Context, moduleID string) error { mr.mu.Lock() defer mr.mu.Unlock() registered, exists := mr.modules[moduleID] if !exists { return fmt.Errorf("module not found: %s", moduleID) } return mr.startModule(ctx, registered) } // Stop stops a specific module func (mr *ModuleRegistry) Stop(ctx context.Context, moduleID string) error { mr.mu.Lock() defer mr.mu.Unlock() registered, exists := mr.modules[moduleID] if !exists { return fmt.Errorf("module not found: %s", moduleID) } return mr.stopModule(registered) } // Pause pauses a specific module func (mr *ModuleRegistry) Pause(ctx context.Context, moduleID string) error { mr.mu.Lock() defer mr.mu.Unlock() registered, exists := mr.modules[moduleID] if !exists { return fmt.Errorf("module not found: %s", moduleID) } return mr.pauseModule(ctx, registered) } // Resume resumes a paused module func (mr *ModuleRegistry) Resume(ctx context.Context, moduleID string) error { mr.mu.Lock() defer mr.mu.Unlock() registered, exists := mr.modules[moduleID] if !exists { return fmt.Errorf("module not found: %s", moduleID) } return mr.resumeModule(ctx, registered) } // SetEventBus sets the event bus for the registry func (mr *ModuleRegistry) SetEventBus(eventBus EventBus) { mr.mu.Lock() defer mr.mu.Unlock() mr.eventBus = eventBus } // SetHealthMonitor sets the health monitor for the registry func (mr *ModuleRegistry) SetHealthMonitor(healthMonitor HealthMonitor) { mr.mu.Lock() defer mr.mu.Unlock() mr.healthMonitor = healthMonitor } // SetLogger overrides the default logger for the registry. func (mr *ModuleRegistry) SetLogger(logger *slog.Logger) { mr.mu.Lock() defer mr.mu.Unlock() if logger != nil { mr.logger = logger } } // GetHealth returns the health status of all modules func (mr *ModuleRegistry) GetHealth() map[string]ModuleHealth { mr.mu.RLock() defer mr.mu.RUnlock() health := make(map[string]ModuleHealth) for id, registered := range mr.modules { health[id] = registered.HealthStatus } return health } // GetMetrics returns metrics for all modules func (mr *ModuleRegistry) GetMetrics() map[string]ModuleMetrics { mr.mu.RLock() defer mr.mu.RUnlock() metrics := make(map[string]ModuleMetrics) for id, registered := range mr.modules { metrics[id] = registered.Metrics } return metrics } func (mr *ModuleRegistry) recordRegistryError(message string, err error, attrs ...interface{}) { if err == nil { return } attrCopy := append([]interface{}{}, attrs...) wrapped, txHash, attrsWithTx := enrichErrorWithTxHash(message, err, attrCopy) mr.errMu.Lock() mr.registryErrors = append(mr.registryErrors, wrapped) mr.registryErrorDetails = append(mr.registryErrorDetails, RecordedError{ Err: wrapped, TxHash: txHash, }) mr.errMu.Unlock() if mr.logger != nil { kv := append([]interface{}{}, attrsWithTx...) kv = append(kv, "error", err) mr.logger.Error(message, kv...) } } func (mr *ModuleRegistry) publishEventWithRetry(event ModuleEvent, failureMessage string) error { if mr.eventBus == nil { return nil } retries := mr.config.EventPublishRetries if retries <= 0 { retries = 1 } delay := mr.config.EventPublishDelay if delay <= 0 { delay = 200 * time.Millisecond } var errs []error for attempt := 1; attempt <= retries; attempt++ { if err := mr.eventBus.Publish(event); err != nil { errs = append(errs, err) if attempt < retries { time.Sleep(delay) continue } joined := errors.Join(errs...) mr.recordRegistryError( failureMessage, joined, "module_id", event.ModuleID, "event_type", event.Type, "attempts", attempt, ) return fmt.Errorf("%s after %d attempts: %w", failureMessage, attempt, joined) } if attempt > 1 && mr.logger != nil { mr.logger.Warn("Module event publish succeeded after retry", "module_id", event.ModuleID, "event_type", event.Type, "attempts", attempt) } return nil } return nil } func (mr *ModuleRegistry) aggregatedErrors() error { mr.errMu.Lock() defer mr.errMu.Unlock() if len(mr.registryErrors) == 0 { return nil } errs := make([]error, len(mr.registryErrors)) copy(errs, mr.registryErrors) return errors.Join(errs...) } // RegistryErrors exposes a copy of aggregated registry errors for diagnostics. func (mr *ModuleRegistry) RegistryErrors() []error { mr.errMu.Lock() defer mr.errMu.Unlock() if len(mr.registryErrors) == 0 { return nil } errs := make([]error, len(mr.registryErrors)) copy(errs, mr.registryErrors) return errs } // RegistryErrorDetails returns recorded registry errors with tx hash metadata. func (mr *ModuleRegistry) RegistryErrorDetails() []RecordedError { mr.errMu.Lock() defer mr.errMu.Unlock() if len(mr.registryErrorDetails) == 0 { return nil } details := make([]RecordedError, len(mr.registryErrorDetails)) copy(details, mr.registryErrorDetails) return details } // Shutdown gracefully shuts down the registry func (mr *ModuleRegistry) Shutdown(ctx context.Context) error { if mr.state == RegistryRunning { if err := mr.StopAll(ctx); err != nil { return fmt.Errorf("failed to stop all modules: %w", err) } } mr.cancel() mr.state = RegistryStopped return nil } // Private methods func (mr *ModuleRegistry) calculateStartOrder() ([]string, error) { // Topological sort based on dependencies visited := make(map[string]bool) temp := make(map[string]bool) var order []string var visit func(string) error visit = func(moduleID string) error { if temp[moduleID] { return fmt.Errorf("circular dependency detected involving module: %s", moduleID) } if visited[moduleID] { return nil } temp[moduleID] = true for _, depID := range mr.dependencies[moduleID] { if _, exists := mr.modules[depID]; !exists { return fmt.Errorf("dependency not found: %s (required by %s)", depID, moduleID) } if err := visit(depID); err != nil { return err } } temp[moduleID] = false visited[moduleID] = true order = append(order, moduleID) return nil } for moduleID := range mr.modules { if !visited[moduleID] { if err := visit(moduleID); err != nil { return nil, err } } } return order, nil } func (mr *ModuleRegistry) initializeModule(ctx context.Context, registered *RegisteredModule) error { registered.State = StateInitialized if err := registered.Instance.Initialize(ctx, registered.Config); err != nil { if publishErr := mr.publishEventWithRetry(ModuleEvent{ Type: EventModuleFailed, ModuleID: registered.ID, Timestamp: time.Now(), Data: map[string]interface{}{ "error": err.Error(), "phase": "initialization", }, }, "Module initialization failed event publish failed"); publishErr != nil { // CRITICAL FIX: Log event publishing failure but don't fail the operation mr.logger.Warn("Failed to publish module initialization failure event", "module_id", registered.ID, "publish_error", publishErr, "init_error", err) } return err } if publishErr := mr.publishEventWithRetry(ModuleEvent{ Type: EventModuleInitialized, ModuleID: registered.ID, Timestamp: time.Now(), }, "Module initialization event publish failed"); publishErr != nil { // CRITICAL FIX: Log event publishing failure but don't fail the module initialization mr.logger.Warn("Failed to publish module initialization success event", "module_id", registered.ID, "error", publishErr) } return nil } func (mr *ModuleRegistry) startModule(ctx context.Context, registered *RegisteredModule) error { if registered.State != StateInitialized && registered.State != StateStopped { return fmt.Errorf("invalid state for start: %s", registered.State) } startTime := time.Now() registered.State = StateStarting registered.StartTime = startTime // Create timeout context timeoutCtx, cancel := context.WithTimeout(ctx, registered.Config.StartTimeout) defer cancel() if err := registered.Instance.Start(timeoutCtx); err != nil { registered.State = StateFailed return err } registered.State = StateRunning registered.Metrics.StartupTime = time.Since(startTime) // Start health monitoring if mr.healthMonitor != nil { if err := mr.healthMonitor.StartMonitoring(registered); err != nil { mr.recordRegistryError("Failed to start health monitoring", err, "module_id", registered.ID) } } if publishErr := mr.publishEventWithRetry(ModuleEvent{ Type: EventModuleStarted, ModuleID: registered.ID, Timestamp: time.Now(), Data: map[string]interface{}{ "startup_time": registered.Metrics.StartupTime, }, }, "Module started event publish failed"); publishErr != nil { // CRITICAL FIX: Log event publishing failure but don't fail the module startup mr.logger.Warn("Failed to publish module started event", "module_id", registered.ID, "error", publishErr) } return nil } func (mr *ModuleRegistry) stopModule(registered *RegisteredModule) error { if registered.State != StateRunning && registered.State != StatePaused { return fmt.Errorf("invalid state for stop: %s", registered.State) } stopTime := time.Now() registered.State = StateStopping // Create timeout context ctx, cancel := context.WithTimeout(mr.ctx, registered.Config.StopTimeout) defer cancel() if err := registered.Instance.Stop(ctx); err != nil { registered.State = StateFailed return err } registered.State = StateStopped registered.StopTime = stopTime registered.Metrics.ShutdownTime = time.Since(stopTime) // Stop health monitoring if mr.healthMonitor != nil { if err := mr.healthMonitor.StopMonitoring(registered.ID); err != nil { mr.recordRegistryError("Failed to stop health monitoring", err, "module_id", registered.ID) } } if err := mr.publishEventWithRetry(ModuleEvent{ Type: EventModuleStopped, ModuleID: registered.ID, Timestamp: time.Now(), Data: map[string]interface{}{ "shutdown_time": registered.Metrics.ShutdownTime, }, }, "Module stopped event publish failed"); err != nil { // Log the error but don't fail the module stop since this is a non-critical notification mr.logger.Warn("Failed to publish module stopped event", "module_id", registered.ID, "error", err) } return nil } func (mr *ModuleRegistry) transitionModuleState( ctx context.Context, registered *RegisteredModule, actionName string, expectedInitialState ModuleState, intermediateState ModuleState, finalState ModuleState, eventType EventType, actionFunc func(context.Context) error, ) error { if registered.State != expectedInitialState { return fmt.Errorf("invalid state for %s: %s", actionName, registered.State) } registered.State = intermediateState if err := actionFunc(ctx); err != nil { registered.State = StateFailed return err } registered.State = finalState // Publish event if err := mr.publishEventWithRetry(ModuleEvent{ Type: eventType, ModuleID: registered.ID, Timestamp: time.Now(), }, "Module state transition event publish failed"); err != nil { // Log the error but don't fail the state transition since this is a non-critical notification mr.logger.Warn("Failed to publish module state transition event", "module_id", registered.ID, "event_type", eventType, "error", err) } return nil } func (mr *ModuleRegistry) pauseModule(ctx context.Context, registered *RegisteredModule) error { return mr.transitionModuleState( ctx, registered, "pause", StateRunning, StatePausing, StatePaused, EventModulePaused, registered.Instance.Pause, ) } func (mr *ModuleRegistry) resumeModule(ctx context.Context, registered *RegisteredModule) error { return mr.transitionModuleState( ctx, registered, "resume", StatePaused, StateResuming, StateRunning, EventModuleResumed, registered.Instance.Resume, ) } func (mr *ModuleRegistry) startAllSequential(ctx context.Context) error { for _, moduleID := range mr.startOrder { registered := mr.modules[moduleID] if registered.Config.Enabled { if err := mr.startModule(ctx, registered); err != nil { mr.state = RegistryFailed return fmt.Errorf("failed to start module %s: %w", moduleID, err) } } } mr.state = RegistryRunning return nil } func (mr *ModuleRegistry) startAllParallel(ctx context.Context) error { // Start modules in parallel, respecting dependencies // This is a simplified implementation - in production you'd want more sophisticated parallel startup var wg sync.WaitGroup errors := make(chan error, len(mr.modules)) for _, moduleID := range mr.startOrder { registered := mr.modules[moduleID] if registered.Config.Enabled { wg.Add(1) go func(reg *RegisteredModule) { defer wg.Done() if err := mr.startModule(ctx, reg); err != nil { errors <- fmt.Errorf("failed to start module %s: %w", reg.ID, err) } }(registered) } } wg.Wait() close(errors) // Check for errors for err := range errors { mr.state = RegistryFailed return err } mr.state = RegistryRunning return nil } func (mr *ModuleRegistry) stopAllSequential(ctx context.Context) error { for _, moduleID := range mr.stopOrder { registered := mr.modules[moduleID] if registered.State == StateRunning || registered.State == StatePaused { if err := mr.stopModule(registered); err != nil { mr.state = RegistryFailed return fmt.Errorf("failed to stop module %s: %w", moduleID, err) } } } mr.state = RegistryStopped return nil } func (mr *ModuleRegistry) stopAllParallel(ctx context.Context) error { var wg sync.WaitGroup errors := make(chan error, len(mr.modules)) for _, moduleID := range mr.stopOrder { registered := mr.modules[moduleID] if registered.State == StateRunning || registered.State == StatePaused { wg.Add(1) go func(reg *RegisteredModule) { defer wg.Done() if err := mr.stopModule(reg); err != nil { errors <- fmt.Errorf("failed to stop module %s: %w", reg.ID, err) } }(registered) } } wg.Wait() close(errors) // Check for errors for err := range errors { mr.state = RegistryFailed return err } mr.state = RegistryStopped return nil }