package transport import ( "bytes" "compress/gzip" "crypto/aes" "crypto/cipher" "crypto/rand" "encoding/json" "fmt" "io" "os" "path/filepath" "sort" "sync" "time" ) // FilePersistenceLayer implements file-based message persistence type FilePersistenceLayer struct { basePath string maxFileSize int64 maxFiles int compression bool encryption EncryptionConfig mu sync.RWMutex } // EncryptionConfig configures message encryption at rest type EncryptionConfig struct { Enabled bool Algorithm string Key []byte } // PersistedMessage represents a message stored on disk type PersistedMessage struct { ID string `json:"id"` Topic string `json:"topic"` Message *Message `json:"message"` Stored time.Time `json:"stored"` Metadata map[string]interface{} `json:"metadata"` Encrypted bool `json:"encrypted"` } // PersistenceMetrics tracks persistence layer statistics type PersistenceMetrics struct { MessagesStored int64 MessagesRetrieved int64 MessagesDeleted int64 StorageSize int64 FileCount int LastCleanup time.Time Errors int64 } // NewFilePersistenceLayer creates a new file-based persistence layer func NewFilePersistenceLayer(basePath string) *FilePersistenceLayer { return &FilePersistenceLayer{ basePath: basePath, maxFileSize: 100 * 1024 * 1024, // 100MB default maxFiles: 1000, compression: false, } } // SetMaxFileSize configures the maximum file size func (fpl *FilePersistenceLayer) SetMaxFileSize(size int64) { fpl.mu.Lock() defer fpl.mu.Unlock() fpl.maxFileSize = size } // SetMaxFiles configures the maximum number of files func (fpl *FilePersistenceLayer) SetMaxFiles(count int) { fpl.mu.Lock() defer fpl.mu.Unlock() fpl.maxFiles = count } // EnableCompression enables/disables compression func (fpl *FilePersistenceLayer) EnableCompression(enabled bool) { fpl.mu.Lock() defer fpl.mu.Unlock() fpl.compression = enabled } // SetEncryption configures encryption settings func (fpl *FilePersistenceLayer) SetEncryption(config EncryptionConfig) { fpl.mu.Lock() defer fpl.mu.Unlock() fpl.encryption = config } // Store persists a message to disk func (fpl *FilePersistenceLayer) Store(msg *Message) error { fpl.mu.Lock() defer fpl.mu.Unlock() // Create directory if it doesn't exist topicDir := filepath.Join(fpl.basePath, msg.Topic) if err := os.MkdirAll(topicDir, 0750); err != nil { return fmt.Errorf("failed to create topic directory: %w", err) } // Create persisted message persistedMsg := &PersistedMessage{ ID: msg.ID, Topic: msg.Topic, Message: msg, Stored: time.Now(), Metadata: make(map[string]interface{}), } // Serialize message data, err := json.Marshal(persistedMsg) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } // Apply encryption if enabled if fpl.encryption.Enabled { encryptedData, err := fpl.encrypt(data) if err != nil { return fmt.Errorf("encryption failed: %w", err) } data = encryptedData persistedMsg.Encrypted = true } // Apply compression if enabled if fpl.compression { compressedData, err := fpl.compress(data) if err != nil { return fmt.Errorf("compression failed: %w", err) } data = compressedData } // Find appropriate file to write to filename, err := fpl.getWritableFile(topicDir, len(data)) if err != nil { return fmt.Errorf("failed to get writable file: %w", err) } // Write to file file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) if err != nil { return fmt.Errorf("failed to open file: %w", err) } defer file.Close() // Write length prefix and data lengthPrefix := fmt.Sprintf("%d\n", len(data)) if _, err := file.WriteString(lengthPrefix); err != nil { return fmt.Errorf("failed to write length prefix: %w", err) } if _, err := file.Write(data); err != nil { return fmt.Errorf("failed to write data: %w", err) } return nil } // Retrieve loads a message from disk by ID func (fpl *FilePersistenceLayer) Retrieve(id string) (*Message, error) { fpl.mu.RLock() defer fpl.mu.RUnlock() // Search all topic directories topicDirs, err := fpl.getTopicDirectories() if err != nil { return nil, fmt.Errorf("failed to get topic directories: %w", err) } for _, topicDir := range topicDirs { files, err := fpl.getTopicFiles(topicDir) if err != nil { continue } for _, file := range files { msg, err := fpl.findMessageInFile(file, id) if err != nil { continue } if msg != nil { return msg, nil } } } return nil, fmt.Errorf("message not found: %s", id) } // Delete removes a message from disk by ID func (fpl *FilePersistenceLayer) Delete(id string) error { fpl.mu.Lock() defer fpl.mu.Unlock() // Search all topic directories to find the message topicDirs, err := fpl.getTopicDirectories() if err != nil { return fmt.Errorf("failed to get topic directories: %w", err) } for _, topicDir := range topicDirs { files, err := fpl.getTopicFiles(topicDir) if err != nil { continue } for _, file := range files { if err := fpl.deleteMessageFromFile(file, id); err == nil { return nil // Successfully deleted } } } return fmt.Errorf("message not found: %s", id) } // List returns messages for a topic with optional limit func (fpl *FilePersistenceLayer) List(topic string, limit int) ([]*Message, error) { fpl.mu.RLock() defer fpl.mu.RUnlock() topicDir := filepath.Join(fpl.basePath, topic) if _, err := os.Stat(topicDir); os.IsNotExist(err) { return []*Message{}, nil } files, err := fpl.getTopicFiles(topicDir) if err != nil { return nil, fmt.Errorf("failed to get topic files: %w", err) } var messages []*Message count := 0 // Read files in chronological order (newest first) sort.Slice(files, func(i, j int) bool { infoI, _ := os.Stat(files[i]) infoJ, _ := os.Stat(files[j]) return infoI.ModTime().After(infoJ.ModTime()) }) for _, file := range files { fileMessages, err := fpl.readMessagesFromFile(file) if err != nil { continue } for _, msg := range fileMessages { messages = append(messages, msg) count++ if limit > 0 && count >= limit { break } } if limit > 0 && count >= limit { break } } return messages, nil } // Cleanup removes messages older than maxAge func (fpl *FilePersistenceLayer) Cleanup(maxAge time.Duration) error { fpl.mu.Lock() defer fpl.mu.Unlock() cutoff := time.Now().Add(-maxAge) topicDirs, err := fpl.getTopicDirectories() if err != nil { return fmt.Errorf("failed to get topic directories: %w", err) } for _, topicDir := range topicDirs { files, err := fpl.getTopicFiles(topicDir) if err != nil { continue } for _, file := range files { // Check file modification time info, err := os.Stat(file) if err != nil { continue } if info.ModTime().Before(cutoff) { os.Remove(file) } } // Remove empty topic directories if isEmpty, _ := fpl.isDirectoryEmpty(topicDir); isEmpty { os.Remove(topicDir) } } return nil } // GetMetrics returns persistence layer metrics func (fpl *FilePersistenceLayer) GetMetrics() (PersistenceMetrics, error) { fpl.mu.RLock() defer fpl.mu.RUnlock() metrics := PersistenceMetrics{} // Calculate storage size and file count err := filepath.Walk(fpl.basePath, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { metrics.FileCount++ metrics.StorageSize += info.Size() } return nil }) return metrics, err } // Private helper methods func (fpl *FilePersistenceLayer) getWritableFile(topicDir string, dataSize int) (string, error) { files, err := fpl.getTopicFiles(topicDir) if err != nil { return "", err } // Find a file with enough space for _, file := range files { info, err := os.Stat(file) if err != nil { continue } if info.Size()+int64(dataSize) <= fpl.maxFileSize { return file, nil } } // Create new file timestamp := time.Now().Format("20060102_150405") filename := filepath.Join(topicDir, fmt.Sprintf("messages_%s.dat", timestamp)) return filename, nil } func (fpl *FilePersistenceLayer) getTopicDirectories() ([]string, error) { entries, err := os.ReadDir(fpl.basePath) if err != nil { return nil, err } var dirs []string for _, entry := range entries { if entry.IsDir() { dirs = append(dirs, filepath.Join(fpl.basePath, entry.Name())) } } return dirs, nil } func (fpl *FilePersistenceLayer) getTopicFiles(topicDir string) ([]string, error) { entries, err := os.ReadDir(topicDir) if err != nil { return nil, err } var files []string for _, entry := range entries { if !entry.IsDir() && filepath.Ext(entry.Name()) == ".dat" { files = append(files, filepath.Join(topicDir, entry.Name())) } } return files, nil } func (fpl *FilePersistenceLayer) findMessageInFile(filename, messageID string) (*Message, error) { file, err := os.Open(filename) if err != nil { return nil, err } defer file.Close() data, err := io.ReadAll(file) if err != nil { return nil, err } // Parse messages from file data messages, err := fpl.parseFileData(data) if err != nil { return nil, err } for _, msg := range messages { if msg.ID == messageID { return msg, nil } } return nil, nil } func (fpl *FilePersistenceLayer) readMessagesFromFile(filename string) ([]*Message, error) { file, err := os.Open(filename) if err != nil { return nil, err } defer file.Close() data, err := io.ReadAll(file) if err != nil { return nil, err } return fpl.parseFileData(data) } func (fpl *FilePersistenceLayer) parseFileData(data []byte) ([]*Message, error) { var messages []*Message offset := 0 for offset < len(data) { // Read length prefix lengthEnd := -1 for i := offset; i < len(data); i++ { if data[i] == '\n' { lengthEnd = i break } } if lengthEnd == -1 { break // No more complete messages } lengthStr := string(data[offset:lengthEnd]) var messageLength int if _, err := fmt.Sscanf(lengthStr, "%d", &messageLength); err != nil { break // Invalid length prefix } messageStart := lengthEnd + 1 messageEnd := messageStart + messageLength if messageEnd > len(data) { break // Incomplete message } messageData := data[messageStart:messageEnd] // Apply decompression if needed if fpl.compression { decompressed, err := fpl.decompress(messageData) if err != nil { offset = messageEnd continue } messageData = decompressed } // Apply decryption if needed if fpl.encryption.Enabled { decrypted, err := fpl.decrypt(messageData) if err != nil { offset = messageEnd continue } messageData = decrypted } // Parse message var persistedMsg PersistedMessage if err := json.Unmarshal(messageData, &persistedMsg); err != nil { offset = messageEnd continue } messages = append(messages, persistedMsg.Message) offset = messageEnd } return messages, nil } func (fpl *FilePersistenceLayer) isDirectoryEmpty(dir string) (bool, error) { entries, err := os.ReadDir(dir) if err != nil { return false, err } return len(entries) == 0, nil } func (fpl *FilePersistenceLayer) encrypt(data []byte) ([]byte, error) { if !fpl.encryption.Enabled || len(fpl.encryption.Key) == 0 { return data, nil } // Create cipher block block, err := aes.NewCipher(fpl.encryption.Key) if err != nil { return nil, fmt.Errorf("failed to create cipher: %w", err) } // Generate random nonce nonce := make([]byte, 12) // GCM standard nonce size if _, err := io.ReadFull(rand.Reader, nonce); err != nil { return nil, fmt.Errorf("failed to generate nonce: %w", err) } // Create GCM mode gcm, err := cipher.NewGCM(block) if err != nil { return nil, fmt.Errorf("failed to create GCM: %w", err) } // Encrypt and authenticate ciphertext := gcm.Seal(nil, nonce, data, nil) // Prepend nonce to ciphertext result := make([]byte, len(nonce)+len(ciphertext)) copy(result, nonce) copy(result[len(nonce):], ciphertext) return result, nil } func (fpl *FilePersistenceLayer) decrypt(data []byte) ([]byte, error) { if !fpl.encryption.Enabled || len(fpl.encryption.Key) == 0 { return data, nil } if len(data) < 12 { return nil, fmt.Errorf("encrypted data too short") } // Create cipher block block, err := aes.NewCipher(fpl.encryption.Key) if err != nil { return nil, fmt.Errorf("failed to create cipher: %w", err) } // Create GCM mode gcm, err := cipher.NewGCM(block) if err != nil { return nil, fmt.Errorf("failed to create GCM: %w", err) } // Extract nonce and ciphertext nonce := data[:12] ciphertext := data[12:] // Decrypt and verify plaintext, err := gcm.Open(nil, nonce, ciphertext, nil) if err != nil { return nil, fmt.Errorf("decryption failed: %w", err) } return plaintext, nil } func (fpl *FilePersistenceLayer) compress(data []byte) ([]byte, error) { var buf bytes.Buffer gzWriter := gzip.NewWriter(&buf) if _, err := gzWriter.Write(data); err != nil { gzWriter.Close() return nil, fmt.Errorf("compression failed: %w", err) } if err := gzWriter.Close(); err != nil { return nil, fmt.Errorf("failed to close gzip writer: %w", err) } return buf.Bytes(), nil } func (fpl *FilePersistenceLayer) decompress(data []byte) ([]byte, error) { buf := bytes.NewReader(data) gzReader, err := gzip.NewReader(buf) if err != nil { return nil, fmt.Errorf("failed to create gzip reader: %w", err) } defer gzReader.Close() decompressed, err := io.ReadAll(gzReader) if err != nil { return nil, fmt.Errorf("decompression failed: %w", err) } return decompressed, nil } // deleteMessageFromFile removes a specific message from a file func (fpl *FilePersistenceLayer) deleteMessageFromFile(filename, messageID string) error { // Read all messages from file messages, err := fpl.readMessagesFromFile(filename) if err != nil { return fmt.Errorf("failed to read messages from file: %w", err) } // Check if message exists in this file found := false var filteredMessages []*Message for _, msg := range messages { if msg.ID != messageID { filteredMessages = append(filteredMessages, msg) } else { found = true } } if !found { return fmt.Errorf("message not found in file") } // If no messages remain, delete the file if len(filteredMessages) == 0 { return os.Remove(filename) } // Rewrite file with remaining messages return fpl.rewriteFileWithMessages(filename, filteredMessages) } // rewriteFileWithMessages rewrites a file with the given messages func (fpl *FilePersistenceLayer) rewriteFileWithMessages(filename string, messages []*Message) error { // Create temporary file tempFile := filename + ".tmp" file, err := os.Create(tempFile) if err != nil { return fmt.Errorf("failed to create temp file: %w", err) } defer file.Close() // Write each message to temp file for _, msg := range messages { // Create persisted message persistedMsg := &PersistedMessage{ ID: msg.ID, Topic: msg.Topic, Message: msg, Stored: time.Now(), Metadata: make(map[string]interface{}), } // Serialize message data, err := json.Marshal(persistedMsg) if err != nil { os.Remove(tempFile) return fmt.Errorf("failed to marshal message: %w", err) } // Apply encryption if enabled if fpl.encryption.Enabled { encryptedData, err := fpl.encrypt(data) if err != nil { os.Remove(tempFile) return fmt.Errorf("encryption failed: %w", err) } data = encryptedData persistedMsg.Encrypted = true } // Apply compression if enabled if fpl.compression { compressedData, err := fpl.compress(data) if err != nil { os.Remove(tempFile) return fmt.Errorf("compression failed: %w", err) } data = compressedData } // Write length prefix and data lengthPrefix := fmt.Sprintf("%d\n", len(data)) if _, err := file.WriteString(lengthPrefix); err != nil { os.Remove(tempFile) return fmt.Errorf("failed to write length prefix: %w", err) } if _, err := file.Write(data); err != nil { os.Remove(tempFile) return fmt.Errorf("failed to write data: %w", err) } } // Close temp file before rename file.Close() // Replace original file with temp file return os.Rename(tempFile, filename) } // InMemoryPersistenceLayer implements in-memory persistence for testing/development type InMemoryPersistenceLayer struct { messages map[string]*Message topics map[string][]string mu sync.RWMutex } // NewInMemoryPersistenceLayer creates a new in-memory persistence layer func NewInMemoryPersistenceLayer() *InMemoryPersistenceLayer { return &InMemoryPersistenceLayer{ messages: make(map[string]*Message), topics: make(map[string][]string), } } // Store stores a message in memory func (impl *InMemoryPersistenceLayer) Store(msg *Message) error { impl.mu.Lock() defer impl.mu.Unlock() impl.messages[msg.ID] = msg if _, exists := impl.topics[msg.Topic]; !exists { impl.topics[msg.Topic] = make([]string, 0) } impl.topics[msg.Topic] = append(impl.topics[msg.Topic], msg.ID) return nil } // Retrieve retrieves a message from memory func (impl *InMemoryPersistenceLayer) Retrieve(id string) (*Message, error) { impl.mu.RLock() defer impl.mu.RUnlock() msg, exists := impl.messages[id] if !exists { return nil, fmt.Errorf("message not found: %s", id) } return msg, nil } // Delete removes a message from memory func (impl *InMemoryPersistenceLayer) Delete(id string) error { impl.mu.Lock() defer impl.mu.Unlock() msg, exists := impl.messages[id] if !exists { return fmt.Errorf("message not found: %s", id) } delete(impl.messages, id) // Remove from topic index if messageIDs, exists := impl.topics[msg.Topic]; exists { for i, msgID := range messageIDs { if msgID == id { impl.topics[msg.Topic] = append(messageIDs[:i], messageIDs[i+1:]...) break } } } return nil } // List returns messages for a topic func (impl *InMemoryPersistenceLayer) List(topic string, limit int) ([]*Message, error) { impl.mu.RLock() defer impl.mu.RUnlock() messageIDs, exists := impl.topics[topic] if !exists { return []*Message{}, nil } var messages []*Message count := 0 for _, msgID := range messageIDs { if limit > 0 && count >= limit { break } if msg, exists := impl.messages[msgID]; exists { messages = append(messages, msg) count++ } } return messages, nil } // Cleanup removes messages older than maxAge func (impl *InMemoryPersistenceLayer) Cleanup(maxAge time.Duration) error { impl.mu.Lock() defer impl.mu.Unlock() cutoff := time.Now().Add(-maxAge) var toDelete []string for id, msg := range impl.messages { if msg.Timestamp.Before(cutoff) { toDelete = append(toDelete, id) } } for _, id := range toDelete { impl.Delete(id) } return nil }