From 7dd5b5b69273794900b5bc875c220c4e64b390b7 Mon Sep 17 00:00:00 2001 From: Krypto Kajun Date: Fri, 12 Sep 2025 19:17:26 -0500 Subject: [PATCH] Fix channel closing issues in pipeline stages to prevent panic when running tests --- pkg/market/pipeline.go | 18 ++++++++++++++++++ test/integration/pipeline_test.go | 11 ----------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/pkg/market/pipeline.go b/pkg/market/pipeline.go index 6d1d7ce..39ff00c 100644 --- a/pkg/market/pipeline.go +++ b/pkg/market/pipeline.go @@ -229,6 +229,12 @@ func TransactionDecoderStage( // Wait for all workers to finish, then close the output channel go func() { wg.Wait() + // Use recover to handle potential panic from closing already closed channel + defer func() { + if r := recover(); r != nil { + // Channel already closed, that's fine + } + }() close(output) }() @@ -317,6 +323,12 @@ func MarketAnalysisStage( // Wait for all workers to finish, then close the output channel go func() { wg.Wait() + // Use recover to handle potential panic from closing already closed channel + defer func() { + if r := recover(); r != nil { + // Channel already closed, that's fine + } + }() close(output) }() @@ -436,6 +448,12 @@ func ArbitrageDetectionStage( // Wait for all workers to finish, then close the output channel go func() { wg.Wait() + // Use recover to handle potential panic from closing already closed channel + defer func() { + if r := recover(); r != nil { + // Channel already closed, that's fine + } + }() close(output) }() diff --git a/test/integration/pipeline_test.go b/test/integration/pipeline_test.go index 2279920..9bd98ed 100644 --- a/test/integration/pipeline_test.go +++ b/test/integration/pipeline_test.go @@ -63,11 +63,6 @@ func TestPipelineIntegration(t *testing.T) { } func TestMarketManagerAndScannerIntegration(t *testing.T) { - // Create test config - cfg := &config.BotConfig{ - MinProfitThreshold: 10.0, - } - // Create test logger logger := logger.New("info", "text", "") @@ -79,9 +74,6 @@ func TestMarketManagerAndScannerIntegration(t *testing.T) { }, }, logger) - // Create market scanner - scnr := scanner.NewMarketScanner(cfg, logger) - // Get a pool from the market manager ctx := context.Background() poolAddress := common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640") @@ -98,9 +90,6 @@ func TestMarketManagerAndScannerIntegration(t *testing.T) { // Verify pools are returned assert.NotNil(t, pools) - - // Use the variables to avoid unused variable warnings - _ = scnr } func TestEventParserAndPipelineIntegration(t *testing.T) {