From 639c47109e329c9a411153b1415a3d2bb44ded7a Mon Sep 17 00:00:00 2001 From: Mark Salpeter Date: Mon, 5 Sep 2022 11:45:49 +0200 Subject: [PATCH] feat(join): joins two processes together where the output of one is the input of the other --- join.go | 28 +++++++++++++++++++++++++++ join_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++ sequence_test.go | 22 +++++++++++++++++++++ 3 files changed, 100 insertions(+) create mode 100644 join.go create mode 100644 join_test.go diff --git a/join.go b/join.go new file mode 100644 index 0000000..96a369b --- /dev/null +++ b/join.go @@ -0,0 +1,28 @@ +package pipeline + +import "context" + +type join[A, B, C any] struct { + a Processor[A, B] + b Processor[B, C] +} + +func (j *join[A, B, C]) Process(ctx context.Context, a A) (C, error) { + var zero C + if b, err := j.a.Process(ctx, a); err != nil { + j.a.Cancel(a, err) + return zero, err + } else if c, err := j.b.Process(ctx, b); err != nil { + j.b.Cancel(b, err) + return zero, err + } else { + return c, nil + } +} + +func (j *join[A, B, C]) Cancel(_ A, _ error) {} + +// Join connects two processes where the output of the first is the input of the second +func Join[A, B, C any](a Processor[A, B], b Processor[B, C]) Processor[A, C] { + return &join[A, B, C]{a, b} +} diff --git a/join_test.go b/join_test.go new file mode 100644 index 0000000..088a287 --- /dev/null +++ b/join_test.go @@ -0,0 +1,50 @@ +package pipeline + +import ( + "context" + "fmt" + "strconv" + "testing" +) + +func TestJoin(t *testing.T) { + // Emit 10 numbers + want := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + ins := Emit(want...) + // Join two steps, one that converts the number to a string, the other that converts it back to a number + join := Join(NewProcessor(func(_ context.Context, i int) (string, error) { + return strconv.Itoa(i), nil + }, nil), NewProcessor(func(_ context.Context, i string) (int, error) { + return strconv.Atoi(i) + }, nil)) + // Compare inputs and outputs + var idx int + for got := range Process(context.Background(), join, ins) { + if want[idx] != got { + t.Fatalf("[%d] = %d, want = %d", idx, got, want[idx]) + } + idx++ + } +} + +func JoinExample() { + // Emit 10 numbers + inputs := Emit(0, 1, 2, 3, 4, 5) + // Join two steps, one that converts the number to a string, the other that converts it back to a number + convertToStringThenBackToInt := Process(context.Background(), Join(NewProcessor(func(_ context.Context, i int) (string, error) { + return strconv.Itoa(i), nil + }, nil), NewProcessor(func(_ context.Context, i string) (int, error) { + return strconv.Atoi(i) + }, nil)), inputs) + // Print the output + for o := range convertToStringThenBackToInt { + fmt.Println(o) + } + // Output + // 0 + // 1 + // 2 + // 3 + // 4 + // 5 +} diff --git a/sequence_test.go b/sequence_test.go index 197c5f6..afd920e 100644 --- a/sequence_test.go +++ b/sequence_test.go @@ -2,6 +2,7 @@ package pipeline import ( "context" + "fmt" "testing" ) @@ -23,3 +24,24 @@ func TestSequence(t *testing.T) { i++ } } + +func SequenceExample() { + // Create a step that increments the integer by 1 + inc := NewProcessor(func(_ context.Context, i int) (int, error) { + i++ + return i, nil + }, nil) + // Add 5 to every input + inputs := Emit(0, 1, 2, 3, 4) + addFive := Process(context.Background(), Sequence(inc, inc, inc, inc, inc), inputs) + // Print the output + for o := range addFive { + fmt.Println(o) + } + // Output + // 5 + // 6 + // 7 + // 8 + // 9 +}