Skip to content

Commit

Permalink
Handle network errors and signal closed connection
Browse files Browse the repository at this point in the history
  • Loading branch information
andersfugmann committed Nov 4, 2018
1 parent a42c0be commit 21f0fc6
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 18 deletions.
15 changes: 10 additions & 5 deletions async/src/connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ let open_connection { framing; virtual_host; _ } =

let connection_closed t _s =
match t with
| { closed = Some ivar; _ } when Ivar.is_full ivar ->
return ()
| { closed = Some ivar; _ } ->
Ivar.fill ivar ();
Framing.close t.framing
Expand All @@ -143,13 +145,17 @@ let on_closed t =

let connect ~id ?(virtual_host="/") ?(port=5672) ?(credentials=("guest", "guest")) ?heartbeat host =

Tcp.connect ~nodelay:() host port >>= fun (input, output) ->
let tcp_error_handler = ref (fun exn -> raise exn) in

Tcp.connect ~exn_handler:(fun exn -> !tcp_error_handler exn) ~nodelay:() host port >>= fun (input, output) ->

let framing = Framing.init ~id input output in
let t =
{ framing; virtual_host; channel = 0; closing = false;
closed = None }
in
let exn_handler exn = connection_closed t (Printexc.to_string exn) in
tcp_error_handler := exn_handler;
Framing.start framing (connection_closed t) >>= fun () ->
reply_start framing credentials >>= fun () ->
reply_tune framing >>= fun server_heartbeat ->
Expand All @@ -158,16 +164,15 @@ let connect ~id ?(virtual_host="/") ?(port=5672) ?(credentials=("guest", "guest"
| None, `Disabled -> ()
| Some hb, `Disabled
| None, `Heartbeat hb ->
spawn (send_heartbeat hb t);
spawn ~exn_handler (send_heartbeat hb t);
| Some hb, `Heartbeat hb' ->
spawn (send_heartbeat (min hb hb') t);
spawn ~exn_handler (send_heartbeat (min hb hb') t);
end;
open_connection t >>= fun () ->
register_blocked_handler framing;
spawn (reply_close t framing);
spawn ~exn_handler (reply_close t framing);
return t


let connect_uri ~id uri =
let u = Uri.of_string uri in
let () = match Uri.scheme u with
Expand Down
7 changes: 5 additions & 2 deletions async/src/framing.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type channel = { mutable state: channel_state;
type close_handler = string -> unit Deferred.t
type t = { input: Reader.t; output: Writer.t;
multiplex: String.t Pipe.Reader.t Pipe.Writer.t;
multiplex_reader: String.t Pipe.Reader.t Pipe.Reader.t;
mutable channels: channel option array;
mutable max_length: int;
id: string;
Expand Down Expand Up @@ -284,19 +285,21 @@ let id {id; _} = id
let init ~id input output =
let id = Printf.sprintf "%s.%s.%s.%s" id (Unix.gethostname ()) (Unix.getpid () |> string_of_int) (Sys.executable_name |> Filename.basename) in
let reader, writer = Pipe.create () in
spawn (start_writer output (Pipe.interleave_pipe reader));
{ input;
output;
max_length = 1024;
channels = Array.make 256 None;
multiplex = writer;
multiplex_reader = reader;
id;
flow = false;
}

let start t close_handler =
let exn_handler exn = close_handler (Printexc.to_string exn) in
spawn ~exn_handler (start_writer t.output (Pipe.interleave_pipe t.multiplex_reader));
Writer.write t.output protocol_header;
spawn (read_frame t close_handler);
spawn ~exn_handler (read_frame t close_handler);
open_channel t 0

let set_max_length t max_length =
Expand Down
18 changes: 15 additions & 3 deletions async/src/thread.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,17 @@ let (>>=) = (>>=)
let (>>|) = (>>|)
let return a = return a
let after ms = after (Core.Time.Span.of_ms ms)
let spawn t = don't_wait_for t
let spawn ?exn_handler t =
don't_wait_for (
match exn_handler with
| Some handler ->
begin
Monitor.try_with (fun () -> t) >>= function
| Ok () -> return ()
| Error exn -> handler exn
end
| None -> t
)

let with_timeout seconds deferred =
let duration = Core.Time.Span.of_sec (float_of_int seconds) in
Expand All @@ -43,11 +53,13 @@ module Writer = struct
end

module Tcp = struct
let connect ?nodelay host port =
let connect ~exn_handler ?nodelay host port =
let addr = Core.Host_and_port.create ~host ~port
|> Tcp.Where_to_connect.of_host_and_port
in
Tcp.connect ~buffer_age_limit:`Unlimited addr >>= fun (s, r, w) ->
let monitor = Monitor.create ~name:"Network" () in
Monitor.Exported_for_scheduler.within' ~monitor(fun () -> Tcp.connect ~buffer_age_limit:`Unlimited addr) >>= fun (s, r, w) ->
spawn (Monitor.detach_and_get_next_error monitor >>= exn_handler);
(match nodelay with
| Some () -> Socket.setopt s Socket.Opt.nodelay true
| None -> ());
Expand Down
16 changes: 13 additions & 3 deletions examples/on_closed.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,27 @@ open Thread

let _ =
let _ =
Connection.connect ~id:"fugmann" "localhost" >>= fun connection ->
Connection.connect ~id:"fugmann" ~heartbeat:10 "localhost" >>= fun connection ->
spawn (Connection.on_closed connection >>= fun () ->
Log.info "Connection closed";
return ());

Log.info "Connection started";
Log.info "Connection started.";
Connection.open_channel Channel.no_confirm ~id:"test" connection >>= fun channel ->
Log.info "Channel opened";
spawn (Channel.on_closed channel >>= fun () ->
Log.info "Channel closed";
Log.info "Channel closed - Handler";
return ());

spawn (Connection.on_closed connection >>= fun () ->
Log.info "Connection closed - Handler";
return ());
Unix.sleep 300;
(*
Connection.close connection >>= fun () ->
Log.info "Connection closed ";
*)
Log.info "Done";
return ();
in
Scheduler.go ()
4 changes: 2 additions & 2 deletions lib/thread.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module type T = sig
val ( >>| ) : 'a Deferred.t -> ('a -> 'b) -> 'b Deferred.t
val return : 'a -> 'a Deferred.t
val after : float -> unit Deferred.t
val spawn : unit Deferred.t -> unit
val spawn : ?exn_handler:(exn -> unit Deferred.t) -> unit Deferred.t -> unit
val with_timeout : int -> 'a Deferred.t -> [ `Result of 'a | `Timeout ] Deferred.t

module Ivar : sig
Expand All @@ -39,7 +39,7 @@ module type T = sig
end

module Tcp : sig
val connect : ?nodelay:unit -> string -> int ->
val connect : exn_handler:(exn -> unit Deferred.t) -> ?nodelay:unit -> string -> int ->
(Reader.t * Writer.t) Deferred.t
end

Expand Down
10 changes: 7 additions & 3 deletions lwt/src/thread.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ let (>>=) = Lwt.(>>=)
let (>>|) = Lwt.(>|=)
let return = Lwt.return
let after ms = Lwt_unix.sleep (ms /. 1000.0)
let spawn t = Lwt.async (fun () -> t)
let spawn ?exn_handler t = Lwt.async (fun () ->
match exn_handler with
| Some handler -> Lwt.catch (fun () -> t) handler
| None -> t
)

let with_timeout seconds deferred =
Lwt.pick [
Expand Down Expand Up @@ -217,14 +221,14 @@ end

module Tcp = struct

let connect ?nodelay host port =
let connect ~exn_handler ?nodelay host port =
let fd = Lwt_unix.(socket PF_INET SOCK_STREAM 0) in
Lwt_unix.gethostbyname host >>= fun entry ->
let sock_addr = (Lwt_unix.ADDR_INET (entry.Lwt_unix.h_addr_list.(0), port)) in
Lwt_io.open_connection ~fd sock_addr >>= fun (ic, oc) ->
(* Start a process that writes *)
let (reader, writer) = Pipe.create () in
spawn (Pipe.iter ~f:(fun str ->
spawn ~exn_handler (Pipe.iter ~f:(fun str ->
Lwt_io.write oc str) reader);

(match nodelay with
Expand Down

0 comments on commit 21f0fc6

Please sign in to comment.