Files
mev-beta/pkg/transport/router.go
Krypto Kajun c0ec08468c feat(transport): implement comprehensive universal message bus
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-19 16:39:14 -05:00

479 lines
12 KiB
Go

package transport
import (
"fmt"
"math/rand"
"sort"
"sync"
"time"
)
// MessageRouter handles intelligent message routing and transport selection
type MessageRouter struct {
rules []RoutingRule
fallback TransportType
loadBalancer LoadBalancer
mu sync.RWMutex
}
// RoutingRule defines message routing logic
type RoutingRule struct {
ID string
Name string
Condition MessageFilter
Transport TransportType
Priority int
Enabled bool
Created time.Time
LastUsed time.Time
UsageCount int64
}
// RouteMessage selects the appropriate transport for a message
func (mr *MessageRouter) RouteMessage(msg *Message, transports map[TransportType]Transport) (Transport, error) {
mr.mu.RLock()
defer mr.mu.RUnlock()
// Find matching rules (sorted by priority)
matchingRules := mr.findMatchingRules(msg)
// Try each matching rule in priority order
for _, rule := range matchingRules {
if transport, exists := transports[rule.Transport]; exists {
// Check transport health
if health := transport.Health(); health.Status == "healthy" {
mr.updateRuleUsage(rule.ID)
return transport, nil
}
}
}
// Use load balancer for available transports
if mr.loadBalancer != nil {
availableTransports := mr.getHealthyTransports(transports)
if len(availableTransports) > 0 {
selectedType := mr.loadBalancer.SelectTransport(availableTransports, msg)
if transport, exists := transports[selectedType]; exists {
return transport, nil
}
}
}
// Fall back to default transport
if fallbackTransport, exists := transports[mr.fallback]; exists {
if health := fallbackTransport.Health(); health.Status != "unhealthy" {
return fallbackTransport, nil
}
}
return nil, fmt.Errorf("no available transport for message")
}
// AddRule adds a new routing rule
func (mr *MessageRouter) AddRule(rule RoutingRule) {
mr.mu.Lock()
defer mr.mu.Unlock()
if rule.ID == "" {
rule.ID = fmt.Sprintf("rule_%d", time.Now().UnixNano())
}
rule.Created = time.Now()
rule.Enabled = true
mr.rules = append(mr.rules, rule)
mr.sortRulesByPriority()
}
// RemoveRule removes a routing rule by ID
func (mr *MessageRouter) RemoveRule(ruleID string) bool {
mr.mu.Lock()
defer mr.mu.Unlock()
for i, rule := range mr.rules {
if rule.ID == ruleID {
mr.rules = append(mr.rules[:i], mr.rules[i+1:]...)
return true
}
}
return false
}
// UpdateRule updates an existing routing rule
func (mr *MessageRouter) UpdateRule(ruleID string, updates func(*RoutingRule)) bool {
mr.mu.Lock()
defer mr.mu.Unlock()
for i := range mr.rules {
if mr.rules[i].ID == ruleID {
updates(&mr.rules[i])
mr.sortRulesByPriority()
return true
}
}
return false
}
// GetRules returns all routing rules
func (mr *MessageRouter) GetRules() []RoutingRule {
mr.mu.RLock()
defer mr.mu.RUnlock()
rules := make([]RoutingRule, len(mr.rules))
copy(rules, mr.rules)
return rules
}
// EnableRule enables a routing rule
func (mr *MessageRouter) EnableRule(ruleID string) bool {
return mr.UpdateRule(ruleID, func(rule *RoutingRule) {
rule.Enabled = true
})
}
// DisableRule disables a routing rule
func (mr *MessageRouter) DisableRule(ruleID string) bool {
return mr.UpdateRule(ruleID, func(rule *RoutingRule) {
rule.Enabled = false
})
}
// SetFallbackTransport sets the fallback transport type
func (mr *MessageRouter) SetFallbackTransport(transportType TransportType) {
mr.mu.Lock()
defer mr.mu.Unlock()
mr.fallback = transportType
}
// SetLoadBalancer sets the load balancer
func (mr *MessageRouter) SetLoadBalancer(lb LoadBalancer) {
mr.mu.Lock()
defer mr.mu.Unlock()
mr.loadBalancer = lb
}
// Private helper methods
func (mr *MessageRouter) findMatchingRules(msg *Message) []RoutingRule {
var matching []RoutingRule
for _, rule := range mr.rules {
if rule.Enabled && (rule.Condition == nil || rule.Condition(msg)) {
matching = append(matching, rule)
}
}
return matching
}
func (mr *MessageRouter) sortRulesByPriority() {
sort.Slice(mr.rules, func(i, j int) bool {
return mr.rules[i].Priority > mr.rules[j].Priority
})
}
func (mr *MessageRouter) updateRuleUsage(ruleID string) {
for i := range mr.rules {
if mr.rules[i].ID == ruleID {
mr.rules[i].LastUsed = time.Now()
mr.rules[i].UsageCount++
break
}
}
}
func (mr *MessageRouter) getHealthyTransports(transports map[TransportType]Transport) []TransportType {
var healthy []TransportType
for transportType, transport := range transports {
if health := transport.Health(); health.Status == "healthy" {
healthy = append(healthy, transportType)
}
}
return healthy
}
// LoadBalancer implementations
// RoundRobinLoadBalancer implements round-robin load balancing
type RoundRobinLoadBalancer struct {
counter int64
mu sync.Mutex
}
func NewRoundRobinLoadBalancer() *RoundRobinLoadBalancer {
return &RoundRobinLoadBalancer{}
}
func (lb *RoundRobinLoadBalancer) SelectTransport(transports []TransportType, msg *Message) TransportType {
if len(transports) == 0 {
return ""
}
lb.mu.Lock()
defer lb.mu.Unlock()
selected := transports[lb.counter%int64(len(transports))]
lb.counter++
return selected
}
func (lb *RoundRobinLoadBalancer) UpdateStats(transport TransportType, latency time.Duration, success bool) {
// Round-robin doesn't use stats
}
// WeightedLoadBalancer implements weighted load balancing based on performance
type WeightedLoadBalancer struct {
stats map[TransportType]*TransportStats
mu sync.RWMutex
}
type TransportStats struct {
TotalRequests int64
SuccessRequests int64
TotalLatency time.Duration
LastUpdate time.Time
Weight float64
}
func NewWeightedLoadBalancer() *WeightedLoadBalancer {
return &WeightedLoadBalancer{
stats: make(map[TransportType]*TransportStats),
}
}
func (lb *WeightedLoadBalancer) SelectTransport(transports []TransportType, msg *Message) TransportType {
if len(transports) == 0 {
return ""
}
lb.mu.RLock()
defer lb.mu.RUnlock()
// Calculate weights and select based on weighted random selection
totalWeight := 0.0
weights := make(map[TransportType]float64)
for _, transport := range transports {
weight := lb.calculateWeight(transport)
weights[transport] = weight
totalWeight += weight
}
if totalWeight == 0 {
// Fall back to random selection
return transports[rand.Intn(len(transports))]
}
// Weighted random selection
target := rand.Float64() * totalWeight
current := 0.0
for _, transport := range transports {
current += weights[transport]
if current >= target {
return transport
}
}
// Fallback (shouldn't happen)
return transports[0]
}
func (lb *WeightedLoadBalancer) UpdateStats(transport TransportType, latency time.Duration, success bool) {
lb.mu.Lock()
defer lb.mu.Unlock()
stats, exists := lb.stats[transport]
if !exists {
stats = &TransportStats{
Weight: 1.0, // Default weight
}
lb.stats[transport] = stats
}
stats.TotalRequests++
stats.TotalLatency += latency
stats.LastUpdate = time.Now()
if success {
stats.SuccessRequests++
}
// Recalculate weight based on performance
stats.Weight = lb.calculateWeight(transport)
}
func (lb *WeightedLoadBalancer) calculateWeight(transport TransportType) float64 {
stats, exists := lb.stats[transport]
if !exists {
return 1.0 // Default weight for unknown transports
}
if stats.TotalRequests == 0 {
return 1.0
}
// Calculate success rate
successRate := float64(stats.SuccessRequests) / float64(stats.TotalRequests)
// Calculate average latency
avgLatency := stats.TotalLatency / time.Duration(stats.TotalRequests)
// Weight formula: success rate / (latency factor)
// Lower latency and higher success rate = higher weight
latencyFactor := float64(avgLatency) / float64(time.Millisecond)
if latencyFactor < 1 {
latencyFactor = 1
}
weight := successRate / latencyFactor
// Ensure minimum weight
if weight < 0.1 {
weight = 0.1
}
return weight
}
// LeastLatencyLoadBalancer selects the transport with the lowest latency
type LeastLatencyLoadBalancer struct {
stats map[TransportType]*LatencyStats
mu sync.RWMutex
}
type LatencyStats struct {
RecentLatencies []time.Duration
MaxSamples int
LastUpdate time.Time
}
func NewLeastLatencyLoadBalancer() *LeastLatencyLoadBalancer {
return &LeastLatencyLoadBalancer{
stats: make(map[TransportType]*LatencyStats),
}
}
func (lb *LeastLatencyLoadBalancer) SelectTransport(transports []TransportType, msg *Message) TransportType {
if len(transports) == 0 {
return ""
}
lb.mu.RLock()
defer lb.mu.RUnlock()
bestTransport := transports[0]
bestLatency := time.Hour // Large initial value
for _, transport := range transports {
avgLatency := lb.getAverageLatency(transport)
if avgLatency < bestLatency {
bestLatency = avgLatency
bestTransport = transport
}
}
return bestTransport
}
func (lb *LeastLatencyLoadBalancer) UpdateStats(transport TransportType, latency time.Duration, success bool) {
if !success {
return // Only track successful requests
}
lb.mu.Lock()
defer lb.mu.Unlock()
stats, exists := lb.stats[transport]
if !exists {
stats = &LatencyStats{
RecentLatencies: make([]time.Duration, 0),
MaxSamples: 10, // Keep last 10 samples
}
lb.stats[transport] = stats
}
// Add new latency sample
stats.RecentLatencies = append(stats.RecentLatencies, latency)
// Keep only recent samples
if len(stats.RecentLatencies) > stats.MaxSamples {
stats.RecentLatencies = stats.RecentLatencies[1:]
}
stats.LastUpdate = time.Now()
}
func (lb *LeastLatencyLoadBalancer) getAverageLatency(transport TransportType) time.Duration {
stats, exists := lb.stats[transport]
if !exists || len(stats.RecentLatencies) == 0 {
return time.Millisecond * 100 // Default estimate
}
total := time.Duration(0)
for _, latency := range stats.RecentLatencies {
total += latency
}
return total / time.Duration(len(stats.RecentLatencies))
}
// Common routing rule factory functions
// CreateTopicRule creates a rule based on message topic
func CreateTopicRule(name string, topic string, transport TransportType, priority int) RoutingRule {
return RoutingRule{
Name: name,
Condition: func(msg *Message) bool { return msg.Topic == topic },
Transport: transport,
Priority: priority,
}
}
// CreateTopicPatternRule creates a rule based on topic pattern matching
func CreateTopicPatternRule(name string, pattern string, transport TransportType, priority int) RoutingRule {
return RoutingRule{
Name: name,
Condition: func(msg *Message) bool {
// Simple pattern matching (can be enhanced with regex)
return msg.Topic == pattern ||
(len(pattern) > 0 && pattern[len(pattern)-1] == '*' &&
len(msg.Topic) >= len(pattern)-1 &&
msg.Topic[:len(pattern)-1] == pattern[:len(pattern)-1])
},
Transport: transport,
Priority: priority,
}
}
// CreatePriorityRule creates a rule based on message priority
func CreatePriorityRule(name string, msgPriority MessagePriority, transport TransportType, priority int) RoutingRule {
return RoutingRule{
Name: name,
Condition: func(msg *Message) bool { return msg.Priority == msgPriority },
Transport: transport,
Priority: priority,
}
}
// CreateTypeRule creates a rule based on message type
func CreateTypeRule(name string, msgType MessageType, transport TransportType, priority int) RoutingRule {
return RoutingRule{
Name: name,
Condition: func(msg *Message) bool { return msg.Type == msgType },
Transport: transport,
Priority: priority,
}
}
// CreateSourceRule creates a rule based on message source
func CreateSourceRule(name string, source string, transport TransportType, priority int) RoutingRule {
return RoutingRule{
Name: name,
Condition: func(msg *Message) bool { return msg.Source == source },
Transport: transport,
Priority: priority,
}
}