diff --git a/buffer.go b/buffer.go index bfe5f9d..400ffd4 100644 --- a/buffer.go +++ b/buffer.go @@ -56,6 +56,7 @@ func (bfs *buffers) set(key string, value *buffer) { func (bfs *buffers) remove(key string) { bfs.Lock() defer bfs.Unlock() + close(bfs.bufferMap[key].in) delete(bfs.bufferMap, key) } diff --git a/examples/advanced/main.go b/examples/advanced/main.go index eff6e5c..392b5aa 100644 --- a/examples/advanced/main.go +++ b/examples/advanced/main.go @@ -75,7 +75,7 @@ func (d *downloadStep) Cancel() error { return nil } -func readPipeline(pipe *pipeline.Pipeline) { +func readPipeline(ctx context.Context, pipe *pipeline.Pipeline) { out, err := pipe.Out() if err != nil { return @@ -92,6 +92,8 @@ func readPipeline(pipe *pipeline.Pipeline) { fmt.Println(line) case p := <-progress: fmt.Println("percent done: ", p) + case <-ctx.Done(): + return //结束当前的函数,以防止goroutine泄漏 } } } @@ -124,16 +126,16 @@ func main() { // add all stages workflow.AddStage(stage, concurrentStage, concurrentErrStage) - + ctx, cancelFunc := context.WithCancel(context.Background()) // start a routine to read out and progress - go readPipeline(workflow) + go readPipeline(ctx, workflow) // execute pipeline result := workflow.Run() if result.Error != nil { fmt.Println(result.Error) } - + cancelFunc() // 停止读取流水线输出的goroutine // one would persist the time taken duration to use as progress scale for the next workflow build fmt.Println("timeTaken:", workflow.GetDuration())