From 6c7968bdc41205e14f95253cbca7f5c167655e75 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Thu, 9 May 2024 09:57:06 +0100 Subject: [PATCH] Simplify message writing system Instead of spawning a new flush fiber if one isn't running, just keep one around at all times. This is simpler, makes the traces cleaner, and will probably help with buffering later. --- capnp-rpc-net/capTP_capnp.ml | 44 ++++++++++++++++-------------------- capnp-rpc-net/endpoint.ml | 6 ++++- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/capnp-rpc-net/capTP_capnp.ml b/capnp-rpc-net/capTP_capnp.ml index cd815f97..2f58d7eb 100644 --- a/capnp-rpc-net/capTP_capnp.ml +++ b/capnp-rpc-net/capTP_capnp.ml @@ -46,7 +46,7 @@ module Make (Network : S.NETWORK) = struct sw : Switch.t; endpoint : Endpoint.t; conn : Conn.t; - xmit_queue : Capnp.Message.rw Capnp.BytesMessage.Message.t Queue.t; + xmit_queue : Capnp.Message.rw Capnp.BytesMessage.Message.t Eio.Stream.t; mutable disconnecting : bool; } @@ -62,54 +62,47 @@ module Make (Network : S.NETWORK) = struct let tags t = Conn.tags t.conn let drop_queue q = - Prometheus.Counter.inc Metrics.messages_outbound_dropped_total (float_of_int (Queue.length q)); - Queue.clear q + let len = Eio.Stream.length q in + Prometheus.Counter.inc Metrics.messages_outbound_dropped_total (float_of_int len) + (* Queue.clear q -- could close stream here instead *) - (* [flush ~xmit_queue endpoint] writes each message in the queue until it is empty. - Invariant: - Whenever Eio blocks or switches threads, a flush thread is running iff the - queue is non-empty. *) + (* [flush ~xmit_queue endpoint] writes each message in [xmit_queue] to [endpoint]. *) let rec flush ~xmit_queue endpoint = - (* We keep the item on the queue until it is transmitted, as the queue state - tells us whether there is a [flush] currently running. *) - let next = Queue.peek xmit_queue in + let next = Eio.Stream.take xmit_queue in match Endpoint.send endpoint next with | Error `Closed -> Endpoint.disconnect endpoint; (* We'll read a close soon *) - drop_queue xmit_queue + drop_queue xmit_queue; + `Stop_daemon | Error (`Msg msg) -> Log.warn (fun f -> f "Error sending messages: %s (will shutdown connection)" msg); Endpoint.disconnect endpoint; - drop_queue xmit_queue + drop_queue xmit_queue; + `Stop_daemon | Ok () -> Prometheus.Counter.inc_one Metrics.messages_outbound_sent_total; - ignore (Queue.pop xmit_queue); - if not (Queue.is_empty xmit_queue) then - flush ~xmit_queue endpoint - (* else queue is empty and flush thread is done *) + flush ~xmit_queue endpoint | exception ex -> drop_queue xmit_queue; raise ex (* Enqueue [message] in [xmit_queue] and ensure the flush thread is running. *) - let queue_send ~sw ~xmit_queue endpoint message = + let queue_send ~xmit_queue message = Log.debug (fun f -> let module M = Capnp_rpc_lwt.Private.Schema.MessageWrapper.Message in f "queue_send: %d/%d allocated bytes in %d segs" (M.total_size message) (M.total_alloc_size message) (M.num_segments message)); - let was_idle = Queue.is_empty xmit_queue in - Queue.add message xmit_queue; - Prometheus.Counter.inc_one Metrics.messages_outbound_enqueued_total; - if was_idle then Eio.Fiber.fork ~sw (fun () -> flush ~xmit_queue endpoint) + Eio.Stream.add xmit_queue message; + Prometheus.Counter.inc_one Metrics.messages_outbound_enqueued_total let return_not_implemented t x = Log.debug (fun f -> f ~tags:(tags t) "Returning Unimplemented"); let open Builder in let m = Message.init_root () in let _ : Builder.Message.t = Message.unimplemented_set_reader m x in - queue_send ~sw:t.sw ~xmit_queue:t.xmit_queue t.endpoint (Message.to_message m) + queue_send ~xmit_queue:t.xmit_queue (Message.to_message m) let listen t = let rec loop () = @@ -148,7 +141,7 @@ module Make (Network : S.NETWORK) = struct loop () let send_abort t ex = - queue_send ~sw:t.sw ~xmit_queue:t.xmit_queue t.endpoint (Serialise.message (`Abort ex)) + queue_send ~xmit_queue:t.xmit_queue (Serialise.message (`Abort ex)) let disconnect t ex = if not t.disconnecting then ( @@ -161,8 +154,9 @@ module Make (Network : S.NETWORK) = struct let disconnecting t = t.disconnecting let connect ~sw ~restore ?(tags=Logs.Tag.empty) endpoint = - let xmit_queue = Queue.create () in - let queue_send msg = queue_send ~sw ~xmit_queue endpoint (Serialise.message msg) in + let xmit_queue = Eio.Stream.create 100 in (* todo: tune this? make it configurable? *) + Fiber.fork_daemon ~sw (fun () -> flush ~xmit_queue endpoint); + let queue_send msg = Eio.Stream.add xmit_queue (Serialise.message msg) in let restore = Restorer.fn restore in let fork = Fiber.fork ~sw in let conn = Conn.create ~restore ~tags ~fork ~queue_send in diff --git a/capnp-rpc-net/endpoint.ml b/capnp-rpc-net/endpoint.ml index d54c15de..845644ff 100644 --- a/capnp-rpc-net/endpoint.ml +++ b/capnp-rpc-net/endpoint.ml @@ -64,4 +64,8 @@ let rec recv t = Error `Closed let disconnect t = - Eio.Flow.shutdown t.flow `All + try + Eio.Flow.shutdown t.flow `All + with Eio.Io (Eio.Net.E Connection_reset _, _) -> + (* TCP connection already shut down, so TLS shutdown failed. Ignore. *) + ()