Skip to content
This repository was archived by the owner on Mar 5, 2024. It is now read-only.

Commit a0faf08

Browse files
committed
Support chaining streams
1 parent 92ebb1d commit a0faf08

File tree

1 file changed

+58
-16
lines changed

1 file changed

+58
-16
lines changed

src/data/data_stream.erl

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from_fun/1,
1111
from_list/1,
1212
to_list/1,
13+
append/2,
1314
foreach/2,
1415
fold/3,
1516
map/2, % Alias for lazy_map.
@@ -21,8 +22,6 @@
2122
sample/2
2223
]).
2324

24-
-define(T, ?MODULE).
25-
2625
-type reservoir(A) :: #{pos_integer() => A}.
2726

2827
-type filter(A, B)
@@ -32,20 +31,22 @@
3231

3332
-type next(A) :: fun(() -> none | {some, {A, next(A)}}).
3433

35-
-record(?T, {
34+
-record(stream, {
3635
next :: next(any()),
3736
filters :: [filter(any(), any())]
3837
}).
3938

40-
-opaque t(A) ::
39+
-type stream(A) ::
4140
%% XXX Record syntax does not support type parameters, so we get around it with desugaring.
4241
%% XXX Ensure the field order is the same as in the corresponding record!
4342
{
44-
?T,
43+
stream,
4544
next(A),
4645
[filter(A, any())]
4746
}.
4847

48+
-opaque t(A) :: [stream(A)].
49+
4950
-record(sched, {
5051
id :: reference(),
5152
producers :: [{pid(), reference()}],
@@ -59,18 +60,22 @@
5960

6061
-spec from_fun(next(A)) -> t(A).
6162
from_fun(Next) ->
62-
#?T{
63-
next = Next,
64-
filters = []
65-
}.
63+
[#stream{next = Next, filters = []}].
64+
65+
-spec append(t(A), t(A)) -> t(A).
66+
append(TA, TB) ->
67+
TA ++ TB.
6668

6769
-spec next(t(A)) -> none | {some, {A, t(A)}}.
68-
next(#?T{next=Next0, filters=Filters}=T0) when is_function(Next0) ->
70+
next([#stream{next=Next0, filters=Filters}=S | Streams]) when is_function(Next0) ->
6971
case Next0() of
7072
none ->
71-
none;
73+
case Streams of
74+
[] -> none;
75+
[_|_] -> next(Streams)
76+
end;
7277
{some, {X, Next1}} when is_function(Next1) ->
73-
T1 = T0#?T{next=Next1},
78+
T1 = [S#stream{next=Next1} | Streams],
7479
case filters_apply(X, Filters) of
7580
none ->
7681
next(T1);
@@ -86,12 +91,12 @@ filter(T, F) ->
8691
lazy_filter(T, F).
8792

8893
-spec lazy_map(t(A), fun((A) -> B)) -> t(B).
89-
lazy_map(#?T{filters=Filters}=T, F) ->
90-
T#?T{filters=Filters ++ [{map, F}]}.
94+
lazy_map([#stream{filters=Filters}=S | Streams], F) ->
95+
[S#stream{filters=Filters ++ [{map, F}]} | Streams].
9196

9297
-spec lazy_filter(t(A), fun((A) -> boolean())) -> t(A).
93-
lazy_filter(#?T{filters=Filters}=T, F) ->
94-
T#?T{filters=Filters ++ [{test, F}]}.
98+
lazy_filter([#stream{filters=Filters}=S | Streams], F) ->
99+
[S#stream{filters=Filters ++ [{test, F}]} | Streams].
95100

96101
-spec fold(t(A), B, fun((A, B) -> B)) -> B.
97102
fold(T0, Acc, F) ->
@@ -485,6 +490,43 @@ fold_test_() ->
485490
]
486491
].
487492

493+
append_test_() ->
494+
[
495+
?_assertEqual(
496+
[1, 2, 3, 4, 5],
497+
to_list(append(from_list([1, 2]), from_list([3, 4, 5])))
498+
),
499+
?_assertEqual(
500+
[1, 2, 3, 4, 5, 6, 7, 8],
501+
to_list(
502+
append(
503+
append(from_list([1, 2]), from_list([3, 4, 5])),
504+
from_list([6, 7, 8]))
505+
)
506+
),
507+
?_assertEqual(
508+
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
509+
to_list(
510+
append(
511+
append(
512+
from_list([1, 2]),
513+
append(
514+
append(from_list([3]), from_list([4])),
515+
from_list([5])
516+
)
517+
),
518+
append(
519+
from_list([6, 7]),
520+
append(
521+
from_list([8]),
522+
from_list([9, 10])
523+
)
524+
)
525+
)
526+
)
527+
)
528+
].
529+
488530
random_elements_test_() ->
489531
TestCases =
490532
[

0 commit comments

Comments
 (0)