Files
mev-beta/orig/pkg/transport/message_bus.go
Administrator 803de231ba feat: create v2-prep branch with comprehensive planning
Restructured project for V2 refactor:

**Structure Changes:**
- Moved all V1 code to orig/ folder (preserved with git mv)
- Created docs/planning/ directory
- Added orig/README_V1.md explaining V1 preservation

**Planning Documents:**
- 00_V2_MASTER_PLAN.md: Complete architecture overview
  - Executive summary of critical V1 issues
  - High-level component architecture diagrams
  - 5-phase implementation roadmap
  - Success metrics and risk mitigation

- 07_TASK_BREAKDOWN.md: Atomic task breakdown
  - 99+ hours of detailed tasks
  - Every task < 2 hours (atomic)
  - Clear dependencies and success criteria
  - Organized by implementation phase

**V2 Key Improvements:**
- Per-exchange parsers (factory pattern)
- Multi-layer strict validation
- Multi-index pool cache
- Background validation pipeline
- Comprehensive observability

**Critical Issues Addressed:**
- Zero address tokens (strict validation + cache enrichment)
- Parsing accuracy (protocol-specific parsers)
- No audit trail (background validation channel)
- Inefficient lookups (multi-index cache)
- Stats disconnection (event-driven metrics)

Next Steps:
1. Review planning documents
2. Begin Phase 1: Foundation (P1-001 through P1-010)
3. Implement parsers in Phase 2
4. Build cache system in Phase 3
5. Add validation pipeline in Phase 4
6. Migrate and test in Phase 5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 10:14:26 +01:00

411 lines
11 KiB
Go

package transport
import (
"context"
"fmt"
"sync"
"time"
)
// MessageType represents the type of message being sent
type MessageType string
const (
MessageTypeEvent MessageType = "event"
MessageTypeCommand MessageType = "command"
MessageTypeQuery MessageType = "query"
MessageTypeResponse MessageType = "response"
MessageTypeNotification MessageType = "notification"
MessageTypeHeartbeat MessageType = "heartbeat"
)
// MessagePriority defines message processing priority
type MessagePriority int
const (
PriorityLow MessagePriority = iota
PriorityNormal
PriorityHigh
PriorityCritical
)
// Message represents a universal message in the system
type Message struct {
ID string `json:"id"`
Type MessageType `json:"type"`
Topic string `json:"topic"`
Source string `json:"source"`
Target string `json:"target,omitempty"`
Priority MessagePriority `json:"priority"`
Timestamp time.Time `json:"timestamp"`
Data interface{} `json:"data"`
Headers map[string]string `json:"headers,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
TTL time.Duration `json:"ttl,omitempty"`
Retries int `json:"retries"`
MaxRetries int `json:"max_retries"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// MessageHandler processes incoming messages
type MessageHandler func(ctx context.Context, msg *Message) error
// MessageFilter determines if a message should be processed
type MessageFilter func(msg *Message) bool
// Subscription represents a topic subscription
type Subscription struct {
ID string
Topic string
Filter MessageFilter
Handler MessageHandler
Options SubscriptionOptions
created time.Time
active bool
mu sync.RWMutex
}
// SubscriptionOptions configures subscription behavior
type SubscriptionOptions struct {
QueueSize int
BatchSize int
BatchTimeout time.Duration
DLQEnabled bool
RetryEnabled bool
Persistent bool
Durable bool
}
// MessageBusInterface defines the universal message bus contract
type MessageBusInterface interface {
// Core messaging operations
Publish(ctx context.Context, msg *Message) error
Subscribe(topic string, handler MessageHandler, opts ...SubscriptionOption) (*Subscription, error)
Unsubscribe(subscriptionID string) error
// Advanced messaging patterns
Request(ctx context.Context, msg *Message, timeout time.Duration) (*Message, error)
Reply(ctx context.Context, originalMsg *Message, response *Message) error
// Topic management
CreateTopic(topic string, config TopicConfig) error
DeleteTopic(topic string) error
ListTopics() []string
GetTopicInfo(topic string) (*TopicInfo, error)
// Queue operations
QueueMessage(topic string, msg *Message) error
DequeueMessage(topic string, timeout time.Duration) (*Message, error)
PeekMessage(topic string) (*Message, error)
// Dead letter queue
GetDLQMessages(topic string) ([]*Message, error)
ReprocessDLQMessage(messageID string) error
PurgeDLQ(topic string) error
// Lifecycle management
Start(ctx context.Context) error
Stop(ctx context.Context) error
Health() HealthStatus
// Metrics and monitoring
GetMetrics() MessageBusMetrics
GetSubscriptions() []*Subscription
GetActiveConnections() int
}
// SubscriptionOption configures subscription behavior
type SubscriptionOption func(*SubscriptionOptions)
// TopicConfig defines topic configuration
type TopicConfig struct {
Persistent bool
Replicated bool
RetentionPolicy RetentionPolicy
Partitions int
MaxMessageSize int64
TTL time.Duration
}
// RetentionPolicy defines message retention behavior
type RetentionPolicy struct {
MaxMessages int
MaxAge time.Duration
MaxSize int64
}
// TopicInfo provides topic statistics
type TopicInfo struct {
Name string
Config TopicConfig
MessageCount int64
SubscriberCount int
LastActivity time.Time
SizeBytes int64
}
// HealthStatus represents system health
type HealthStatus struct {
Status string
Uptime time.Duration
LastCheck time.Time
Components map[string]ComponentHealth
Errors []HealthError
}
// ComponentHealth represents component-specific health
type ComponentHealth struct {
Status string
LastCheck time.Time
ResponseTime time.Duration
ErrorCount int64
}
// HealthError represents a health check error
type HealthError struct {
Component string
Message string
Timestamp time.Time
Severity string
}
// MessageBusMetrics provides operational metrics
type MessageBusMetrics struct {
MessagesPublished int64
MessagesConsumed int64
MessagesFailed int64
MessagesInDLQ int64
ActiveSubscriptions int
TopicCount int
AverageLatency time.Duration
ThroughputPerSec float64
ErrorRate float64
MemoryUsage int64
CPUUsage float64
}
// TransportType defines available transport mechanisms
type TransportType string
const (
TransportMemory TransportType = "memory"
TransportUnixSocket TransportType = "unix"
TransportTCP TransportType = "tcp"
TransportWebSocket TransportType = "websocket"
TransportRedis TransportType = "redis"
TransportNATS TransportType = "nats"
)
// TransportConfig configures transport layer
type TransportConfig struct {
Type TransportType
Address string
Options map[string]interface{}
RetryConfig RetryConfig
SecurityConfig SecurityConfig
}
// RetryConfig defines retry behavior
type RetryConfig struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
Jitter bool
}
// SecurityConfig defines security settings
type SecurityConfig struct {
Enabled bool
TLSConfig *TLSConfig
AuthConfig *AuthConfig
Encryption bool
Compression bool
}
// TLSConfig for secure transport
type TLSConfig struct {
CertFile string
KeyFile string
CAFile string
Verify bool
}
// AuthConfig for authentication
type AuthConfig struct {
Username string
Password string
Token string
Method string
}
// UniversalMessageBus implements MessageBusInterface
type UniversalMessageBus struct {
config MessageBusConfig
transports map[TransportType]Transport
router *MessageRouter
topics map[string]*Topic
subscriptions map[string]*Subscription
dlq *DeadLetterQueue
metrics *MetricsCollector
persistence PersistenceLayer
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
started bool
}
// MessageBusConfig configures the message bus
type MessageBusConfig struct {
DefaultTransport TransportType
EnablePersistence bool
EnableMetrics bool
EnableDLQ bool
MaxMessageSize int64
DefaultTTL time.Duration
HealthCheckInterval time.Duration
CleanupInterval time.Duration
}
// Transport interface for different transport mechanisms
type Transport interface {
Send(ctx context.Context, msg *Message) error
Receive(ctx context.Context) (<-chan *Message, error)
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Health() ComponentHealth
GetMetrics() TransportMetrics
}
// TransportMetrics for transport-specific metrics
type TransportMetrics struct {
BytesSent int64
BytesReceived int64
MessagesSent int64
MessagesReceived int64
Connections int
Errors int64
Latency time.Duration
}
// Note: MessageRouter and RoutingRule are defined in router.go
// LoadBalancer for transport selection
type LoadBalancer interface {
SelectTransport(transports []TransportType, msg *Message) TransportType
UpdateStats(transport TransportType, latency time.Duration, success bool)
}
// Topic represents a message topic
type Topic struct {
Name string
Config TopicConfig
Messages []StoredMessage
Subscribers []*Subscription
Created time.Time
LastActivity time.Time
mu sync.RWMutex
}
// StoredMessage represents a persisted message
type StoredMessage struct {
Message *Message
Stored time.Time
Processed bool
}
// Note: DeadLetterQueue and DLQConfig are defined in dlq.go
// Note: MetricsCollector is defined in serialization.go
// PersistenceLayer handles message persistence
type PersistenceLayer interface {
Store(msg *Message) error
Retrieve(id string) (*Message, error)
Delete(id string) error
List(topic string, limit int) ([]*Message, error)
Cleanup(maxAge time.Duration) error
}
// Factory functions for common subscription options
func WithQueueSize(size int) SubscriptionOption {
return func(opts *SubscriptionOptions) {
opts.QueueSize = size
}
}
func WithBatchProcessing(size int, timeout time.Duration) SubscriptionOption {
return func(opts *SubscriptionOptions) {
opts.BatchSize = size
opts.BatchTimeout = timeout
}
}
func WithDLQ(enabled bool) SubscriptionOption {
return func(opts *SubscriptionOptions) {
opts.DLQEnabled = enabled
}
}
func WithRetry(enabled bool) SubscriptionOption {
return func(opts *SubscriptionOptions) {
opts.RetryEnabled = enabled
}
}
func WithPersistence(enabled bool) SubscriptionOption {
return func(opts *SubscriptionOptions) {
opts.Persistent = enabled
}
}
// NewUniversalMessageBus creates a new message bus instance
func NewUniversalMessageBus(config MessageBusConfig) *UniversalMessageBus {
ctx, cancel := context.WithCancel(context.Background())
return &UniversalMessageBus{
config: config,
transports: make(map[TransportType]Transport),
topics: make(map[string]*Topic),
subscriptions: make(map[string]*Subscription),
router: NewMessageRouter(),
dlq: NewDeadLetterQueue(DLQConfig{}),
metrics: NewMetricsCollector(),
ctx: ctx,
cancel: cancel,
}
}
// NewMessageRouter creates a new message router
func NewMessageRouter() *MessageRouter {
return &MessageRouter{
rules: make([]RoutingRule, 0),
fallback: TransportMemory,
}
}
// Note: NewDeadLetterQueue is defined in dlq.go
// Note: NewMetricsCollector is defined in serialization.go
// Helper function to generate message ID
func GenerateMessageID() string {
return fmt.Sprintf("msg_%d_%d", time.Now().UnixNano(), time.Now().Nanosecond())
}
// Helper function to create message with defaults
func NewMessage(msgType MessageType, topic string, source string, data interface{}) *Message {
return &Message{
ID: GenerateMessageID(),
Type: msgType,
Topic: topic,
Source: source,
Priority: PriorityNormal,
Timestamp: time.Now(),
Data: data,
Headers: make(map[string]string),
Metadata: make(map[string]interface{}),
Retries: 0,
MaxRetries: 3,
}
}