Sequencer is working (minimal parsing)
This commit is contained in:
541
pkg/patterns/pipeline.go
Normal file
541
pkg/patterns/pipeline.go
Normal file
@@ -0,0 +1,541 @@
|
||||
package patterns
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fraktal/mev-beta/internal/logger"
|
||||
)
|
||||
|
||||
// AdvancedPipeline implements sophisticated pipeline patterns for high-performance processing
|
||||
type AdvancedPipeline struct {
|
||||
stages []PipelineStage
|
||||
errorChan chan error
|
||||
metrics *PipelineMetrics
|
||||
logger *logger.Logger
|
||||
bufferSize int
|
||||
workers int
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// PipelineStage represents a processing stage in the pipeline
|
||||
type PipelineStage interface {
|
||||
Process(ctx context.Context, input <-chan interface{}, output chan<- interface{}) error
|
||||
Name() string
|
||||
GetMetrics() StageMetrics
|
||||
}
|
||||
|
||||
// PipelineMetrics tracks pipeline performance
|
||||
type PipelineMetrics struct {
|
||||
TotalProcessed int64
|
||||
TotalErrors int64
|
||||
AverageLatency time.Duration
|
||||
ThroughputPerSec float64
|
||||
BackpressureCount int64
|
||||
StartTime time.Time
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// StageMetrics tracks individual stage performance
|
||||
type StageMetrics struct {
|
||||
Name string
|
||||
Processed int64
|
||||
Errors int64
|
||||
AverageLatency time.Duration
|
||||
InputBuffer int
|
||||
OutputBuffer int
|
||||
WorkerCount int
|
||||
}
|
||||
|
||||
// WorkerPoolStage implements a stage with worker pool
|
||||
type WorkerPoolStage struct {
|
||||
name string
|
||||
workerCount int
|
||||
processor func(interface{}) (interface{}, error)
|
||||
metrics StageMetrics
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewAdvancedPipeline creates a new advanced pipeline
|
||||
func NewAdvancedPipeline(bufferSize, workers int, logger *logger.Logger) *AdvancedPipeline {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &AdvancedPipeline{
|
||||
stages: make([]PipelineStage, 0),
|
||||
errorChan: make(chan error, 100),
|
||||
bufferSize: bufferSize,
|
||||
workers: workers,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
metrics: &PipelineMetrics{
|
||||
StartTime: time.Now(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// AddStage adds a stage to the pipeline
|
||||
func (p *AdvancedPipeline) AddStage(stage PipelineStage) {
|
||||
p.stages = append(p.stages, stage)
|
||||
p.logger.Info(fmt.Sprintf("Added pipeline stage: %s", stage.Name()))
|
||||
}
|
||||
|
||||
// Start starts the pipeline processing
|
||||
func (p *AdvancedPipeline) Start(input <-chan interface{}) <-chan interface{} {
|
||||
if len(p.stages) == 0 {
|
||||
p.logger.Error("No stages configured in pipeline")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create channels between stages
|
||||
channels := make([]chan interface{}, len(p.stages)+1)
|
||||
channels[0] = make(chan interface{}, p.bufferSize)
|
||||
|
||||
for i := 1; i <= len(p.stages); i++ {
|
||||
channels[i] = make(chan interface{}, p.bufferSize)
|
||||
}
|
||||
|
||||
// Start input feeder
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
defer close(channels[0])
|
||||
|
||||
for {
|
||||
select {
|
||||
case item, ok := <-input:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case channels[0] <- item:
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
}
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start each stage
|
||||
for i, stage := range p.stages {
|
||||
p.wg.Add(1)
|
||||
go func(stageIndex int, s PipelineStage) {
|
||||
defer p.wg.Done()
|
||||
defer close(channels[stageIndex+1])
|
||||
|
||||
err := s.Process(p.ctx, channels[stageIndex], channels[stageIndex+1])
|
||||
if err != nil {
|
||||
select {
|
||||
case p.errorChan <- fmt.Errorf("stage %s error: %v", s.Name(), err):
|
||||
default:
|
||||
}
|
||||
}
|
||||
}(i, stage)
|
||||
}
|
||||
|
||||
// Start metrics collection
|
||||
go p.collectMetrics()
|
||||
|
||||
// Return output channel
|
||||
return channels[len(p.stages)]
|
||||
}
|
||||
|
||||
// collectMetrics collects pipeline metrics
|
||||
func (p *AdvancedPipeline) collectMetrics() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
p.updateMetrics()
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateMetrics updates pipeline metrics
|
||||
func (p *AdvancedPipeline) updateMetrics() {
|
||||
p.metrics.mu.Lock()
|
||||
defer p.metrics.mu.Unlock()
|
||||
|
||||
elapsed := time.Since(p.metrics.StartTime).Seconds()
|
||||
if elapsed > 0 {
|
||||
p.metrics.ThroughputPerSec = float64(p.metrics.TotalProcessed) / elapsed
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the pipeline
|
||||
func (p *AdvancedPipeline) Stop() {
|
||||
p.cancel()
|
||||
p.wg.Wait()
|
||||
close(p.errorChan)
|
||||
}
|
||||
|
||||
// GetErrors returns error channel
|
||||
func (p *AdvancedPipeline) GetErrors() <-chan error {
|
||||
return p.errorChan
|
||||
}
|
||||
|
||||
// GetMetrics returns current pipeline metrics.
|
||||
// The returned pointer should not be modified.
|
||||
func (p *AdvancedPipeline) GetMetrics() *PipelineMetrics {
|
||||
return p.metrics
|
||||
}
|
||||
|
||||
// NewWorkerPoolStage creates a new worker pool stage
|
||||
func NewWorkerPoolStage(name string, workerCount int, processor func(interface{}) (interface{}, error)) *WorkerPoolStage {
|
||||
return &WorkerPoolStage{
|
||||
name: name,
|
||||
workerCount: workerCount,
|
||||
processor: processor,
|
||||
metrics: StageMetrics{
|
||||
Name: name,
|
||||
WorkerCount: workerCount,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Process implements PipelineStage interface
|
||||
func (wps *WorkerPoolStage) Process(ctx context.Context, input <-chan interface{}, output chan<- interface{}) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start workers
|
||||
for i := 0; i < wps.workerCount; i++ {
|
||||
wg.Add(1)
|
||||
go func(workerID int) {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case item, ok := <-input:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
result, err := wps.processor(item)
|
||||
latency := time.Since(start)
|
||||
|
||||
wps.updateMetrics(latency, err == nil)
|
||||
|
||||
if err != nil {
|
||||
continue // Skip failed items
|
||||
}
|
||||
|
||||
select {
|
||||
case output <- result:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateMetrics updates stage metrics
|
||||
func (wps *WorkerPoolStage) updateMetrics(latency time.Duration, success bool) {
|
||||
wps.mu.Lock()
|
||||
defer wps.mu.Unlock()
|
||||
|
||||
wps.metrics.Processed++
|
||||
if !success {
|
||||
wps.metrics.Errors++
|
||||
}
|
||||
|
||||
// Update average latency (simple moving average)
|
||||
if wps.metrics.AverageLatency == 0 {
|
||||
wps.metrics.AverageLatency = latency
|
||||
} else {
|
||||
wps.metrics.AverageLatency = (wps.metrics.AverageLatency + latency) / 2
|
||||
}
|
||||
}
|
||||
|
||||
// Name returns stage name
|
||||
func (wps *WorkerPoolStage) Name() string {
|
||||
return wps.name
|
||||
}
|
||||
|
||||
// GetMetrics returns stage metrics
|
||||
func (wps *WorkerPoolStage) GetMetrics() StageMetrics {
|
||||
wps.mu.RLock()
|
||||
defer wps.mu.RUnlock()
|
||||
return wps.metrics
|
||||
}
|
||||
|
||||
// FanOutFanIn implements fan-out/fan-in pattern
|
||||
type FanOutFanIn struct {
|
||||
workers int
|
||||
bufferSize int
|
||||
logger *logger.Logger
|
||||
}
|
||||
|
||||
// NewFanOutFanIn creates a new fan-out/fan-in processor
|
||||
func NewFanOutFanIn(workers, bufferSize int, logger *logger.Logger) *FanOutFanIn {
|
||||
return &FanOutFanIn{
|
||||
workers: workers,
|
||||
bufferSize: bufferSize,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Process processes items using fan-out/fan-in pattern
|
||||
func (fofi *FanOutFanIn) Process(ctx context.Context, input <-chan interface{}, processor func(interface{}) (interface{}, error)) <-chan interface{} {
|
||||
output := make(chan interface{}, fofi.bufferSize)
|
||||
|
||||
// Fan-out: distribute work to multiple workers
|
||||
workerInputs := make([]chan interface{}, fofi.workers)
|
||||
for i := 0; i < fofi.workers; i++ {
|
||||
workerInputs[i] = make(chan interface{}, fofi.bufferSize)
|
||||
}
|
||||
|
||||
// Start distributor
|
||||
go func() {
|
||||
defer func() {
|
||||
for _, ch := range workerInputs {
|
||||
close(ch)
|
||||
}
|
||||
}()
|
||||
|
||||
workerIndex := 0
|
||||
for {
|
||||
select {
|
||||
case item, ok := <-input:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case workerInputs[workerIndex] <- item:
|
||||
workerIndex = (workerIndex + 1) % fofi.workers
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start workers
|
||||
workerOutputs := make([]<-chan interface{}, fofi.workers)
|
||||
for i := 0; i < fofi.workers; i++ {
|
||||
workerOutput := make(chan interface{}, fofi.bufferSize)
|
||||
workerOutputs[i] = workerOutput
|
||||
|
||||
go func(input <-chan interface{}, output chan<- interface{}) {
|
||||
defer close(output)
|
||||
|
||||
for {
|
||||
select {
|
||||
case item, ok := <-input:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
result, err := processor(item)
|
||||
if err != nil {
|
||||
fofi.logger.Error(fmt.Sprintf("Worker processing error: %v", err))
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case output <- result:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}(workerInputs[i], workerOutput)
|
||||
}
|
||||
|
||||
// Fan-in: merge worker outputs
|
||||
go func() {
|
||||
defer close(output)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, workerOutput := range workerOutputs {
|
||||
wg.Add(1)
|
||||
go func(input <-chan interface{}) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case item, ok := <-input:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case output <- item:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}(workerOutput)
|
||||
}
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
return output
|
||||
}
|
||||
|
||||
// BackpressureHandler handles backpressure in pipeline stages
|
||||
type BackpressureHandler struct {
|
||||
threshold int
|
||||
strategy BackpressureStrategy
|
||||
metrics *BackpressureMetrics
|
||||
logger *logger.Logger
|
||||
}
|
||||
|
||||
// BackpressureStrategy defines different backpressure handling strategies
|
||||
type BackpressureStrategy int
|
||||
|
||||
const (
|
||||
DropOldest BackpressureStrategy = iota
|
||||
DropNewest
|
||||
Block
|
||||
Sample
|
||||
)
|
||||
|
||||
// BackpressureMetrics tracks backpressure events
|
||||
type BackpressureMetrics struct {
|
||||
DroppedItems int64
|
||||
BlockedCount int64
|
||||
SampledItems int64
|
||||
TotalItems int64
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewBackpressureHandler creates a new backpressure handler
|
||||
func NewBackpressureHandler(threshold int, strategy BackpressureStrategy, logger *logger.Logger) *BackpressureHandler {
|
||||
return &BackpressureHandler{
|
||||
threshold: threshold,
|
||||
strategy: strategy,
|
||||
metrics: &BackpressureMetrics{},
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// HandleBackpressure applies backpressure strategy to a channel
|
||||
func (bh *BackpressureHandler) HandleBackpressure(ctx context.Context, input <-chan interface{}, output chan interface{}) {
|
||||
buffer := make([]interface{}, 0, bh.threshold*2)
|
||||
|
||||
for {
|
||||
select {
|
||||
case item, ok := <-input:
|
||||
if !ok {
|
||||
// Flush remaining items
|
||||
for _, bufferedItem := range buffer {
|
||||
select {
|
||||
case output <- bufferedItem:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
bh.metrics.mu.Lock()
|
||||
bh.metrics.TotalItems++
|
||||
bh.metrics.mu.Unlock()
|
||||
|
||||
// Check if we need to apply backpressure
|
||||
if len(buffer) >= bh.threshold {
|
||||
switch bh.strategy {
|
||||
case DropOldest:
|
||||
if len(buffer) > 0 {
|
||||
buffer = buffer[1:]
|
||||
bh.metrics.mu.Lock()
|
||||
bh.metrics.DroppedItems++
|
||||
bh.metrics.mu.Unlock()
|
||||
}
|
||||
buffer = append(buffer, item)
|
||||
|
||||
case DropNewest:
|
||||
bh.metrics.mu.Lock()
|
||||
bh.metrics.DroppedItems++
|
||||
bh.metrics.mu.Unlock()
|
||||
continue // Drop the new item
|
||||
|
||||
case Block:
|
||||
bh.metrics.mu.Lock()
|
||||
bh.metrics.BlockedCount++
|
||||
bh.metrics.mu.Unlock()
|
||||
// Try to send oldest item (blocking)
|
||||
if len(buffer) > 0 {
|
||||
select {
|
||||
case output <- buffer[0]:
|
||||
buffer = buffer[1:]
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
buffer = append(buffer, item)
|
||||
|
||||
case Sample:
|
||||
// Keep every nth item when under pressure
|
||||
bh.metrics.mu.Lock()
|
||||
sampleRate := bh.metrics.TotalItems % 5 // Keep every 5th item
|
||||
bh.metrics.mu.Unlock()
|
||||
|
||||
if sampleRate == 0 {
|
||||
if len(buffer) > 0 {
|
||||
buffer = buffer[1:]
|
||||
}
|
||||
buffer = append(buffer, item)
|
||||
} else {
|
||||
bh.metrics.mu.Lock()
|
||||
bh.metrics.SampledItems++
|
||||
bh.metrics.mu.Unlock()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
buffer = append(buffer, item)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// Try to drain buffer
|
||||
for len(buffer) > 0 {
|
||||
select {
|
||||
case output <- buffer[0]:
|
||||
buffer = buffer[1:]
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
// Can't send more, break out of drain loop
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetMetrics returns backpressure metrics.
|
||||
// The returned pointer should not be modified.
|
||||
func (bh *BackpressureHandler) GetMetrics() *BackpressureMetrics {
|
||||
return bh.metrics
|
||||
}
|
||||
Reference in New Issue
Block a user