package patterns import ( "context" "fmt" "sync" "time" "github.com/fraktal/mev-beta/internal/logger" ) // AdvancedPipeline implements sophisticated pipeline patterns for high-performance processing type AdvancedPipeline struct { stages []PipelineStage errorChan chan error metrics *PipelineMetrics logger *logger.Logger bufferSize int workers int ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // PipelineStage represents a processing stage in the pipeline type PipelineStage interface { Process(ctx context.Context, input <-chan interface{}, output chan<- interface{}) error Name() string GetMetrics() StageMetrics } // PipelineMetrics tracks pipeline performance type PipelineMetrics struct { TotalProcessed int64 TotalErrors int64 AverageLatency time.Duration ThroughputPerSec float64 BackpressureCount int64 StartTime time.Time mu sync.RWMutex } // StageMetrics tracks individual stage performance type StageMetrics struct { Name string Processed int64 Errors int64 AverageLatency time.Duration InputBuffer int OutputBuffer int WorkerCount int } // WorkerPoolStage implements a stage with worker pool type WorkerPoolStage struct { name string workerCount int processor func(interface{}) (interface{}, error) metrics StageMetrics mu sync.RWMutex } // NewAdvancedPipeline creates a new advanced pipeline func NewAdvancedPipeline(bufferSize, workers int, logger *logger.Logger) *AdvancedPipeline { ctx, cancel := context.WithCancel(context.Background()) return &AdvancedPipeline{ stages: make([]PipelineStage, 0), errorChan: make(chan error, 100), bufferSize: bufferSize, workers: workers, logger: logger, ctx: ctx, cancel: cancel, metrics: &PipelineMetrics{ StartTime: time.Now(), }, } } // AddStage adds a stage to the pipeline func (p *AdvancedPipeline) AddStage(stage PipelineStage) { p.stages = append(p.stages, stage) p.logger.Info(fmt.Sprintf("Added pipeline stage: %s", stage.Name())) } // Start starts the pipeline processing func (p *AdvancedPipeline) Start(input <-chan interface{}) <-chan interface{} { if len(p.stages) == 0 { p.logger.Error("No stages configured in pipeline") return nil } // Create channels between stages channels := make([]chan interface{}, len(p.stages)+1) channels[0] = make(chan interface{}, p.bufferSize) for i := 1; i <= len(p.stages); i++ { channels[i] = make(chan interface{}, p.bufferSize) } // Start input feeder p.wg.Add(1) go func() { defer p.wg.Done() defer close(channels[0]) for { select { case item, ok := <-input: if !ok { return } select { case channels[0] <- item: case <-p.ctx.Done(): return } case <-p.ctx.Done(): return } } }() // Start each stage for i, stage := range p.stages { p.wg.Add(1) go func(stageIndex int, s PipelineStage) { defer p.wg.Done() defer close(channels[stageIndex+1]) err := s.Process(p.ctx, channels[stageIndex], channels[stageIndex+1]) if err != nil { select { case p.errorChan <- fmt.Errorf("stage %s error: %v", s.Name(), err): default: } } }(i, stage) } // Start metrics collection go p.collectMetrics() // Return output channel return channels[len(p.stages)] } // collectMetrics collects pipeline metrics func (p *AdvancedPipeline) collectMetrics() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: p.updateMetrics() case <-p.ctx.Done(): return } } } // updateMetrics updates pipeline metrics func (p *AdvancedPipeline) updateMetrics() { p.metrics.mu.Lock() defer p.metrics.mu.Unlock() elapsed := time.Since(p.metrics.StartTime).Seconds() if elapsed > 0 { p.metrics.ThroughputPerSec = float64(p.metrics.TotalProcessed) / elapsed } } // Stop stops the pipeline func (p *AdvancedPipeline) Stop() { p.cancel() p.wg.Wait() close(p.errorChan) } // GetErrors returns error channel func (p *AdvancedPipeline) GetErrors() <-chan error { return p.errorChan } // GetMetrics returns current pipeline metrics. // The returned pointer should not be modified. func (p *AdvancedPipeline) GetMetrics() *PipelineMetrics { return p.metrics } // NewWorkerPoolStage creates a new worker pool stage func NewWorkerPoolStage(name string, workerCount int, processor func(interface{}) (interface{}, error)) *WorkerPoolStage { return &WorkerPoolStage{ name: name, workerCount: workerCount, processor: processor, metrics: StageMetrics{ Name: name, WorkerCount: workerCount, }, } } // Process implements PipelineStage interface func (wps *WorkerPoolStage) Process(ctx context.Context, input <-chan interface{}, output chan<- interface{}) error { var wg sync.WaitGroup // Start workers for i := 0; i < wps.workerCount; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for { select { case item, ok := <-input: if !ok { return } start := time.Now() result, err := wps.processor(item) latency := time.Since(start) wps.updateMetrics(latency, err == nil) if err != nil { continue // Skip failed items } select { case output <- result: case <-ctx.Done(): return } case <-ctx.Done(): return } } }(i) } wg.Wait() return nil } // updateMetrics updates stage metrics func (wps *WorkerPoolStage) updateMetrics(latency time.Duration, success bool) { wps.mu.Lock() defer wps.mu.Unlock() wps.metrics.Processed++ if !success { wps.metrics.Errors++ } // Update average latency (simple moving average) if wps.metrics.AverageLatency == 0 { wps.metrics.AverageLatency = latency } else { wps.metrics.AverageLatency = (wps.metrics.AverageLatency + latency) / 2 } } // Name returns stage name func (wps *WorkerPoolStage) Name() string { return wps.name } // GetMetrics returns stage metrics func (wps *WorkerPoolStage) GetMetrics() StageMetrics { wps.mu.RLock() defer wps.mu.RUnlock() return wps.metrics } // FanOutFanIn implements fan-out/fan-in pattern type FanOutFanIn struct { workers int bufferSize int logger *logger.Logger } // NewFanOutFanIn creates a new fan-out/fan-in processor func NewFanOutFanIn(workers, bufferSize int, logger *logger.Logger) *FanOutFanIn { return &FanOutFanIn{ workers: workers, bufferSize: bufferSize, logger: logger, } } // Process processes items using fan-out/fan-in pattern func (fofi *FanOutFanIn) Process(ctx context.Context, input <-chan interface{}, processor func(interface{}) (interface{}, error)) <-chan interface{} { output := make(chan interface{}, fofi.bufferSize) // Fan-out: distribute work to multiple workers workerInputs := make([]chan interface{}, fofi.workers) for i := 0; i < fofi.workers; i++ { workerInputs[i] = make(chan interface{}, fofi.bufferSize) } // Start distributor go func() { defer func() { for _, ch := range workerInputs { close(ch) } }() workerIndex := 0 for { select { case item, ok := <-input: if !ok { return } select { case workerInputs[workerIndex] <- item: workerIndex = (workerIndex + 1) % fofi.workers case <-ctx.Done(): return } case <-ctx.Done(): return } } }() // Start workers workerOutputs := make([]<-chan interface{}, fofi.workers) for i := 0; i < fofi.workers; i++ { workerOutput := make(chan interface{}, fofi.bufferSize) workerOutputs[i] = workerOutput go func(input <-chan interface{}, output chan<- interface{}) { defer close(output) for { select { case item, ok := <-input: if !ok { return } result, err := processor(item) if err != nil { fofi.logger.Error(fmt.Sprintf("Worker processing error: %v", err)) continue } select { case output <- result: case <-ctx.Done(): return } case <-ctx.Done(): return } } }(workerInputs[i], workerOutput) } // Fan-in: merge worker outputs go func() { defer close(output) var wg sync.WaitGroup for _, workerOutput := range workerOutputs { wg.Add(1) go func(input <-chan interface{}) { defer wg.Done() for { select { case item, ok := <-input: if !ok { return } select { case output <- item: case <-ctx.Done(): return } case <-ctx.Done(): return } } }(workerOutput) } wg.Wait() }() return output } // BackpressureHandler handles backpressure in pipeline stages type BackpressureHandler struct { threshold int strategy BackpressureStrategy metrics *BackpressureMetrics logger *logger.Logger } // BackpressureStrategy defines different backpressure handling strategies type BackpressureStrategy int const ( DropOldest BackpressureStrategy = iota DropNewest Block Sample ) // BackpressureMetrics tracks backpressure events type BackpressureMetrics struct { DroppedItems int64 BlockedCount int64 SampledItems int64 TotalItems int64 mu sync.RWMutex } // NewBackpressureHandler creates a new backpressure handler func NewBackpressureHandler(threshold int, strategy BackpressureStrategy, logger *logger.Logger) *BackpressureHandler { return &BackpressureHandler{ threshold: threshold, strategy: strategy, metrics: &BackpressureMetrics{}, logger: logger, } } // HandleBackpressure applies backpressure strategy to a channel func (bh *BackpressureHandler) HandleBackpressure(ctx context.Context, input <-chan interface{}, output chan interface{}) { buffer := make([]interface{}, 0, bh.threshold*2) for { select { case item, ok := <-input: if !ok { // Flush remaining items for _, bufferedItem := range buffer { select { case output <- bufferedItem: case <-ctx.Done(): return } } return } bh.metrics.mu.Lock() bh.metrics.TotalItems++ bh.metrics.mu.Unlock() // Check if we need to apply backpressure if len(buffer) >= bh.threshold { switch bh.strategy { case DropOldest: if len(buffer) > 0 { buffer = buffer[1:] bh.metrics.mu.Lock() bh.metrics.DroppedItems++ bh.metrics.mu.Unlock() } buffer = append(buffer, item) case DropNewest: bh.metrics.mu.Lock() bh.metrics.DroppedItems++ bh.metrics.mu.Unlock() continue // Drop the new item case Block: bh.metrics.mu.Lock() bh.metrics.BlockedCount++ bh.metrics.mu.Unlock() // Try to send oldest item (blocking) if len(buffer) > 0 { select { case output <- buffer[0]: buffer = buffer[1:] case <-ctx.Done(): return } } buffer = append(buffer, item) case Sample: // Keep every nth item when under pressure bh.metrics.mu.Lock() sampleRate := bh.metrics.TotalItems % 5 // Keep every 5th item bh.metrics.mu.Unlock() if sampleRate == 0 { if len(buffer) > 0 { buffer = buffer[1:] } buffer = append(buffer, item) } else { bh.metrics.mu.Lock() bh.metrics.SampledItems++ bh.metrics.mu.Unlock() } } } else { buffer = append(buffer, item) } case <-ctx.Done(): return } // Try to drain buffer for len(buffer) > 0 { select { case output <- buffer[0]: buffer = buffer[1:] case <-ctx.Done(): return default: // Can't send more, break out of drain loop break } } } } // GetMetrics returns backpressure metrics. // The returned pointer should not be modified. func (bh *BackpressureHandler) GetMetrics() *BackpressureMetrics { return bh.metrics }