Skip to content

Commit

Permalink
update pbrt_services for stream-taking handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Oct 19, 2023
1 parent 3fec08d commit 8c1bd16
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 34 deletions.
5 changes: 3 additions & 2 deletions src/compilerlib/pb_codegen_services.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ let string_of_rpc_handler_type (req : Ot.rpc_type) (res : Ot.rpc_type) : string
| Ot.Rpc_scalar req, Ot.Rpc_stream res ->
spf "(%s, %s) Pbrt_services.Server.server_stream_handler" (f req) (f res)
| Ot.Rpc_stream req, Ot.Rpc_stream res ->
spf "(%s, %s) Pbrt_services.Server.both_stream_handler" (f req) (f res)
spf "(%s, %s) Pbrt_services.Server.bidirectional_stream_handler" (f req)
(f res)

let function_name_encode_json ~service_name ~rpc_name (ty : Ot.rpc_type) :
string =
Expand Down Expand Up @@ -193,7 +194,7 @@ let gen_service_server_struct (service : Ot.service) sc : unit =
| Rpc_scalar _, Rpc_scalar _ -> spf "(Unary %s)" f
| Rpc_scalar _, Rpc_stream _ -> spf "(Server_stream %s)" f
| Rpc_stream _, Rpc_scalar _ -> spf "(Client_stream %s)" f
| Rpc_stream _, Rpc_stream _ -> spf "(Both_stream %s)" f
| Rpc_stream _, Rpc_stream _ -> spf "(Bidirectional_stream %s)" f
in

F.linep sc " (mk_rpc ~name:%S" rpc.rpc_name;
Expand Down
35 changes: 21 additions & 14 deletions src/runtime-services/pbrt_services.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,37 @@ end

(** Server end of services *)
module Server = struct
type ('req, 'res, 'state) client_stream_handler_with_state = {
init: unit -> 'state;
on_item: 'state -> 'req -> unit;
on_close: 'state -> 'res;
}

type ('req, 'res) client_stream_handler =
| Client_stream_handler : {
init: unit -> 'state;
on_input:
'state -> 'req -> [ `Update of 'state | `Return_early of 'res ];
on_close: 'state -> 'res;
}
| Client_stream_handler :
('req, 'res, 'state) client_stream_handler_with_state
-> ('req, 'res) client_stream_handler
[@@unboxed]

type ('req, 'res) server_stream_handler = 'req -> 'res Push_stream.t -> unit

type ('req, 'res) both_stream_handler =
| Both_stream_handler : {
init: unit -> 'res Push_stream.t -> 'state;
on_input: 'state -> 'res Push_stream.t -> 'req -> 'state;
n_close: 'state -> 'res Push_stream.t -> unit;
}
-> ('req, 'res) both_stream_handler
type ('req, 'res, 'state) bidirectional_stream_handler_with_state = {
init: unit -> 'res Push_stream.t -> 'state;
on_item: 'state -> 'req -> unit;
on_close: 'state -> unit;
}

type ('req, 'res) bidirectional_stream_handler =
| Bidirectional_stream_handler :
('req, 'res, 'state) bidirectional_stream_handler_with_state
-> ('req, 'res) bidirectional_stream_handler
[@@unboxed]

type ('req, 'res) handler =
| Unary of ('req -> 'res)
| Client_stream of ('req, 'res) client_stream_handler
| Server_stream of ('req, 'res) server_stream_handler
| Both_stream of ('req, 'res) both_stream_handler
| Bidirectional_stream of ('req, 'res) bidirectional_stream_handler

type ('req, 'res) rpc = {
name: string;
Expand Down
46 changes: 28 additions & 18 deletions src/runtime-services/pbrt_services.mli
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,49 @@ end

(** Service stubs, server side *)
module Server : sig
type ('req, 'res, 'state) client_stream_handler_with_state = {
init: unit -> 'state; (** When a stream starts *)
on_item: 'state -> 'req -> unit;
(** When an element of the stream is received. This can either
update the internal state by mutation, performing side effects,
or choose to return a value early and stop reading from the input stream. *)
on_close: 'state -> 'res; (** When the stream is over *)
}
(** Handler that receives a client stream *)

type ('req, 'res) client_stream_handler =
| Client_stream_handler : {
init: unit -> 'state; (** When a stream starts *)
on_input:
'state -> 'req -> [ `Update of 'state | `Return_early of 'res ];
(** When an element of the stream is received. This can either
update the internal state, or return a value early and
stop reading from the input stream. *)
on_close: 'state -> 'res; (** When the stream is over *)
}
| Client_stream_handler :
('req, 'res, 'state) client_stream_handler_with_state
-> ('req, 'res) client_stream_handler
[@@unboxed]

type ('req, 'res) server_stream_handler = 'req -> 'res Push_stream.t -> unit
(** Takes the input value and a push stream (to send items to
the caller, and then close the stream at the end).
The stream's [close] function must be called exactly once. *)

type ('req, 'res) both_stream_handler =
| Both_stream_handler : {
init: unit -> 'res Push_stream.t -> 'state;
on_input: 'state -> 'res Push_stream.t -> 'req -> 'state;
n_close: 'state -> 'res Push_stream.t -> unit;
}
-> ('req, 'res) both_stream_handler
(** Handler taking a stream of values and returning a stream as well. *)
type ('req, 'res, 'state) bidirectional_stream_handler_with_state = {
init: unit -> 'res Push_stream.t -> 'state;
on_item: 'state -> 'req -> unit;
on_close: 'state -> unit;
}
(** Handler taking a stream of values and returning a stream as well. *)

type ('req, 'res) bidirectional_stream_handler =
| Bidirectional_stream_handler :
('req, 'res, 'state) bidirectional_stream_handler_with_state
-> ('req, 'res) bidirectional_stream_handler
[@@unboxed]

(** A handler, i.e the server side implementation of a single RPC method.
Handlers come in various flavors because they make take, or return,
streams of values. *)
type ('req, 'res) handler =
| Unary of ('req -> 'res)
(** Simple unary handler, gets a value, returns a value. *)
| Client_stream of ('req, 'res) client_stream_handler
| Server_stream of ('req, 'res) server_stream_handler
| Both_stream of ('req, 'res) both_stream_handler
| Bidirectional_stream of ('req, 'res) bidirectional_stream_handler

type ('req, 'res) rpc = {
name: string;
Expand Down

0 comments on commit 8c1bd16

Please sign in to comment.