Files
mev-beta/pkg/lifecycle/state_machine.go
Krypto Kajun 850223a953 fix(multicall): resolve critical multicall parsing corruption issues
- Added comprehensive bounds checking to prevent buffer overruns in multicall parsing
- Implemented graduated validation system (Strict/Moderate/Permissive) to reduce false positives
- Added LRU caching system for address validation with 10-minute TTL
- Enhanced ABI decoder with missing Universal Router and Arbitrum-specific DEX signatures
- Fixed duplicate function declarations and import conflicts across multiple files
- Added error recovery mechanisms with multiple fallback strategies
- Updated tests to handle new validation behavior for suspicious addresses
- Fixed parser test expectations for improved validation system
- Applied gofmt formatting fixes to ensure code style compliance
- Fixed mutex copying issues in monitoring package by introducing MetricsSnapshot
- Resolved critical security vulnerabilities in heuristic address extraction
- Progress: Updated TODO audit from 10% to 35% complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 00:12:55 -05:00

661 lines
19 KiB
Go

package lifecycle
import (
"context"
"fmt"
"sync"
"time"
)
// StateMachine manages module state transitions and enforces valid state changes
type StateMachine struct {
currentState ModuleState
transitions map[ModuleState][]ModuleState
stateHandlers map[ModuleState]StateHandler
transitionHooks map[string]TransitionHook
history []StateTransition
module Module
config StateMachineConfig
mu sync.RWMutex
metrics StateMachineMetrics
}
// StateHandler handles operations when entering a specific state
type StateHandler func(ctx context.Context, machine *StateMachine) error
// TransitionHook is called before or after state transitions
type TransitionHook func(ctx context.Context, from, to ModuleState, machine *StateMachine) error
// StateTransition represents a state change event
type StateTransition struct {
From ModuleState `json:"from"`
To ModuleState `json:"to"`
Timestamp time.Time `json:"timestamp"`
Duration time.Duration `json:"duration"`
Success bool `json:"success"`
Error error `json:"error,omitempty"`
Trigger string `json:"trigger"`
Context map[string]interface{} `json:"context"`
}
// StateMachineConfig configures state machine behavior
type StateMachineConfig struct {
InitialState ModuleState `json:"initial_state"`
TransitionTimeout time.Duration `json:"transition_timeout"`
MaxHistorySize int `json:"max_history_size"`
EnableMetrics bool `json:"enable_metrics"`
EnableValidation bool `json:"enable_validation"`
AllowConcurrent bool `json:"allow_concurrent"`
RetryFailedTransitions bool `json:"retry_failed_transitions"`
MaxRetries int `json:"max_retries"`
RetryDelay time.Duration `json:"retry_delay"`
}
// StateMachineMetrics tracks state machine performance
type StateMachineMetrics struct {
TotalTransitions int64 `json:"total_transitions"`
SuccessfulTransitions int64 `json:"successful_transitions"`
FailedTransitions int64 `json:"failed_transitions"`
StateDistribution map[ModuleState]int64 `json:"state_distribution"`
TransitionTimes map[string]time.Duration `json:"transition_times"`
AverageTransitionTime time.Duration `json:"average_transition_time"`
LongestTransition time.Duration `json:"longest_transition"`
LastTransition time.Time `json:"last_transition"`
CurrentStateDuration time.Duration `json:"current_state_duration"`
stateEnterTime time.Time
}
// NewStateMachine creates a new state machine for a module
func NewStateMachine(module Module, config StateMachineConfig) *StateMachine {
sm := &StateMachine{
currentState: config.InitialState,
transitions: createDefaultTransitions(),
stateHandlers: make(map[ModuleState]StateHandler),
transitionHooks: make(map[string]TransitionHook),
history: make([]StateTransition, 0),
module: module,
config: config,
metrics: StateMachineMetrics{
StateDistribution: make(map[ModuleState]int64),
TransitionTimes: make(map[string]time.Duration),
stateEnterTime: time.Now(),
},
}
// Set default config values
if sm.config.TransitionTimeout == 0 {
sm.config.TransitionTimeout = 30 * time.Second
}
if sm.config.MaxHistorySize == 0 {
sm.config.MaxHistorySize = 100
}
if sm.config.MaxRetries == 0 {
sm.config.MaxRetries = 3
}
if sm.config.RetryDelay == 0 {
sm.config.RetryDelay = time.Second
}
// Setup default state handlers
sm.setupDefaultHandlers()
return sm
}
// GetCurrentState returns the current state
func (sm *StateMachine) GetCurrentState() ModuleState {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.currentState
}
// CanTransition checks if a transition from current state to target state is valid
func (sm *StateMachine) CanTransition(to ModuleState) bool {
sm.mu.RLock()
defer sm.mu.RUnlock()
validTransitions, exists := sm.transitions[sm.currentState]
if !exists {
return false
}
for _, validState := range validTransitions {
if validState == to {
return true
}
}
return false
}
// Transition performs a state transition
func (sm *StateMachine) Transition(ctx context.Context, to ModuleState, trigger string) error {
if !sm.config.AllowConcurrent {
sm.mu.Lock()
defer sm.mu.Unlock()
} else {
sm.mu.RLock()
defer sm.mu.RUnlock()
}
return sm.performTransition(ctx, to, trigger)
}
// TransitionWithRetry performs a state transition with retry logic
func (sm *StateMachine) TransitionWithRetry(ctx context.Context, to ModuleState, trigger string) error {
var lastErr error
for attempt := 0; attempt <= sm.config.MaxRetries; attempt++ {
if attempt > 0 {
// Wait before retrying
select {
case <-time.After(sm.config.RetryDelay):
case <-ctx.Done():
return ctx.Err()
}
}
err := sm.Transition(ctx, to, trigger)
if err == nil {
return nil
}
lastErr = err
// Don't retry if it's a validation error
if !sm.config.RetryFailedTransitions {
break
}
}
return fmt.Errorf("transition failed after %d attempts: %w", sm.config.MaxRetries, lastErr)
}
// Initialize transitions to initialized state
func (sm *StateMachine) Initialize(ctx context.Context) error {
return sm.Transition(ctx, StateInitialized, "initialize")
}
// Start transitions to running state
func (sm *StateMachine) Start(ctx context.Context) error {
return sm.Transition(ctx, StateRunning, "start")
}
// Stop transitions to stopped state
func (sm *StateMachine) Stop(ctx context.Context) error {
return sm.Transition(ctx, StateStopped, "stop")
}
// Pause transitions to paused state
func (sm *StateMachine) Pause(ctx context.Context) error {
return sm.Transition(ctx, StatePaused, "pause")
}
// Resume transitions to running state from paused
func (sm *StateMachine) Resume(ctx context.Context) error {
return sm.Transition(ctx, StateRunning, "resume")
}
// Fail transitions to failed state
func (sm *StateMachine) Fail(ctx context.Context, reason string) error {
return sm.Transition(ctx, StateFailed, fmt.Sprintf("fail: %s", reason))
}
// SetStateHandler sets a custom handler for a specific state
func (sm *StateMachine) SetStateHandler(state ModuleState, handler StateHandler) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.stateHandlers[state] = handler
}
// SetTransitionHook sets a hook for state transitions
func (sm *StateMachine) SetTransitionHook(name string, hook TransitionHook) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.transitionHooks[name] = hook
}
// GetHistory returns the state transition history
func (sm *StateMachine) GetHistory() []StateTransition {
sm.mu.RLock()
defer sm.mu.RUnlock()
history := make([]StateTransition, len(sm.history))
copy(history, sm.history)
return history
}
// GetMetrics returns state machine metrics
func (sm *StateMachine) GetMetrics() StateMachineMetrics {
sm.mu.RLock()
defer sm.mu.RUnlock()
// Update current state duration
metrics := sm.metrics
metrics.CurrentStateDuration = time.Since(sm.metrics.stateEnterTime)
return metrics
}
// AddCustomTransition adds a custom state transition rule
func (sm *StateMachine) AddCustomTransition(from, to ModuleState) {
sm.mu.Lock()
defer sm.mu.Unlock()
if _, exists := sm.transitions[from]; !exists {
sm.transitions[from] = make([]ModuleState, 0)
}
// Check if transition already exists
for _, existing := range sm.transitions[from] {
if existing == to {
return
}
}
sm.transitions[from] = append(sm.transitions[from], to)
}
// RemoveTransition removes a state transition rule
func (sm *StateMachine) RemoveTransition(from, to ModuleState) {
sm.mu.Lock()
defer sm.mu.Unlock()
transitions, exists := sm.transitions[from]
if !exists {
return
}
for i, transition := range transitions {
if transition == to {
sm.transitions[from] = append(transitions[:i], transitions[i+1:]...)
break
}
}
}
// GetValidTransitions returns all valid transitions from current state
func (sm *StateMachine) GetValidTransitions() []ModuleState {
sm.mu.RLock()
defer sm.mu.RUnlock()
validTransitions, exists := sm.transitions[sm.currentState]
if !exists {
return []ModuleState{}
}
result := make([]ModuleState, len(validTransitions))
copy(result, validTransitions)
return result
}
// IsInState checks if the state machine is in a specific state
func (sm *StateMachine) IsInState(state ModuleState) bool {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.currentState == state
}
// IsInAnyState checks if the state machine is in any of the provided states
func (sm *StateMachine) IsInAnyState(states ...ModuleState) bool {
sm.mu.RLock()
defer sm.mu.RUnlock()
for _, state := range states {
if sm.currentState == state {
return true
}
}
return false
}
// WaitForState waits until the state machine reaches a specific state or times out
func (sm *StateMachine) WaitForState(ctx context.Context, state ModuleState, timeout time.Duration) error {
if sm.IsInState(state) {
return nil
}
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timeoutCtx.Done():
return fmt.Errorf("timeout waiting for state %s", state)
case <-ticker.C:
if sm.IsInState(state) {
return nil
}
}
}
}
// Reset resets the state machine to its initial state
func (sm *StateMachine) Reset(ctx context.Context) error {
sm.mu.Lock()
defer sm.mu.Unlock()
// Clear history
sm.history = make([]StateTransition, 0)
// Reset metrics
sm.metrics = StateMachineMetrics{
StateDistribution: make(map[ModuleState]int64),
TransitionTimes: make(map[string]time.Duration),
stateEnterTime: time.Now(),
}
// Transition to initial state
return sm.performTransition(ctx, sm.config.InitialState, "reset")
}
// Private methods
func (sm *StateMachine) performTransition(ctx context.Context, to ModuleState, trigger string) error {
startTime := time.Now()
from := sm.currentState
// Validate transition
if sm.config.EnableValidation && !sm.canTransitionUnsafe(to) {
return fmt.Errorf("invalid transition from %s to %s", from, to)
}
// Create transition context
transitionCtx := map[string]interface{}{
"trigger": trigger,
"start_time": startTime,
"module_id": sm.module.GetID(),
}
// Execute pre-transition hooks
for name, hook := range sm.transitionHooks {
hookCtx, cancel := context.WithTimeout(ctx, sm.config.TransitionTimeout)
err := func() error {
defer cancel()
if err := hook(hookCtx, from, to, sm); err != nil {
return fmt.Errorf("pre-transition hook %s failed: %w", name, err)
}
return nil
}()
if err != nil {
sm.recordFailedTransition(from, to, startTime, trigger, err, transitionCtx)
return err
}
}
// Execute state-specific logic
if err := sm.executeStateTransition(ctx, from, to); err != nil {
sm.recordFailedTransition(from, to, startTime, trigger, err, transitionCtx)
return fmt.Errorf("state transition failed: %w", err)
}
// Update current state
sm.currentState = to
duration := time.Since(startTime)
// Update metrics
if sm.config.EnableMetrics {
sm.updateMetrics(from, to, duration)
}
// Record successful transition
sm.recordSuccessfulTransition(from, to, startTime, duration, trigger, transitionCtx)
// Execute post-transition hooks
for _, hook := range sm.transitionHooks {
hookCtx, cancel := context.WithTimeout(ctx, sm.config.TransitionTimeout)
func() {
defer cancel()
if err := hook(hookCtx, from, to, sm); err != nil {
// Log error but don't fail the transition
return
}
}()
}
// Execute state handler for new state
if handler, exists := sm.stateHandlers[to]; exists {
handlerCtx, cancel := context.WithTimeout(ctx, sm.config.TransitionTimeout)
func() {
defer cancel()
if err := handler(handlerCtx, sm); err != nil {
// Log error but don't fail the transition
}
}()
}
return nil
}
func (sm *StateMachine) executeStateTransition(ctx context.Context, from, to ModuleState) error {
// Create timeout context for the operation
timeoutCtx, cancel := context.WithTimeout(ctx, sm.config.TransitionTimeout)
defer cancel()
switch to {
case StateInitialized:
return sm.module.Initialize(timeoutCtx, ModuleConfig{})
case StateRunning:
if from == StatePaused {
return sm.module.Resume(timeoutCtx)
}
return sm.module.Start(timeoutCtx)
case StateStopped:
return sm.module.Stop(timeoutCtx)
case StatePaused:
return sm.module.Pause(timeoutCtx)
case StateFailed:
// Failed state doesn't require module action
return nil
default:
return fmt.Errorf("unknown target state: %s", to)
}
}
func (sm *StateMachine) canTransitionUnsafe(to ModuleState) bool {
validTransitions, exists := sm.transitions[sm.currentState]
if !exists {
return false
}
for _, validState := range validTransitions {
if validState == to {
return true
}
}
return false
}
func (sm *StateMachine) recordSuccessfulTransition(from, to ModuleState, startTime time.Time, duration time.Duration, trigger string, context map[string]interface{}) {
transition := StateTransition{
From: from,
To: to,
Timestamp: startTime,
Duration: duration,
Success: true,
Trigger: trigger,
Context: context,
}
sm.addToHistory(transition)
}
func (sm *StateMachine) recordFailedTransition(from, to ModuleState, startTime time.Time, trigger string, err error, context map[string]interface{}) {
transition := StateTransition{
From: from,
To: to,
Timestamp: startTime,
Duration: time.Since(startTime),
Success: false,
Error: err,
Trigger: trigger,
Context: context,
}
sm.addToHistory(transition)
if sm.config.EnableMetrics {
sm.metrics.FailedTransitions++
}
}
func (sm *StateMachine) addToHistory(transition StateTransition) {
sm.history = append(sm.history, transition)
// Trim history if it exceeds max size
if len(sm.history) > sm.config.MaxHistorySize {
sm.history = sm.history[1:]
}
}
func (sm *StateMachine) updateMetrics(from, to ModuleState, duration time.Duration) {
sm.metrics.TotalTransitions++
sm.metrics.SuccessfulTransitions++
sm.metrics.StateDistribution[to]++
sm.metrics.LastTransition = time.Now()
// Update transition times
transitionKey := fmt.Sprintf("%s->%s", from, to)
sm.metrics.TransitionTimes[transitionKey] = duration
// Update average transition time
if sm.metrics.TotalTransitions > 0 {
total := time.Duration(0)
for _, d := range sm.metrics.TransitionTimes {
total += d
}
sm.metrics.AverageTransitionTime = total / time.Duration(len(sm.metrics.TransitionTimes))
}
// Update longest transition
if duration > sm.metrics.LongestTransition {
sm.metrics.LongestTransition = duration
}
// Update state enter time for duration tracking
sm.metrics.stateEnterTime = time.Now()
}
func (sm *StateMachine) setupDefaultHandlers() {
// Default handlers for common states
sm.stateHandlers[StateInitialized] = func(ctx context.Context, machine *StateMachine) error {
// State entered successfully
return nil
}
sm.stateHandlers[StateRunning] = func(ctx context.Context, machine *StateMachine) error {
// Module is now running
return nil
}
sm.stateHandlers[StateStopped] = func(ctx context.Context, machine *StateMachine) error {
// Module has stopped
return nil
}
sm.stateHandlers[StatePaused] = func(ctx context.Context, machine *StateMachine) error {
// Module is paused
return nil
}
sm.stateHandlers[StateFailed] = func(ctx context.Context, machine *StateMachine) error {
// Handle failure state - could trigger recovery logic
return nil
}
}
// createDefaultTransitions creates the standard state transition rules
func createDefaultTransitions() map[ModuleState][]ModuleState {
return map[ModuleState][]ModuleState{
StateUninitialized: {StateInitialized, StateFailed},
StateInitialized: {StateStarting, StateStopped, StateFailed},
StateStarting: {StateRunning, StateFailed},
StateRunning: {StatePausing, StateStopping, StateFailed},
StatePausing: {StatePaused, StateFailed},
StatePaused: {StateResuming, StateStopping, StateFailed},
StateResuming: {StateRunning, StateFailed},
StateStopping: {StateStopped, StateFailed},
StateStopped: {StateInitialized, StateStarting, StateFailed},
StateFailed: {StateInitialized, StateStopped}, // Recovery paths
}
}
// StateMachineBuilder provides a fluent interface for building state machines
type StateMachineBuilder struct {
config StateMachineConfig
stateHandlers map[ModuleState]StateHandler
transitionHooks map[string]TransitionHook
customTransitions map[ModuleState][]ModuleState
}
// NewStateMachineBuilder creates a new state machine builder
func NewStateMachineBuilder() *StateMachineBuilder {
return &StateMachineBuilder{
config: StateMachineConfig{
InitialState: StateUninitialized,
TransitionTimeout: 30 * time.Second,
MaxHistorySize: 100,
EnableMetrics: true,
EnableValidation: true,
},
stateHandlers: make(map[ModuleState]StateHandler),
transitionHooks: make(map[string]TransitionHook),
customTransitions: make(map[ModuleState][]ModuleState),
}
}
// WithConfig sets the state machine configuration
func (smb *StateMachineBuilder) WithConfig(config StateMachineConfig) *StateMachineBuilder {
smb.config = config
return smb
}
// WithStateHandler adds a state handler
func (smb *StateMachineBuilder) WithStateHandler(state ModuleState, handler StateHandler) *StateMachineBuilder {
smb.stateHandlers[state] = handler
return smb
}
// WithTransitionHook adds a transition hook
func (smb *StateMachineBuilder) WithTransitionHook(name string, hook TransitionHook) *StateMachineBuilder {
smb.transitionHooks[name] = hook
return smb
}
// WithCustomTransition adds a custom transition rule
func (smb *StateMachineBuilder) WithCustomTransition(from, to ModuleState) *StateMachineBuilder {
if _, exists := smb.customTransitions[from]; !exists {
smb.customTransitions[from] = make([]ModuleState, 0)
}
smb.customTransitions[from] = append(smb.customTransitions[from], to)
return smb
}
// Build creates the state machine
func (smb *StateMachineBuilder) Build(module Module) *StateMachine {
sm := NewStateMachine(module, smb.config)
// Add state handlers
for state, handler := range smb.stateHandlers {
sm.SetStateHandler(state, handler)
}
// Add transition hooks
for name, hook := range smb.transitionHooks {
sm.SetTransitionHook(name, hook)
}
// Add custom transitions
for from, toStates := range smb.customTransitions {
for _, to := range toStates {
sm.AddCustomTransition(from, to)
}
}
return sm
}