package transport import ( "context" "fmt" "time" ) // Publish sends a message to the specified topic func (mb *UniversalMessageBus) Publish(ctx context.Context, msg *Message) error { if !mb.started { return fmt.Errorf("message bus not started") } // Validate message if err := mb.validateMessage(msg); err != nil { return fmt.Errorf("invalid message: %w", err) } // Set timestamp if not set if msg.Timestamp.IsZero() { msg.Timestamp = time.Now() } // Set ID if not set if msg.ID == "" { msg.ID = GenerateMessageID() } // Update metrics mb.metrics.IncrementCounter("messages_published_total") mb.metrics.RecordLatency("publish_latency", time.Since(msg.Timestamp)) // Route message to appropriate transport transport, err := mb.router.RouteMessage(msg, mb.transports) if err != nil { mb.metrics.IncrementCounter("routing_errors_total") return fmt.Errorf("routing failed: %w", err) } // Send via transport if err := transport.Send(ctx, msg); err != nil { mb.metrics.IncrementCounter("send_errors_total") // Try dead letter queue if enabled if mb.config.EnableDLQ { if dlqErr := mb.dlq.AddMessage(msg.Topic, msg); dlqErr != nil { return fmt.Errorf("send failed and DLQ failed: %v, original error: %w", dlqErr, err) } } return fmt.Errorf("send failed: %w", err) } // Store in topic if persistence enabled if mb.config.EnablePersistence { if err := mb.addMessageToTopic(msg); err != nil { // Log error but don't fail the publish mb.metrics.IncrementCounter("persistence_errors_total") } } // Deliver to local subscribers go mb.deliverToSubscribers(ctx, msg) return nil } // Subscribe creates a subscription to a topic func (mb *UniversalMessageBus) Subscribe(topic string, handler MessageHandler, opts ...SubscriptionOption) (*Subscription, error) { if !mb.started { return nil, fmt.Errorf("message bus not started") } // Apply subscription options options := SubscriptionOptions{ QueueSize: 1000, BatchSize: 1, BatchTimeout: time.Second, DLQEnabled: mb.config.EnableDLQ, RetryEnabled: true, Persistent: false, Durable: false, } for _, opt := range opts { opt(&options) } // Create subscription subscription := &Subscription{ ID: fmt.Sprintf("sub_%s_%d", topic, time.Now().UnixNano()), Topic: topic, Handler: handler, Options: options, created: time.Now(), active: true, } mb.mu.Lock() mb.subscriptions[subscription.ID] = subscription mb.mu.Unlock() // Add to topic subscribers mb.addSubscriberToTopic(topic, subscription) mb.metrics.IncrementCounter("subscriptions_created_total") return subscription, nil } // Unsubscribe removes a subscription func (mb *UniversalMessageBus) Unsubscribe(subscriptionID string) error { mb.mu.Lock() defer mb.mu.Unlock() subscription, exists := mb.subscriptions[subscriptionID] if !exists { return fmt.Errorf("subscription not found: %s", subscriptionID) } // Mark as inactive subscription.mu.Lock() subscription.active = false subscription.mu.Unlock() // Remove from subscriptions map delete(mb.subscriptions, subscriptionID) // Remove from topic subscribers mb.removeSubscriberFromTopic(subscription.Topic, subscriptionID) mb.metrics.IncrementCounter("subscriptions_removed_total") return nil } // Request sends a request and waits for a response func (mb *UniversalMessageBus) Request(ctx context.Context, msg *Message, timeout time.Duration) (*Message, error) { if !mb.started { return nil, fmt.Errorf("message bus not started") } // Set correlation ID for request-response if msg.CorrelationID == "" { msg.CorrelationID = GenerateMessageID() } // Create response channel responseChannel := make(chan *Message, 1) defer close(responseChannel) // Subscribe to response topic responseTopic := fmt.Sprintf("response.%s", msg.CorrelationID) subscription, err := mb.Subscribe(responseTopic, func(ctx context.Context, response *Message) error { select { case responseChannel <- response: default: // Channel full, ignore } return nil }) if err != nil { return nil, fmt.Errorf("failed to subscribe to response topic: %w", err) } defer mb.Unsubscribe(subscription.ID) // Send request if err := mb.Publish(ctx, msg); err != nil { return nil, fmt.Errorf("failed to publish request: %w", err) } // Wait for response with timeout select { case response := <-responseChannel: return response, nil case <-time.After(timeout): return nil, fmt.Errorf("request timeout after %v", timeout) case <-ctx.Done(): return nil, ctx.Err() } } // Reply sends a response to a request func (mb *UniversalMessageBus) Reply(ctx context.Context, originalMsg *Message, response *Message) error { if originalMsg.CorrelationID == "" { return fmt.Errorf("original message has no correlation ID") } // Set response properties response.Type = MessageTypeResponse response.Topic = fmt.Sprintf("response.%s", originalMsg.CorrelationID) response.CorrelationID = originalMsg.CorrelationID response.Target = originalMsg.Source return mb.Publish(ctx, response) } // CreateTopic creates a new topic with configuration func (mb *UniversalMessageBus) CreateTopic(topicName string, config TopicConfig) error { mb.mu.Lock() defer mb.mu.Unlock() if _, exists := mb.topics[topicName]; exists { return fmt.Errorf("topic already exists: %s", topicName) } topic := &Topic{ Name: topicName, Config: config, Messages: make([]StoredMessage, 0), Subscribers: make([]*Subscription, 0), Created: time.Now(), LastActivity: time.Now(), } mb.topics[topicName] = topic mb.metrics.IncrementCounter("topics_created_total") return nil } // DeleteTopic removes a topic func (mb *UniversalMessageBus) DeleteTopic(topicName string) error { mb.mu.Lock() defer mb.mu.Unlock() topic, exists := mb.topics[topicName] if !exists { return fmt.Errorf("topic not found: %s", topicName) } // Remove all subscribers for _, sub := range topic.Subscribers { mb.Unsubscribe(sub.ID) } delete(mb.topics, topicName) mb.metrics.IncrementCounter("topics_deleted_total") return nil } // ListTopics returns all topic names func (mb *UniversalMessageBus) ListTopics() []string { mb.mu.RLock() defer mb.mu.RUnlock() topics := make([]string, 0, len(mb.topics)) for name := range mb.topics { topics = append(topics, name) } return topics } // GetTopicInfo returns topic information func (mb *UniversalMessageBus) GetTopicInfo(topicName string) (*TopicInfo, error) { mb.mu.RLock() defer mb.mu.RUnlock() topic, exists := mb.topics[topicName] if !exists { return nil, fmt.Errorf("topic not found: %s", topicName) } topic.mu.RLock() defer topic.mu.RUnlock() // Calculate size var sizeBytes int64 for _, stored := range topic.Messages { // Rough estimation of message size sizeBytes += int64(len(fmt.Sprintf("%+v", stored.Message))) } return &TopicInfo{ Name: topic.Name, Config: topic.Config, MessageCount: int64(len(topic.Messages)), SubscriberCount: len(topic.Subscribers), LastActivity: topic.LastActivity, SizeBytes: sizeBytes, }, nil } // QueueMessage adds a message to a topic queue func (mb *UniversalMessageBus) QueueMessage(topic string, msg *Message) error { return mb.addMessageToTopic(msg) } // DequeueMessage removes a message from a topic queue func (mb *UniversalMessageBus) DequeueMessage(topic string, timeout time.Duration) (*Message, error) { start := time.Now() for time.Since(start) < timeout { mb.mu.RLock() topicObj, exists := mb.topics[topic] mb.mu.RUnlock() if !exists { return nil, fmt.Errorf("topic not found: %s", topic) } topicObj.mu.Lock() if len(topicObj.Messages) > 0 { // Get first unprocessed message for i, stored := range topicObj.Messages { if !stored.Processed { topicObj.Messages[i].Processed = true topicObj.mu.Unlock() return stored.Message, nil } } } topicObj.mu.Unlock() // Wait a bit before trying again time.Sleep(10 * time.Millisecond) } return nil, fmt.Errorf("no message available within timeout") } // PeekMessage returns the next message without removing it func (mb *UniversalMessageBus) PeekMessage(topic string) (*Message, error) { mb.mu.RLock() topicObj, exists := mb.topics[topic] mb.mu.RUnlock() if !exists { return nil, fmt.Errorf("topic not found: %s", topic) } topicObj.mu.RLock() defer topicObj.mu.RUnlock() for _, stored := range topicObj.Messages { if !stored.Processed { return stored.Message, nil } } return nil, fmt.Errorf("no messages available") } // Start initializes and starts the message bus func (mb *UniversalMessageBus) Start(ctx context.Context) error { mb.mu.Lock() defer mb.mu.Unlock() if mb.started { return fmt.Errorf("message bus already started") } // Initialize default transport if none configured if len(mb.transports) == 0 { memTransport := NewMemoryTransport() mb.transports[TransportMemory] = memTransport if err := memTransport.Connect(ctx); err != nil { return fmt.Errorf("failed to connect default transport: %w", err) } } // Start background routines go mb.healthCheckLoop() go mb.cleanupLoop() go mb.metricsLoop() mb.started = true mb.metrics.RecordEvent("message_bus_started") return nil } // Stop gracefully shuts down the message bus func (mb *UniversalMessageBus) Stop(ctx context.Context) error { mb.mu.Lock() defer mb.mu.Unlock() if !mb.started { return nil } // Cancel context to stop background routines mb.cancel() // Disconnect all transports for _, transport := range mb.transports { if err := transport.Disconnect(ctx); err != nil { // Log error but continue shutdown } } mb.started = false mb.metrics.RecordEvent("message_bus_stopped") return nil } // Health returns the current health status func (mb *UniversalMessageBus) Health() HealthStatus { components := make(map[string]ComponentHealth) // Check transport health for transportType, transport := range mb.transports { components[string(transportType)] = transport.Health() } // Overall status status := "healthy" var errors []HealthError for name, component := range components { if component.Status != "healthy" { status = "degraded" errors = append(errors, HealthError{ Component: name, Message: fmt.Sprintf("Component %s is %s", name, component.Status), Timestamp: time.Now(), Severity: "warning", }) } } return HealthStatus{ Status: status, Uptime: time.Since(time.Now()), // Would track actual uptime LastCheck: time.Now(), Components: components, Errors: errors, } } // GetMetrics returns current operational metrics func (mb *UniversalMessageBus) GetMetrics() MessageBusMetrics { _ = mb.metrics.GetAll() // metrics not used return MessageBusMetrics{ MessagesPublished: mb.getMetricInt64("messages_published_total"), MessagesConsumed: mb.getMetricInt64("messages_consumed_total"), MessagesFailed: mb.getMetricInt64("send_errors_total"), MessagesInDLQ: int64(mb.dlq.GetMessageCount()), ActiveSubscriptions: len(mb.subscriptions), TopicCount: len(mb.topics), AverageLatency: mb.getMetricDuration("average_latency"), ThroughputPerSec: mb.getMetricFloat64("throughput_per_second"), ErrorRate: mb.getMetricFloat64("error_rate"), MemoryUsage: mb.getMetricInt64("memory_usage_bytes"), CPUUsage: mb.getMetricFloat64("cpu_usage_percent"), } } // GetSubscriptions returns all active subscriptions func (mb *UniversalMessageBus) GetSubscriptions() []*Subscription { mb.mu.RLock() defer mb.mu.RUnlock() subscriptions := make([]*Subscription, 0, len(mb.subscriptions)) for _, sub := range mb.subscriptions { subscriptions = append(subscriptions, sub) } return subscriptions } // GetActiveConnections returns the number of active connections func (mb *UniversalMessageBus) GetActiveConnections() int { count := 0 for _, transport := range mb.transports { metrics := transport.GetMetrics() count += metrics.Connections } return count } // Helper methods func (mb *UniversalMessageBus) validateMessage(msg *Message) error { if msg == nil { return fmt.Errorf("message is nil") } if msg.Topic == "" { return fmt.Errorf("message topic is empty") } if msg.Source == "" { return fmt.Errorf("message source is empty") } if msg.Data == nil { return fmt.Errorf("message data is nil") } return nil } func (mb *UniversalMessageBus) addMessageToTopic(msg *Message) error { mb.mu.RLock() topic, exists := mb.topics[msg.Topic] mb.mu.RUnlock() if !exists { // Create topic automatically config := TopicConfig{ Persistent: true, RetentionPolicy: RetentionPolicy{MaxMessages: 10000, MaxAge: 24 * time.Hour}, } if err := mb.CreateTopic(msg.Topic, config); err != nil { return err } topic = mb.topics[msg.Topic] } topic.mu.Lock() defer topic.mu.Unlock() stored := StoredMessage{ Message: msg, Stored: time.Now(), Processed: false, } topic.Messages = append(topic.Messages, stored) topic.LastActivity = time.Now() // Apply retention policy mb.applyRetentionPolicy(topic) return nil } func (mb *UniversalMessageBus) addSubscriberToTopic(topicName string, subscription *Subscription) { mb.mu.RLock() topic, exists := mb.topics[topicName] mb.mu.RUnlock() if !exists { // Create topic automatically config := TopicConfig{Persistent: false} mb.CreateTopic(topicName, config) topic = mb.topics[topicName] } topic.mu.Lock() topic.Subscribers = append(topic.Subscribers, subscription) topic.mu.Unlock() } func (mb *UniversalMessageBus) removeSubscriberFromTopic(topicName, subscriptionID string) { mb.mu.RLock() topic, exists := mb.topics[topicName] mb.mu.RUnlock() if !exists { return } topic.mu.Lock() defer topic.mu.Unlock() for i, sub := range topic.Subscribers { if sub.ID == subscriptionID { topic.Subscribers = append(topic.Subscribers[:i], topic.Subscribers[i+1:]...) break } } } func (mb *UniversalMessageBus) deliverToSubscribers(ctx context.Context, msg *Message) { mb.mu.RLock() topic, exists := mb.topics[msg.Topic] mb.mu.RUnlock() if !exists { return } topic.mu.RLock() subscribers := make([]*Subscription, len(topic.Subscribers)) copy(subscribers, topic.Subscribers) topic.mu.RUnlock() for _, sub := range subscribers { sub.mu.RLock() if !sub.active { sub.mu.RUnlock() continue } // Apply filter if present if sub.Filter != nil && !sub.Filter(msg) { sub.mu.RUnlock() continue } handler := sub.Handler sub.mu.RUnlock() // Deliver message in goroutine go func(subscription *Subscription, message *Message) { defer func() { if r := recover(); r != nil { mb.metrics.IncrementCounter("handler_panics_total") } }() if err := handler(ctx, message); err != nil { mb.metrics.IncrementCounter("handler_errors_total") if mb.config.EnableDLQ { mb.dlq.AddMessage(message.Topic, message) } } else { mb.metrics.IncrementCounter("messages_consumed_total") } }(sub, msg) } } func (mb *UniversalMessageBus) applyRetentionPolicy(topic *Topic) { policy := topic.Config.RetentionPolicy // Remove old messages if policy.MaxAge > 0 { cutoff := time.Now().Add(-policy.MaxAge) filtered := make([]StoredMessage, 0) for _, stored := range topic.Messages { if stored.Stored.After(cutoff) { filtered = append(filtered, stored) } } topic.Messages = filtered } // Limit number of messages if policy.MaxMessages > 0 && len(topic.Messages) > policy.MaxMessages { topic.Messages = topic.Messages[len(topic.Messages)-policy.MaxMessages:] } } func (mb *UniversalMessageBus) healthCheckLoop() { ticker := time.NewTicker(mb.config.HealthCheckInterval) defer ticker.Stop() for { select { case <-ticker.C: mb.performHealthCheck() case <-mb.ctx.Done(): return } } } func (mb *UniversalMessageBus) cleanupLoop() { ticker := time.NewTicker(mb.config.CleanupInterval) defer ticker.Stop() for { select { case <-ticker.C: mb.performCleanup() case <-mb.ctx.Done(): return } } } func (mb *UniversalMessageBus) metricsLoop() { ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { select { case <-ticker.C: mb.updateMetrics() case <-mb.ctx.Done(): return } } } func (mb *UniversalMessageBus) performHealthCheck() { // Check all transports for transportType, transport := range mb.transports { health := transport.Health() mb.metrics.RecordGauge(fmt.Sprintf("transport_%s_healthy", transportType), map[string]float64{"healthy": 1, "unhealthy": 0, "degraded": 0.5}[health.Status]) } } func (mb *UniversalMessageBus) performCleanup() { // Clean up processed messages in topics mb.mu.RLock() topics := make([]*Topic, 0, len(mb.topics)) for _, topic := range mb.topics { topics = append(topics, topic) } mb.mu.RUnlock() for _, topic := range topics { topic.mu.Lock() mb.applyRetentionPolicy(topic) topic.mu.Unlock() } // Clean up DLQ mb.dlq.Cleanup(time.Hour * 24) // Clean messages older than 24 hours } func (mb *UniversalMessageBus) updateMetrics() { // Update throughput metrics publishedCount := mb.getMetricInt64("messages_published_total") if publishedCount > 0 { // Calculate per-second rate (simplified) mb.metrics.RecordGauge("throughput_per_second", float64(publishedCount)/60.0) } // Update error rate errorCount := mb.getMetricInt64("send_errors_total") totalCount := publishedCount if totalCount > 0 { errorRate := float64(errorCount) / float64(totalCount) mb.metrics.RecordGauge("error_rate", errorRate) } } func (mb *UniversalMessageBus) getMetricInt64(key string) int64 { if val, ok := mb.metrics.Get(key).(int64); ok { return val } return 0 } func (mb *UniversalMessageBus) getMetricFloat64(key string) float64 { if val, ok := mb.metrics.Get(key).(float64); ok { return val } return 0 } func (mb *UniversalMessageBus) getMetricDuration(key string) time.Duration { if val, ok := mb.metrics.Get(key).(time.Duration); ok { return val } return 0 }