Files
mev-beta/pkg/transport/dlq.go
Krypto Kajun c0ec08468c feat(transport): implement comprehensive universal message bus
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-19 16:39:14 -05:00

592 lines
14 KiB
Go

package transport
import (
"context"
"fmt"
"sort"
"sync"
"time"
)
// DeadLetterQueue handles failed messages with retry and reprocessing capabilities
type DeadLetterQueue struct {
messages map[string][]*DLQMessage
config DLQConfig
metrics DLQMetrics
reprocessor MessageReprocessor
mu sync.RWMutex
cleanupTicker *time.Ticker
ctx context.Context
cancel context.CancelFunc
}
// DLQMessage represents a message in the dead letter queue
type DLQMessage struct {
ID string
OriginalMessage *Message
Topic string
FirstFailed time.Time
LastAttempt time.Time
AttemptCount int
MaxRetries int
FailureReason string
RetryDelay time.Duration
NextRetry time.Time
Metadata map[string]interface{}
Permanent bool
}
// DLQConfig configures dead letter queue behavior
type DLQConfig struct {
MaxMessages int
MaxRetries int
RetentionTime time.Duration
AutoReprocess bool
ReprocessInterval time.Duration
BackoffStrategy BackoffStrategy
InitialRetryDelay time.Duration
MaxRetryDelay time.Duration
BackoffMultiplier float64
PermanentFailures []string // Error patterns that mark messages as permanently failed
ReprocessBatchSize int
}
// BackoffStrategy defines retry delay calculation methods
type BackoffStrategy string
const (
BackoffFixed BackoffStrategy = "fixed"
BackoffLinear BackoffStrategy = "linear"
BackoffExponential BackoffStrategy = "exponential"
BackoffCustom BackoffStrategy = "custom"
)
// DLQMetrics tracks dead letter queue statistics
type DLQMetrics struct {
MessagesAdded int64
MessagesReprocessed int64
MessagesExpired int64
MessagesPermanent int64
ReprocessSuccesses int64
ReprocessFailures int64
QueueSize int64
OldestMessage time.Time
}
// MessageReprocessor handles message reprocessing logic
type MessageReprocessor interface {
Reprocess(ctx context.Context, msg *DLQMessage) error
CanReprocess(msg *DLQMessage) bool
ShouldRetry(msg *DLQMessage, err error) bool
}
// DefaultMessageReprocessor implements basic reprocessing logic
type DefaultMessageReprocessor struct {
publisher MessagePublisher
}
// MessagePublisher interface for republishing messages
type MessagePublisher interface {
Publish(ctx context.Context, msg *Message) error
}
// NewDeadLetterQueue creates a new dead letter queue
func NewDeadLetterQueue(config DLQConfig) *DeadLetterQueue {
ctx, cancel := context.WithCancel(context.Background())
dlq := &DeadLetterQueue{
messages: make(map[string][]*DLQMessage),
config: config,
metrics: DLQMetrics{},
ctx: ctx,
cancel: cancel,
}
// Set default configuration values
if dlq.config.MaxMessages == 0 {
dlq.config.MaxMessages = 10000
}
if dlq.config.MaxRetries == 0 {
dlq.config.MaxRetries = 3
}
if dlq.config.RetentionTime == 0 {
dlq.config.RetentionTime = 24 * time.Hour
}
if dlq.config.ReprocessInterval == 0 {
dlq.config.ReprocessInterval = 5 * time.Minute
}
if dlq.config.InitialRetryDelay == 0 {
dlq.config.InitialRetryDelay = time.Minute
}
if dlq.config.MaxRetryDelay == 0 {
dlq.config.MaxRetryDelay = time.Hour
}
if dlq.config.BackoffMultiplier == 0 {
dlq.config.BackoffMultiplier = 2.0
}
if dlq.config.BackoffStrategy == "" {
dlq.config.BackoffStrategy = BackoffExponential
}
if dlq.config.ReprocessBatchSize == 0 {
dlq.config.ReprocessBatchSize = 10
}
// Start cleanup routine
dlq.startCleanupRoutine()
// Start reprocessing routine if enabled
if dlq.config.AutoReprocess {
dlq.startReprocessRoutine()
}
return dlq
}
// AddMessage adds a failed message to the dead letter queue
func (dlq *DeadLetterQueue) AddMessage(topic string, msg *Message) error {
return dlq.AddMessageWithReason(topic, msg, "unknown failure")
}
// AddMessageWithReason adds a failed message with a specific failure reason
func (dlq *DeadLetterQueue) AddMessageWithReason(topic string, msg *Message, reason string) error {
dlq.mu.Lock()
defer dlq.mu.Unlock()
// Check if we've exceeded max messages
totalMessages := dlq.getTotalMessageCount()
if totalMessages >= dlq.config.MaxMessages {
// Remove oldest message to make room
dlq.removeOldestMessage()
}
// Check if this is a permanent failure
permanent := dlq.isPermanentFailure(reason)
dlqMsg := &DLQMessage{
ID: fmt.Sprintf("dlq_%s_%d", topic, time.Now().UnixNano()),
OriginalMessage: msg,
Topic: topic,
FirstFailed: time.Now(),
LastAttempt: time.Now(),
AttemptCount: 1,
MaxRetries: dlq.config.MaxRetries,
FailureReason: reason,
Metadata: make(map[string]interface{}),
Permanent: permanent,
}
if !permanent {
dlqMsg.RetryDelay = dlq.calculateRetryDelay(dlqMsg)
dlqMsg.NextRetry = time.Now().Add(dlqMsg.RetryDelay)
}
// Add to queue
if _, exists := dlq.messages[topic]; !exists {
dlq.messages[topic] = make([]*DLQMessage, 0)
}
dlq.messages[topic] = append(dlq.messages[topic], dlqMsg)
// Update metrics
dlq.metrics.MessagesAdded++
dlq.metrics.QueueSize++
if permanent {
dlq.metrics.MessagesPermanent++
}
dlq.updateOldestMessage()
return nil
}
// GetMessages returns all messages for a topic
func (dlq *DeadLetterQueue) GetMessages(topic string) ([]*DLQMessage, error) {
dlq.mu.RLock()
defer dlq.mu.RUnlock()
messages, exists := dlq.messages[topic]
if !exists {
return []*DLQMessage{}, nil
}
// Return a copy to avoid race conditions
result := make([]*DLQMessage, len(messages))
copy(result, messages)
return result, nil
}
// GetAllMessages returns all messages across all topics
func (dlq *DeadLetterQueue) GetAllMessages() map[string][]*DLQMessage {
dlq.mu.RLock()
defer dlq.mu.RUnlock()
result := make(map[string][]*DLQMessage)
for topic, messages := range dlq.messages {
result[topic] = make([]*DLQMessage, len(messages))
copy(result[topic], messages)
}
return result
}
// ReprocessMessage attempts to reprocess a specific message
func (dlq *DeadLetterQueue) ReprocessMessage(messageID string) error {
dlq.mu.Lock()
defer dlq.mu.Unlock()
// Find message
var dlqMsg *DLQMessage
var topic string
var index int
for t, messages := range dlq.messages {
for i, msg := range messages {
if msg.ID == messageID {
dlqMsg = msg
topic = t
index = i
break
}
}
if dlqMsg != nil {
break
}
}
if dlqMsg == nil {
return fmt.Errorf("message not found: %s", messageID)
}
if dlqMsg.Permanent {
return fmt.Errorf("message marked as permanent failure: %s", messageID)
}
// Attempt reprocessing
err := dlq.attemptReprocess(dlqMsg)
if err == nil {
// Success - remove from queue
dlq.removeMessageByIndex(topic, index)
dlq.metrics.ReprocessSuccesses++
dlq.metrics.QueueSize--
return nil
}
// Failed - update retry information
dlqMsg.AttemptCount++
dlqMsg.LastAttempt = time.Now()
dlqMsg.FailureReason = err.Error()
if dlqMsg.AttemptCount >= dlqMsg.MaxRetries {
dlqMsg.Permanent = true
dlq.metrics.MessagesPermanent++
} else {
dlqMsg.RetryDelay = dlq.calculateRetryDelay(dlqMsg)
dlqMsg.NextRetry = time.Now().Add(dlqMsg.RetryDelay)
}
dlq.metrics.ReprocessFailures++
return fmt.Errorf("reprocessing failed: %w", err)
}
// PurgeMessages removes all messages for a topic
func (dlq *DeadLetterQueue) PurgeMessages(topic string) error {
dlq.mu.Lock()
defer dlq.mu.Unlock()
if messages, exists := dlq.messages[topic]; exists {
count := len(messages)
delete(dlq.messages, topic)
dlq.metrics.QueueSize -= int64(count)
dlq.updateOldestMessage()
}
return nil
}
// PurgeAllMessages removes all messages from the queue
func (dlq *DeadLetterQueue) PurgeAllMessages() error {
dlq.mu.Lock()
defer dlq.mu.Unlock()
dlq.messages = make(map[string][]*DLQMessage)
dlq.metrics.QueueSize = 0
dlq.metrics.OldestMessage = time.Time{}
return nil
}
// GetMessageCount returns the total number of messages in the queue
func (dlq *DeadLetterQueue) GetMessageCount() int {
dlq.mu.RLock()
defer dlq.mu.RUnlock()
return dlq.getTotalMessageCount()
}
// GetMetrics returns current DLQ metrics
func (dlq *DeadLetterQueue) GetMetrics() DLQMetrics {
dlq.mu.RLock()
defer dlq.mu.RUnlock()
return dlq.metrics
}
// SetReprocessor sets the message reprocessor
func (dlq *DeadLetterQueue) SetReprocessor(reprocessor MessageReprocessor) {
dlq.mu.Lock()
defer dlq.mu.Unlock()
dlq.reprocessor = reprocessor
}
// Cleanup removes expired messages
func (dlq *DeadLetterQueue) Cleanup(maxAge time.Duration) error {
dlq.mu.Lock()
defer dlq.mu.Unlock()
cutoff := time.Now().Add(-maxAge)
expiredCount := 0
for topic, messages := range dlq.messages {
filtered := make([]*DLQMessage, 0)
for _, msg := range messages {
if msg.FirstFailed.After(cutoff) {
filtered = append(filtered, msg)
} else {
expiredCount++
}
}
dlq.messages[topic] = filtered
// Remove empty topics
if len(filtered) == 0 {
delete(dlq.messages, topic)
}
}
dlq.metrics.MessagesExpired += int64(expiredCount)
dlq.metrics.QueueSize -= int64(expiredCount)
dlq.updateOldestMessage()
return nil
}
// Stop gracefully shuts down the dead letter queue
func (dlq *DeadLetterQueue) Stop() error {
dlq.cancel()
if dlq.cleanupTicker != nil {
dlq.cleanupTicker.Stop()
}
return nil
}
// Private helper methods
func (dlq *DeadLetterQueue) getTotalMessageCount() int {
count := 0
for _, messages := range dlq.messages {
count += len(messages)
}
return count
}
func (dlq *DeadLetterQueue) removeOldestMessage() {
var oldestTime time.Time
var oldestTopic string
var oldestIndex int
for topic, messages := range dlq.messages {
for i, msg := range messages {
if oldestTime.IsZero() || msg.FirstFailed.Before(oldestTime) {
oldestTime = msg.FirstFailed
oldestTopic = topic
oldestIndex = i
}
}
}
if !oldestTime.IsZero() {
dlq.removeMessageByIndex(oldestTopic, oldestIndex)
dlq.metrics.QueueSize--
}
}
func (dlq *DeadLetterQueue) removeMessageByIndex(topic string, index int) {
messages := dlq.messages[topic]
dlq.messages[topic] = append(messages[:index], messages[index+1:]...)
if len(dlq.messages[topic]) == 0 {
delete(dlq.messages, topic)
}
}
func (dlq *DeadLetterQueue) isPermanentFailure(reason string) bool {
for _, pattern := range dlq.config.PermanentFailures {
if pattern == reason {
return true
}
// Simple pattern matching (can be enhanced with regex)
if len(pattern) > 0 && pattern[len(pattern)-1] == '*' {
prefix := pattern[:len(pattern)-1]
if len(reason) >= len(prefix) && reason[:len(prefix)] == prefix {
return true
}
}
}
return false
}
func (dlq *DeadLetterQueue) calculateRetryDelay(msg *DLQMessage) time.Duration {
switch dlq.config.BackoffStrategy {
case BackoffFixed:
return dlq.config.InitialRetryDelay
case BackoffLinear:
delay := time.Duration(msg.AttemptCount) * dlq.config.InitialRetryDelay
if delay > dlq.config.MaxRetryDelay {
return dlq.config.MaxRetryDelay
}
return delay
case BackoffExponential:
delay := time.Duration(float64(dlq.config.InitialRetryDelay) *
pow(dlq.config.BackoffMultiplier, float64(msg.AttemptCount-1)))
if delay > dlq.config.MaxRetryDelay {
return dlq.config.MaxRetryDelay
}
return delay
default:
return dlq.config.InitialRetryDelay
}
}
func (dlq *DeadLetterQueue) attemptReprocess(msg *DLQMessage) error {
if dlq.reprocessor == nil {
return fmt.Errorf("no reprocessor configured")
}
if !dlq.reprocessor.CanReprocess(msg) {
return fmt.Errorf("message cannot be reprocessed")
}
return dlq.reprocessor.Reprocess(dlq.ctx, msg)
}
func (dlq *DeadLetterQueue) updateOldestMessage() {
var oldest time.Time
for _, messages := range dlq.messages {
for _, msg := range messages {
if oldest.IsZero() || msg.FirstFailed.Before(oldest) {
oldest = msg.FirstFailed
}
}
}
dlq.metrics.OldestMessage = oldest
}
func (dlq *DeadLetterQueue) startCleanupRoutine() {
dlq.cleanupTicker = time.NewTicker(dlq.config.ReprocessInterval)
go func() {
for {
select {
case <-dlq.cleanupTicker.C:
dlq.Cleanup(dlq.config.RetentionTime)
case <-dlq.ctx.Done():
return
}
}
}()
}
func (dlq *DeadLetterQueue) startReprocessRoutine() {
ticker := time.NewTicker(dlq.config.ReprocessInterval)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
dlq.processRetryableMessages()
case <-dlq.ctx.Done():
return
}
}
}()
}
func (dlq *DeadLetterQueue) processRetryableMessages() {
dlq.mu.Lock()
retryable := dlq.getRetryableMessages()
dlq.mu.Unlock()
// Sort by next retry time
sort.Slice(retryable, func(i, j int) bool {
return retryable[i].NextRetry.Before(retryable[j].NextRetry)
})
// Process batch
batchSize := dlq.config.ReprocessBatchSize
if len(retryable) < batchSize {
batchSize = len(retryable)
}
for i := 0; i < batchSize; i++ {
msg := retryable[i]
if time.Now().After(msg.NextRetry) {
dlq.ReprocessMessage(msg.ID)
}
}
}
func (dlq *DeadLetterQueue) getRetryableMessages() []*DLQMessage {
var retryable []*DLQMessage
for _, messages := range dlq.messages {
for _, msg := range messages {
if !msg.Permanent && msg.AttemptCount < msg.MaxRetries {
retryable = append(retryable, msg)
}
}
}
return retryable
}
// Implementation of DefaultMessageReprocessor
func NewDefaultMessageReprocessor(publisher MessagePublisher) *DefaultMessageReprocessor {
return &DefaultMessageReprocessor{
publisher: publisher,
}
}
func (r *DefaultMessageReprocessor) Reprocess(ctx context.Context, msg *DLQMessage) error {
if r.publisher == nil {
return fmt.Errorf("no publisher configured")
}
return r.publisher.Publish(ctx, msg.OriginalMessage)
}
func (r *DefaultMessageReprocessor) CanReprocess(msg *DLQMessage) bool {
return !msg.Permanent && msg.AttemptCount < msg.MaxRetries
}
func (r *DefaultMessageReprocessor) ShouldRetry(msg *DLQMessage, err error) bool {
// Simple retry logic - can be enhanced based on error types
return msg.AttemptCount < msg.MaxRetries
}
// Helper function for power calculation
func pow(base, exp float64) float64 {
if exp == 0 {
return 1
}
result := base
for i := 1; i < int(exp); i++ {
result *= base
}
return result
}