-
Notifications
You must be signed in to change notification settings - Fork 0
/
join.go2
132 lines (111 loc) · 2.24 KB
/
join.go2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package channels
// Merge combines the output of all input into one output channel. The order is at random.
// The returned channel is closed when all the input channels are closed.
// If zero channels are passed it returns an closed channel.
//
// If you want channels to be processed in the order they were passed see Concat
func Merge[T any](in ...<-chan T) <-chan T {
out := make(chan T)
if len(in) == 0 {
close(out)
return out
}
end := make(chan struct{})
for _, c := range in {
go func(c <-chan T) {
for v := range c {
out <- v
}
end <- struct{}{}
}(c)
}
go func() {
count := 0
for range end {
count++
if count == len(in) {
break
}
}
close(out)
}()
return out
}
// Concat is similar to Merge except that it guarantees order. The input channels are processed one
// after the other.
//
// Note: Use this only if the channels are guaranteed to be closed.
func Concat[T any](in ...<-chan T) <-chan T {
out := make(chan T)
if len(in) == 0 {
close(out)
return out
}
go func() {
for _, c := range in {
for v := range c {
out <- v
}
}
close(out)
}()
return out
}
// Copy copies values from src channel to dst channel. It returns a channel that closes/emits once the copy operation is complete.
// It does not close the dst channel
func Copy[T any](dst chan<- T, src <-chan T) <-chan Signal {
done := make(chan Signal)
go func() {
for v := range src {
dst <- v
}
close(done)
}()
return done
}
func StartWith[T any](src <-chan T, vv ...T) <-chan T {
out := make(chan T)
go func() {
for _, v := range vv {
out <- v
}
for v := range src {
out <- v
}
close(out)
}()
return out
}
// Race will return the channel that is equivalent to the first channel that starts emitting values.
func Race[T any](in ...chan T) <-chan T {
out := make(chan T)
if len(in) == 0 {
close(out)
return out
}
done := make(chan Signal)
type sel struct {
value T
ch <-chan T
}
selCh := make(chan sel)
for i, c := range in {
go func(i int, c <-chan T) {
select {
case <-done:
case v := <-c:
close(done)
selCh <- sel{v, c}
}
}(i, c)
}
go func() {
s := <-selCh
out <- s.value
for v := range s.ch {
out <- v
}
close(out)
}()
return out
}