-
Notifications
You must be signed in to change notification settings - Fork 0
/
transform.go2
91 lines (78 loc) · 1.73 KB
/
transform.go2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package channels
// Collect returns a slice of all values received from the src channel.
//
// MOTE: Only use this if src is guaranteed to close otherwise this will block indefinitely.
func Collect[T any](src <-chan T) []T {
var vv []T
for v := range src {
vv = append(vv, v)
}
return vv
}
// Transform returns a channel that will transform the values of the src channel using the provided function
func Transform[T, U any](src <-chan T, f func(T) U) <-chan U {
out := make(chan U)
go func() {
for v := range src {
out <- f(v)
}
close(out)
}()
return out
}
// Split splits the channels into two based the predicate.
func Split[T any](src <-chan T, f func(T) (first bool)) (<-chan T, <-chan T) {
first, second := make(chan T), make(chan T)
go func() {
for v := range src {
if f(v) {
first <- v
} else {
second <- v
}
}
}()
return first, second
}
// Each runs the function on each value obtained from the src channel.
// It is lazy and function is run when reading values from the returned channel
func Each[T any](src <-chan T, f func(T)) <-chan T {
return Transform[T, T](src, func(v T) T {
f(v)
return v
})
}
// Buffer
func Buffer[T any](src <-chan T, f func(T) bool) <-chan []T {
out := make(chan []T)
go func() {
var arr []T
for v := range src {
arr = append(arr, v)
if f(v) {
out <- arr
arr = make([]T, 0)
}
}
out <- arr
close(out)
}()
return out
}
func NWise[T any](src <-chan T, n int) <-chan []T {
if n < 1 {
panic("channels: n needs to be 1 or greater than 1 for NWise")
}
out := make(chan []T)
go func() {
var arr []T
for v := range src {
arr = append(arr, v)
if len(arr) >= n {
out <- arr[len(arr)-n:]
}
}
close(out)
}()
return out
}