542 lines
12 KiB
Go
542 lines
12 KiB
Go
package patterns
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fraktal/mev-beta/internal/logger"
|
|
)
|
|
|
|
// AdvancedPipeline implements sophisticated pipeline patterns for high-performance processing
|
|
type AdvancedPipeline struct {
|
|
stages []PipelineStage
|
|
errorChan chan error
|
|
metrics *PipelineMetrics
|
|
logger *logger.Logger
|
|
bufferSize int
|
|
workers int
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// PipelineStage represents a processing stage in the pipeline
|
|
type PipelineStage interface {
|
|
Process(ctx context.Context, input <-chan interface{}, output chan<- interface{}) error
|
|
Name() string
|
|
GetMetrics() StageMetrics
|
|
}
|
|
|
|
// PipelineMetrics tracks pipeline performance
|
|
type PipelineMetrics struct {
|
|
TotalProcessed int64
|
|
TotalErrors int64
|
|
AverageLatency time.Duration
|
|
ThroughputPerSec float64
|
|
BackpressureCount int64
|
|
StartTime time.Time
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// StageMetrics tracks individual stage performance
|
|
type StageMetrics struct {
|
|
Name string
|
|
Processed int64
|
|
Errors int64
|
|
AverageLatency time.Duration
|
|
InputBuffer int
|
|
OutputBuffer int
|
|
WorkerCount int
|
|
}
|
|
|
|
// WorkerPoolStage implements a stage with worker pool
|
|
type WorkerPoolStage struct {
|
|
name string
|
|
workerCount int
|
|
processor func(interface{}) (interface{}, error)
|
|
metrics StageMetrics
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewAdvancedPipeline creates a new advanced pipeline
|
|
func NewAdvancedPipeline(bufferSize, workers int, logger *logger.Logger) *AdvancedPipeline {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &AdvancedPipeline{
|
|
stages: make([]PipelineStage, 0),
|
|
errorChan: make(chan error, 100),
|
|
bufferSize: bufferSize,
|
|
workers: workers,
|
|
logger: logger,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
metrics: &PipelineMetrics{
|
|
StartTime: time.Now(),
|
|
},
|
|
}
|
|
}
|
|
|
|
// AddStage adds a stage to the pipeline
|
|
func (p *AdvancedPipeline) AddStage(stage PipelineStage) {
|
|
p.stages = append(p.stages, stage)
|
|
p.logger.Info(fmt.Sprintf("Added pipeline stage: %s", stage.Name()))
|
|
}
|
|
|
|
// Start starts the pipeline processing
|
|
func (p *AdvancedPipeline) Start(input <-chan interface{}) <-chan interface{} {
|
|
if len(p.stages) == 0 {
|
|
p.logger.Error("No stages configured in pipeline")
|
|
return nil
|
|
}
|
|
|
|
// Create channels between stages
|
|
channels := make([]chan interface{}, len(p.stages)+1)
|
|
channels[0] = make(chan interface{}, p.bufferSize)
|
|
|
|
for i := 1; i <= len(p.stages); i++ {
|
|
channels[i] = make(chan interface{}, p.bufferSize)
|
|
}
|
|
|
|
// Start input feeder
|
|
p.wg.Add(1)
|
|
go func() {
|
|
defer p.wg.Done()
|
|
defer close(channels[0])
|
|
|
|
for {
|
|
select {
|
|
case item, ok := <-input:
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case channels[0] <- item:
|
|
case <-p.ctx.Done():
|
|
return
|
|
}
|
|
case <-p.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Start each stage
|
|
for i, stage := range p.stages {
|
|
p.wg.Add(1)
|
|
go func(stageIndex int, s PipelineStage) {
|
|
defer p.wg.Done()
|
|
defer close(channels[stageIndex+1])
|
|
|
|
err := s.Process(p.ctx, channels[stageIndex], channels[stageIndex+1])
|
|
if err != nil {
|
|
select {
|
|
case p.errorChan <- fmt.Errorf("stage %s error: %v", s.Name(), err):
|
|
default:
|
|
}
|
|
}
|
|
}(i, stage)
|
|
}
|
|
|
|
// Start metrics collection
|
|
go p.collectMetrics()
|
|
|
|
// Return output channel
|
|
return channels[len(p.stages)]
|
|
}
|
|
|
|
// collectMetrics collects pipeline metrics
|
|
func (p *AdvancedPipeline) collectMetrics() {
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
p.updateMetrics()
|
|
case <-p.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateMetrics updates pipeline metrics
|
|
func (p *AdvancedPipeline) updateMetrics() {
|
|
p.metrics.mu.Lock()
|
|
defer p.metrics.mu.Unlock()
|
|
|
|
elapsed := time.Since(p.metrics.StartTime).Seconds()
|
|
if elapsed > 0 {
|
|
p.metrics.ThroughputPerSec = float64(p.metrics.TotalProcessed) / elapsed
|
|
}
|
|
}
|
|
|
|
// Stop stops the pipeline
|
|
func (p *AdvancedPipeline) Stop() {
|
|
p.cancel()
|
|
p.wg.Wait()
|
|
close(p.errorChan)
|
|
}
|
|
|
|
// GetErrors returns error channel
|
|
func (p *AdvancedPipeline) GetErrors() <-chan error {
|
|
return p.errorChan
|
|
}
|
|
|
|
// GetMetrics returns current pipeline metrics.
|
|
// The returned pointer should not be modified.
|
|
func (p *AdvancedPipeline) GetMetrics() *PipelineMetrics {
|
|
return p.metrics
|
|
}
|
|
|
|
// NewWorkerPoolStage creates a new worker pool stage
|
|
func NewWorkerPoolStage(name string, workerCount int, processor func(interface{}) (interface{}, error)) *WorkerPoolStage {
|
|
return &WorkerPoolStage{
|
|
name: name,
|
|
workerCount: workerCount,
|
|
processor: processor,
|
|
metrics: StageMetrics{
|
|
Name: name,
|
|
WorkerCount: workerCount,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Process implements PipelineStage interface
|
|
func (wps *WorkerPoolStage) Process(ctx context.Context, input <-chan interface{}, output chan<- interface{}) error {
|
|
var wg sync.WaitGroup
|
|
|
|
// Start workers
|
|
for i := 0; i < wps.workerCount; i++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case item, ok := <-input:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
start := time.Now()
|
|
result, err := wps.processor(item)
|
|
latency := time.Since(start)
|
|
|
|
wps.updateMetrics(latency, err == nil)
|
|
|
|
if err != nil {
|
|
continue // Skip failed items
|
|
}
|
|
|
|
select {
|
|
case output <- result:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// updateMetrics updates stage metrics
|
|
func (wps *WorkerPoolStage) updateMetrics(latency time.Duration, success bool) {
|
|
wps.mu.Lock()
|
|
defer wps.mu.Unlock()
|
|
|
|
wps.metrics.Processed++
|
|
if !success {
|
|
wps.metrics.Errors++
|
|
}
|
|
|
|
// Update average latency (simple moving average)
|
|
if wps.metrics.AverageLatency == 0 {
|
|
wps.metrics.AverageLatency = latency
|
|
} else {
|
|
wps.metrics.AverageLatency = (wps.metrics.AverageLatency + latency) / 2
|
|
}
|
|
}
|
|
|
|
// Name returns stage name
|
|
func (wps *WorkerPoolStage) Name() string {
|
|
return wps.name
|
|
}
|
|
|
|
// GetMetrics returns stage metrics
|
|
func (wps *WorkerPoolStage) GetMetrics() StageMetrics {
|
|
wps.mu.RLock()
|
|
defer wps.mu.RUnlock()
|
|
return wps.metrics
|
|
}
|
|
|
|
// FanOutFanIn implements fan-out/fan-in pattern
|
|
type FanOutFanIn struct {
|
|
workers int
|
|
bufferSize int
|
|
logger *logger.Logger
|
|
}
|
|
|
|
// NewFanOutFanIn creates a new fan-out/fan-in processor
|
|
func NewFanOutFanIn(workers, bufferSize int, logger *logger.Logger) *FanOutFanIn {
|
|
return &FanOutFanIn{
|
|
workers: workers,
|
|
bufferSize: bufferSize,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Process processes items using fan-out/fan-in pattern
|
|
func (fofi *FanOutFanIn) Process(ctx context.Context, input <-chan interface{}, processor func(interface{}) (interface{}, error)) <-chan interface{} {
|
|
output := make(chan interface{}, fofi.bufferSize)
|
|
|
|
// Fan-out: distribute work to multiple workers
|
|
workerInputs := make([]chan interface{}, fofi.workers)
|
|
for i := 0; i < fofi.workers; i++ {
|
|
workerInputs[i] = make(chan interface{}, fofi.bufferSize)
|
|
}
|
|
|
|
// Start distributor
|
|
go func() {
|
|
defer func() {
|
|
for _, ch := range workerInputs {
|
|
close(ch)
|
|
}
|
|
}()
|
|
|
|
workerIndex := 0
|
|
for {
|
|
select {
|
|
case item, ok := <-input:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case workerInputs[workerIndex] <- item:
|
|
workerIndex = (workerIndex + 1) % fofi.workers
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Start workers
|
|
workerOutputs := make([]<-chan interface{}, fofi.workers)
|
|
for i := 0; i < fofi.workers; i++ {
|
|
workerOutput := make(chan interface{}, fofi.bufferSize)
|
|
workerOutputs[i] = workerOutput
|
|
|
|
go func(input <-chan interface{}, output chan<- interface{}) {
|
|
defer close(output)
|
|
|
|
for {
|
|
select {
|
|
case item, ok := <-input:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
result, err := processor(item)
|
|
if err != nil {
|
|
fofi.logger.Error(fmt.Sprintf("Worker processing error: %v", err))
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case output <- result:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}(workerInputs[i], workerOutput)
|
|
}
|
|
|
|
// Fan-in: merge worker outputs
|
|
go func() {
|
|
defer close(output)
|
|
|
|
var wg sync.WaitGroup
|
|
for _, workerOutput := range workerOutputs {
|
|
wg.Add(1)
|
|
go func(input <-chan interface{}) {
|
|
defer wg.Done()
|
|
for {
|
|
select {
|
|
case item, ok := <-input:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case output <- item:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}(workerOutput)
|
|
}
|
|
wg.Wait()
|
|
}()
|
|
|
|
return output
|
|
}
|
|
|
|
// BackpressureHandler handles backpressure in pipeline stages
|
|
type BackpressureHandler struct {
|
|
threshold int
|
|
strategy BackpressureStrategy
|
|
metrics *BackpressureMetrics
|
|
logger *logger.Logger
|
|
}
|
|
|
|
// BackpressureStrategy defines different backpressure handling strategies
|
|
type BackpressureStrategy int
|
|
|
|
const (
|
|
DropOldest BackpressureStrategy = iota
|
|
DropNewest
|
|
Block
|
|
Sample
|
|
)
|
|
|
|
// BackpressureMetrics tracks backpressure events
|
|
type BackpressureMetrics struct {
|
|
DroppedItems int64
|
|
BlockedCount int64
|
|
SampledItems int64
|
|
TotalItems int64
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewBackpressureHandler creates a new backpressure handler
|
|
func NewBackpressureHandler(threshold int, strategy BackpressureStrategy, logger *logger.Logger) *BackpressureHandler {
|
|
return &BackpressureHandler{
|
|
threshold: threshold,
|
|
strategy: strategy,
|
|
metrics: &BackpressureMetrics{},
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// HandleBackpressure applies backpressure strategy to a channel
|
|
func (bh *BackpressureHandler) HandleBackpressure(ctx context.Context, input <-chan interface{}, output chan interface{}) {
|
|
buffer := make([]interface{}, 0, bh.threshold*2)
|
|
|
|
for {
|
|
select {
|
|
case item, ok := <-input:
|
|
if !ok {
|
|
// Flush remaining items
|
|
for _, bufferedItem := range buffer {
|
|
select {
|
|
case output <- bufferedItem:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
bh.metrics.mu.Lock()
|
|
bh.metrics.TotalItems++
|
|
bh.metrics.mu.Unlock()
|
|
|
|
// Check if we need to apply backpressure
|
|
if len(buffer) >= bh.threshold {
|
|
switch bh.strategy {
|
|
case DropOldest:
|
|
if len(buffer) > 0 {
|
|
buffer = buffer[1:]
|
|
bh.metrics.mu.Lock()
|
|
bh.metrics.DroppedItems++
|
|
bh.metrics.mu.Unlock()
|
|
}
|
|
buffer = append(buffer, item)
|
|
|
|
case DropNewest:
|
|
bh.metrics.mu.Lock()
|
|
bh.metrics.DroppedItems++
|
|
bh.metrics.mu.Unlock()
|
|
continue // Drop the new item
|
|
|
|
case Block:
|
|
bh.metrics.mu.Lock()
|
|
bh.metrics.BlockedCount++
|
|
bh.metrics.mu.Unlock()
|
|
// Try to send oldest item (blocking)
|
|
if len(buffer) > 0 {
|
|
select {
|
|
case output <- buffer[0]:
|
|
buffer = buffer[1:]
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
buffer = append(buffer, item)
|
|
|
|
case Sample:
|
|
// Keep every nth item when under pressure
|
|
bh.metrics.mu.Lock()
|
|
sampleRate := bh.metrics.TotalItems % 5 // Keep every 5th item
|
|
bh.metrics.mu.Unlock()
|
|
|
|
if sampleRate == 0 {
|
|
if len(buffer) > 0 {
|
|
buffer = buffer[1:]
|
|
}
|
|
buffer = append(buffer, item)
|
|
} else {
|
|
bh.metrics.mu.Lock()
|
|
bh.metrics.SampledItems++
|
|
bh.metrics.mu.Unlock()
|
|
}
|
|
}
|
|
} else {
|
|
buffer = append(buffer, item)
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
// Try to drain buffer
|
|
for len(buffer) > 0 {
|
|
select {
|
|
case output <- buffer[0]:
|
|
buffer = buffer[1:]
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
// Can't send more, break out of drain loop
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetMetrics returns backpressure metrics.
|
|
// The returned pointer should not be modified.
|
|
func (bh *BackpressureHandler) GetMetrics() *BackpressureMetrics {
|
|
return bh.metrics
|
|
}
|