package transport import ( "context" "fmt" "sort" "sync" "time" ) // DeadLetterQueue handles failed messages with retry and reprocessing capabilities type DeadLetterQueue struct { messages map[string][]*DLQMessage config DLQConfig metrics DLQMetrics reprocessor MessageReprocessor mu sync.RWMutex cleanupTicker *time.Ticker ctx context.Context cancel context.CancelFunc } // DLQMessage represents a message in the dead letter queue type DLQMessage struct { ID string OriginalMessage *Message Topic string FirstFailed time.Time LastAttempt time.Time AttemptCount int MaxRetries int FailureReason string RetryDelay time.Duration NextRetry time.Time Metadata map[string]interface{} Permanent bool } // DLQConfig configures dead letter queue behavior type DLQConfig struct { MaxMessages int MaxRetries int RetentionTime time.Duration AutoReprocess bool ReprocessInterval time.Duration BackoffStrategy BackoffStrategy InitialRetryDelay time.Duration MaxRetryDelay time.Duration BackoffMultiplier float64 PermanentFailures []string // Error patterns that mark messages as permanently failed ReprocessBatchSize int } // BackoffStrategy defines retry delay calculation methods type BackoffStrategy string const ( BackoffFixed BackoffStrategy = "fixed" BackoffLinear BackoffStrategy = "linear" BackoffExponential BackoffStrategy = "exponential" BackoffCustom BackoffStrategy = "custom" ) // DLQMetrics tracks dead letter queue statistics type DLQMetrics struct { MessagesAdded int64 MessagesReprocessed int64 MessagesExpired int64 MessagesPermanent int64 ReprocessSuccesses int64 ReprocessFailures int64 QueueSize int64 OldestMessage time.Time } // MessageReprocessor handles message reprocessing logic type MessageReprocessor interface { Reprocess(ctx context.Context, msg *DLQMessage) error CanReprocess(msg *DLQMessage) bool ShouldRetry(msg *DLQMessage, err error) bool } // DefaultMessageReprocessor implements basic reprocessing logic type DefaultMessageReprocessor struct { publisher MessagePublisher } // MessagePublisher interface for republishing messages type MessagePublisher interface { Publish(ctx context.Context, msg *Message) error } // NewDeadLetterQueue creates a new dead letter queue func NewDeadLetterQueue(config DLQConfig) *DeadLetterQueue { ctx, cancel := context.WithCancel(context.Background()) dlq := &DeadLetterQueue{ messages: make(map[string][]*DLQMessage), config: config, metrics: DLQMetrics{}, ctx: ctx, cancel: cancel, } // Set default configuration values if dlq.config.MaxMessages == 0 { dlq.config.MaxMessages = 10000 } if dlq.config.MaxRetries == 0 { dlq.config.MaxRetries = 3 } if dlq.config.RetentionTime == 0 { dlq.config.RetentionTime = 24 * time.Hour } if dlq.config.ReprocessInterval == 0 { dlq.config.ReprocessInterval = 5 * time.Minute } if dlq.config.InitialRetryDelay == 0 { dlq.config.InitialRetryDelay = time.Minute } if dlq.config.MaxRetryDelay == 0 { dlq.config.MaxRetryDelay = time.Hour } if dlq.config.BackoffMultiplier == 0 { dlq.config.BackoffMultiplier = 2.0 } if dlq.config.BackoffStrategy == "" { dlq.config.BackoffStrategy = BackoffExponential } if dlq.config.ReprocessBatchSize == 0 { dlq.config.ReprocessBatchSize = 10 } // Start cleanup routine dlq.startCleanupRoutine() // Start reprocessing routine if enabled if dlq.config.AutoReprocess { dlq.startReprocessRoutine() } return dlq } // AddMessage adds a failed message to the dead letter queue func (dlq *DeadLetterQueue) AddMessage(topic string, msg *Message) error { return dlq.AddMessageWithReason(topic, msg, "unknown failure") } // AddMessageWithReason adds a failed message with a specific failure reason func (dlq *DeadLetterQueue) AddMessageWithReason(topic string, msg *Message, reason string) error { dlq.mu.Lock() defer dlq.mu.Unlock() // Check if we've exceeded max messages totalMessages := dlq.getTotalMessageCount() if totalMessages >= dlq.config.MaxMessages { // Remove oldest message to make room dlq.removeOldestMessage() } // Check if this is a permanent failure permanent := dlq.isPermanentFailure(reason) dlqMsg := &DLQMessage{ ID: fmt.Sprintf("dlq_%s_%d", topic, time.Now().UnixNano()), OriginalMessage: msg, Topic: topic, FirstFailed: time.Now(), LastAttempt: time.Now(), AttemptCount: 1, MaxRetries: dlq.config.MaxRetries, FailureReason: reason, Metadata: make(map[string]interface{}), Permanent: permanent, } if !permanent { dlqMsg.RetryDelay = dlq.calculateRetryDelay(dlqMsg) dlqMsg.NextRetry = time.Now().Add(dlqMsg.RetryDelay) } // Add to queue if _, exists := dlq.messages[topic]; !exists { dlq.messages[topic] = make([]*DLQMessage, 0) } dlq.messages[topic] = append(dlq.messages[topic], dlqMsg) // Update metrics dlq.metrics.MessagesAdded++ dlq.metrics.QueueSize++ if permanent { dlq.metrics.MessagesPermanent++ } dlq.updateOldestMessage() return nil } // GetMessages returns all messages for a topic func (dlq *DeadLetterQueue) GetMessages(topic string) ([]*DLQMessage, error) { dlq.mu.RLock() defer dlq.mu.RUnlock() messages, exists := dlq.messages[topic] if !exists { return []*DLQMessage{}, nil } // Return a copy to avoid race conditions result := make([]*DLQMessage, len(messages)) copy(result, messages) return result, nil } // GetAllMessages returns all messages across all topics func (dlq *DeadLetterQueue) GetAllMessages() map[string][]*DLQMessage { dlq.mu.RLock() defer dlq.mu.RUnlock() result := make(map[string][]*DLQMessage) for topic, messages := range dlq.messages { result[topic] = make([]*DLQMessage, len(messages)) copy(result[topic], messages) } return result } // ReprocessMessage attempts to reprocess a specific message func (dlq *DeadLetterQueue) ReprocessMessage(messageID string) error { dlq.mu.Lock() defer dlq.mu.Unlock() // Find message var dlqMsg *DLQMessage var topic string var index int for t, messages := range dlq.messages { for i, msg := range messages { if msg.ID == messageID { dlqMsg = msg topic = t index = i break } } if dlqMsg != nil { break } } if dlqMsg == nil { return fmt.Errorf("message not found: %s", messageID) } if dlqMsg.Permanent { return fmt.Errorf("message marked as permanent failure: %s", messageID) } // Attempt reprocessing err := dlq.attemptReprocess(dlqMsg) if err == nil { // Success - remove from queue dlq.removeMessageByIndex(topic, index) dlq.metrics.ReprocessSuccesses++ dlq.metrics.QueueSize-- return nil } // Failed - update retry information dlqMsg.AttemptCount++ dlqMsg.LastAttempt = time.Now() dlqMsg.FailureReason = err.Error() if dlqMsg.AttemptCount >= dlqMsg.MaxRetries { dlqMsg.Permanent = true dlq.metrics.MessagesPermanent++ } else { dlqMsg.RetryDelay = dlq.calculateRetryDelay(dlqMsg) dlqMsg.NextRetry = time.Now().Add(dlqMsg.RetryDelay) } dlq.metrics.ReprocessFailures++ return fmt.Errorf("reprocessing failed: %w", err) } // PurgeMessages removes all messages for a topic func (dlq *DeadLetterQueue) PurgeMessages(topic string) error { dlq.mu.Lock() defer dlq.mu.Unlock() if messages, exists := dlq.messages[topic]; exists { count := len(messages) delete(dlq.messages, topic) dlq.metrics.QueueSize -= int64(count) dlq.updateOldestMessage() } return nil } // PurgeAllMessages removes all messages from the queue func (dlq *DeadLetterQueue) PurgeAllMessages() error { dlq.mu.Lock() defer dlq.mu.Unlock() dlq.messages = make(map[string][]*DLQMessage) dlq.metrics.QueueSize = 0 dlq.metrics.OldestMessage = time.Time{} return nil } // GetMessageCount returns the total number of messages in the queue func (dlq *DeadLetterQueue) GetMessageCount() int { dlq.mu.RLock() defer dlq.mu.RUnlock() return dlq.getTotalMessageCount() } // GetMetrics returns current DLQ metrics func (dlq *DeadLetterQueue) GetMetrics() DLQMetrics { dlq.mu.RLock() defer dlq.mu.RUnlock() return dlq.metrics } // SetReprocessor sets the message reprocessor func (dlq *DeadLetterQueue) SetReprocessor(reprocessor MessageReprocessor) { dlq.mu.Lock() defer dlq.mu.Unlock() dlq.reprocessor = reprocessor } // Cleanup removes expired messages func (dlq *DeadLetterQueue) Cleanup(maxAge time.Duration) error { dlq.mu.Lock() defer dlq.mu.Unlock() cutoff := time.Now().Add(-maxAge) expiredCount := 0 for topic, messages := range dlq.messages { filtered := make([]*DLQMessage, 0) for _, msg := range messages { if msg.FirstFailed.After(cutoff) { filtered = append(filtered, msg) } else { expiredCount++ } } dlq.messages[topic] = filtered // Remove empty topics if len(filtered) == 0 { delete(dlq.messages, topic) } } dlq.metrics.MessagesExpired += int64(expiredCount) dlq.metrics.QueueSize -= int64(expiredCount) dlq.updateOldestMessage() return nil } // Stop gracefully shuts down the dead letter queue func (dlq *DeadLetterQueue) Stop() error { dlq.cancel() if dlq.cleanupTicker != nil { dlq.cleanupTicker.Stop() } return nil } // Private helper methods func (dlq *DeadLetterQueue) getTotalMessageCount() int { count := 0 for _, messages := range dlq.messages { count += len(messages) } return count } func (dlq *DeadLetterQueue) removeOldestMessage() { var oldestTime time.Time var oldestTopic string var oldestIndex int for topic, messages := range dlq.messages { for i, msg := range messages { if oldestTime.IsZero() || msg.FirstFailed.Before(oldestTime) { oldestTime = msg.FirstFailed oldestTopic = topic oldestIndex = i } } } if !oldestTime.IsZero() { dlq.removeMessageByIndex(oldestTopic, oldestIndex) dlq.metrics.QueueSize-- } } func (dlq *DeadLetterQueue) removeMessageByIndex(topic string, index int) { messages := dlq.messages[topic] dlq.messages[topic] = append(messages[:index], messages[index+1:]...) if len(dlq.messages[topic]) == 0 { delete(dlq.messages, topic) } } func (dlq *DeadLetterQueue) isPermanentFailure(reason string) bool { for _, pattern := range dlq.config.PermanentFailures { if pattern == reason { return true } // Simple pattern matching (can be enhanced with regex) if len(pattern) > 0 && pattern[len(pattern)-1] == '*' { prefix := pattern[:len(pattern)-1] if len(reason) >= len(prefix) && reason[:len(prefix)] == prefix { return true } } } return false } func (dlq *DeadLetterQueue) calculateRetryDelay(msg *DLQMessage) time.Duration { switch dlq.config.BackoffStrategy { case BackoffFixed: return dlq.config.InitialRetryDelay case BackoffLinear: delay := time.Duration(msg.AttemptCount) * dlq.config.InitialRetryDelay if delay > dlq.config.MaxRetryDelay { return dlq.config.MaxRetryDelay } return delay case BackoffExponential: delay := time.Duration(float64(dlq.config.InitialRetryDelay) * pow(dlq.config.BackoffMultiplier, float64(msg.AttemptCount-1))) if delay > dlq.config.MaxRetryDelay { return dlq.config.MaxRetryDelay } return delay default: return dlq.config.InitialRetryDelay } } func (dlq *DeadLetterQueue) attemptReprocess(msg *DLQMessage) error { if dlq.reprocessor == nil { return fmt.Errorf("no reprocessor configured") } if !dlq.reprocessor.CanReprocess(msg) { return fmt.Errorf("message cannot be reprocessed") } return dlq.reprocessor.Reprocess(dlq.ctx, msg) } func (dlq *DeadLetterQueue) updateOldestMessage() { var oldest time.Time for _, messages := range dlq.messages { for _, msg := range messages { if oldest.IsZero() || msg.FirstFailed.Before(oldest) { oldest = msg.FirstFailed } } } dlq.metrics.OldestMessage = oldest } func (dlq *DeadLetterQueue) startCleanupRoutine() { dlq.cleanupTicker = time.NewTicker(dlq.config.ReprocessInterval) go func() { for { select { case <-dlq.cleanupTicker.C: dlq.Cleanup(dlq.config.RetentionTime) case <-dlq.ctx.Done(): return } } }() } func (dlq *DeadLetterQueue) startReprocessRoutine() { ticker := time.NewTicker(dlq.config.ReprocessInterval) go func() { defer ticker.Stop() for { select { case <-ticker.C: dlq.processRetryableMessages() case <-dlq.ctx.Done(): return } } }() } func (dlq *DeadLetterQueue) processRetryableMessages() { dlq.mu.Lock() retryable := dlq.getRetryableMessages() dlq.mu.Unlock() // Sort by next retry time sort.Slice(retryable, func(i, j int) bool { return retryable[i].NextRetry.Before(retryable[j].NextRetry) }) // Process batch batchSize := dlq.config.ReprocessBatchSize if len(retryable) < batchSize { batchSize = len(retryable) } for i := 0; i < batchSize; i++ { msg := retryable[i] if time.Now().After(msg.NextRetry) { dlq.ReprocessMessage(msg.ID) } } } func (dlq *DeadLetterQueue) getRetryableMessages() []*DLQMessage { var retryable []*DLQMessage for _, messages := range dlq.messages { for _, msg := range messages { if !msg.Permanent && msg.AttemptCount < msg.MaxRetries { retryable = append(retryable, msg) } } } return retryable } // Implementation of DefaultMessageReprocessor func NewDefaultMessageReprocessor(publisher MessagePublisher) *DefaultMessageReprocessor { return &DefaultMessageReprocessor{ publisher: publisher, } } func (r *DefaultMessageReprocessor) Reprocess(ctx context.Context, msg *DLQMessage) error { if r.publisher == nil { return fmt.Errorf("no publisher configured") } return r.publisher.Publish(ctx, msg.OriginalMessage) } func (r *DefaultMessageReprocessor) CanReprocess(msg *DLQMessage) bool { return !msg.Permanent && msg.AttemptCount < msg.MaxRetries } func (r *DefaultMessageReprocessor) ShouldRetry(msg *DLQMessage, err error) bool { // Simple retry logic - can be enhanced based on error types return msg.AttemptCount < msg.MaxRetries } // Helper function for power calculation func pow(base, exp float64) float64 { if exp == 0 { return 1 } result := base for i := 1; i < int(exp); i++ { result *= base } return result }