Files
mev-beta/pkg/transport/message_bus_impl.go
Krypto Kajun 3f69aeafcf fix: resolve all compilation issues across transport and lifecycle packages
- Fixed duplicate type declarations in transport package
- Removed unused variables in lifecycle and dependency injection
- Fixed big.Int arithmetic operations in uniswap contracts
- Added missing methods to MetricsCollector (IncrementCounter, RecordLatency, etc.)
- Fixed jitter calculation in TCP transport retry logic
- Updated ComponentHealth field access to use transport type
- Ensured all core packages build successfully

All major compilation errors resolved:
 Transport package builds clean
 Lifecycle package builds clean
 Main MEV bot application builds clean
 Fixed method signature mismatches
 Resolved type conflicts and duplications

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-19 17:23:14 -05:00

743 lines
18 KiB
Go

package transport
import (
"context"
"fmt"
"time"
)
// Publish sends a message to the specified topic
func (mb *UniversalMessageBus) Publish(ctx context.Context, msg *Message) error {
if !mb.started {
return fmt.Errorf("message bus not started")
}
// Validate message
if err := mb.validateMessage(msg); err != nil {
return fmt.Errorf("invalid message: %w", err)
}
// Set timestamp if not set
if msg.Timestamp.IsZero() {
msg.Timestamp = time.Now()
}
// Set ID if not set
if msg.ID == "" {
msg.ID = GenerateMessageID()
}
// Update metrics
mb.metrics.IncrementCounter("messages_published_total")
mb.metrics.RecordLatency("publish_latency", time.Since(msg.Timestamp))
// Route message to appropriate transport
transport, err := mb.router.RouteMessage(msg, mb.transports)
if err != nil {
mb.metrics.IncrementCounter("routing_errors_total")
return fmt.Errorf("routing failed: %w", err)
}
// Send via transport
if err := transport.Send(ctx, msg); err != nil {
mb.metrics.IncrementCounter("send_errors_total")
// Try dead letter queue if enabled
if mb.config.EnableDLQ {
if dlqErr := mb.dlq.AddMessage(msg.Topic, msg); dlqErr != nil {
return fmt.Errorf("send failed and DLQ failed: %v, original error: %w", dlqErr, err)
}
}
return fmt.Errorf("send failed: %w", err)
}
// Store in topic if persistence enabled
if mb.config.EnablePersistence {
if err := mb.addMessageToTopic(msg); err != nil {
// Log error but don't fail the publish
mb.metrics.IncrementCounter("persistence_errors_total")
}
}
// Deliver to local subscribers
go mb.deliverToSubscribers(ctx, msg)
return nil
}
// Subscribe creates a subscription to a topic
func (mb *UniversalMessageBus) Subscribe(topic string, handler MessageHandler, opts ...SubscriptionOption) (*Subscription, error) {
if !mb.started {
return nil, fmt.Errorf("message bus not started")
}
// Apply subscription options
options := SubscriptionOptions{
QueueSize: 1000,
BatchSize: 1,
BatchTimeout: time.Second,
DLQEnabled: mb.config.EnableDLQ,
RetryEnabled: true,
Persistent: false,
Durable: false,
}
for _, opt := range opts {
opt(&options)
}
// Create subscription
subscription := &Subscription{
ID: fmt.Sprintf("sub_%s_%d", topic, time.Now().UnixNano()),
Topic: topic,
Handler: handler,
Options: options,
created: time.Now(),
active: true,
}
mb.mu.Lock()
mb.subscriptions[subscription.ID] = subscription
mb.mu.Unlock()
// Add to topic subscribers
mb.addSubscriberToTopic(topic, subscription)
mb.metrics.IncrementCounter("subscriptions_created_total")
return subscription, nil
}
// Unsubscribe removes a subscription
func (mb *UniversalMessageBus) Unsubscribe(subscriptionID string) error {
mb.mu.Lock()
defer mb.mu.Unlock()
subscription, exists := mb.subscriptions[subscriptionID]
if !exists {
return fmt.Errorf("subscription not found: %s", subscriptionID)
}
// Mark as inactive
subscription.mu.Lock()
subscription.active = false
subscription.mu.Unlock()
// Remove from subscriptions map
delete(mb.subscriptions, subscriptionID)
// Remove from topic subscribers
mb.removeSubscriberFromTopic(subscription.Topic, subscriptionID)
mb.metrics.IncrementCounter("subscriptions_removed_total")
return nil
}
// Request sends a request and waits for a response
func (mb *UniversalMessageBus) Request(ctx context.Context, msg *Message, timeout time.Duration) (*Message, error) {
if !mb.started {
return nil, fmt.Errorf("message bus not started")
}
// Set correlation ID for request-response
if msg.CorrelationID == "" {
msg.CorrelationID = GenerateMessageID()
}
// Create response channel
responseChannel := make(chan *Message, 1)
defer close(responseChannel)
// Subscribe to response topic
responseTopic := fmt.Sprintf("response.%s", msg.CorrelationID)
subscription, err := mb.Subscribe(responseTopic, func(ctx context.Context, response *Message) error {
select {
case responseChannel <- response:
default:
// Channel full, ignore
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to subscribe to response topic: %w", err)
}
defer mb.Unsubscribe(subscription.ID)
// Send request
if err := mb.Publish(ctx, msg); err != nil {
return nil, fmt.Errorf("failed to publish request: %w", err)
}
// Wait for response with timeout
select {
case response := <-responseChannel:
return response, nil
case <-time.After(timeout):
return nil, fmt.Errorf("request timeout after %v", timeout)
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Reply sends a response to a request
func (mb *UniversalMessageBus) Reply(ctx context.Context, originalMsg *Message, response *Message) error {
if originalMsg.CorrelationID == "" {
return fmt.Errorf("original message has no correlation ID")
}
// Set response properties
response.Type = MessageTypeResponse
response.Topic = fmt.Sprintf("response.%s", originalMsg.CorrelationID)
response.CorrelationID = originalMsg.CorrelationID
response.Target = originalMsg.Source
return mb.Publish(ctx, response)
}
// CreateTopic creates a new topic with configuration
func (mb *UniversalMessageBus) CreateTopic(topicName string, config TopicConfig) error {
mb.mu.Lock()
defer mb.mu.Unlock()
if _, exists := mb.topics[topicName]; exists {
return fmt.Errorf("topic already exists: %s", topicName)
}
topic := &Topic{
Name: topicName,
Config: config,
Messages: make([]StoredMessage, 0),
Subscribers: make([]*Subscription, 0),
Created: time.Now(),
LastActivity: time.Now(),
}
mb.topics[topicName] = topic
mb.metrics.IncrementCounter("topics_created_total")
return nil
}
// DeleteTopic removes a topic
func (mb *UniversalMessageBus) DeleteTopic(topicName string) error {
mb.mu.Lock()
defer mb.mu.Unlock()
topic, exists := mb.topics[topicName]
if !exists {
return fmt.Errorf("topic not found: %s", topicName)
}
// Remove all subscribers
for _, sub := range topic.Subscribers {
mb.Unsubscribe(sub.ID)
}
delete(mb.topics, topicName)
mb.metrics.IncrementCounter("topics_deleted_total")
return nil
}
// ListTopics returns all topic names
func (mb *UniversalMessageBus) ListTopics() []string {
mb.mu.RLock()
defer mb.mu.RUnlock()
topics := make([]string, 0, len(mb.topics))
for name := range mb.topics {
topics = append(topics, name)
}
return topics
}
// GetTopicInfo returns topic information
func (mb *UniversalMessageBus) GetTopicInfo(topicName string) (*TopicInfo, error) {
mb.mu.RLock()
defer mb.mu.RUnlock()
topic, exists := mb.topics[topicName]
if !exists {
return nil, fmt.Errorf("topic not found: %s", topicName)
}
topic.mu.RLock()
defer topic.mu.RUnlock()
// Calculate size
var sizeBytes int64
for _, stored := range topic.Messages {
// Rough estimation of message size
sizeBytes += int64(len(fmt.Sprintf("%+v", stored.Message)))
}
return &TopicInfo{
Name: topic.Name,
Config: topic.Config,
MessageCount: int64(len(topic.Messages)),
SubscriberCount: len(topic.Subscribers),
LastActivity: topic.LastActivity,
SizeBytes: sizeBytes,
}, nil
}
// QueueMessage adds a message to a topic queue
func (mb *UniversalMessageBus) QueueMessage(topic string, msg *Message) error {
return mb.addMessageToTopic(msg)
}
// DequeueMessage removes a message from a topic queue
func (mb *UniversalMessageBus) DequeueMessage(topic string, timeout time.Duration) (*Message, error) {
start := time.Now()
for time.Since(start) < timeout {
mb.mu.RLock()
topicObj, exists := mb.topics[topic]
mb.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("topic not found: %s", topic)
}
topicObj.mu.Lock()
if len(topicObj.Messages) > 0 {
// Get first unprocessed message
for i, stored := range topicObj.Messages {
if !stored.Processed {
topicObj.Messages[i].Processed = true
topicObj.mu.Unlock()
return stored.Message, nil
}
}
}
topicObj.mu.Unlock()
// Wait a bit before trying again
time.Sleep(10 * time.Millisecond)
}
return nil, fmt.Errorf("no message available within timeout")
}
// PeekMessage returns the next message without removing it
func (mb *UniversalMessageBus) PeekMessage(topic string) (*Message, error) {
mb.mu.RLock()
topicObj, exists := mb.topics[topic]
mb.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("topic not found: %s", topic)
}
topicObj.mu.RLock()
defer topicObj.mu.RUnlock()
for _, stored := range topicObj.Messages {
if !stored.Processed {
return stored.Message, nil
}
}
return nil, fmt.Errorf("no messages available")
}
// Start initializes and starts the message bus
func (mb *UniversalMessageBus) Start(ctx context.Context) error {
mb.mu.Lock()
defer mb.mu.Unlock()
if mb.started {
return fmt.Errorf("message bus already started")
}
// Initialize default transport if none configured
if len(mb.transports) == 0 {
memTransport := NewMemoryTransport()
mb.transports[TransportMemory] = memTransport
if err := memTransport.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect default transport: %w", err)
}
}
// Start background routines
go mb.healthCheckLoop()
go mb.cleanupLoop()
go mb.metricsLoop()
mb.started = true
mb.metrics.RecordEvent("message_bus_started")
return nil
}
// Stop gracefully shuts down the message bus
func (mb *UniversalMessageBus) Stop(ctx context.Context) error {
mb.mu.Lock()
defer mb.mu.Unlock()
if !mb.started {
return nil
}
// Cancel context to stop background routines
mb.cancel()
// Disconnect all transports
for _, transport := range mb.transports {
if err := transport.Disconnect(ctx); err != nil {
// Log error but continue shutdown
}
}
mb.started = false
mb.metrics.RecordEvent("message_bus_stopped")
return nil
}
// Health returns the current health status
func (mb *UniversalMessageBus) Health() HealthStatus {
components := make(map[string]ComponentHealth)
// Check transport health
for transportType, transport := range mb.transports {
components[string(transportType)] = transport.Health()
}
// Overall status
status := "healthy"
var errors []HealthError
for name, component := range components {
if component.Status != "healthy" {
status = "degraded"
errors = append(errors, HealthError{
Component: name,
Message: fmt.Sprintf("Component %s is %s", name, component.Status),
Timestamp: time.Now(),
Severity: "warning",
})
}
}
return HealthStatus{
Status: status,
Uptime: time.Since(time.Now()), // Would track actual uptime
LastCheck: time.Now(),
Components: components,
Errors: errors,
}
}
// GetMetrics returns current operational metrics
func (mb *UniversalMessageBus) GetMetrics() MessageBusMetrics {
_ = mb.metrics.GetAll() // metrics not used
return MessageBusMetrics{
MessagesPublished: mb.getMetricInt64("messages_published_total"),
MessagesConsumed: mb.getMetricInt64("messages_consumed_total"),
MessagesFailed: mb.getMetricInt64("send_errors_total"),
MessagesInDLQ: int64(mb.dlq.GetMessageCount()),
ActiveSubscriptions: len(mb.subscriptions),
TopicCount: len(mb.topics),
AverageLatency: mb.getMetricDuration("average_latency"),
ThroughputPerSec: mb.getMetricFloat64("throughput_per_second"),
ErrorRate: mb.getMetricFloat64("error_rate"),
MemoryUsage: mb.getMetricInt64("memory_usage_bytes"),
CPUUsage: mb.getMetricFloat64("cpu_usage_percent"),
}
}
// GetSubscriptions returns all active subscriptions
func (mb *UniversalMessageBus) GetSubscriptions() []*Subscription {
mb.mu.RLock()
defer mb.mu.RUnlock()
subscriptions := make([]*Subscription, 0, len(mb.subscriptions))
for _, sub := range mb.subscriptions {
subscriptions = append(subscriptions, sub)
}
return subscriptions
}
// GetActiveConnections returns the number of active connections
func (mb *UniversalMessageBus) GetActiveConnections() int {
count := 0
for _, transport := range mb.transports {
metrics := transport.GetMetrics()
count += metrics.Connections
}
return count
}
// Helper methods
func (mb *UniversalMessageBus) validateMessage(msg *Message) error {
if msg == nil {
return fmt.Errorf("message is nil")
}
if msg.Topic == "" {
return fmt.Errorf("message topic is empty")
}
if msg.Source == "" {
return fmt.Errorf("message source is empty")
}
if msg.Data == nil {
return fmt.Errorf("message data is nil")
}
return nil
}
func (mb *UniversalMessageBus) addMessageToTopic(msg *Message) error {
mb.mu.RLock()
topic, exists := mb.topics[msg.Topic]
mb.mu.RUnlock()
if !exists {
// Create topic automatically
config := TopicConfig{
Persistent: true,
RetentionPolicy: RetentionPolicy{MaxMessages: 10000, MaxAge: 24 * time.Hour},
}
if err := mb.CreateTopic(msg.Topic, config); err != nil {
return err
}
topic = mb.topics[msg.Topic]
}
topic.mu.Lock()
defer topic.mu.Unlock()
stored := StoredMessage{
Message: msg,
Stored: time.Now(),
Processed: false,
}
topic.Messages = append(topic.Messages, stored)
topic.LastActivity = time.Now()
// Apply retention policy
mb.applyRetentionPolicy(topic)
return nil
}
func (mb *UniversalMessageBus) addSubscriberToTopic(topicName string, subscription *Subscription) {
mb.mu.RLock()
topic, exists := mb.topics[topicName]
mb.mu.RUnlock()
if !exists {
// Create topic automatically
config := TopicConfig{Persistent: false}
mb.CreateTopic(topicName, config)
topic = mb.topics[topicName]
}
topic.mu.Lock()
topic.Subscribers = append(topic.Subscribers, subscription)
topic.mu.Unlock()
}
func (mb *UniversalMessageBus) removeSubscriberFromTopic(topicName, subscriptionID string) {
mb.mu.RLock()
topic, exists := mb.topics[topicName]
mb.mu.RUnlock()
if !exists {
return
}
topic.mu.Lock()
defer topic.mu.Unlock()
for i, sub := range topic.Subscribers {
if sub.ID == subscriptionID {
topic.Subscribers = append(topic.Subscribers[:i], topic.Subscribers[i+1:]...)
break
}
}
}
func (mb *UniversalMessageBus) deliverToSubscribers(ctx context.Context, msg *Message) {
mb.mu.RLock()
topic, exists := mb.topics[msg.Topic]
mb.mu.RUnlock()
if !exists {
return
}
topic.mu.RLock()
subscribers := make([]*Subscription, len(topic.Subscribers))
copy(subscribers, topic.Subscribers)
topic.mu.RUnlock()
for _, sub := range subscribers {
sub.mu.RLock()
if !sub.active {
sub.mu.RUnlock()
continue
}
// Apply filter if present
if sub.Filter != nil && !sub.Filter(msg) {
sub.mu.RUnlock()
continue
}
handler := sub.Handler
sub.mu.RUnlock()
// Deliver message in goroutine
go func(subscription *Subscription, message *Message) {
defer func() {
if r := recover(); r != nil {
mb.metrics.IncrementCounter("handler_panics_total")
}
}()
if err := handler(ctx, message); err != nil {
mb.metrics.IncrementCounter("handler_errors_total")
if mb.config.EnableDLQ {
mb.dlq.AddMessage(message.Topic, message)
}
} else {
mb.metrics.IncrementCounter("messages_consumed_total")
}
}(sub, msg)
}
}
func (mb *UniversalMessageBus) applyRetentionPolicy(topic *Topic) {
policy := topic.Config.RetentionPolicy
// Remove old messages
if policy.MaxAge > 0 {
cutoff := time.Now().Add(-policy.MaxAge)
filtered := make([]StoredMessage, 0)
for _, stored := range topic.Messages {
if stored.Stored.After(cutoff) {
filtered = append(filtered, stored)
}
}
topic.Messages = filtered
}
// Limit number of messages
if policy.MaxMessages > 0 && len(topic.Messages) > policy.MaxMessages {
topic.Messages = topic.Messages[len(topic.Messages)-policy.MaxMessages:]
}
}
func (mb *UniversalMessageBus) healthCheckLoop() {
ticker := time.NewTicker(mb.config.HealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
mb.performHealthCheck()
case <-mb.ctx.Done():
return
}
}
}
func (mb *UniversalMessageBus) cleanupLoop() {
ticker := time.NewTicker(mb.config.CleanupInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
mb.performCleanup()
case <-mb.ctx.Done():
return
}
}
}
func (mb *UniversalMessageBus) metricsLoop() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
mb.updateMetrics()
case <-mb.ctx.Done():
return
}
}
}
func (mb *UniversalMessageBus) performHealthCheck() {
// Check all transports
for transportType, transport := range mb.transports {
health := transport.Health()
mb.metrics.RecordGauge(fmt.Sprintf("transport_%s_healthy", transportType),
map[string]float64{"healthy": 1, "unhealthy": 0, "degraded": 0.5}[health.Status])
}
}
func (mb *UniversalMessageBus) performCleanup() {
// Clean up processed messages in topics
mb.mu.RLock()
topics := make([]*Topic, 0, len(mb.topics))
for _, topic := range mb.topics {
topics = append(topics, topic)
}
mb.mu.RUnlock()
for _, topic := range topics {
topic.mu.Lock()
mb.applyRetentionPolicy(topic)
topic.mu.Unlock()
}
// Clean up DLQ
mb.dlq.Cleanup(time.Hour * 24) // Clean messages older than 24 hours
}
func (mb *UniversalMessageBus) updateMetrics() {
// Update throughput metrics
publishedCount := mb.getMetricInt64("messages_published_total")
if publishedCount > 0 {
// Calculate per-second rate (simplified)
mb.metrics.RecordGauge("throughput_per_second", float64(publishedCount)/60.0)
}
// Update error rate
errorCount := mb.getMetricInt64("send_errors_total")
totalCount := publishedCount
if totalCount > 0 {
errorRate := float64(errorCount) / float64(totalCount)
mb.metrics.RecordGauge("error_rate", errorRate)
}
}
func (mb *UniversalMessageBus) getMetricInt64(key string) int64 {
if val, ok := mb.metrics.Get(key).(int64); ok {
return val
}
return 0
}
func (mb *UniversalMessageBus) getMetricFloat64(key string) float64 {
if val, ok := mb.metrics.Get(key).(float64); ok {
return val
}
return 0
}
func (mb *UniversalMessageBus) getMetricDuration(key string) time.Duration {
if val, ok := mb.metrics.Get(key).(time.Duration); ok {
return val
}
return 0
}