From c6d79ab34dce2debadd71e5ab55eca20d035e048 Mon Sep 17 00:00:00 2001 From: Mark Salpeter Date: Tue, 19 Apr 2022 16:37:48 +0200 Subject: [PATCH] feat(generics)!: Refactored the entire library to support generics - For Go 1.17 < support, continue to use `v1` - Added better examples to the documentation closed #4 --- .github/workflows/ci.yml | 50 +-- .golangci.yml | 2 +- Makefile | 8 + README.md | 611 +++++++++++++++++-------------- buffer.go | 4 +- cancel.go | 8 +- cancel_example_test.go | 4 +- cancel_test.go | 8 +- collect.go | 27 +- collect_test.go | 42 +-- delay.go | 4 +- delay_test.go | 26 +- drain.go | 7 + emit.go | 26 +- example/db/db.go | 81 ---- example/processors/processors.go | 62 ---- go.mod | 6 +- go.sum | 2 + merge.go | 14 +- merge_example_test.go | 77 +--- merge_test.go | 8 +- mocks_test.go | 21 +- pipeline.go | 7 + pipeline_example_test.go | 144 ++++++++ process.go | 24 +- process_batch.go | 36 +- process_batch_example_test.go | 46 ++- process_batch_test.go | 80 ++-- process_example_test.go | 30 +- process_test.go | 101 +++-- processor.go | 28 +- split.go | 7 +- split_test.go | 16 +- 33 files changed, 837 insertions(+), 780 deletions(-) create mode 100644 Makefile create mode 100644 drain.go delete mode 100644 example/db/db.go delete mode 100644 example/processors/processors.go create mode 100644 go.sum create mode 100644 pipeline_example_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a87174d..bc0598f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,13 +13,13 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: 1.18 - name: Lint if: always() - uses: golangci/golangci-lint-action@v2 + uses: golangci/golangci-lint-action@v3 with: - version: v1.29 + version: v1.45 test: runs-on: ubuntu-latest @@ -30,7 +30,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: 1.18 - name: Test run: go test -coverprofile=coverage.txt -json ./... > test.json @@ -57,33 +57,33 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: 1.18 - name: Build run: go build -v ./... - docs: - needs: build - runs-on: ubuntu-latest - steps: - - name: Check out repository - uses: actions/checkout@v2 - - name: Update readme according to Go doc - uses: posener/goreadme@e713475bae0c17d5083f4378c5e53d6c66d74c74 - with: - email: 'marksalpeter@gmail.com' - badge-codecov: true - badge-godoc: true - badge-github: "ci.yml" - badge-goreportcard: true - badge-travisci: false - commit-message: 'improvement(docs): updated docs' - types: true - functions: true - github-token: '${{ secrets.GITHUB_TOKEN }}' + # docs: + # needs: build + # runs-on: ubuntu-latest + # steps: + # - name: Check out repository + # uses: actions/checkout@v2 + # - name: Update readme according to Go doc + # uses: posener/goreadme + # with: + # email: 'marksalpeter@gmail.com' + # badge-codecov: true + # badge-godoc: true + # badge-github: "ci.yml" + # badge-goreportcard: true + # badge-travisci: false + # commit-message: 'improvement(docs): updated docs' + # types: true + # functions: true + # github-token: '${{ secrets.GITHUB_TOKEN }}' version: - needs: docs + # needs: docs if: github.ref == 'refs/heads/main' # only version on push to master runs-on: ubuntu-latest steps: diff --git a/.golangci.yml b/.golangci.yml index 8af0c74..531fbd2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -2,7 +2,7 @@ linters: disable-all: true enable: - gofmt - - golint + - revive - govet - misspell - deadcode diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..67f3744 --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +phony: readme lint + +readme: #generates the README.md file + @goreadme -functions -badge-godoc -badge-github=ci.yaml -badge-goreportcard -credit=false -skip-sub-packages > README.md + +lint: #runs the go linter + @go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.45.2 + @golangci-lint run \ No newline at end of file diff --git a/README.md b/README.md index 4647ffd..6e4b919 100644 --- a/README.md +++ b/README.md @@ -1,394 +1,451 @@ # pipeline -[![GitHub Workflow Status](https://github.com/deliveryhero/pipeline/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/deliveryhero/pipeline/actions/workflows/ci.yml?query=branch:main) -[![codecov](https://codecov.io/gh/deliveryhero/pipeline/branch/main/graph/badge.svg)](https://codecov.io/gh/deliveryhero/pipeline) -[![GoDoc](https://img.shields.io/badge/pkg.go.dev-doc-blue)](http://pkg.go.dev/github.com/deliveryhero/pipeline) -[![Go Report Card](https://goreportcard.com/badge/github.com/deliveryhero/pipeline)](https://goreportcard.com/report/github.com/deliveryhero/pipeline) +[![build](https://github.com/deliveryhero/pipeline/actions/workflows/ci.yml/badge.svg)](https://github.com/deliveryhero/pipeline/actions/workflows/ci.yml) +[![GoDoc](https://img.shields.io/badge/pkg.go.dev-doc-blue)](http://pkg.go.dev/deliveryhero/pipeline) +[![Go Report Card](https://goreportcard.com/badge/deliveryhero/pipeline)](https://goreportcard.com/report/deliveryhero/pipeline) Pipeline is a go library that helps you build pipelines without worrying about channel management and concurrency. It contains common fan-in and fan-out operations as well as useful utility funcs for batch processing and scaling. If you have another common use case you would like to see covered by this package, please [open a feature request](https://github.com/deliveryhero/pipeline/issues). +## Cookbook + +* [How to run a pipeline until the container is killed](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenContainerIsKilled) +* [How to shut down a pipeline when there is a error](https://github.com/deliveryhero/pipeline#PipelineShutsDownOnError) +* [How to shut down a pipeline after it has finished processing a batch of data](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenInputChannelIsClosed) + ## Functions ### func [Buffer](/buffer.go#L5) -`func Buffer(size int, in <-chan interface{}) <-chan interface{}` +`func Buffer[Item any](size int, in <-chan Item) <-chan Item` Buffer creates a buffered channel that will close after the input is closed and the buffer is fully drained ### func [Cancel](/cancel.go#L9) -`func Cancel(ctx context.Context, cancel func(interface{}, error), in <-chan interface{}) <-chan interface{}` +`func Cancel[Item any](ctx context.Context, cancel func(Item, error), in <-chan Item) <-chan Item` -Cancel passes an `interface{}` from the `in <-chan interface{}` directly to the out `<-chan interface{}` until the `Context` is canceled. -After the context is canceled, everything from `in <-chan interface{}` is sent to the `cancel` func instead with the `ctx.Err()`. +Cancel passes an `Item any` from the `in <-chan Item` directly to the out `<-chan Item` until the `Context` is canceled. +After the context is canceled, everything from `in <-chan Item` is sent to the `cancel` func instead with the `ctx.Err()`. ```golang -package main -import ( - "context" - "github.com/deliveryhero/pipeline" - "log" - "time" +// Create a context that lasts for 1 second +ctx, cancel := context.WithTimeout(context.Background(), time.Second) +defer cancel() + +// Create a basic pipeline that emits one int every 250ms +p := pipeline.Delay(ctx, time.Second/4, + pipeline.Emit(1, 2, 3, 4, 5), ) -func main() { - // Create a context that lasts for 1 second - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - // Create a basic pipeline that emits one int every 250ms - p := pipeline.Delay(ctx, time.Second/4, - pipeline.Emit(1, 2, 3, 4, 5), - ) - - // If the context is canceled, pass the ints to the cancel func for teardown - p = pipeline.Cancel(ctx, func(i interface{}, err error) { - log.Printf("%+v could not be processed, %s", i, err) - }, p) - - // Otherwise, process the inputs - for out := range p { - log.Printf("process: %+v", out) - } - - // Output - // process: 1 - // process: 2 - // process: 3 - // process: 4 - // 5 could not be processed, context deadline exceeded +// If the context is canceled, pass the ints to the cancel func for teardown +p = pipeline.Cancel(ctx, func(i int, err error) { + log.Printf("%+v could not be processed, %s", i, err) +}, p) + +// Otherwise, process the inputs +for out := range p { + log.Printf("process: %+v", out) } +// Output +// process: 1 +// process: 2 +// process: 3 +// process: 4 +// 5 could not be processed, context deadline exceeded + ``` ### func [Collect](/collect.go#L13) -`func Collect(ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan interface{}) <-chan interface{}` +`func Collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) <-chan []Item` -Collect collects `interface{}`s from its in channel and returns `[]interface{}` from its out channel. -It will collect up to `maxSize` inputs from the `in <-chan interface{}` over up to `maxDuration` before returning them as `[]interface{}`. -That means when `maxSize` is reached before `maxDuration`, `[maxSize]interface{}` will be passed to the out channel. -But if `maxDuration` is reached before `maxSize` inputs are collected, `[< maxSize]interface{}` will be passed to the out channel. +Collect collects `[Item any]`s from its in channel and returns `[]Item` from its out channel. +It will collect up to `maxSize` inputs from the `in <-chan Item` over up to `maxDuration` before returning them as `[]Item`. +That means when `maxSize` is reached before `maxDuration`, `[maxSize]Item` will be passed to the out channel. +But if `maxDuration` is reached before `maxSize` inputs are collected, `[< maxSize]Item` will be passed to the out channel. When the `context` is canceled, everything in the buffer will be flushed to the out channel. ### func [Delay](/delay.go#L10) -`func Delay(ctx context.Context, duration time.Duration, in <-chan interface{}) <-chan interface{}` +`func Delay[Item any](ctx context.Context, duration time.Duration, in <-chan Item) <-chan Item` Delay delays reading each input by `duration`. If the context is canceled, the delay will not be applied. -### func [Emit](/emit.go#L4) +### func [Drain](/drain.go#L4) + +`func Drain[Item any](in <-chan Item)` + +Drain empties the input and blocks until the channel is closed + +### func [Emit](/emit.go#L6) -`func Emit(is ...interface{}) <-chan interface{}` +`func Emit[Item any](is ...Item) <-chan Item` -Emit fans `is ...interface{}`` out to a `<-chan interface{}` +Emit fans `is ...Item`` out to a `<-chan Item` + +### func [Emitter](/emit.go#L19) + +`func Emitter[Item any](ctx context.Context, next func() Item) <-chan Item` + +Emitter continuously emits new items generated by the next func +until the context is canceled ### func [Merge](/merge.go#L6) -`func Merge(ins ...<-chan interface{}) <-chan interface{}` +`func Merge[Item any](ins ...<-chan Item) <-chan Item` Merge fans multiple channels in to a single channel ```golang -package main - -import ( - "context" - "encoding/json" - "log" - "net/http" +one := pipeline.Emit(1) +two := pipeline.Emit(2, 2) +three := pipeline.Emit(3, 3, 3) - "github.com/deliveryhero/pipeline" - "github.com/deliveryhero/pipeline/example/db" -) - -// SearchResults returns many types of search results at once -type SearchResults struct { - Advertisements []db.Result `json:"advertisements"` - Images []db.Result `json:"images"` - Products []db.Result `json:"products"` - Websites []db.Result `json:"websites"` +for i := range pipeline.Merge(one, two, three) { + fmt.Printf("output: %d\n", i) } -func main() { - r := http.NewServeMux() - - // `GET /search?q=` is an endpoint that merges concurrently fetched - // search results into a single search response using `pipeline.Merge` - r.HandleFunc("/search", func(w http.ResponseWriter, r *http.Request) { - query := r.URL.Query().Get("q") - if len(query) < 1 { - w.WriteHeader(http.StatusBadRequest) - return - } - - // If the request times out, or we receive an error from our `db` - // the context will stop all pending db queries for this request - ctx, cancel := context.WithCancel(r.Context()) - defer cancel() - - // Fetch all of the different search results concurrently - var results SearchResults - for err := range pipeline.Merge( - db.GetAdvertisements(ctx, query, &results.Advertisements), - db.GetImages(ctx, query, &results.Images), - db.GetProducts(ctx, query, &results.Products), - db.GetWebsites(ctx, query, &results.Websites), - ) { - // Stop all pending db requests if theres an error - if err != nil { - log.Print(err) - w.WriteHeader(http.StatusInternalServerError) - return - } - } - - // Return the search results - if bs, err := json.Marshal(&results); err != nil { - log.Print(err) - w.WriteHeader(http.StatusInternalServerError) - } else if _, err := w.Write(bs); err != nil { - log.Print(err) - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - } - }) -} +fmt.Println("done") +// Output ``` ### func [Process](/process.go#L13) -`func Process(ctx context.Context, processor Processor, in <-chan interface{}) <-chan interface{}` +`func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output` -Process takes each input from the `in <-chan interface{}` and calls `Processor.Process` on it. -When `Processor.Process` returns an `interface{}`, it will be sent to the output `<-chan interface{}`. +Process takes each input from the `in <-chan Input` and calls `Processor.Process` on it. +When `Processor.Process` returns an `Output`, it will be sent to the output `<-chan Output`. If `Processor.Process` returns an error, `Processor.Cancel` will be called with the corresponding input and error message. -Finally, if the `Context` is canceled, all inputs remaining in the `in <-chan interface{}` will go directly to `Processor.Cancel`. +Finally, if the `Context` is canceled, all inputs remaining in the `in <-chan Input` will go directly to `Processor.Cancel`. ```golang -package main - -import ( - "context" - "github.com/deliveryhero/pipeline" - "github.com/deliveryhero/pipeline/example/processors" - "log" - "time" -) -func main() { - // Create a context that times out after 5 seconds - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Create a pipeline that emits 1-6 at a rate of one int per second - p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6)) - - // Use the Multiplier to multiply each int by 10 - p = pipeline.Process(ctx, &processors.Multiplier{ - Factor: 10, - }, p) - - // Finally, lets print the results and see what happened - for result := range p { - log.Printf("result: %d\n", result) - } - - // Output - // result: 10 - // result: 20 - // result: 30 - // result: 40 - // result: 50 - // error: could not multiply 6, context deadline exceeded +// Create a context that times out after 5 seconds +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +defer cancel() + +// Create a pipeline that emits 1-6 at a rate of one int per second +p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6)) + +// Multiply each number by 10 +p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) { + return in * 10, nil +}, func(i int, err error) { + fmt.Printf("error: could not multiply %v, %s\n", i, err) +}), p) + +// Finally, lets print the results and see what happened +for result := range p { + fmt.Printf("result: %d\n", result) } +// Output +// result: 10 +// result: 20 +// result: 30 +// result: 40 +// result: 50 +// error: could not multiply 6, context deadline exceeded + ``` ### func [ProcessBatch](/process_batch.go#L14) -`func ProcessBatch( +`func ProcessBatch[Input, Output any]( ctx context.Context, maxSize int, maxDuration time.Duration, - processor Processor, - in <-chan interface{}, -) <-chan interface{}` + processor Processor[[]Input, []Output], + in <-chan Input, +) <-chan Output` -ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of `interface{}`s. -It passed an []interface{} to the `Processor.Process` method and expects a []interface{} back. -It passes []interface{} batches of inputs to the `Processor.Cancel` method. +ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of `Input`s. +It passed an []Output to the `Processor.Process` method and expects a []Input back. +It passes []Input batches of inputs to the `Processor.Cancel` method. If the receiver is backed up, ProcessBatch can holds up to 2x maxSize. ```golang -package main - -import ( - "context" - "github.com/deliveryhero/pipeline" - "github.com/deliveryhero/pipeline/example/processors" - "log" - "time" -) - -func main() { - // Create a context that times out after 5 seconds - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Create a pipeline that emits 1-6 at a rate of one int per second - p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6)) - - // Use the BatchMultiplier to multiply 2 adjacent numbers together - p = pipeline.ProcessBatch(ctx, 2, time.Minute, &processors.BatchMultiplier{}, p) - // Finally, lets print the results and see what happened - for result := range p { - log.Printf("result: %d\n", result) - } - - // Output - // result: 2 - // result: 12 - // error: could not multiply [5], context deadline exceeded - // error: could not multiply [6], context deadline exceeded +// Create a context that times out after 5 seconds +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +defer cancel() + +// Create a pipeline that emits 1-6 at a rate of one int per second +p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6)) + +// Multiply every 2 adjacent numbers together +p = pipeline.ProcessBatch(ctx, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, is []int) ([]int, error) { + o := 1 + for _, i := range is { + o *= i + } + return []int{o}, nil +}, func(is []int, err error) { + fmt.Printf("error: could not multiply %v, %s\n", is, err) +}), p) + +// Finally, lets print the results and see what happened +for result := range p { + fmt.Printf("result: %d\n", result) } +// Output +// result: 2 +// result: 12 +// error: could not multiply [5 6], context deadline exceeded + ``` ### func [ProcessBatchConcurrently](/process_batch.go#L35) -`func ProcessBatchConcurrently( +`func ProcessBatchConcurrently[Input, Output any]( ctx context.Context, concurrently, maxSize int, maxDuration time.Duration, - processor Processor, - in <-chan interface{}, -) <-chan interface{}` + processor Processor[[]Input, []Output], + in <-chan Input, +) <-chan Output` ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently, then it fans the out channels of the batch Processors back into a single out chan ```golang -package main - -import ( - "context" - "github.com/deliveryhero/pipeline" - "github.com/deliveryhero/pipeline/example/processors" - "log" - "time" -) -func main() { - // Create a context that times out after 5 seconds - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Create a pipeline that emits 1-9 - p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9) - - // Wait 4 seconds to pass 2 numbers through the pipe - // * 2 concurrent Processors - p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, &processors.Waiter{ - Duration: 4 * time.Second, - }, p) - - // Finally, lets print the results and see what happened - for result := range p { - log.Printf("result: %d\n", result) - } - - // Output - // result: 3 - // result: 4 - // result: 1 - // result: 2 - // error: could not process [5 6], process was canceled - // error: could not process [7 8], process was canceled - // error: could not process [9], context deadline exceeded +// Create a context that times out after 5 seconds +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +defer cancel() + +// Create a pipeline that emits 1-9 +p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9) + +// Add a 1 second delay to each number +p = pipeline.Delay(ctx, time.Second, p) + +// Group two inputs at a time +p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, ins []int) ([]int, error) { + return ins, nil +}, func(i []int, err error) { + fmt.Printf("error: could not process %v, %s\n", i, err) +}), p) + +// Finally, lets print the results and see what happened +for result := range p { + fmt.Printf("result: %d\n", result) } +// Output +// result: 1 +// result: 2 +// result: 3 +// result: 5 +// error: could not process [7 8], context deadline exceeded +// error: could not process [4 6], context deadline exceeded +// error: could not process [9], context deadline exceeded + ``` ### func [ProcessConcurrently](/process.go#L26) -`func ProcessConcurrently(ctx context.Context, concurrently int, p Processor, in <-chan interface{}) <-chan interface{}` +`func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], in <-chan Input) <-chan Output` ProcessConcurrently fans the in channel out to multiple Processors running concurrently, then it fans the out channels of the Processors back into a single out chan ```golang -package main - -import ( - "context" - "github.com/deliveryhero/pipeline" - "github.com/deliveryhero/pipeline/example/processors" - "log" - "time" -) -func main() { - // Create a context that times out after 5 seconds - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Create a pipeline that emits 1-7 - p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7) - - // Wait 2 seconds to pass each number through the pipe - // * 2 concurrent Processors - p = pipeline.ProcessConcurrently(ctx, 2, &processors.Waiter{ - Duration: 2 * time.Second, - }, p) - - // Finally, lets print the results and see what happened - for result := range p { - log.Printf("result: %d\n", result) - } - - // Output - // result: 2 - // result: 1 - // result: 4 - // result: 3 - // error: could not process 6, process was canceled - // error: could not process 5, process was canceled - // error: could not process 7, context deadline exceeded +// Create a context that times out after 5 seconds +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +defer cancel() + +// Create a pipeline that emits 1-7 +p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7) + +// Add a two second delay to each number +p = pipeline.Delay(ctx, 2*time.Second, p) + +// Add two concurrent processors that pass input numbers to the output +p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) { + return in, nil +}, func(i int, err error) { + fmt.Printf("error: could not process %v, %s\n", i, err) +}), p) + +// Finally, lets print the results and see what happened +for result := range p { + log.Printf("result: %d\n", result) } +// Output +// result: 2 +// result: 1 +// result: 4 +// result: 3 +// error: could not process 6, process was canceled +// error: could not process 5, process was canceled +// error: could not process 7, context deadline exceeded + ``` -### func [Split](/split.go#L5) +### func [Split](/split.go#L4) -`func Split(in <-chan interface{}) <-chan interface{}` +`func Split[Item any](in <-chan []Item) <-chan Item` Split takes an interface from Collect and splits it back out into individual elements -Useful for batch processing pipelines (`input chan -> Collect -> Process -> Split -> Cancel -> output chan`). -## Types +## Examples + +### PipelineShutsDownOnError + +The following example shows how you can shutdown a pipeline +gracefully when it receives an error message -### type [Processor](/processor.go#L8) +```golang + +// Create a context that can be canceled +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() + +// Create a pipeline that emits 1-10 +p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + +// A step that will shutdown the pipeline if the number is greater than 1 +p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) { + // Shut down the pipeline by canceling the context + if i != 1 { + cancel() + return i, fmt.Errorf("%d caused the shutdown", i) + } + return i, nil +}, func(i int, err error) { + // The cancel func is called when an error is returned by the process func or the context is canceled + fmt.Printf("could not process %d: %s\n", i, err) +}), p) + +// Finally, lets print the results and see what happened +for result := range p { + fmt.Printf("result: %d\n", result) +} + +fmt.Println("exiting the pipeline after all data is processed") + +// Output +// could not process 2: 2 caused the shutdown +// result: 1 +// could not process 3: context canceled +// could not process 4: context canceled +// could not process 5: context canceled +// could not process 6: context canceled +// could not process 7: context canceled +// could not process 8: context canceled +// could not process 9: context canceled +// could not process 10: context canceled +// exiting the pipeline after all data is processed -`type Processor interface { ... }` +``` + +### PipelineShutsDownWhenContainerIsKilled -Processor represents a blocking operation in a pipeline. Implementing `Processor` will allow you to add -business logic to your pipelines without directly managing channels. This simplifies your unit tests -and eliminates channel management related bugs. +This example demonstrates a pipline +that runs until the os / container the pipline is running in kills it -## Sub Packages +```golang -* [semaphore](./semaphore): package semaphore is like a sync.WaitGroup with an upper limit. +// Gracefully shutdown the pipeline when the the system is shutting down +// by canceling the context when os.Kill or os.Interrupt signal is sent +ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt) +defer cancel() + +// Create a pipeline that keeps emitting numbers sequentially until the context is canceled +var count int +p := pipeline.Emitter(ctx, func() int { + count++ + return count +}) + +// Filter out only even numbers +p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) { + if i%2 == 0 { + return i, nil + } + return i, fmt.Errorf("'%d' is an odd number", i) +}, func(i int, err error) { + fmt.Printf("error processing '%v': %s\n", i, err) +}), p) + +// Wait a few nanoseconds an simulate the os.Interrupt signal +go func() { + time.Sleep(time.Millisecond / 10) + fmt.Print("\n--- os kills the app ---\n\n") + syscall.Kill(syscall.Getpid(), syscall.SIGINT) +}() + +// Finally, lets print the results and see what happened +for result := range p { + fmt.Printf("result: %d\n", result) +} ---- -Readme created from Go doc with [goreadme](https://github.com/posener/goreadme) +fmt.Println("exiting after the input channel is closed") + +// Output +// error processing '1': '1' is an odd number +// result: 2 +// +// --- os kills the app --- +// +// error processing '3': '3' is an odd number +// error processing '4': context canceled +// exiting after the input channel is closed + +``` + +### PipelineShutsDownWhenInputChannelIsClosed + +The following example demonstrates a pipeline +that naturally finishes its run when the input channel is closed + +```golang + +// Create a context that can be canceled +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() + +// Create a pipeline that emits 1-10 and then closes its output channel +p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + +// Multiply every number by 2 +p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) { + return i * 2, nil +}, func(i int, err error) { + fmt.Printf("could not multiply %d: %s\n", i, err) +}), p) + +// Finally, lets print the results and see what happened +for result := range p { + fmt.Printf("result: %d\n", result) +} + +fmt.Println("exiting after the input channel is closed") + +// Output +// result: 2 +// result: 4 +// result: 6 +// result: 8 +// result: 10 +// result: 12 +// result: 14 +// result: 16 +// result: 18 +// result: 20 +// exiting after the input channel is closed + +``` diff --git a/buffer.go b/buffer.go index 29d7785..2a5d534 100644 --- a/buffer.go +++ b/buffer.go @@ -2,8 +2,8 @@ package pipeline // Buffer creates a buffered channel that will close after the input // is closed and the buffer is fully drained -func Buffer(size int, in <-chan interface{}) <-chan interface{} { - buffer := make(chan interface{}, size) +func Buffer[Item any](size int, in <-chan Item) <-chan Item { + buffer := make(chan Item, size) go func() { for i := range in { buffer <- i diff --git a/cancel.go b/cancel.go index a3a940d..1b18465 100644 --- a/cancel.go +++ b/cancel.go @@ -4,10 +4,10 @@ import ( "context" ) -// Cancel passes an `interface{}` from the `in <-chan interface{}` directly to the out `<-chan interface{}` until the `Context` is canceled. -// After the context is canceled, everything from `in <-chan interface{}` is sent to the `cancel` func instead with the `ctx.Err()`. -func Cancel(ctx context.Context, cancel func(interface{}, error), in <-chan interface{}) <-chan interface{} { - out := make(chan interface{}) +// Cancel passes an `Item any` from the `in <-chan Item` directly to the out `<-chan Item` until the `Context` is canceled. +// After the context is canceled, everything from `in <-chan Item` is sent to the `cancel` func instead with the `ctx.Err()`. +func Cancel[Item any](ctx context.Context, cancel func(Item, error), in <-chan Item) <-chan Item { + out := make(chan Item) go func() { defer close(out) for { diff --git a/cancel_example_test.go b/cancel_example_test.go index 489fb72..9b804fa 100644 --- a/cancel_example_test.go +++ b/cancel_example_test.go @@ -5,7 +5,7 @@ import ( "log" "time" - "github.com/deliveryhero/pipeline" + "github.com/deliveryhero/pipeline/v2" ) func ExampleCancel() { @@ -19,7 +19,7 @@ func ExampleCancel() { ) // If the context is canceled, pass the ints to the cancel func for teardown - p = pipeline.Cancel(ctx, func(i interface{}, err error) { + p = pipeline.Cancel(ctx, func(i int, err error) { log.Printf("%+v could not be processed, %s", i, err) }, p) diff --git a/cancel_test.go b/cancel_test.go index 2ef36a0..59f5e13 100644 --- a/cancel_test.go +++ b/cancel_test.go @@ -20,7 +20,7 @@ func TestCancel(t *testing.T) { } // Send a stream of ints through the in chan - in := make(chan interface{}) + in := make(chan int) go func() { defer close(in) i := 0 @@ -34,15 +34,15 @@ func TestCancel(t *testing.T) { }() // Create a logger for the cancel func - canceled := func(i interface{}, err error) { + canceled := func(i int, err error) { logf("canceled: %d because %s\n", i, err) } // Start canceling the pipeline about half way through the test ctx, cancel := context.WithTimeout(context.Background(), testDuration/2) defer cancel() - for o := range Cancel(ctx, canceled, in) { - logf("%d", o) + for i := range Cancel[int](ctx, canceled, in) { + logf("%d", i) } // There should be some logs diff --git a/collect.go b/collect.go index 930050d..7218e00 100644 --- a/collect.go +++ b/collect.go @@ -5,16 +5,16 @@ import ( "time" ) -// Collect collects `interface{}`s from its in channel and returns `[]interface{}` from its out channel. -// It will collect up to `maxSize` inputs from the `in <-chan interface{}` over up to `maxDuration` before returning them as `[]interface{}`. -// That means when `maxSize` is reached before `maxDuration`, `[maxSize]interface{}` will be passed to the out channel. -// But if `maxDuration` is reached before `maxSize` inputs are collected, `[< maxSize]interface{}` will be passed to the out channel. +// Collect collects `[Item any]`s from its in channel and returns `[]Item` from its out channel. +// It will collect up to `maxSize` inputs from the `in <-chan Item` over up to `maxDuration` before returning them as `[]Item`. +// That means when `maxSize` is reached before `maxDuration`, `[maxSize]Item` will be passed to the out channel. +// But if `maxDuration` is reached before `maxSize` inputs are collected, `[< maxSize]Item` will be passed to the out channel. // When the `context` is canceled, everything in the buffer will be flushed to the out channel. -func Collect(ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan interface{}) <-chan interface{} { - out := make(chan interface{}) +func Collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) <-chan []Item { + out := make(chan []Item) go func() { for { - is, open := collect(ctx, maxSize, maxDuration, in) + is, open := collect[Item](ctx, maxSize, maxDuration, in) if is != nil { out <- is } @@ -27,28 +27,27 @@ func Collect(ctx context.Context, maxSize int, maxDuration time.Duration, in <-c return out } -func collect(ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan interface{}) ([]interface{}, bool) { - var buffer []interface{} +func collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) ([]Item, bool) { + var buffer []Item timeout := time.After(maxDuration) for { lenBuffer := len(buffer) select { case <-ctx.Done(): // Reduce the timeout to 1/10th of a second - bs, open := collect(context.Background(), maxSize, 100*time.Millisecond, in) + bs, open := collect(context.Background(), maxSize-lenBuffer, 100*time.Millisecond, in) return append(buffer, bs...), open case <-timeout: return buffer, true case i, open := <-in: if !open { return buffer, false - } else if lenBuffer < maxSize-1 { - // There is still room in the buffer - buffer = append(buffer, i) - } else { + } else if lenBuffer == maxSize-1 { // There is no room left in the buffer return append(buffer, i), true } + // There is still room in the buffer + buffer = append(buffer, i) } } } diff --git a/collect_test.go b/collect_test.go index 482947e..023ee87 100644 --- a/collect_test.go +++ b/collect_test.go @@ -19,12 +19,12 @@ func TestCollect(t *testing.T) { type args struct { maxSize int maxDuration time.Duration - in []interface{} + in []int inDelay time.Duration ctxTimeout time.Duration } type want struct { - out []interface{} + out [][]int open bool } for _, test := range []struct { @@ -49,12 +49,12 @@ func TestCollect(t *testing.T) { args: args{ maxSize: 2, maxDuration: maxTestDuration, - in: []interface{}{1, 2, 3}, + in: []int{1, 2, 3}, inDelay: (maxTestDuration / 2) - (10 * time.Millisecond), ctxTimeout: maxTestDuration, }, want: want{ - out: []interface{}{[]interface{}{1, 2}}, + out: [][]int{{1, 2}}, open: true, }, }, { @@ -63,14 +63,14 @@ func TestCollect(t *testing.T) { maxSize: 2, maxDuration: maxTestDuration / 10 * 9, inDelay: maxTestDuration / 10, - in: []interface{}{1, 2, 3, 4, 5}, + in: []int{1, 2, 3, 4, 5}, ctxTimeout: maxTestDuration / 10 * 9, }, want: want{ - out: []interface{}{ - []interface{}{1, 2}, - []interface{}{3, 4}, - []interface{}{5}, + out: [][]int{ + {1, 2}, + {3, 4}, + {5}, }, open: false, }, @@ -80,15 +80,15 @@ func TestCollect(t *testing.T) { maxSize: 10, maxDuration: maxTestDuration / 4, inDelay: (maxTestDuration / 4) - (25 * time.Millisecond), - in: []interface{}{1, 2, 3, 4, 5}, + in: []int{1, 2, 3, 4, 5}, ctxTimeout: maxTestDuration / 4, }, want: want{ - out: []interface{}{ - []interface{}{1}, - []interface{}{2}, - []interface{}{3}, - []interface{}{4}, + out: [][]int{ + {1}, + {2}, + {3}, + {4}, }, open: true, }, @@ -98,19 +98,19 @@ func TestCollect(t *testing.T) { maxSize: 10, maxDuration: maxTestDuration, inDelay: 0, - in: []interface{}{1, 2, 3, 4, 5}, + in: []int{1, 2, 3, 4, 5}, ctxTimeout: 0, }, want: want{ - out: []interface{}{ - []interface{}{1, 2, 3, 4, 5}, + out: [][]int{ + {1, 2, 3, 4, 5}, }, open: false, }, }} { t.Run(test.name, func(t *testing.T) { // Create the in channel - in := make(chan interface{}) + in := make(chan int) go func() { defer close(in) for _, i := range test.args.in { @@ -124,9 +124,9 @@ func TestCollect(t *testing.T) { defer cancel() // Collect responses - collect := Collect(ctx, test.args.maxSize, test.args.maxDuration, in) + collect := Collect[int](ctx, test.args.maxSize, test.args.maxDuration, in) timeout := time.After(maxTestDuration) - var outs []interface{} + var outs [][]int var isOpen bool loop: for { diff --git a/delay.go b/delay.go index 8bcba50..1b9d7a5 100644 --- a/delay.go +++ b/delay.go @@ -7,8 +7,8 @@ import ( // Delay delays reading each input by `duration`. // If the context is canceled, the delay will not be applied. -func Delay(ctx context.Context, duration time.Duration, in <-chan interface{}) <-chan interface{} { - out := make(chan interface{}) +func Delay[Item any](ctx context.Context, duration time.Duration, in <-chan Item) <-chan Item { + out := make(chan Item) go func() { defer close(out) // Keep reading from in until its closed diff --git a/delay_test.go b/delay_test.go index b3cf89a..14c52d6 100644 --- a/delay_test.go +++ b/delay_test.go @@ -12,10 +12,10 @@ func TestDelay(t *testing.T) { type args struct { ctxTimeout time.Duration duration time.Duration - in []interface{} + in []int } type want struct { - out []interface{} + out []int open bool } for _, test := range []struct { @@ -27,10 +27,10 @@ func TestDelay(t *testing.T) { args: args{ ctxTimeout: maxTestDuration, duration: maxTestDuration - 100*time.Millisecond, - in: []interface{}{1}, + in: []int{1}, }, want: want{ - out: []interface{}{1}, + out: []int{1}, open: false, }, }, { @@ -38,10 +38,10 @@ func TestDelay(t *testing.T) { args: args{ ctxTimeout: 10 * time.Millisecond, duration: maxTestDuration, - in: []interface{}{1, 2, 3, 4, 5}, + in: []int{1, 2, 3, 4, 5}, }, want: want{ - out: []interface{}{1, 2, 3, 4, 5}, + out: []int{1, 2, 3, 4, 5}, open: false, }, }, { @@ -49,22 +49,16 @@ func TestDelay(t *testing.T) { args: args{ ctxTimeout: maxTestDuration, duration: maxTestDuration / 4, - in: []interface{}{1, 2, 3, 4, 5}, + in: []int{1, 2, 3, 4, 5}, }, want: want{ - out: []interface{}{1, 2, 3, 4}, + out: []int{1, 2, 3, 4}, open: true, }, }} { t.Run(test.name, func(t *testing.T) { // Create in channel - in := make(chan interface{}) - go func() { - defer close(in) - for _, i := range test.args.in { - in <- i - } - }() + in := Emit(test.args.in...) // Create a context with a timeut ctx, cancel := context.WithTimeout(context.Background(), test.args.ctxTimeout) @@ -74,7 +68,7 @@ func TestDelay(t *testing.T) { delay := Delay(ctx, test.args.duration, in) timeout := time.After(maxTestDuration) var isOpen bool - var outs []interface{} + var outs []int loop: for { select { diff --git a/drain.go b/drain.go new file mode 100644 index 0000000..77077c9 --- /dev/null +++ b/drain.go @@ -0,0 +1,7 @@ +package pipeline + +// Drain empties the input and blocks until the channel is closed +func Drain[Item any](in <-chan Item) { + for range in { + } +} diff --git a/emit.go b/emit.go index 370a291..e8ec452 100644 --- a/emit.go +++ b/emit.go @@ -1,8 +1,10 @@ package pipeline -// Emit fans `is ...interface{}`` out to a `<-chan interface{}` -func Emit(is ...interface{}) <-chan interface{} { - out := make(chan interface{}) +import "context" + +// Emit fans `is ...Item`` out to a `<-chan Item` +func Emit[Item any](is ...Item) <-chan Item { + out := make(chan Item) go func() { defer close(out) for _, i := range is { @@ -11,3 +13,21 @@ func Emit(is ...interface{}) <-chan interface{} { }() return out } + +// Emitter continuously emits new items generated by the next func +// until the context is canceled +func Emitter[Item any](ctx context.Context, next func() Item) <-chan Item { + out := make(chan Item) + go func() { + defer close(out) + for { + select { + case <-ctx.Done(): + return + default: + out <- next() + } + } + }() + return out +} diff --git a/example/db/db.go b/example/db/db.go deleted file mode 100644 index b17167d..0000000 --- a/example/db/db.go +++ /dev/null @@ -1,81 +0,0 @@ -// db is a fake db package for use with some of the examples -package db - -import ( - "context" - "fmt" - "math/rand" - "time" -) - -// Result is returned from the 'database' -type Result struct { - Title string `json:"title"` - Description string `json:"description"` - URL string `json:"url"` -} - -func GetAdvertisements(ctx context.Context, query string, rs *[]Result) <-chan interface{} { - out := make(chan interface{}) - go func() { - defer close(out) - if err := simulatedDbRequest(ctx, "advertisement", rs); err != nil { - out <- err - } - }() - return out -} - -func GetImages(ctx context.Context, query string, rs *[]Result) <-chan interface{} { - out := make(chan interface{}) - go func() { - defer close(out) - if err := simulatedDbRequest(ctx, "image", rs); err != nil { - out <- err - } - }() - return out -} - -func GetProducts(ctx context.Context, query string, rs *[]Result) <-chan interface{} { - out := make(chan interface{}) - go func() { - defer close(out) - if err := simulatedDbRequest(ctx, "product", rs); err != nil { - out <- err - } - }() - return out -} - -func GetWebsites(ctx context.Context, query string, rs *[]Result) <-chan interface{} { - out := make(chan interface{}) - go func() { - defer close(out) - if err := simulatedDbRequest(ctx, "website", rs); err != nil { - out <- err - } - }() - return out -} - -func simulatedDbRequest(ctx context.Context, resultPrefix string, rs *[]Result) error { - select { - // Return right away if the - case <-ctx.Done(): - return ctx.Err() - // Simulate 25-200ms of latency from a db request - case <-time.After(time.Millisecond * time.Duration((25 + (rand.Int() % 175)))): // #nosec - break - } - // Return between 1 and 5 results - total := 1 + (rand.Int() % 5) // #nosec - for i := 0; i < total; i++ { - *rs = append(*rs, Result{ - Title: fmt.Sprintf("%s %d title", resultPrefix, i), - Description: fmt.Sprintf("%s %d description", resultPrefix, i), - URL: fmt.Sprintf("http://%s-%d.com", resultPrefix, i), - }) - } - return nil -} diff --git a/example/processors/processors.go b/example/processors/processors.go deleted file mode 100644 index f534b41..0000000 --- a/example/processors/processors.go +++ /dev/null @@ -1,62 +0,0 @@ -// processors are a bunch of simple processors used in the examples -package processors - -import ( - "context" - "errors" - "log" - "time" -) - -// Miltiplier is a simple processor that multiplies each integer it receives by some Factor -type Multiplier struct { - // Factor will change the amount each number is multiplied by - Factor int -} - -// Process multiplies a number by factor -func (m *Multiplier) Process(_ context.Context, in interface{}) (interface{}, error) { - return in.(int) * m.Factor, nil -} - -// Cancel is called when the context is canceled -func (m *Multiplier) Cancel(i interface{}, err error) { - log.Printf("error: could not multiply %d, %s\n", i, err) -} - -// BatchMultiplier is a simple batch processor that multiplies each `[]int` it receives together -type BatchMultiplier struct{} - -// Process a slice of numbers together and returns a slice of numbers with the results -func (m *BatchMultiplier) Process(_ context.Context, ins interface{}) (interface{}, error) { - result := 1 - for _, in := range ins.([]interface{}) { - result *= in.(int) - } - return []interface{}{result}, nil -} - -// Cancel is called when the context is canceled -func (m *BatchMultiplier) Cancel(i interface{}, err error) { - log.Printf("error: could not multiply %+v, %s\n", i, err) -} - -// Waiter is a Processor that waits for Duration before returning its output -type Waiter struct { - Duration time.Duration -} - -// Process waits for `Waiter.Duration` before returning the value passed in -func (w *Waiter) Process(ctx context.Context, in interface{}) (interface{}, error) { - select { - case <-time.After(w.Duration): - return in, nil - case <-ctx.Done(): - return nil, errors.New("process was canceled") - } -} - -// Cancel is called when the context is canceled -func (w *Waiter) Cancel(i interface{}, err error) { - log.Printf("error: could not process %+v, %s\n", i, err) -} diff --git a/go.mod b/go.mod index a12ef88..8e887c2 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ -module github.com/deliveryhero/pipeline +module github.com/deliveryhero/pipeline/v2 -go 1.16 \ No newline at end of file +go 1.18 + +require github.com/deliveryhero/pipeline v0.4.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8edc542 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/deliveryhero/pipeline v0.4.0 h1:rQ6qHTApvVFouP9Y02k53KoFX+myuO6/OAxVX34iXvo= +github.com/deliveryhero/pipeline v0.4.0/go.mod h1:78CQfQT2DONSGPktr7X71xu333ZMPdrcYuV/gY/Mnkg= diff --git a/merge.go b/merge.go index 3656da6..887002e 100644 --- a/merge.go +++ b/merge.go @@ -3,16 +3,16 @@ package pipeline import "sync" // Merge fans multiple channels in to a single channel -func Merge(ins ...<-chan interface{}) <-chan interface{} { +func Merge[Item any](ins ...<-chan Item) <-chan Item { // Don't merge anything if we don't have to if l := len(ins); l == 0 { - out := make(chan interface{}) + out := make(chan Item) close(out) return out } else if l == 1 { return ins[0] } - out := make(chan interface{}) + out := make(chan Item) // Create a WaitGroup that waits for all of the ins to close var wg sync.WaitGroup wg.Add(len(ins)) @@ -22,13 +22,11 @@ func Merge(ins ...<-chan interface{}) <-chan interface{} { close(out) }() for i := range ins { - go func(in <-chan interface{}) { + go func(in <-chan Item) { // Wait for each in to close for i := range in { - if i != nil { - // Fan the contents of each in into the out - out <- i - } + // Fan the contents of each in into the out + out <- i } // Tell the WaitGroup that one of the channels is closed wg.Done() diff --git a/merge_example_test.go b/merge_example_test.go index edf64c7..f3d92c5 100644 --- a/merge_example_test.go +++ b/merge_example_test.go @@ -1,65 +1,28 @@ package pipeline_test import ( - "context" - "encoding/json" - "log" - "net/http" + "fmt" - "github.com/deliveryhero/pipeline" - "github.com/deliveryhero/pipeline/example/db" + "github.com/deliveryhero/pipeline/v2" ) -// SearchResults returns many types of search results at once -type SearchResults struct { - Advertisements []db.Result `json:"advertisements"` - Images []db.Result `json:"images"` - Products []db.Result `json:"products"` - Websites []db.Result `json:"websites"` -} - func ExampleMerge() { - r := http.NewServeMux() - - // `GET /search?q=` is an endpoint that merges concurrently fetched - // search results into a single search response using `pipeline.Merge` - r.HandleFunc("/search", func(w http.ResponseWriter, r *http.Request) { - query := r.URL.Query().Get("q") - if len(query) < 1 { - w.WriteHeader(http.StatusBadRequest) - return - } - - // If the request times out, or we receive an error from our `db` - // the context will stop all pending db queries for this request - ctx, cancel := context.WithCancel(r.Context()) - defer cancel() - - // Fetch all of the different search results concurrently - var results SearchResults - for err := range pipeline.Merge( - db.GetAdvertisements(ctx, query, &results.Advertisements), - db.GetImages(ctx, query, &results.Images), - db.GetProducts(ctx, query, &results.Products), - db.GetWebsites(ctx, query, &results.Websites), - ) { - // Stop all pending db requests if theres an error - if err != nil { - log.Print(err) - w.WriteHeader(http.StatusInternalServerError) - return - } - } - - // Return the search results - if bs, err := json.Marshal(&results); err != nil { - log.Print(err) - w.WriteHeader(http.StatusInternalServerError) - } else if _, err := w.Write(bs); err != nil { - log.Print(err) - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - } - }) + one := pipeline.Emit(1) + two := pipeline.Emit(2, 2) + three := pipeline.Emit(3, 3, 3) + + for i := range pipeline.Merge(one, two, three) { + fmt.Printf("output: %d\n", i) + } + + fmt.Println("done") + + // Output + // output: 1 + // output: 3 + // output: 2 + // output: 2 + // output: 3 + // output: 3 + // done } diff --git a/merge_test.go b/merge_test.go index 574fddf..6d92e8c 100644 --- a/merge_test.go +++ b/merge_test.go @@ -15,8 +15,8 @@ type task struct { } // do performs the task -func (t task) do() <-chan interface{} { - out := make(chan interface{}) +func (t task) do() <-chan error { + out := make(chan error) go func() { defer close(out) time.Sleep(t.waitFor) @@ -116,14 +116,14 @@ func TestMerge(t *testing.T) { }} { t.Run(test.description, func(t *testing.T) { // Start doing all of the tasks - var errChans []<-chan interface{} + var errChans []<-chan error for _, task := range test.tasks { errChans = append(errChans, task.do()) } // Merge all of their error channels together var errs []error - merged := Merge(errChans...) + merged := Merge[error](errChans...) // Create the timeout timeout := time.After(maxTestDuration) diff --git a/mocks_test.go b/mocks_test.go index 6e4945b..78a2c6b 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -7,32 +7,33 @@ import ( ) // mockProcess is a mock of the Processor interface -type mockProcessor struct { +type mockProcessor[Item any] struct { processDuration time.Duration cancelDuration time.Duration processReturnsErrs bool - processed []interface{} - canceled []interface{} - errs []interface{} + processed []Item + canceled []Item + errs []string } // Process waits processDuration before returning its input as its output -func (m *mockProcessor) Process(ctx context.Context, i interface{}) (interface{}, error) { +func (m *mockProcessor[Item]) Process(ctx context.Context, i Item) (Item, error) { + var zero Item select { case <-ctx.Done(): - return nil, ctx.Err() + return zero, ctx.Err() case <-time.After(m.processDuration): break } if m.processReturnsErrs { - return nil, fmt.Errorf("process error: %d", i) + return zero, fmt.Errorf("process error: %v", i) } m.processed = append(m.processed, i) return i, nil } // Cancel collects all inputs that were canceled in m.canceled -func (m *mockProcessor) Cancel(i interface{}, err error) { +func (m *mockProcessor[Item]) Cancel(i Item, err error) { time.Sleep(m.cancelDuration) m.canceled = append(m.canceled, i) m.errs = append(m.errs, err.Error()) @@ -40,13 +41,13 @@ func (m *mockProcessor) Cancel(i interface{}, err error) { // containsAll returns true if a and b contain all of the same elements // in any order or if both are empty / nil -func containsAll(a, b []interface{}) bool { +func containsAll[Item comparable](a, b []Item) bool { if len(a) != len(b) { return false } else if len(a) == 0 { return true } - aMap := make(map[interface{}]bool) + aMap := make(map[Item]bool) for _, i := range a { aMap[i] = true } diff --git a/pipeline.go b/pipeline.go index b76f7c2..ca2340a 100644 --- a/pipeline.go +++ b/pipeline.go @@ -2,4 +2,11 @@ // It contains common fan-in and fan-out operations as well as useful utility funcs for batch processing and scaling. // // If you have another common use case you would like to see covered by this package, please (open a feature request) https://github.com/deliveryhero/pipeline/issues. +// +// Cookbook +// +// * (How to run a pipeline until the container is killed) https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenContainerIsKilled +// * (How to shut down a pipeline when there is a error) https://github.com/deliveryhero/pipeline#PipelineShutsDownOnError +// * (How to shut down a pipeline after it has finished processing a batch of data) https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenInputChannelIsClosed +// package pipeline diff --git a/pipeline_example_test.go b/pipeline_example_test.go new file mode 100644 index 0000000..7d22172 --- /dev/null +++ b/pipeline_example_test.go @@ -0,0 +1,144 @@ +package pipeline_test + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/deliveryhero/pipeline/v2" +) + +// The following example shows how you can shutdown a pipeline +// gracefully when it receives an error message +func Example_pipelineShutsDownOnError() { + // Create a context that can be canceled + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a pipeline that emits 1-10 + p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + + // A step that will shutdown the pipeline if the number is greater than 1 + p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) { + // Shut down the pipeline by canceling the context + if i != 1 { + cancel() + return i, fmt.Errorf("%d caused the shutdown", i) + } + return i, nil + }, func(i int, err error) { + // The cancel func is called when an error is returned by the process func or the context is canceled + fmt.Printf("could not process %d: %s\n", i, err) + }), p) + + // Finally, lets print the results and see what happened + for result := range p { + fmt.Printf("result: %d\n", result) + } + + fmt.Println("exiting the pipeline after all data is processed") + + // Output + // could not process 2: 2 caused the shutdown + // result: 1 + // could not process 3: context canceled + // could not process 4: context canceled + // could not process 5: context canceled + // could not process 6: context canceled + // could not process 7: context canceled + // could not process 8: context canceled + // could not process 9: context canceled + // could not process 10: context canceled + // exiting the pipeline after all data is processed +} + +// The following example demonstrates a pipeline +// that naturally finishes its run when the input channel is closed +func Example_pipelineShutsDownWhenInputChannelIsClosed() { + // Create a context that can be canceled + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a pipeline that emits 1-10 and then closes its output channel + p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + + // Multiply every number by 2 + p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) { + return i * 2, nil + }, func(i int, err error) { + fmt.Printf("could not multiply %d: %s\n", i, err) + }), p) + + // Finally, lets print the results and see what happened + for result := range p { + fmt.Printf("result: %d\n", result) + } + + fmt.Println("exiting after the input channel is closed") + + // Output + // result: 2 + // result: 4 + // result: 6 + // result: 8 + // result: 10 + // result: 12 + // result: 14 + // result: 16 + // result: 18 + // result: 20 + // exiting after the input channel is closed +} + +// This example demonstrates a pipline +// that runs until the os / container the pipline is running in kills it +func Example_pipelineShutsDownWhenContainerIsKilled() { + // Gracefully shutdown the pipeline when the the system is shutting down + // by canceling the context when os.Kill or os.Interrupt signal is sent + ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt) + defer cancel() + + // Create a pipeline that keeps emitting numbers sequentially until the context is canceled + var count int + p := pipeline.Emitter(ctx, func() int { + count++ + return count + }) + + // Filter out only even numbers + p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) { + if i%2 == 0 { + return i, nil + } + return i, fmt.Errorf("'%d' is an odd number", i) + }, func(i int, err error) { + fmt.Printf("error processing '%v': %s\n", i, err) + }), p) + + // Wait a few nanoseconds an simulate the os.Interrupt signal + go func() { + time.Sleep(time.Millisecond / 10) + fmt.Print("\n--- os kills the app ---\n\n") + syscall.Kill(syscall.Getpid(), syscall.SIGINT) + }() + + // Finally, lets print the results and see what happened + for result := range p { + fmt.Printf("result: %d\n", result) + } + + fmt.Println("exiting after the input channel is closed") + + // Output + // error processing '1': '1' is an odd number + // result: 2 + // + // --- os kills the app --- + // + // error processing '3': '3' is an odd number + // error processing '4': context canceled + // exiting after the input channel is closed +} diff --git a/process.go b/process.go index b71a40c..c6b4990 100644 --- a/process.go +++ b/process.go @@ -6,12 +6,12 @@ import ( "github.com/deliveryhero/pipeline/semaphore" ) -// Process takes each input from the `in <-chan interface{}` and calls `Processor.Process` on it. -// When `Processor.Process` returns an `interface{}`, it will be sent to the output `<-chan interface{}`. +// Process takes each input from the `in <-chan Input` and calls `Processor.Process` on it. +// When `Processor.Process` returns an `Output`, it will be sent to the output `<-chan Output`. // If `Processor.Process` returns an error, `Processor.Cancel` will be called with the corresponding input and error message. -// Finally, if the `Context` is canceled, all inputs remaining in the `in <-chan interface{}` will go directly to `Processor.Cancel`. -func Process(ctx context.Context, processor Processor, in <-chan interface{}) <-chan interface{} { - out := make(chan interface{}) +// Finally, if the `Context` is canceled, all inputs remaining in the `in <-chan Input` will go directly to `Processor.Cancel`. +func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output { + out := make(chan Output) go func() { for i := range in { process(ctx, processor, i, out) @@ -23,15 +23,15 @@ func Process(ctx context.Context, processor Processor, in <-chan interface{}) <- // ProcessConcurrently fans the in channel out to multiple Processors running concurrently, // then it fans the out channels of the Processors back into a single out chan -func ProcessConcurrently(ctx context.Context, concurrently int, p Processor, in <-chan interface{}) <-chan interface{} { +func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], in <-chan Input) <-chan Output { // Create the out chan - out := make(chan interface{}) + out := make(chan Output) go func() { // Perform Process concurrently times sem := semaphore.New(concurrently) for i := range in { sem.Add(1) - go func(i interface{}) { + go func(i Input) { process(ctx, p, i, out) sem.Done() }(i) @@ -43,11 +43,11 @@ func ProcessConcurrently(ctx context.Context, concurrently int, p Processor, in return out } -func process( +func process[A, B any]( ctx context.Context, - processor Processor, - i interface{}, - out chan<- interface{}, + processor Processor[A, B], + i A, + out chan<- B, ) { select { // When the context is canceled, Cancel all inputs diff --git a/process_batch.go b/process_batch.go index d8a29fb..f133023 100644 --- a/process_batch.go +++ b/process_batch.go @@ -7,18 +7,18 @@ import ( "github.com/deliveryhero/pipeline/semaphore" ) -// ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of `interface{}`s. -// It passed an []interface{} to the `Processor.Process` method and expects a []interface{} back. -// It passes []interface{} batches of inputs to the `Processor.Cancel` method. +// ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of `Input`s. +// It passed an []Output to the `Processor.Process` method and expects a []Input back. +// It passes []Input batches of inputs to the `Processor.Cancel` method. // If the receiver is backed up, ProcessBatch can holds up to 2x maxSize. -func ProcessBatch( +func ProcessBatch[Input, Output any]( ctx context.Context, maxSize int, maxDuration time.Duration, - processor Processor, - in <-chan interface{}, -) <-chan interface{} { - out := make(chan interface{}) + processor Processor[[]Input, []Output], + in <-chan Input, +) <-chan Output { + out := make(chan Output) go func() { for { if !processOneBatch(ctx, maxSize, maxDuration, processor, in, out) { @@ -32,16 +32,16 @@ func ProcessBatch( // ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently, // then it fans the out channels of the batch Processors back into a single out chan -func ProcessBatchConcurrently( +func ProcessBatchConcurrently[Input, Output any]( ctx context.Context, concurrently, maxSize int, maxDuration time.Duration, - processor Processor, - in <-chan interface{}, -) <-chan interface{} { + processor Processor[[]Input, []Output], + in <-chan Input, +) <-chan Output { // Create the out chan - out := make(chan interface{}) + out := make(chan Output) go func() { // Perform Process concurrently times sem := semaphore.New(concurrently) @@ -75,13 +75,13 @@ func isDone(ctx context.Context) bool { // processOneBatch processes one batch of inputs from the in chan. // It returns true if the in chan is still open. -func processOneBatch( +func processOneBatch[Input, Output any]( ctx context.Context, maxSize int, maxDuration time.Duration, - processor Processor, - in <-chan interface{}, - out chan<- interface{}, + processor Processor[[]Input, []Output], + in <-chan Input, + out chan<- Output, ) (open bool) { // Collect interfaces for batch processing is, open := collect(ctx, maxSize, maxDuration, in) @@ -98,7 +98,7 @@ func processOneBatch( return open } // Split the results back into interfaces - for _, result := range results.([]interface{}) { + for _, result := range results { out <- result } } diff --git a/process_batch_example_test.go b/process_batch_example_test.go index 6af5c6b..a08db4c 100644 --- a/process_batch_example_test.go +++ b/process_batch_example_test.go @@ -2,11 +2,10 @@ package pipeline_test import ( "context" - "log" + "fmt" "time" - "github.com/deliveryhero/pipeline" - "github.com/deliveryhero/pipeline/example/processors" + "github.com/deliveryhero/pipeline/v2" ) func ExampleProcessBatch() { @@ -17,19 +16,26 @@ func ExampleProcessBatch() { // Create a pipeline that emits 1-6 at a rate of one int per second p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6)) - // Use the BatchMultiplier to multiply 2 adjacent numbers together - p = pipeline.ProcessBatch(ctx, 2, time.Minute, &processors.BatchMultiplier{}, p) + // Multiply every 2 adjacent numbers together + p = pipeline.ProcessBatch(ctx, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, is []int) ([]int, error) { + o := 1 + for _, i := range is { + o *= i + } + return []int{o}, nil + }, func(is []int, err error) { + fmt.Printf("error: could not multiply %v, %s\n", is, err) + }), p) // Finally, lets print the results and see what happened for result := range p { - log.Printf("result: %d\n", result) + fmt.Printf("result: %d\n", result) } // Output // result: 2 // result: 12 - // error: could not multiply [5], context deadline exceeded - // error: could not multiply [6], context deadline exceeded + // error: could not multiply [5 6], context deadline exceeded } func ExampleProcessBatchConcurrently() { @@ -40,23 +46,27 @@ func ExampleProcessBatchConcurrently() { // Create a pipeline that emits 1-9 p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9) - // Wait 4 seconds to pass 2 numbers through the pipe - // * 2 concurrent Processors - p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, &processors.Waiter{ - Duration: 4 * time.Second, - }, p) + // Add a 1 second delay to each number + p = pipeline.Delay(ctx, time.Second, p) + + // Group two inputs at a time + p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, ins []int) ([]int, error) { + return ins, nil + }, func(i []int, err error) { + fmt.Printf("error: could not process %v, %s\n", i, err) + }), p) // Finally, lets print the results and see what happened for result := range p { - log.Printf("result: %d\n", result) + fmt.Printf("result: %d\n", result) } // Output - // result: 3 - // result: 4 // result: 1 // result: 2 - // error: could not process [5 6], process was canceled - // error: could not process [7 8], process was canceled + // result: 3 + // result: 5 + // error: could not process [7 8], context deadline exceeded + // error: could not process [4 6], context deadline exceeded // error: could not process [9], context deadline exceeded } diff --git a/process_batch_test.go b/process_batch_test.go index e3a7d33..25cfc33 100644 --- a/process_batch_test.go +++ b/process_batch_test.go @@ -13,8 +13,8 @@ func TestProcessBatch(t *testing.T) { ctxTimeout time.Duration maxSize int maxDuration time.Duration - processor *mockProcessor - in <-chan interface{} + processor *mockProcessor[[]int] + in <-chan int } tests := []struct { name string @@ -28,7 +28,7 @@ func TestProcessBatch(t *testing.T) { maxDuration: maxTestDuration, // Process 2 elements 33% of the total test duration maxSize: 2, - processor: &mockProcessor{ + processor: &mockProcessor[[]int]{ processDuration: maxTestDuration / 3, cancelDuration: maxTestDuration / 3, }, @@ -45,7 +45,7 @@ func TestProcessBatch(t *testing.T) { maxDuration: maxTestDuration, // Process 5 elements 33% of the total test duration maxSize: 5, - processor: &mockProcessor{ + processor: &mockProcessor[[]int]{ processDuration: maxTestDuration / 3, cancelDuration: maxTestDuration / 3, }, @@ -63,7 +63,7 @@ func TestProcessBatch(t *testing.T) { // Process the batch with a timeout of maxTestDuration open := true - outChan := ProcessBatch(ctx, tt.args.maxSize, tt.args.maxDuration, tt.args.processor, tt.args.in) + outChan := ProcessBatch[int, int](ctx, tt.args.maxSize, tt.args.maxDuration, tt.args.processor, tt.args.in) timeout := time.After(maxTestDuration) loop: for { @@ -92,8 +92,8 @@ func TestProcessBatchConcurrently(t *testing.T) { concurrently int maxSize int maxDuration time.Duration - processor *mockProcessor - in <-chan interface{} + processor *mockProcessor[[]int] + in <-chan int } tests := []struct { name string @@ -108,7 +108,7 @@ func TestProcessBatchConcurrently(t *testing.T) { maxSize: 1, // * 2x concurrently concurrently: 2, - processor: &mockProcessor{ + processor: &mockProcessor[[]int]{ processDuration: maxTestDuration / 3, cancelDuration: maxTestDuration / 3, }, @@ -126,7 +126,7 @@ func TestProcessBatchConcurrently(t *testing.T) { maxSize: 1, // * 5x concurrently concurrently: 5, - processor: &mockProcessor{ + processor: &mockProcessor[[]int]{ processDuration: maxTestDuration / 3, cancelDuration: maxTestDuration / 3, }, @@ -144,7 +144,7 @@ func TestProcessBatchConcurrently(t *testing.T) { // Process the batch with a timeout of maxTestDuration open := true - out := ProcessBatchConcurrently(ctx, tt.args.concurrently, tt.args.maxSize, tt.args.maxDuration, tt.args.processor, tt.args.in) + out := ProcessBatchConcurrently[int, int](ctx, tt.args.concurrently, tt.args.maxSize, tt.args.maxDuration, tt.args.processor, tt.args.in) timeout := time.After(maxTestDuration) loop: for { @@ -167,21 +167,21 @@ func TestProcessBatchConcurrently(t *testing.T) { } func Test_processBatch(t *testing.T) { - drain := make(chan interface{}, 10000) + drain := make(chan int, 10000) const maxTestDuration = time.Second type args struct { ctxTimeout time.Duration maxSize int maxDuration time.Duration - processor *mockProcessor - in <-chan interface{} - out chan<- interface{} + processor *mockProcessor[[]int] + in <-chan int + out chan<- int } type want struct { open bool - processed []interface{} - canceled []interface{} - errs []interface{} + processed [][]int + canceled [][]int + errs []string } tests := []struct { name string @@ -193,9 +193,9 @@ func Test_processBatch(t *testing.T) { ctxTimeout: maxTestDuration, maxSize: 20, maxDuration: maxTestDuration, - processor: new(mockProcessor), - in: func() <-chan interface{} { - in := make(chan interface{}) + processor: new(mockProcessor[[]int]), + in: func() <-chan int { + in := make(chan int) close(in) return in }(), @@ -210,21 +210,21 @@ func Test_processBatch(t *testing.T) { ctxTimeout: maxTestDuration, maxSize: 2, maxDuration: maxTestDuration, - processor: new(mockProcessor), + processor: new(mockProcessor[[]int]), in: Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), out: drain, }, want: want{ open: false, - processed: []interface{}{[]interface{}{ + processed: [][]int{{ 1, 2, - }, []interface{}{ + }, { 3, 4, - }, []interface{}{ + }, { 5, 6, - }, []interface{}{ + }, { 7, 8, - }, []interface{}{ + }, { 9, 10, }}, }, @@ -234,7 +234,7 @@ func Test_processBatch(t *testing.T) { ctxTimeout: maxTestDuration / 2, maxSize: 5, maxDuration: maxTestDuration, - processor: &mockProcessor{ + processor: &mockProcessor[[]int]{ processReturnsErrs: true, }, in: Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), @@ -242,12 +242,12 @@ func Test_processBatch(t *testing.T) { }, want: want{ open: false, - canceled: []interface{}{[]interface{}{ + canceled: [][]int{{ 1, 2, 3, 4, 5, - }, []interface{}{ + }, { 6, 7, 8, 9, 10, }}, - errs: []interface{}{ + errs: []string{ "process error: [1 2 3 4 5]", "process error: [6 7 8 9 10]", }, @@ -258,7 +258,7 @@ func Test_processBatch(t *testing.T) { ctxTimeout: maxTestDuration / 2, maxSize: 1, maxDuration: maxTestDuration, - processor: &mockProcessor{ + processor: &mockProcessor[[]int]{ // this will take longer to complete than the maxTestDuration by a few micro seconds processDuration: maxTestDuration / 10, // 5 calls to Process > maxTestDuration / 2 cancelDuration: maxTestDuration/10 + 25*time.Millisecond, // 5 calls to Cancel > maxTestDuration / 2 @@ -268,19 +268,13 @@ func Test_processBatch(t *testing.T) { }, want: want{ open: true, - processed: []interface{}{ - []interface{}{1}, - []interface{}{2}, - []interface{}{3}, - []interface{}{4}, + processed: [][]int{ + {1}, {2}, {3}, {4}, }, - canceled: []interface{}{ - []interface{}{5}, - []interface{}{6}, - []interface{}{7}, - []interface{}{8}, + canceled: [][]int{ + {5}, {6}, {7}, {8}, }, - errs: []interface{}{ + errs: []string{ "context deadline exceeded", "context deadline exceeded", "context deadline exceeded", @@ -302,7 +296,7 @@ func Test_processBatch(t *testing.T) { case <-timeout: break loop default: - open = processOneBatch(ctx, tt.args.maxSize, tt.args.maxDuration, tt.args.processor, tt.args.in, tt.args.out) + open = processOneBatch[int, int](ctx, tt.args.maxSize, tt.args.maxDuration, tt.args.processor, tt.args.in, tt.args.out) if !open { break loop } diff --git a/process_example_test.go b/process_example_test.go index fee74a0..9eb32dd 100644 --- a/process_example_test.go +++ b/process_example_test.go @@ -2,11 +2,11 @@ package pipeline_test import ( "context" + "fmt" "log" "time" - "github.com/deliveryhero/pipeline" - "github.com/deliveryhero/pipeline/example/processors" + "github.com/deliveryhero/pipeline/v2" ) func ExampleProcess() { @@ -17,14 +17,16 @@ func ExampleProcess() { // Create a pipeline that emits 1-6 at a rate of one int per second p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6)) - // Use the Multiplier to multiply each int by 10 - p = pipeline.Process(ctx, &processors.Multiplier{ - Factor: 10, - }, p) + // Multiply each number by 10 + p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) { + return in * 10, nil + }, func(i int, err error) { + fmt.Printf("error: could not multiply %v, %s\n", i, err) + }), p) // Finally, lets print the results and see what happened for result := range p { - log.Printf("result: %d\n", result) + fmt.Printf("result: %d\n", result) } // Output @@ -44,11 +46,15 @@ func ExampleProcessConcurrently() { // Create a pipeline that emits 1-7 p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7) - // Wait 2 seconds to pass each number through the pipe - // * 2 concurrent Processors - p = pipeline.ProcessConcurrently(ctx, 2, &processors.Waiter{ - Duration: 2 * time.Second, - }, p) + // Add a two second delay to each number + p = pipeline.Delay(ctx, 2*time.Second, p) + + // Add two concurrent processors that pass input numbers to the output + p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) { + return in, nil + }, func(i int, err error) { + fmt.Printf("error: could not process %v, %s\n", i, err) + }), p) // Finally, lets print the results and see what happened for result := range p { diff --git a/process_test.go b/process_test.go index 92d88e1..8d06021 100644 --- a/process_test.go +++ b/process_test.go @@ -14,13 +14,13 @@ func TestProcess(t *testing.T) { processDuration time.Duration processReturnsErrors bool cancelDuration time.Duration - in []interface{} + in []int } type want struct { open bool - out []interface{} - canceled []interface{} - canceledErrs []interface{} + out []int + canceled []int + canceledErrs []string } tests := []struct { name string @@ -32,11 +32,11 @@ func TestProcess(t *testing.T) { args: args{ ctxTimeout: 2 * maxTestDuration, processDuration: 0, - in: []interface{}{1, 2, 3}, + in: []int{1, 2, 3}, }, want: want{ open: false, - out: []interface{}{1, 2, 3}, + out: []int{1, 2, 3}, canceled: nil, }, }, { @@ -44,13 +44,13 @@ func TestProcess(t *testing.T) { args: args{ ctxTimeout: maxTestDuration / 2, processDuration: maxTestDuration / 11, - in: []interface{}{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + in: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, }, want: want{ open: false, - out: []interface{}{1, 2, 3, 4, 5}, - canceled: []interface{}{6, 7, 8, 9, 10}, - canceledErrs: []interface{}{ + out: []int{1, 2, 3, 4, 5}, + canceled: []int{6, 7, 8, 9, 10}, + canceledErrs: []string{ "context deadline exceeded", "context deadline exceeded", "context deadline exceeded", @@ -64,13 +64,13 @@ func TestProcess(t *testing.T) { ctxTimeout: maxTestDuration / 2, processDuration: (maxTestDuration / 2) - (100 * time.Millisecond), cancelDuration: (maxTestDuration / 2) - (100 * time.Millisecond), - in: []interface{}{1, 2, 3}, + in: []int{1, 2, 3}, }, want: want{ open: true, - out: []interface{}{1}, - canceled: []interface{}{2}, - canceledErrs: []interface{}{ + out: []int{1}, + canceled: []int{2}, + canceledErrs: []string{ "context deadline exceeded", }, }, @@ -81,13 +81,13 @@ func TestProcess(t *testing.T) { processDuration: (maxTestDuration - 200*time.Millisecond) / 2, processReturnsErrors: true, cancelDuration: 0, - in: []interface{}{1, 2, 3}, + in: []int{1, 2, 3}, }, want: want{ open: false, out: nil, - canceled: []interface{}{1, 2, 3}, - canceledErrs: []interface{}{ + canceled: []int{1, 2, 3}, + canceledErrs: []string{ "process error: 1", "process error: 2", "context deadline exceeded", @@ -98,7 +98,7 @@ func TestProcess(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Create the in channel - in := make(chan interface{}) + in := make(chan int) go func() { defer close(in) for _, i := range test.args.in { @@ -109,16 +109,16 @@ func TestProcess(t *testing.T) { // Setup the Processor ctx, cancel := context.WithTimeout(context.Background(), test.args.ctxTimeout) defer cancel() - processor := &mockProcessor{ + processor := &mockProcessor[int]{ processDuration: test.args.processDuration, processReturnsErrs: test.args.processReturnsErrors, cancelDuration: test.args.cancelDuration, } - out := Process(ctx, processor, in) + out := Process[int, int](ctx, processor, in) // Collect the outputs timeout := time.After(maxTestDuration) - var outs []interface{} + var outs []int var isOpen bool loop: for { @@ -166,13 +166,13 @@ func TestProcessConcurrently(t *testing.T) { processReturnsErrors bool cancelDuration time.Duration concurrently int - in []interface{} + in []int } type want struct { open bool - out []interface{} - canceled []interface{} - canceledErrs []interface{} + out []int + canceled []int + canceledErrs []string } tests := []struct { name string @@ -185,11 +185,11 @@ func TestProcessConcurrently(t *testing.T) { ctxTimeout: 2 * maxTestDuration, // context never times out processDuration: maxTestDuration/3 - (100 * time.Millisecond), // 3 processed per processor concurrently: 2, // * 2 processors = 6 processed, pipe closes - in: []interface{}{1, 2, 3, 4, 5, 6}, + in: []int{1, 2, 3, 4, 5, 6}, }, want: want{ open: false, - out: []interface{}{1, 2, 3, 4, 5, 6}, + out: []int{1, 2, 3, 4, 5, 6}, canceled: nil, }, }, { @@ -198,13 +198,13 @@ func TestProcessConcurrently(t *testing.T) { ctxTimeout: maxTestDuration / 2, // context times out before the test ends processDuration: (maxTestDuration / 4) - (10 * time.Millisecond), // 2 processed per processor before timeout concurrently: 3, // * 3 processors = 6 processed, 4 canceled, pipe closes - in: []interface{}{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + in: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, }, want: want{ open: false, - out: []interface{}{1, 2, 3, 4, 5, 6}, - canceled: []interface{}{7, 8, 9, 10}, - canceledErrs: []interface{}{ + out: []int{1, 2, 3, 4, 5, 6}, + canceled: []int{7, 8, 9, 10}, + canceledErrs: []string{ "context deadline exceeded", "context deadline exceeded", "context deadline exceeded", @@ -218,13 +218,13 @@ func TestProcessConcurrently(t *testing.T) { processDuration: (maxTestDuration / 2) - (100 * time.Millisecond), // process fires onces per processor cancelDuration: (maxTestDuration / 2) - (100 * time.Millisecond), // cancel fires once per process concurrently: 3, // * 3 proceses = 3 canceled, 3 processed, 1 still in the pipe - in: []interface{}{1, 2, 3, 4, 5, 6, 7}, + in: []int{1, 2, 3, 4, 5, 6, 7}, }, want: want{ open: true, - out: []interface{}{1, 2, 3}, - canceled: []interface{}{4, 5, 6}, - canceledErrs: []interface{}{ + out: []int{1, 2, 3}, + canceled: []int{4, 5, 6}, + canceledErrs: []string{ "context deadline exceeded", "context deadline exceeded", "context deadline exceeded", @@ -238,13 +238,13 @@ func TestProcessConcurrently(t *testing.T) { processReturnsErrors: true, cancelDuration: 0, concurrently: 1, - in: []interface{}{1, 2, 3}, + in: []int{1, 2, 3}, }, want: want{ open: false, out: nil, - canceled: []interface{}{1, 2, 3}, - canceledErrs: []interface{}{ + canceled: []int{1, 2, 3}, + canceledErrs: []string{ "process error: 1", "process error: 2", "context deadline exceeded", @@ -255,38 +255,31 @@ func TestProcessConcurrently(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Create the in channel - in := make(chan interface{}) - go func() { - defer close(in) - for _, i := range test.args.in { - in <- i - } - }() + in := Emit(test.args.in...) // Setup the Processor ctx, cancel := context.WithTimeout(context.Background(), test.args.ctxTimeout) defer cancel() - processor := &mockProcessor{ + processor := &mockProcessor[int]{ processDuration: test.args.processDuration, processReturnsErrs: test.args.processReturnsErrors, cancelDuration: test.args.cancelDuration, } - out := ProcessConcurrently(ctx, test.args.concurrently, processor, in) + out := ProcessConcurrently[int, int](ctx, test.args.concurrently, processor, in) - // Collect the outputs - timeout := time.After(maxTestDuration) - var outs []interface{} + var outs []int var isOpen bool + timeout := time.After(maxTestDuration) loop: for { select { - case o, open := <-out: + case i, open := <-out: + isOpen = open if !open { - isOpen = false break loop } - isOpen = true - outs = append(outs, o) + outs = append(outs, i) + case <-timeout: break loop } diff --git a/processor.go b/processor.go index c846e3c..0f07d55 100644 --- a/processor.go +++ b/processor.go @@ -5,33 +5,33 @@ import "context" // Processor represents a blocking operation in a pipeline. Implementing `Processor` will allow you to add // business logic to your pipelines without directly managing channels. This simplifies your unit tests // and eliminates channel management related bugs. -type Processor interface { +type Processor[Input, Output any] interface { // Process processes an input and returns an output or an error, if the output could not be processed. // When the context is canceled, process should stop all blocking operations and return the `Context.Err()`. - Process(ctx context.Context, i interface{}) (interface{}, error) + Process(ctx context.Context, i Input) (Output, error) - // Cancel is called if process returns an error or if the context is canceled while there are still items in the `in <-chan interface{}`. - Cancel(i interface{}, err error) + // Cancel is called if process returns an error or if the context is canceled while there are still items in the `in <-chan Input`. + Cancel(i Input, err error) } // NewProcessor creates a process and cancel func -func NewProcessor( - process func(ctx context.Context, i interface{}) (interface{}, error), - cancel func(i interface{}, err error), -) Processor { - return &processor{process, cancel} +func NewProcessor[Input, Output any]( + process func(ctx context.Context, i Input) (Output, error), + cancel func(i Input, err error), +) Processor[Input, Output] { + return &processor[Input, Output]{process, cancel} } // processor implements Processor -type processor struct { - process func(ctx context.Context, i interface{}) (interface{}, error) - cancel func(i interface{}, err error) +type processor[Input, Output any] struct { + process func(ctx context.Context, i Input) (Output, error) + cancel func(i Input, err error) } -func (p *processor) Process(ctx context.Context, i interface{}) (interface{}, error) { +func (p *processor[Input, Output]) Process(ctx context.Context, i Input) (Output, error) { return p.process(ctx, i) } -func (p *processor) Cancel(i interface{}, err error) { +func (p *processor[Input, Output]) Cancel(i Input, err error) { p.cancel(i, err) } diff --git a/split.go b/split.go index 61187c0..be3b7fa 100644 --- a/split.go +++ b/split.go @@ -1,13 +1,12 @@ package pipeline // Split takes an interface from Collect and splits it back out into individual elements -// Useful for batch processing pipelines (`input chan -> Collect -> Process -> Split -> Cancel -> output chan`). -func Split(in <-chan interface{}) <-chan interface{} { - out := make(chan interface{}) +func Split[Item any](in <-chan []Item) <-chan Item { + out := make(chan Item) go func() { defer close(out) for is := range in { - for _, i := range is.([]interface{}) { + for _, i := range is { out <- i } } diff --git a/split_test.go b/split_test.go index 4b48272..8f5e8cb 100644 --- a/split_test.go +++ b/split_test.go @@ -7,10 +7,10 @@ import ( func TestSplit(t *testing.T) { type args struct { - in interface{} + in []int } type want struct { - out []interface{} + out []int } tests := []struct { name string @@ -20,24 +20,20 @@ func TestSplit(t *testing.T) { { "splits slices if interfaces into individual interfaces", args{ - in: []interface{}{1, 2, 3, 4, 5}, + in: []int{1, 2, 3, 4, 5}, }, want{ - out: []interface{}{1, 2, 3, 4, 5}, + out: []int{1, 2, 3, 4, 5}, }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Create the in channel - in := make(chan interface{}) - go func() { - defer close(in) - in <- test.args.in - }() + in := Emit(test.args.in) // Collect the output - var outs []interface{} + var outs []int for o := range Split(in) { outs = append(outs, o) }