Skip to content

Commit

Permalink
feat(join): joins two processes together where the output of one is t…
Browse files Browse the repository at this point in the history
…he input of the other
  • Loading branch information
marksalpeter committed Sep 5, 2022
1 parent 4863fba commit 639c471
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 0 deletions.
28 changes: 28 additions & 0 deletions join.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package pipeline

import "context"

type join[A, B, C any] struct {
a Processor[A, B]
b Processor[B, C]
}

func (j *join[A, B, C]) Process(ctx context.Context, a A) (C, error) {
var zero C
if b, err := j.a.Process(ctx, a); err != nil {
j.a.Cancel(a, err)
return zero, err
} else if c, err := j.b.Process(ctx, b); err != nil {
j.b.Cancel(b, err)
return zero, err
} else {
return c, nil
}
}

func (j *join[A, B, C]) Cancel(_ A, _ error) {}

// Join connects two processes where the output of the first is the input of the second
func Join[A, B, C any](a Processor[A, B], b Processor[B, C]) Processor[A, C] {
return &join[A, B, C]{a, b}
}
50 changes: 50 additions & 0 deletions join_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package pipeline

import (
"context"
"fmt"
"strconv"
"testing"
)

func TestJoin(t *testing.T) {
// Emit 10 numbers
want := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
ins := Emit(want...)
// Join two steps, one that converts the number to a string, the other that converts it back to a number
join := Join(NewProcessor(func(_ context.Context, i int) (string, error) {
return strconv.Itoa(i), nil
}, nil), NewProcessor(func(_ context.Context, i string) (int, error) {
return strconv.Atoi(i)
}, nil))
// Compare inputs and outputs
var idx int
for got := range Process(context.Background(), join, ins) {
if want[idx] != got {
t.Fatalf("[%d] = %d, want = %d", idx, got, want[idx])
}
idx++
}
}

func JoinExample() {
// Emit 10 numbers
inputs := Emit(0, 1, 2, 3, 4, 5)
// Join two steps, one that converts the number to a string, the other that converts it back to a number
convertToStringThenBackToInt := Process(context.Background(), Join(NewProcessor(func(_ context.Context, i int) (string, error) {
return strconv.Itoa(i), nil
}, nil), NewProcessor(func(_ context.Context, i string) (int, error) {
return strconv.Atoi(i)
}, nil)), inputs)
// Print the output
for o := range convertToStringThenBackToInt {
fmt.Println(o)
}
// Output
// 0
// 1
// 2
// 3
// 4
// 5
}
22 changes: 22 additions & 0 deletions sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pipeline

import (
"context"
"fmt"
"testing"
)

Expand All @@ -23,3 +24,24 @@ func TestSequence(t *testing.T) {
i++
}
}

func SequenceExample() {
// Create a step that increments the integer by 1
inc := NewProcessor(func(_ context.Context, i int) (int, error) {
i++
return i, nil
}, nil)
// Add 5 to every input
inputs := Emit(0, 1, 2, 3, 4)
addFive := Process(context.Background(), Sequence(inc, inc, inc, inc, inc), inputs)
// Print the output
for o := range addFive {
fmt.Println(o)
}
// Output
// 5
// 6
// 7
// 8
// 9
}

0 comments on commit 639c471

Please sign in to comment.