1616]).
1717
1818-record (sched , {
19- id :: reference (),
20- ps_up :: [{pid (), reference ()}], % producers up.
21- cs_up :: [{pid (), reference ()}], % consumers up.
22- cs_free :: [pid ()], % consumers available to work.
23- xs :: [any ()], % inputs. received from producers.
24- ys :: [any ()] % outputs received from consumers.
19+ id :: reference (),
20+ producers :: [{pid (), reference ()}],
21+ consumers :: [{pid (), reference ()}],
22+ consumers_free :: [pid ()], % available to work.
23+ work :: [any ()], % received from producers.
24+ results :: [any ()] % received from consumers.
2525}).
2626
2727% % API ========================================================================
@@ -70,14 +70,14 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 -
7070 fun () ->
7171 SchedPid = self (),
7272 Consumer =
73- fun Work () ->
73+ fun Consume () ->
7474 ConsumerPid = self (),
7575 SchedPid ! {SchedID , consumer_ready , ConsumerPid },
7676 receive
7777 {SchedID , job , X } ->
7878 Y = F (X ),
7979 SchedPid ! {SchedID , consumer_output , Y },
80- Work ();
80+ Consume ();
8181 {SchedID , done } ->
8282 ok
8383 end
@@ -88,12 +88,12 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 -
8888 end ,
8989 Ys =
9090 sched (# sched {
91- id = SchedID ,
92- ps_up = [spawn_monitor (Producer )],
93- cs_up = [spawn_monitor (Consumer ) || _ <- lists :duplicate (J , {})],
94- cs_free = [],
95- xs = [],
96- ys = []
91+ id = SchedID ,
92+ producers = [spawn_monitor (Producer )],
93+ consumers = [spawn_monitor (Consumer ) || _ <- lists :duplicate (J , {})],
94+ consumers_free = [],
95+ work = [],
96+ results = []
9797 }),
9898 CallerPid ! {SchedID , Ys }
9999 end ,
@@ -115,19 +115,19 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 -
115115% % Internal ===================================================================
116116
117117- spec sched (# sched {}) -> [any ()].
118- sched (# sched {id = _ , ps_up = [], cs_up = [], cs_free = [], xs = [], ys = Ys }) ->
118+ sched (# sched {id = _ , producers = [], consumers = [], consumers_free = [], work = [], results = Ys }) ->
119119 Ys ;
120- sched (# sched {id = ID , ps_up = [], cs_up = [_ |_ ], cs_free = [_ |_ ]= CsFree , xs = []}= S0 ) ->
120+ sched (# sched {id = ID , producers = [], consumers = [_ |_ ], consumers_free = [_ |_ ]= CsFree , work = []}= S0 ) ->
121121 _ = [C ! {ID , done } || C <- CsFree ],
122- sched (S0 # sched {cs_free = []});
123- sched (# sched {id = _ , ps_up = _ , cs_up = [_ |_ ], cs_free = [_ |_ ], xs = [_ |_ ]}= S0 ) ->
122+ sched (S0 # sched {consumers_free = []});
123+ sched (# sched {id = _ , producers = _ , consumers = [_ |_ ], consumers_free = [_ |_ ], work = [_ |_ ]}= S0 ) ->
124124 S1 = sched_assign (S0 ),
125125 sched (S1 );
126- sched (# sched {id = ID , ps_up = Ps , cs_up = _ , cs_free = CsFree , xs = Xs , ys = Ys }= S ) ->
126+ sched (# sched {id = ID , producers = Ps , consumers = _ , consumers_free = CsFree , work = Xs , results = Ys }= S ) ->
127127 receive
128- {ID , producer_output , X } -> sched (S # sched {xs = [X | Xs ]});
129- {ID , consumer_output , Y } -> sched (S # sched {ys = [Y | Ys ]});
130- {ID , consumer_ready , C } -> sched (S # sched {cs_free = [C | CsFree ]});
128+ {ID , producer_output , X } -> sched (S # sched {work = [X | Xs ]});
129+ {ID , consumer_output , Y } -> sched (S # sched {results = [Y | Ys ]});
130+ {ID , consumer_ready , C } -> sched (S # sched {consumers_free = [C | CsFree ]});
131131 {'DOWN' , MonRef , process , Pid , normal } ->
132132 S1 = sched_remove_worker (S , {Pid , MonRef }),
133133 sched (S1 );
@@ -139,23 +139,23 @@ sched(#sched{id=ID, ps_up=Ps, cs_up=_, cs_free=CsFree, xs=Xs, ys=Ys }=S) ->
139139 end .
140140
141141- spec sched_remove_worker (# sched {}, {pid (), reference ()}) -> # sched {}.
142- sched_remove_worker (# sched {ps_up = Ps , cs_up = Cs , cs_free = CsFree }= S , {Pid , _ }= PidRef ) ->
142+ sched_remove_worker (# sched {producers = Ps , consumers = Cs , consumers_free = CsFree }= S , {Pid , _ }= PidRef ) ->
143143 case lists :member (PidRef , Ps ) of
144144 true ->
145- S # sched {ps_up = Ps -- [PidRef ]};
145+ S # sched {producers = Ps -- [PidRef ]};
146146 false ->
147147 S # sched {
148- cs_up = Cs -- [PidRef ],
149- cs_free = CsFree -- [Pid ]
148+ consumers = Cs -- [PidRef ],
149+ consumers_free = CsFree -- [Pid ]
150150 }
151151 end .
152152
153153- spec sched_assign (# sched {}) -> # sched {}.
154- sched_assign (# sched {cs_free = [], xs = Xs }= S ) -> S # sched {cs_free = [], xs = Xs };
155- sched_assign (# sched {cs_free = Cs , xs = []}= S ) -> S # sched {cs_free = Cs , xs = []};
156- sched_assign (# sched {cs_free = [C | Cs ], xs = [X | Xs ], id = ID }= S ) ->
154+ sched_assign (# sched {consumers_free = [], work = Xs }= S ) -> S # sched {consumers_free = [], work = Xs };
155+ sched_assign (# sched {consumers_free = Cs , work = []}= S ) -> S # sched {consumers_free = Cs , work = []};
156+ sched_assign (# sched {consumers_free = [C | Cs ], work = [X | Xs ], id = ID }= S ) ->
157157 C ! {ID , job , X },
158- sched_assign (S # sched {cs_free = Cs , xs = Xs }).
158+ sched_assign (S # sched {consumers_free = Cs , work = Xs }).
159159
160160% % Tests ======================================================================
161161
0 commit comments