Skip to content

Commit

Permalink
Simplify message writing system
Browse files Browse the repository at this point in the history
Instead of spawning a new flush fiber if one isn't running, just keep
one around at all times. This is simpler, makes the traces cleaner, and
will probably help with buffering later.
  • Loading branch information
talex5 committed Jun 25, 2024
1 parent 0db589d commit 6c7968b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 26 deletions.
44 changes: 19 additions & 25 deletions capnp-rpc-net/capTP_capnp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ module Make (Network : S.NETWORK) = struct
sw : Switch.t;
endpoint : Endpoint.t;
conn : Conn.t;
xmit_queue : Capnp.Message.rw Capnp.BytesMessage.Message.t Queue.t;
xmit_queue : Capnp.Message.rw Capnp.BytesMessage.Message.t Eio.Stream.t;
mutable disconnecting : bool;
}

Expand All @@ -62,54 +62,47 @@ module Make (Network : S.NETWORK) = struct
let tags t = Conn.tags t.conn

let drop_queue q =
Prometheus.Counter.inc Metrics.messages_outbound_dropped_total (float_of_int (Queue.length q));
Queue.clear q
let len = Eio.Stream.length q in
Prometheus.Counter.inc Metrics.messages_outbound_dropped_total (float_of_int len)
(* Queue.clear q -- could close stream here instead *)

(* [flush ~xmit_queue endpoint] writes each message in the queue until it is empty.
Invariant:
Whenever Eio blocks or switches threads, a flush thread is running iff the
queue is non-empty. *)
(* [flush ~xmit_queue endpoint] writes each message in [xmit_queue] to [endpoint]. *)
let rec flush ~xmit_queue endpoint =
(* We keep the item on the queue until it is transmitted, as the queue state
tells us whether there is a [flush] currently running. *)
let next = Queue.peek xmit_queue in
let next = Eio.Stream.take xmit_queue in
match Endpoint.send endpoint next with
| Error `Closed ->
Endpoint.disconnect endpoint; (* We'll read a close soon *)
drop_queue xmit_queue
drop_queue xmit_queue;
`Stop_daemon
| Error (`Msg msg) ->
Log.warn (fun f -> f "Error sending messages: %s (will shutdown connection)" msg);
Endpoint.disconnect endpoint;
drop_queue xmit_queue
drop_queue xmit_queue;
`Stop_daemon
| Ok () ->
Prometheus.Counter.inc_one Metrics.messages_outbound_sent_total;
ignore (Queue.pop xmit_queue);
if not (Queue.is_empty xmit_queue) then
flush ~xmit_queue endpoint
(* else queue is empty and flush thread is done *)
flush ~xmit_queue endpoint
| exception ex ->
drop_queue xmit_queue;
raise ex

(* Enqueue [message] in [xmit_queue] and ensure the flush thread is running. *)
let queue_send ~sw ~xmit_queue endpoint message =
let queue_send ~xmit_queue message =
Log.debug (fun f ->
let module M = Capnp_rpc_lwt.Private.Schema.MessageWrapper.Message in
f "queue_send: %d/%d allocated bytes in %d segs"
(M.total_size message)
(M.total_alloc_size message)
(M.num_segments message));
let was_idle = Queue.is_empty xmit_queue in
Queue.add message xmit_queue;
Prometheus.Counter.inc_one Metrics.messages_outbound_enqueued_total;
if was_idle then Eio.Fiber.fork ~sw (fun () -> flush ~xmit_queue endpoint)
Eio.Stream.add xmit_queue message;
Prometheus.Counter.inc_one Metrics.messages_outbound_enqueued_total

let return_not_implemented t x =
Log.debug (fun f -> f ~tags:(tags t) "Returning Unimplemented");
let open Builder in
let m = Message.init_root () in
let _ : Builder.Message.t = Message.unimplemented_set_reader m x in
queue_send ~sw:t.sw ~xmit_queue:t.xmit_queue t.endpoint (Message.to_message m)
queue_send ~xmit_queue:t.xmit_queue (Message.to_message m)

let listen t =
let rec loop () =
Expand Down Expand Up @@ -148,7 +141,7 @@ module Make (Network : S.NETWORK) = struct
loop ()

let send_abort t ex =
queue_send ~sw:t.sw ~xmit_queue:t.xmit_queue t.endpoint (Serialise.message (`Abort ex))
queue_send ~xmit_queue:t.xmit_queue (Serialise.message (`Abort ex))

let disconnect t ex =
if not t.disconnecting then (
Expand All @@ -161,8 +154,9 @@ module Make (Network : S.NETWORK) = struct
let disconnecting t = t.disconnecting

let connect ~sw ~restore ?(tags=Logs.Tag.empty) endpoint =
let xmit_queue = Queue.create () in
let queue_send msg = queue_send ~sw ~xmit_queue endpoint (Serialise.message msg) in
let xmit_queue = Eio.Stream.create 100 in (* todo: tune this? make it configurable? *)
Fiber.fork_daemon ~sw (fun () -> flush ~xmit_queue endpoint);
let queue_send msg = Eio.Stream.add xmit_queue (Serialise.message msg) in
let restore = Restorer.fn restore in
let fork = Fiber.fork ~sw in
let conn = Conn.create ~restore ~tags ~fork ~queue_send in
Expand Down
6 changes: 5 additions & 1 deletion capnp-rpc-net/endpoint.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ let rec recv t =
Error `Closed

let disconnect t =
Eio.Flow.shutdown t.flow `All
try
Eio.Flow.shutdown t.flow `All
with Eio.Io (Eio.Net.E Connection_reset _, _) ->
(* TCP connection already shut down, so TLS shutdown failed. Ignore. *)
()

0 comments on commit 6c7968b

Please sign in to comment.