Skip to content

Commit

Permalink
feat(sequence): joins many processors with the same input and output
Browse files Browse the repository at this point in the history
  • Loading branch information
marksalpeter committed Sep 5, 2022
1 parent c6d79ab commit 4863fba
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
26 changes: 26 additions & 0 deletions sequence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package pipeline

import "context"

type sequence[A any] []Processor[A, A]

func (s sequence[A]) Process(ctx context.Context, a A) (A, error) {
var zero A
var in = a
for _, p := range s {
if out, err := p.Process(ctx, in); err != nil {
p.Cancel(in, err)
return zero, err
} else {
in = out
}
}
return in, nil
}

func (s sequence[A]) Cancel(_ A, _ error) {}

// Sequence connects many processors sequentially where the inputs are the same outputs
func Sequence[A any](ps ...Processor[A, A]) Processor[A, A] {
return sequence[A](ps)
}
25 changes: 25 additions & 0 deletions sequence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package pipeline

import (
"context"
"testing"
)

func TestSequence(t *testing.T) {
// Create a step that increments the integer by 1
inc := NewProcessor(func(_ context.Context, i int) (int, error) {
i++
return i, nil
}, nil)
// Add 5 to every number
inc5 := Sequence(inc, inc, inc, inc, inc)
// Verify the sequence ads 5 to each number
var i int
want := []int{5, 6, 7, 8, 9}
for o := range Process(context.Background(), inc5, Emit(0, 1, 2, 3, 4)) {
if want[i] != o {
t.Fatalf("[%d] = %d, want %d", i, o, want[i])
}
i++
}
}

0 comments on commit 4863fba

Please sign in to comment.