Restructured project for V2 refactor: **Structure Changes:** - Moved all V1 code to orig/ folder (preserved with git mv) - Created docs/planning/ directory - Added orig/README_V1.md explaining V1 preservation **Planning Documents:** - 00_V2_MASTER_PLAN.md: Complete architecture overview - Executive summary of critical V1 issues - High-level component architecture diagrams - 5-phase implementation roadmap - Success metrics and risk mitigation - 07_TASK_BREAKDOWN.md: Atomic task breakdown - 99+ hours of detailed tasks - Every task < 2 hours (atomic) - Clear dependencies and success criteria - Organized by implementation phase **V2 Key Improvements:** - Per-exchange parsers (factory pattern) - Multi-layer strict validation - Multi-index pool cache - Background validation pipeline - Comprehensive observability **Critical Issues Addressed:** - Zero address tokens (strict validation + cache enrichment) - Parsing accuracy (protocol-specific parsers) - No audit trail (background validation channel) - Inefficient lookups (multi-index cache) - Stats disconnection (event-driven metrics) Next Steps: 1. Review planning documents 2. Begin Phase 1: Foundation (P1-001 through P1-010) 3. Implement parsers in Phase 2 4. Build cache system in Phase 3 5. Add validation pipeline in Phase 4 6. Migrate and test in Phase 5 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
411 lines
11 KiB
Go
411 lines
11 KiB
Go
package transport
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// MessageType represents the type of message being sent
|
|
type MessageType string
|
|
|
|
const (
|
|
MessageTypeEvent MessageType = "event"
|
|
MessageTypeCommand MessageType = "command"
|
|
MessageTypeQuery MessageType = "query"
|
|
MessageTypeResponse MessageType = "response"
|
|
MessageTypeNotification MessageType = "notification"
|
|
MessageTypeHeartbeat MessageType = "heartbeat"
|
|
)
|
|
|
|
// MessagePriority defines message processing priority
|
|
type MessagePriority int
|
|
|
|
const (
|
|
PriorityLow MessagePriority = iota
|
|
PriorityNormal
|
|
PriorityHigh
|
|
PriorityCritical
|
|
)
|
|
|
|
// Message represents a universal message in the system
|
|
type Message struct {
|
|
ID string `json:"id"`
|
|
Type MessageType `json:"type"`
|
|
Topic string `json:"topic"`
|
|
Source string `json:"source"`
|
|
Target string `json:"target,omitempty"`
|
|
Priority MessagePriority `json:"priority"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Data interface{} `json:"data"`
|
|
Headers map[string]string `json:"headers,omitempty"`
|
|
CorrelationID string `json:"correlation_id,omitempty"`
|
|
TTL time.Duration `json:"ttl,omitempty"`
|
|
Retries int `json:"retries"`
|
|
MaxRetries int `json:"max_retries"`
|
|
Metadata map[string]interface{} `json:"metadata,omitempty"`
|
|
}
|
|
|
|
// MessageHandler processes incoming messages
|
|
type MessageHandler func(ctx context.Context, msg *Message) error
|
|
|
|
// MessageFilter determines if a message should be processed
|
|
type MessageFilter func(msg *Message) bool
|
|
|
|
// Subscription represents a topic subscription
|
|
type Subscription struct {
|
|
ID string
|
|
Topic string
|
|
Filter MessageFilter
|
|
Handler MessageHandler
|
|
Options SubscriptionOptions
|
|
created time.Time
|
|
active bool
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// SubscriptionOptions configures subscription behavior
|
|
type SubscriptionOptions struct {
|
|
QueueSize int
|
|
BatchSize int
|
|
BatchTimeout time.Duration
|
|
DLQEnabled bool
|
|
RetryEnabled bool
|
|
Persistent bool
|
|
Durable bool
|
|
}
|
|
|
|
// MessageBusInterface defines the universal message bus contract
|
|
type MessageBusInterface interface {
|
|
// Core messaging operations
|
|
Publish(ctx context.Context, msg *Message) error
|
|
Subscribe(topic string, handler MessageHandler, opts ...SubscriptionOption) (*Subscription, error)
|
|
Unsubscribe(subscriptionID string) error
|
|
|
|
// Advanced messaging patterns
|
|
Request(ctx context.Context, msg *Message, timeout time.Duration) (*Message, error)
|
|
Reply(ctx context.Context, originalMsg *Message, response *Message) error
|
|
|
|
// Topic management
|
|
CreateTopic(topic string, config TopicConfig) error
|
|
DeleteTopic(topic string) error
|
|
ListTopics() []string
|
|
GetTopicInfo(topic string) (*TopicInfo, error)
|
|
|
|
// Queue operations
|
|
QueueMessage(topic string, msg *Message) error
|
|
DequeueMessage(topic string, timeout time.Duration) (*Message, error)
|
|
PeekMessage(topic string) (*Message, error)
|
|
|
|
// Dead letter queue
|
|
GetDLQMessages(topic string) ([]*Message, error)
|
|
ReprocessDLQMessage(messageID string) error
|
|
PurgeDLQ(topic string) error
|
|
|
|
// Lifecycle management
|
|
Start(ctx context.Context) error
|
|
Stop(ctx context.Context) error
|
|
Health() HealthStatus
|
|
|
|
// Metrics and monitoring
|
|
GetMetrics() MessageBusMetrics
|
|
GetSubscriptions() []*Subscription
|
|
GetActiveConnections() int
|
|
}
|
|
|
|
// SubscriptionOption configures subscription behavior
|
|
type SubscriptionOption func(*SubscriptionOptions)
|
|
|
|
// TopicConfig defines topic configuration
|
|
type TopicConfig struct {
|
|
Persistent bool
|
|
Replicated bool
|
|
RetentionPolicy RetentionPolicy
|
|
Partitions int
|
|
MaxMessageSize int64
|
|
TTL time.Duration
|
|
}
|
|
|
|
// RetentionPolicy defines message retention behavior
|
|
type RetentionPolicy struct {
|
|
MaxMessages int
|
|
MaxAge time.Duration
|
|
MaxSize int64
|
|
}
|
|
|
|
// TopicInfo provides topic statistics
|
|
type TopicInfo struct {
|
|
Name string
|
|
Config TopicConfig
|
|
MessageCount int64
|
|
SubscriberCount int
|
|
LastActivity time.Time
|
|
SizeBytes int64
|
|
}
|
|
|
|
// HealthStatus represents system health
|
|
type HealthStatus struct {
|
|
Status string
|
|
Uptime time.Duration
|
|
LastCheck time.Time
|
|
Components map[string]ComponentHealth
|
|
Errors []HealthError
|
|
}
|
|
|
|
// ComponentHealth represents component-specific health
|
|
type ComponentHealth struct {
|
|
Status string
|
|
LastCheck time.Time
|
|
ResponseTime time.Duration
|
|
ErrorCount int64
|
|
}
|
|
|
|
// HealthError represents a health check error
|
|
type HealthError struct {
|
|
Component string
|
|
Message string
|
|
Timestamp time.Time
|
|
Severity string
|
|
}
|
|
|
|
// MessageBusMetrics provides operational metrics
|
|
type MessageBusMetrics struct {
|
|
MessagesPublished int64
|
|
MessagesConsumed int64
|
|
MessagesFailed int64
|
|
MessagesInDLQ int64
|
|
ActiveSubscriptions int
|
|
TopicCount int
|
|
AverageLatency time.Duration
|
|
ThroughputPerSec float64
|
|
ErrorRate float64
|
|
MemoryUsage int64
|
|
CPUUsage float64
|
|
}
|
|
|
|
// TransportType defines available transport mechanisms
|
|
type TransportType string
|
|
|
|
const (
|
|
TransportMemory TransportType = "memory"
|
|
TransportUnixSocket TransportType = "unix"
|
|
TransportTCP TransportType = "tcp"
|
|
TransportWebSocket TransportType = "websocket"
|
|
TransportRedis TransportType = "redis"
|
|
TransportNATS TransportType = "nats"
|
|
)
|
|
|
|
// TransportConfig configures transport layer
|
|
type TransportConfig struct {
|
|
Type TransportType
|
|
Address string
|
|
Options map[string]interface{}
|
|
RetryConfig RetryConfig
|
|
SecurityConfig SecurityConfig
|
|
}
|
|
|
|
// RetryConfig defines retry behavior
|
|
type RetryConfig struct {
|
|
MaxRetries int
|
|
InitialDelay time.Duration
|
|
MaxDelay time.Duration
|
|
BackoffFactor float64
|
|
Jitter bool
|
|
}
|
|
|
|
// SecurityConfig defines security settings
|
|
type SecurityConfig struct {
|
|
Enabled bool
|
|
TLSConfig *TLSConfig
|
|
AuthConfig *AuthConfig
|
|
Encryption bool
|
|
Compression bool
|
|
}
|
|
|
|
// TLSConfig for secure transport
|
|
type TLSConfig struct {
|
|
CertFile string
|
|
KeyFile string
|
|
CAFile string
|
|
Verify bool
|
|
}
|
|
|
|
// AuthConfig for authentication
|
|
type AuthConfig struct {
|
|
Username string
|
|
Password string
|
|
Token string
|
|
Method string
|
|
}
|
|
|
|
// UniversalMessageBus implements MessageBusInterface
|
|
type UniversalMessageBus struct {
|
|
config MessageBusConfig
|
|
transports map[TransportType]Transport
|
|
router *MessageRouter
|
|
topics map[string]*Topic
|
|
subscriptions map[string]*Subscription
|
|
dlq *DeadLetterQueue
|
|
metrics *MetricsCollector
|
|
persistence PersistenceLayer
|
|
mu sync.RWMutex
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
started bool
|
|
}
|
|
|
|
// MessageBusConfig configures the message bus
|
|
type MessageBusConfig struct {
|
|
DefaultTransport TransportType
|
|
EnablePersistence bool
|
|
EnableMetrics bool
|
|
EnableDLQ bool
|
|
MaxMessageSize int64
|
|
DefaultTTL time.Duration
|
|
HealthCheckInterval time.Duration
|
|
CleanupInterval time.Duration
|
|
}
|
|
|
|
// Transport interface for different transport mechanisms
|
|
type Transport interface {
|
|
Send(ctx context.Context, msg *Message) error
|
|
Receive(ctx context.Context) (<-chan *Message, error)
|
|
Connect(ctx context.Context) error
|
|
Disconnect(ctx context.Context) error
|
|
Health() ComponentHealth
|
|
GetMetrics() TransportMetrics
|
|
}
|
|
|
|
// TransportMetrics for transport-specific metrics
|
|
type TransportMetrics struct {
|
|
BytesSent int64
|
|
BytesReceived int64
|
|
MessagesSent int64
|
|
MessagesReceived int64
|
|
Connections int
|
|
Errors int64
|
|
Latency time.Duration
|
|
}
|
|
|
|
// Note: MessageRouter and RoutingRule are defined in router.go
|
|
|
|
// LoadBalancer for transport selection
|
|
type LoadBalancer interface {
|
|
SelectTransport(transports []TransportType, msg *Message) TransportType
|
|
UpdateStats(transport TransportType, latency time.Duration, success bool)
|
|
}
|
|
|
|
// Topic represents a message topic
|
|
type Topic struct {
|
|
Name string
|
|
Config TopicConfig
|
|
Messages []StoredMessage
|
|
Subscribers []*Subscription
|
|
Created time.Time
|
|
LastActivity time.Time
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// StoredMessage represents a persisted message
|
|
type StoredMessage struct {
|
|
Message *Message
|
|
Stored time.Time
|
|
Processed bool
|
|
}
|
|
|
|
// Note: DeadLetterQueue and DLQConfig are defined in dlq.go
|
|
|
|
// Note: MetricsCollector is defined in serialization.go
|
|
|
|
// PersistenceLayer handles message persistence
|
|
type PersistenceLayer interface {
|
|
Store(msg *Message) error
|
|
Retrieve(id string) (*Message, error)
|
|
Delete(id string) error
|
|
List(topic string, limit int) ([]*Message, error)
|
|
Cleanup(maxAge time.Duration) error
|
|
}
|
|
|
|
// Factory functions for common subscription options
|
|
func WithQueueSize(size int) SubscriptionOption {
|
|
return func(opts *SubscriptionOptions) {
|
|
opts.QueueSize = size
|
|
}
|
|
}
|
|
|
|
func WithBatchProcessing(size int, timeout time.Duration) SubscriptionOption {
|
|
return func(opts *SubscriptionOptions) {
|
|
opts.BatchSize = size
|
|
opts.BatchTimeout = timeout
|
|
}
|
|
}
|
|
|
|
func WithDLQ(enabled bool) SubscriptionOption {
|
|
return func(opts *SubscriptionOptions) {
|
|
opts.DLQEnabled = enabled
|
|
}
|
|
}
|
|
|
|
func WithRetry(enabled bool) SubscriptionOption {
|
|
return func(opts *SubscriptionOptions) {
|
|
opts.RetryEnabled = enabled
|
|
}
|
|
}
|
|
|
|
func WithPersistence(enabled bool) SubscriptionOption {
|
|
return func(opts *SubscriptionOptions) {
|
|
opts.Persistent = enabled
|
|
}
|
|
}
|
|
|
|
// NewUniversalMessageBus creates a new message bus instance
|
|
func NewUniversalMessageBus(config MessageBusConfig) *UniversalMessageBus {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &UniversalMessageBus{
|
|
config: config,
|
|
transports: make(map[TransportType]Transport),
|
|
topics: make(map[string]*Topic),
|
|
subscriptions: make(map[string]*Subscription),
|
|
router: NewMessageRouter(),
|
|
dlq: NewDeadLetterQueue(DLQConfig{}),
|
|
metrics: NewMetricsCollector(),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
// NewMessageRouter creates a new message router
|
|
func NewMessageRouter() *MessageRouter {
|
|
return &MessageRouter{
|
|
rules: make([]RoutingRule, 0),
|
|
fallback: TransportMemory,
|
|
}
|
|
}
|
|
|
|
// Note: NewDeadLetterQueue is defined in dlq.go
|
|
// Note: NewMetricsCollector is defined in serialization.go
|
|
|
|
// Helper function to generate message ID
|
|
func GenerateMessageID() string {
|
|
return fmt.Sprintf("msg_%d_%d", time.Now().UnixNano(), time.Now().Nanosecond())
|
|
}
|
|
|
|
// Helper function to create message with defaults
|
|
func NewMessage(msgType MessageType, topic string, source string, data interface{}) *Message {
|
|
return &Message{
|
|
ID: GenerateMessageID(),
|
|
Type: msgType,
|
|
Topic: topic,
|
|
Source: source,
|
|
Priority: PriorityNormal,
|
|
Timestamp: time.Now(),
|
|
Data: data,
|
|
Headers: make(map[string]string),
|
|
Metadata: make(map[string]interface{}),
|
|
Retries: 0,
|
|
MaxRetries: 3,
|
|
}
|
|
}
|