package transport import ( "fmt" "math/rand" "sort" "sync" "time" ) // MessageRouter handles intelligent message routing and transport selection type MessageRouter struct { rules []RoutingRule fallback TransportType loadBalancer LoadBalancer mu sync.RWMutex } // RoutingRule defines message routing logic type RoutingRule struct { ID string Name string Condition MessageFilter Transport TransportType Priority int Enabled bool Created time.Time LastUsed time.Time UsageCount int64 } // RouteMessage selects the appropriate transport for a message func (mr *MessageRouter) RouteMessage(msg *Message, transports map[TransportType]Transport) (Transport, error) { mr.mu.RLock() defer mr.mu.RUnlock() // Find matching rules (sorted by priority) matchingRules := mr.findMatchingRules(msg) // Try each matching rule in priority order for _, rule := range matchingRules { if transport, exists := transports[rule.Transport]; exists { // Check transport health if health := transport.Health(); health.Status == "healthy" { mr.updateRuleUsage(rule.ID) return transport, nil } } } // Use load balancer for available transports if mr.loadBalancer != nil { availableTransports := mr.getHealthyTransports(transports) if len(availableTransports) > 0 { selectedType := mr.loadBalancer.SelectTransport(availableTransports, msg) if transport, exists := transports[selectedType]; exists { return transport, nil } } } // Fall back to default transport if fallbackTransport, exists := transports[mr.fallback]; exists { if health := fallbackTransport.Health(); health.Status != "unhealthy" { return fallbackTransport, nil } } return nil, fmt.Errorf("no available transport for message") } // AddRule adds a new routing rule func (mr *MessageRouter) AddRule(rule RoutingRule) { mr.mu.Lock() defer mr.mu.Unlock() if rule.ID == "" { rule.ID = fmt.Sprintf("rule_%d", time.Now().UnixNano()) } rule.Created = time.Now() rule.Enabled = true mr.rules = append(mr.rules, rule) mr.sortRulesByPriority() } // RemoveRule removes a routing rule by ID func (mr *MessageRouter) RemoveRule(ruleID string) bool { mr.mu.Lock() defer mr.mu.Unlock() for i, rule := range mr.rules { if rule.ID == ruleID { mr.rules = append(mr.rules[:i], mr.rules[i+1:]...) return true } } return false } // UpdateRule updates an existing routing rule func (mr *MessageRouter) UpdateRule(ruleID string, updates func(*RoutingRule)) bool { mr.mu.Lock() defer mr.mu.Unlock() for i := range mr.rules { if mr.rules[i].ID == ruleID { updates(&mr.rules[i]) mr.sortRulesByPriority() return true } } return false } // GetRules returns all routing rules func (mr *MessageRouter) GetRules() []RoutingRule { mr.mu.RLock() defer mr.mu.RUnlock() rules := make([]RoutingRule, len(mr.rules)) copy(rules, mr.rules) return rules } // EnableRule enables a routing rule func (mr *MessageRouter) EnableRule(ruleID string) bool { return mr.UpdateRule(ruleID, func(rule *RoutingRule) { rule.Enabled = true }) } // DisableRule disables a routing rule func (mr *MessageRouter) DisableRule(ruleID string) bool { return mr.UpdateRule(ruleID, func(rule *RoutingRule) { rule.Enabled = false }) } // SetFallbackTransport sets the fallback transport type func (mr *MessageRouter) SetFallbackTransport(transportType TransportType) { mr.mu.Lock() defer mr.mu.Unlock() mr.fallback = transportType } // SetLoadBalancer sets the load balancer func (mr *MessageRouter) SetLoadBalancer(lb LoadBalancer) { mr.mu.Lock() defer mr.mu.Unlock() mr.loadBalancer = lb } // Private helper methods func (mr *MessageRouter) findMatchingRules(msg *Message) []RoutingRule { var matching []RoutingRule for _, rule := range mr.rules { if rule.Enabled && (rule.Condition == nil || rule.Condition(msg)) { matching = append(matching, rule) } } return matching } func (mr *MessageRouter) sortRulesByPriority() { sort.Slice(mr.rules, func(i, j int) bool { return mr.rules[i].Priority > mr.rules[j].Priority }) } func (mr *MessageRouter) updateRuleUsage(ruleID string) { for i := range mr.rules { if mr.rules[i].ID == ruleID { mr.rules[i].LastUsed = time.Now() mr.rules[i].UsageCount++ break } } } func (mr *MessageRouter) getHealthyTransports(transports map[TransportType]Transport) []TransportType { var healthy []TransportType for transportType, transport := range transports { if health := transport.Health(); health.Status == "healthy" { healthy = append(healthy, transportType) } } return healthy } // LoadBalancer implementations // RoundRobinLoadBalancer implements round-robin load balancing type RoundRobinLoadBalancer struct { counter int64 mu sync.Mutex } func NewRoundRobinLoadBalancer() *RoundRobinLoadBalancer { return &RoundRobinLoadBalancer{} } func (lb *RoundRobinLoadBalancer) SelectTransport(transports []TransportType, msg *Message) TransportType { if len(transports) == 0 { return "" } lb.mu.Lock() defer lb.mu.Unlock() selected := transports[lb.counter%int64(len(transports))] lb.counter++ return selected } func (lb *RoundRobinLoadBalancer) UpdateStats(transport TransportType, latency time.Duration, success bool) { // Round-robin doesn't use stats } // WeightedLoadBalancer implements weighted load balancing based on performance type WeightedLoadBalancer struct { stats map[TransportType]*TransportStats mu sync.RWMutex } type TransportStats struct { TotalRequests int64 SuccessRequests int64 TotalLatency time.Duration LastUpdate time.Time Weight float64 } func NewWeightedLoadBalancer() *WeightedLoadBalancer { return &WeightedLoadBalancer{ stats: make(map[TransportType]*TransportStats), } } func (lb *WeightedLoadBalancer) SelectTransport(transports []TransportType, msg *Message) TransportType { if len(transports) == 0 { return "" } lb.mu.RLock() defer lb.mu.RUnlock() // Calculate weights and select based on weighted random selection totalWeight := 0.0 weights := make(map[TransportType]float64) for _, transport := range transports { weight := lb.calculateWeight(transport) weights[transport] = weight totalWeight += weight } if totalWeight == 0 { // Fall back to random selection return transports[rand.Intn(len(transports))] } // Weighted random selection target := rand.Float64() * totalWeight current := 0.0 for _, transport := range transports { current += weights[transport] if current >= target { return transport } } // Fallback (shouldn't happen) return transports[0] } func (lb *WeightedLoadBalancer) UpdateStats(transport TransportType, latency time.Duration, success bool) { lb.mu.Lock() defer lb.mu.Unlock() stats, exists := lb.stats[transport] if !exists { stats = &TransportStats{ Weight: 1.0, // Default weight } lb.stats[transport] = stats } stats.TotalRequests++ stats.TotalLatency += latency stats.LastUpdate = time.Now() if success { stats.SuccessRequests++ } // Recalculate weight based on performance stats.Weight = lb.calculateWeight(transport) } func (lb *WeightedLoadBalancer) calculateWeight(transport TransportType) float64 { stats, exists := lb.stats[transport] if !exists { return 1.0 // Default weight for unknown transports } if stats.TotalRequests == 0 { return 1.0 } // Calculate success rate successRate := float64(stats.SuccessRequests) / float64(stats.TotalRequests) // Calculate average latency avgLatency := stats.TotalLatency / time.Duration(stats.TotalRequests) // Weight formula: success rate / (latency factor) // Lower latency and higher success rate = higher weight latencyFactor := float64(avgLatency) / float64(time.Millisecond) if latencyFactor < 1 { latencyFactor = 1 } weight := successRate / latencyFactor // Ensure minimum weight if weight < 0.1 { weight = 0.1 } return weight } // LeastLatencyLoadBalancer selects the transport with the lowest latency type LeastLatencyLoadBalancer struct { stats map[TransportType]*LatencyStats mu sync.RWMutex } type LatencyStats struct { RecentLatencies []time.Duration MaxSamples int LastUpdate time.Time } func NewLeastLatencyLoadBalancer() *LeastLatencyLoadBalancer { return &LeastLatencyLoadBalancer{ stats: make(map[TransportType]*LatencyStats), } } func (lb *LeastLatencyLoadBalancer) SelectTransport(transports []TransportType, msg *Message) TransportType { if len(transports) == 0 { return "" } lb.mu.RLock() defer lb.mu.RUnlock() bestTransport := transports[0] bestLatency := time.Hour // Large initial value for _, transport := range transports { avgLatency := lb.getAverageLatency(transport) if avgLatency < bestLatency { bestLatency = avgLatency bestTransport = transport } } return bestTransport } func (lb *LeastLatencyLoadBalancer) UpdateStats(transport TransportType, latency time.Duration, success bool) { if !success { return // Only track successful requests } lb.mu.Lock() defer lb.mu.Unlock() stats, exists := lb.stats[transport] if !exists { stats = &LatencyStats{ RecentLatencies: make([]time.Duration, 0), MaxSamples: 10, // Keep last 10 samples } lb.stats[transport] = stats } // Add new latency sample stats.RecentLatencies = append(stats.RecentLatencies, latency) // Keep only recent samples if len(stats.RecentLatencies) > stats.MaxSamples { stats.RecentLatencies = stats.RecentLatencies[1:] } stats.LastUpdate = time.Now() } func (lb *LeastLatencyLoadBalancer) getAverageLatency(transport TransportType) time.Duration { stats, exists := lb.stats[transport] if !exists || len(stats.RecentLatencies) == 0 { return time.Millisecond * 100 // Default estimate } total := time.Duration(0) for _, latency := range stats.RecentLatencies { total += latency } return total / time.Duration(len(stats.RecentLatencies)) } // Common routing rule factory functions // CreateTopicRule creates a rule based on message topic func CreateTopicRule(name string, topic string, transport TransportType, priority int) RoutingRule { return RoutingRule{ Name: name, Condition: func(msg *Message) bool { return msg.Topic == topic }, Transport: transport, Priority: priority, } } // CreateTopicPatternRule creates a rule based on topic pattern matching func CreateTopicPatternRule(name string, pattern string, transport TransportType, priority int) RoutingRule { return RoutingRule{ Name: name, Condition: func(msg *Message) bool { // Simple pattern matching (can be enhanced with regex) return msg.Topic == pattern || (len(pattern) > 0 && pattern[len(pattern)-1] == '*' && len(msg.Topic) >= len(pattern)-1 && msg.Topic[:len(pattern)-1] == pattern[:len(pattern)-1]) }, Transport: transport, Priority: priority, } } // CreatePriorityRule creates a rule based on message priority func CreatePriorityRule(name string, msgPriority MessagePriority, transport TransportType, priority int) RoutingRule { return RoutingRule{ Name: name, Condition: func(msg *Message) bool { return msg.Priority == msgPriority }, Transport: transport, Priority: priority, } } // CreateTypeRule creates a rule based on message type func CreateTypeRule(name string, msgType MessageType, transport TransportType, priority int) RoutingRule { return RoutingRule{ Name: name, Condition: func(msg *Message) bool { return msg.Type == msgType }, Transport: transport, Priority: priority, } } // CreateSourceRule creates a rule based on message source func CreateSourceRule(name string, source string, transport TransportType, priority int) RoutingRule { return RoutingRule{ Name: name, Condition: func(msg *Message) bool { return msg.Source == source }, Transport: transport, Priority: priority, } }