Files
mev-beta/pkg/patterns/pipeline.go

542 lines
12 KiB
Go

package patterns
import (
"context"
"fmt"
"sync"
"time"
"github.com/fraktal/mev-beta/internal/logger"
)
// AdvancedPipeline implements sophisticated pipeline patterns for high-performance processing
type AdvancedPipeline struct {
stages []PipelineStage
errorChan chan error
metrics *PipelineMetrics
logger *logger.Logger
bufferSize int
workers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// PipelineStage represents a processing stage in the pipeline
type PipelineStage interface {
Process(ctx context.Context, input <-chan interface{}, output chan<- interface{}) error
Name() string
GetMetrics() StageMetrics
}
// PipelineMetrics tracks pipeline performance
type PipelineMetrics struct {
TotalProcessed int64
TotalErrors int64
AverageLatency time.Duration
ThroughputPerSec float64
BackpressureCount int64
StartTime time.Time
mu sync.RWMutex
}
// StageMetrics tracks individual stage performance
type StageMetrics struct {
Name string
Processed int64
Errors int64
AverageLatency time.Duration
InputBuffer int
OutputBuffer int
WorkerCount int
}
// WorkerPoolStage implements a stage with worker pool
type WorkerPoolStage struct {
name string
workerCount int
processor func(interface{}) (interface{}, error)
metrics StageMetrics
mu sync.RWMutex
}
// NewAdvancedPipeline creates a new advanced pipeline
func NewAdvancedPipeline(bufferSize, workers int, logger *logger.Logger) *AdvancedPipeline {
ctx, cancel := context.WithCancel(context.Background())
return &AdvancedPipeline{
stages: make([]PipelineStage, 0),
errorChan: make(chan error, 100),
bufferSize: bufferSize,
workers: workers,
logger: logger,
ctx: ctx,
cancel: cancel,
metrics: &PipelineMetrics{
StartTime: time.Now(),
},
}
}
// AddStage adds a stage to the pipeline
func (p *AdvancedPipeline) AddStage(stage PipelineStage) {
p.stages = append(p.stages, stage)
p.logger.Info(fmt.Sprintf("Added pipeline stage: %s", stage.Name()))
}
// Start starts the pipeline processing
func (p *AdvancedPipeline) Start(input <-chan interface{}) <-chan interface{} {
if len(p.stages) == 0 {
p.logger.Error("No stages configured in pipeline")
return nil
}
// Create channels between stages
channels := make([]chan interface{}, len(p.stages)+1)
channels[0] = make(chan interface{}, p.bufferSize)
for i := 1; i <= len(p.stages); i++ {
channels[i] = make(chan interface{}, p.bufferSize)
}
// Start input feeder
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer close(channels[0])
for {
select {
case item, ok := <-input:
if !ok {
return
}
select {
case channels[0] <- item:
case <-p.ctx.Done():
return
}
case <-p.ctx.Done():
return
}
}
}()
// Start each stage
for i, stage := range p.stages {
p.wg.Add(1)
go func(stageIndex int, s PipelineStage) {
defer p.wg.Done()
defer close(channels[stageIndex+1])
err := s.Process(p.ctx, channels[stageIndex], channels[stageIndex+1])
if err != nil {
select {
case p.errorChan <- fmt.Errorf("stage %s error: %v", s.Name(), err):
default:
}
}
}(i, stage)
}
// Start metrics collection
go p.collectMetrics()
// Return output channel
return channels[len(p.stages)]
}
// collectMetrics collects pipeline metrics
func (p *AdvancedPipeline) collectMetrics() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.updateMetrics()
case <-p.ctx.Done():
return
}
}
}
// updateMetrics updates pipeline metrics
func (p *AdvancedPipeline) updateMetrics() {
p.metrics.mu.Lock()
defer p.metrics.mu.Unlock()
elapsed := time.Since(p.metrics.StartTime).Seconds()
if elapsed > 0 {
p.metrics.ThroughputPerSec = float64(p.metrics.TotalProcessed) / elapsed
}
}
// Stop stops the pipeline
func (p *AdvancedPipeline) Stop() {
p.cancel()
p.wg.Wait()
close(p.errorChan)
}
// GetErrors returns error channel
func (p *AdvancedPipeline) GetErrors() <-chan error {
return p.errorChan
}
// GetMetrics returns current pipeline metrics.
// The returned pointer should not be modified.
func (p *AdvancedPipeline) GetMetrics() *PipelineMetrics {
return p.metrics
}
// NewWorkerPoolStage creates a new worker pool stage
func NewWorkerPoolStage(name string, workerCount int, processor func(interface{}) (interface{}, error)) *WorkerPoolStage {
return &WorkerPoolStage{
name: name,
workerCount: workerCount,
processor: processor,
metrics: StageMetrics{
Name: name,
WorkerCount: workerCount,
},
}
}
// Process implements PipelineStage interface
func (wps *WorkerPoolStage) Process(ctx context.Context, input <-chan interface{}, output chan<- interface{}) error {
var wg sync.WaitGroup
// Start workers
for i := 0; i < wps.workerCount; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
select {
case item, ok := <-input:
if !ok {
return
}
start := time.Now()
result, err := wps.processor(item)
latency := time.Since(start)
wps.updateMetrics(latency, err == nil)
if err != nil {
continue // Skip failed items
}
select {
case output <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(i)
}
wg.Wait()
return nil
}
// updateMetrics updates stage metrics
func (wps *WorkerPoolStage) updateMetrics(latency time.Duration, success bool) {
wps.mu.Lock()
defer wps.mu.Unlock()
wps.metrics.Processed++
if !success {
wps.metrics.Errors++
}
// Update average latency (simple moving average)
if wps.metrics.AverageLatency == 0 {
wps.metrics.AverageLatency = latency
} else {
wps.metrics.AverageLatency = (wps.metrics.AverageLatency + latency) / 2
}
}
// Name returns stage name
func (wps *WorkerPoolStage) Name() string {
return wps.name
}
// GetMetrics returns stage metrics
func (wps *WorkerPoolStage) GetMetrics() StageMetrics {
wps.mu.RLock()
defer wps.mu.RUnlock()
return wps.metrics
}
// FanOutFanIn implements fan-out/fan-in pattern
type FanOutFanIn struct {
workers int
bufferSize int
logger *logger.Logger
}
// NewFanOutFanIn creates a new fan-out/fan-in processor
func NewFanOutFanIn(workers, bufferSize int, logger *logger.Logger) *FanOutFanIn {
return &FanOutFanIn{
workers: workers,
bufferSize: bufferSize,
logger: logger,
}
}
// Process processes items using fan-out/fan-in pattern
func (fofi *FanOutFanIn) Process(ctx context.Context, input <-chan interface{}, processor func(interface{}) (interface{}, error)) <-chan interface{} {
output := make(chan interface{}, fofi.bufferSize)
// Fan-out: distribute work to multiple workers
workerInputs := make([]chan interface{}, fofi.workers)
for i := 0; i < fofi.workers; i++ {
workerInputs[i] = make(chan interface{}, fofi.bufferSize)
}
// Start distributor
go func() {
defer func() {
for _, ch := range workerInputs {
close(ch)
}
}()
workerIndex := 0
for {
select {
case item, ok := <-input:
if !ok {
return
}
select {
case workerInputs[workerIndex] <- item:
workerIndex = (workerIndex + 1) % fofi.workers
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
// Start workers
workerOutputs := make([]<-chan interface{}, fofi.workers)
for i := 0; i < fofi.workers; i++ {
workerOutput := make(chan interface{}, fofi.bufferSize)
workerOutputs[i] = workerOutput
go func(input <-chan interface{}, output chan<- interface{}) {
defer close(output)
for {
select {
case item, ok := <-input:
if !ok {
return
}
result, err := processor(item)
if err != nil {
fofi.logger.Error(fmt.Sprintf("Worker processing error: %v", err))
continue
}
select {
case output <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(workerInputs[i], workerOutput)
}
// Fan-in: merge worker outputs
go func() {
defer close(output)
var wg sync.WaitGroup
for _, workerOutput := range workerOutputs {
wg.Add(1)
go func(input <-chan interface{}) {
defer wg.Done()
for {
select {
case item, ok := <-input:
if !ok {
return
}
select {
case output <- item:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(workerOutput)
}
wg.Wait()
}()
return output
}
// BackpressureHandler handles backpressure in pipeline stages
type BackpressureHandler struct {
threshold int
strategy BackpressureStrategy
metrics *BackpressureMetrics
logger *logger.Logger
}
// BackpressureStrategy defines different backpressure handling strategies
type BackpressureStrategy int
const (
DropOldest BackpressureStrategy = iota
DropNewest
Block
Sample
)
// BackpressureMetrics tracks backpressure events
type BackpressureMetrics struct {
DroppedItems int64
BlockedCount int64
SampledItems int64
TotalItems int64
mu sync.RWMutex
}
// NewBackpressureHandler creates a new backpressure handler
func NewBackpressureHandler(threshold int, strategy BackpressureStrategy, logger *logger.Logger) *BackpressureHandler {
return &BackpressureHandler{
threshold: threshold,
strategy: strategy,
metrics: &BackpressureMetrics{},
logger: logger,
}
}
// HandleBackpressure applies backpressure strategy to a channel
func (bh *BackpressureHandler) HandleBackpressure(ctx context.Context, input <-chan interface{}, output chan interface{}) {
buffer := make([]interface{}, 0, bh.threshold*2)
for {
select {
case item, ok := <-input:
if !ok {
// Flush remaining items
for _, bufferedItem := range buffer {
select {
case output <- bufferedItem:
case <-ctx.Done():
return
}
}
return
}
bh.metrics.mu.Lock()
bh.metrics.TotalItems++
bh.metrics.mu.Unlock()
// Check if we need to apply backpressure
if len(buffer) >= bh.threshold {
switch bh.strategy {
case DropOldest:
if len(buffer) > 0 {
buffer = buffer[1:]
bh.metrics.mu.Lock()
bh.metrics.DroppedItems++
bh.metrics.mu.Unlock()
}
buffer = append(buffer, item)
case DropNewest:
bh.metrics.mu.Lock()
bh.metrics.DroppedItems++
bh.metrics.mu.Unlock()
continue // Drop the new item
case Block:
bh.metrics.mu.Lock()
bh.metrics.BlockedCount++
bh.metrics.mu.Unlock()
// Try to send oldest item (blocking)
if len(buffer) > 0 {
select {
case output <- buffer[0]:
buffer = buffer[1:]
case <-ctx.Done():
return
}
}
buffer = append(buffer, item)
case Sample:
// Keep every nth item when under pressure
bh.metrics.mu.Lock()
sampleRate := bh.metrics.TotalItems % 5 // Keep every 5th item
bh.metrics.mu.Unlock()
if sampleRate == 0 {
if len(buffer) > 0 {
buffer = buffer[1:]
}
buffer = append(buffer, item)
} else {
bh.metrics.mu.Lock()
bh.metrics.SampledItems++
bh.metrics.mu.Unlock()
}
}
} else {
buffer = append(buffer, item)
}
case <-ctx.Done():
return
}
// Try to drain buffer
for len(buffer) > 0 {
select {
case output <- buffer[0]:
buffer = buffer[1:]
case <-ctx.Done():
return
default:
// Can't send more, break out of drain loop
break
}
}
}
}
// GetMetrics returns backpressure metrics.
// The returned pointer should not be modified.
func (bh *BackpressureHandler) GetMetrics() *BackpressureMetrics {
return bh.metrics
}