Files
mev-beta/orig/pkg/lifecycle/interfaces.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

418 lines
10 KiB
Go

package lifecycle
import (
"context"
"fmt"
"sync"
"time"
)
// BaseModule provides a default implementation of the Module interface
type BaseModule struct {
id string
name string
version string
dependencies []string
state ModuleState
health ModuleHealth
metrics ModuleMetrics
config ModuleConfig
}
// NewBaseModule creates a new base module
func NewBaseModule(id, name, version string, dependencies []string) *BaseModule {
return &BaseModule{
id: id,
name: name,
version: version,
dependencies: dependencies,
state: StateUninitialized,
health: ModuleHealth{
Status: HealthUnknown,
},
metrics: ModuleMetrics{
CustomMetrics: make(map[string]interface{}),
},
}
}
// Core lifecycle methods
func (bm *BaseModule) Initialize(ctx context.Context, config ModuleConfig) error {
bm.config = config
bm.state = StateInitialized
bm.health.Status = HealthHealthy
return nil
}
func (bm *BaseModule) Start(ctx context.Context) error {
if bm.state != StateInitialized && bm.state != StateStopped {
return fmt.Errorf("invalid state for start: %s", bm.state)
}
startTime := time.Now()
bm.state = StateRunning
bm.metrics.StartupTime = time.Since(startTime)
bm.metrics.LastActivity = time.Now()
bm.health.Status = HealthHealthy
return nil
}
func (bm *BaseModule) Stop(ctx context.Context) error {
if bm.state != StateRunning && bm.state != StatePaused {
return fmt.Errorf("invalid state for stop: %s", bm.state)
}
stopTime := time.Now()
bm.state = StateStopped
bm.metrics.ShutdownTime = time.Since(stopTime)
bm.health.Status = HealthUnknown
return nil
}
func (bm *BaseModule) Pause(ctx context.Context) error {
if bm.state != StateRunning {
return fmt.Errorf("invalid state for pause: %s", bm.state)
}
bm.state = StatePaused
return nil
}
func (bm *BaseModule) Resume(ctx context.Context) error {
if bm.state != StatePaused {
return fmt.Errorf("invalid state for resume: %s", bm.state)
}
bm.state = StateRunning
bm.metrics.LastActivity = time.Now()
return nil
}
// Module information
func (bm *BaseModule) GetID() string {
return bm.id
}
func (bm *BaseModule) GetName() string {
return bm.name
}
func (bm *BaseModule) GetVersion() string {
return bm.version
}
func (bm *BaseModule) GetDependencies() []string {
return bm.dependencies
}
// Health and status
func (bm *BaseModule) GetHealth() ModuleHealth {
bm.health.LastCheck = time.Now()
return bm.health
}
func (bm *BaseModule) GetState() ModuleState {
return bm.state
}
func (bm *BaseModule) GetMetrics() ModuleMetrics {
return bm.metrics
}
// Protected methods for subclasses
func (bm *BaseModule) SetHealth(status HealthStatus, message string) {
bm.health.Status = status
bm.health.Message = message
bm.health.LastCheck = time.Now()
}
func (bm *BaseModule) SetState(state ModuleState) {
bm.state = state
}
func (bm *BaseModule) UpdateMetrics(updates map[string]interface{}) {
for key, value := range updates {
bm.metrics.CustomMetrics[key] = value
}
bm.metrics.LastActivity = time.Now()
}
func (bm *BaseModule) IncrementMetric(name string, value int64) {
if current, exists := bm.metrics.CustomMetrics[name]; exists {
if currentVal, ok := current.(int64); ok {
bm.metrics.CustomMetrics[name] = currentVal + value
} else {
bm.metrics.CustomMetrics[name] = value
}
} else {
bm.metrics.CustomMetrics[name] = value
}
}
// SimpleEventBus provides a basic implementation of EventBus
type SimpleEventBus struct {
handlers map[EventType][]EventHandler
mu sync.RWMutex
}
func NewSimpleEventBus() *SimpleEventBus {
return &SimpleEventBus{
handlers: make(map[EventType][]EventHandler),
}
}
func (seb *SimpleEventBus) Publish(event ModuleEvent) error {
seb.mu.RLock()
defer seb.mu.RUnlock()
handlers, exists := seb.handlers[event.Type]
if !exists {
return nil
}
for _, handler := range handlers {
go func(h EventHandler) {
if err := h(event); err != nil {
// Log error but don't fail the publish
}
}(handler)
}
return nil
}
func (seb *SimpleEventBus) Subscribe(eventType EventType, handler EventHandler) error {
seb.mu.Lock()
defer seb.mu.Unlock()
if _, exists := seb.handlers[eventType]; !exists {
seb.handlers[eventType] = make([]EventHandler, 0)
}
seb.handlers[eventType] = append(seb.handlers[eventType], handler)
return nil
}
// LifecycleManager coordinates all lifecycle components
type LifecycleManager struct {
registry *ModuleRegistry
healthMonitor *HealthMonitorImpl
shutdownManager *ShutdownManager
container *Container
eventBus *SimpleEventBus
config LifecycleConfig
mu sync.RWMutex
}
// LifecycleConfig configures the lifecycle manager
type LifecycleConfig struct {
RegistryConfig RegistryConfig `json:"registry_config"`
HealthMonitorConfig HealthMonitorConfig `json:"health_monitor_config"`
ShutdownConfig ShutdownConfig `json:"shutdown_config"`
ContainerConfig ContainerConfig `json:"container_config"`
EnableEventBus bool `json:"enable_event_bus"`
EnableHealthMonitor bool `json:"enable_health_monitor"`
EnableShutdownManager bool `json:"enable_shutdown_manager"`
EnableDependencyInjection bool `json:"enable_dependency_injection"`
}
// NewLifecycleManager creates a new lifecycle manager
func NewLifecycleManager(config LifecycleConfig) *LifecycleManager {
lm := &LifecycleManager{
config: config,
}
// Create event bus
if config.EnableEventBus {
lm.eventBus = NewSimpleEventBus()
}
// Create dependency injection container
if config.EnableDependencyInjection {
lm.container = NewContainer(config.ContainerConfig)
}
// Create module registry
lm.registry = NewModuleRegistry(config.RegistryConfig)
if lm.eventBus != nil {
lm.registry.SetEventBus(lm.eventBus)
}
// Create health monitor
if config.EnableHealthMonitor {
lm.healthMonitor = NewHealthMonitor(config.HealthMonitorConfig)
lm.registry.SetHealthMonitor(lm.healthMonitor)
}
// Create shutdown manager
if config.EnableShutdownManager {
lm.shutdownManager = NewShutdownManager(lm.registry, config.ShutdownConfig)
}
return lm
}
// Initialize initializes the lifecycle manager
func (lm *LifecycleManager) Initialize(ctx context.Context) error {
lm.mu.Lock()
defer lm.mu.Unlock()
// Validate container if enabled
if lm.container != nil {
if err := lm.container.Validate(); err != nil {
return fmt.Errorf("container validation failed: %w", err)
}
}
// Initialize registry
if err := lm.registry.Initialize(ctx); err != nil {
return fmt.Errorf("registry initialization failed: %w", err)
}
// Start health monitor
if lm.healthMonitor != nil {
if err := lm.healthMonitor.Start(); err != nil {
return fmt.Errorf("health monitor start failed: %w", err)
}
}
// Start shutdown manager
if lm.shutdownManager != nil {
if err := lm.shutdownManager.Start(); err != nil {
return fmt.Errorf("shutdown manager start failed: %w", err)
}
}
return nil
}
// Start starts all modules
func (lm *LifecycleManager) Start(ctx context.Context) error {
return lm.registry.StartAll(ctx)
}
// Stop stops all modules
func (lm *LifecycleManager) Stop(ctx context.Context) error {
return lm.registry.StopAll(ctx)
}
// Shutdown gracefully shuts down the entire system
func (lm *LifecycleManager) Shutdown(ctx context.Context) error {
if lm.shutdownManager != nil {
return lm.shutdownManager.Shutdown(ctx)
}
return lm.registry.Shutdown(ctx)
}
// RegisterModule registers a new module
func (lm *LifecycleManager) RegisterModule(module Module, config ModuleConfig) error {
return lm.registry.Register(module, config)
}
// GetModule retrieves a module by ID
func (lm *LifecycleManager) GetModule(moduleID string) (Module, error) {
return lm.registry.Get(moduleID)
}
// GetRegistry returns the module registry
func (lm *LifecycleManager) GetRegistry() *ModuleRegistry {
lm.mu.RLock()
defer lm.mu.RUnlock()
return lm.registry
}
// GetHealthMonitor returns the health monitor
func (lm *LifecycleManager) GetHealthMonitor() *HealthMonitorImpl {
lm.mu.RLock()
defer lm.mu.RUnlock()
return lm.healthMonitor
}
// GetShutdownManager returns the shutdown manager
func (lm *LifecycleManager) GetShutdownManager() *ShutdownManager {
lm.mu.RLock()
defer lm.mu.RUnlock()
return lm.shutdownManager
}
// GetContainer returns the dependency injection container
func (lm *LifecycleManager) GetContainer() *Container {
lm.mu.RLock()
defer lm.mu.RUnlock()
return lm.container
}
// GetEventBus returns the event bus
func (lm *LifecycleManager) GetEventBus() *SimpleEventBus {
lm.mu.RLock()
defer lm.mu.RUnlock()
return lm.eventBus
}
// GetOverallHealth returns the overall system health
func (lm *LifecycleManager) GetOverallHealth() (OverallHealth, error) {
if lm.healthMonitor == nil {
return OverallHealth{}, fmt.Errorf("health monitor not enabled")
}
return lm.healthMonitor.GetOverallHealth(), nil
}
// CreateDefaultConfig creates a default lifecycle configuration
func CreateDefaultConfig() LifecycleConfig {
return LifecycleConfig{
RegistryConfig: RegistryConfig{
StartTimeout: 30 * time.Second,
StopTimeout: 15 * time.Second,
HealthCheckInterval: 30 * time.Second,
EnableMetrics: true,
EnableHealthMonitor: true,
ParallelStartup: false,
ParallelShutdown: true,
FailureRecovery: true,
AutoRestart: true,
MaxRestartAttempts: 3,
},
HealthMonitorConfig: HealthMonitorConfig{
CheckInterval: 30 * time.Second,
CheckTimeout: 10 * time.Second,
HistorySize: 100,
FailureThreshold: 3,
RecoveryThreshold: 3,
EnableNotifications: true,
EnableMetrics: true,
EnableTrends: true,
ParallelChecks: true,
MaxConcurrentChecks: 10,
},
ShutdownConfig: ShutdownConfig{
GracefulTimeout: 30 * time.Second,
ForceTimeout: 60 * time.Second,
SignalBufferSize: 10,
MaxRetries: 3,
RetryDelay: time.Second,
ParallelShutdown: true,
SaveState: true,
CleanupTempFiles: true,
NotifyExternal: false,
WaitForConnections: true,
EnableMetrics: true,
},
ContainerConfig: ContainerConfig{
EnableReflection: true,
EnableCircularDetection: true,
EnableInterception: false,
EnableValidation: true,
MaxDepth: 10,
CacheInstances: true,
},
EnableEventBus: true,
EnableHealthMonitor: true,
EnableShutdownManager: true,
EnableDependencyInjection: true,
}
}