Skip to content

Commit

Permalink
Merge pull request #184 from hannesm/notify
Browse files Browse the repository at this point in the history
server outgoing signature changes
  • Loading branch information
hannesm authored Aug 14, 2019
2 parents e56a792 + c474bbc commit 4485675
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 174 deletions.
1 change: 1 addition & 0 deletions dns-server.opam
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ depends: [
"duration"
"alcotest" {with-test}
"nocrypto" {with-test}
"dns-tsig" {with-test}
]

build: [
Expand Down
7 changes: 7 additions & 0 deletions mirage/dns_mirage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ module Make (S : Mirage_stack_lwt.V4) = struct
T.close flow >|= fun () ->
Error ()

let send_tcp_multiple flow datas =
Lwt_list.fold_left_s (fun acc d ->
match acc with
| Error () -> Lwt.return (Error ())
| Ok () -> send_tcp flow d)
(Ok ()) datas

let read_tcp flow =
read_exactly flow 2 >>= function
| Error () -> Lwt.return (Error ())
Expand Down
7 changes: 6 additions & 1 deletion mirage/dns_mirage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ module Make (S : Mirage_stack_lwt.V4) : sig

val send_tcp : S.TCPV4.flow -> Cstruct.t -> (unit, unit) result Lwt.t
(** [send_tcp flow buf] sends the buffer, either succeeds or fails (logs
actual error). *)
actual error). *)

val send_tcp_multiple : S.TCPV4.flow -> Cstruct.t list ->
(unit, unit) result Lwt.t
(** [send_tcp_multiple flow bufs] sends the buffers, either succeeds or fails
(logs actual error). *)

val send_udp : S.t -> int -> Ipaddr.V4.t -> int -> Cstruct.t -> unit Lwt.t
(** [send_udp stack source_port dst dst_port buf] sends the [buf] as UDP
Expand Down
52 changes: 26 additions & 26 deletions mirage/server/dns_mirage_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@ module Make (P : Mirage_clock_lwt.PCLOCK) (M : Mirage_clock_lwt.MCLOCK) (TIME :
let send_notify recv_task (ip, data) =
let connect_and_send ip =
connect recv_task ip >>= function
| Ok flow -> Dns.send_tcp flow data
| Ok flow -> Dns.send_tcp_multiple flow data
| Error () -> Lwt.return (Error ())
in
(match Dns.IM.find ip !tcp_out with
| None -> connect_and_send ip
| Some f -> Dns.send_tcp f data >>= function
| Some f -> Dns.send_tcp_multiple f data >>= function
| Ok () -> Lwt.return (Ok ())
| Error () -> drop ip ; connect_and_send ip) >>= function
| Ok () -> Lwt.return_unit
| Error () ->
drop ip ; Dns.send_udp stack port ip 53 data
drop ip;
Lwt_list.iter_p (Dns.send_udp stack port ip 53) data
in
let maybe_update_state t =
Expand Down Expand Up @@ -139,7 +140,6 @@ module Make (P : Mirage_clock_lwt.PCLOCK) (M : Mirage_clock_lwt.MCLOCK) (TIME :
let secondary ?(on_update = fun ~old:_ _trie -> Lwt.return_unit) ?(timer = 5) ?(port = 53) stack t =
let state = ref t in
let tcp_out = ref Dns.IM.empty in
let tcp_packet_transit = ref Dns.IM.empty in
let maybe_update_state t =
let old = !state in
Expand All @@ -160,17 +160,12 @@ module Make (P : Mirage_clock_lwt.PCLOCK) (M : Mirage_clock_lwt.MCLOCK) (TIME :
let elapsed = M.elapsed_ns () in
let state', out = Dns_server.Secondary.closed !state now elapsed ip in
state := state' ;
Lwt_list.iter_s request out
request (ip, out)
and read_and_handle ip f =
Dns.read_tcp f >>= function
| Error () ->
Log.debug (fun m -> m "removing %a from tcp_out" Ipaddr.V4.pp ip) ;
close ip >>= fun () ->
(* re-send once *)
begin match Dns.IM.find ip !tcp_packet_transit with
| None -> Lwt.return_unit
| Some data -> request ~record:false data
end
close ip
| Ok data ->
let now = Ptime.v (P.now_d_ps ()) in
let elapsed = M.elapsed_ns () in
Expand All @@ -186,14 +181,14 @@ module Make (P : Mirage_clock_lwt.PCLOCK) (M : Mirage_clock_lwt.MCLOCK) (TIME :
Log.debug (fun m -> m "removing %a from tcp_out" Ipaddr.V4.pp ip) ;
close ip >|= fun () -> Error ()
| Ok () -> Lwt.return (Ok ())) >>= fun r ->
Lwt_list.iter_s request out >>= fun () ->
(match out with
| None -> Lwt.return_unit
| Some (ip, data) -> request_one (ip, data)) >>= fun () ->
match r with
| Ok () -> read_and_handle ip f
| Error () -> Lwt.return_unit
and request ?(record = true) (proto, ip, data) =
and request (ip, data) =
let dport = 53 in
if record then
tcp_packet_transit := Dns.IM.add ip (proto, ip, data) !tcp_packet_transit;
match Dns.IM.find ip !tcp_out with
| None ->
begin
Expand All @@ -202,36 +197,38 @@ module Make (P : Mirage_clock_lwt.PCLOCK) (M : Mirage_clock_lwt.MCLOCK) (TIME :
| Error e ->
Log.err (fun m -> m "error %a while establishing tcp connection to %a:%d"
T.pp_error e Ipaddr.V4.pp ip dport) ;
Lwt.async (fun () ->
TIME.sleep_ns (Duration.of_sec 5) >>= fun () ->
close ip) ;
Lwt.return_unit
close ip
| Ok flow ->
tcp_out := Dns.IM.add ip flow !tcp_out ;
Dns.send_tcp flow data >>= function
Dns.send_tcp_multiple flow data >>= function
| Error () -> close ip
| Ok () ->
Lwt.async (fun () -> read_and_handle ip (Dns.of_flow flow)) ;
Lwt.return_unit
end
| Some flow ->
Dns.send_tcp flow data >>= function
Dns.send_tcp_multiple flow data >>= function
| Ok () -> Lwt.return_unit
| Error () ->
Log.warn (fun m -> m "closing tcp flow to %a:%d, retrying request"
Ipaddr.V4.pp ip dport) ;
T.close flow >>= fun () ->
tcp_out := Dns.IM.remove ip !tcp_out ;
request (proto, ip, data)
request (ip, data)
and request_one (ip, d) = request (ip, [ d ])
in
let udp_cb ~src ~dst:_ ~src_port buf =
Log.info (fun m -> m "udp frame from %a:%d" Ipaddr.V4.pp src src_port) ;
let now = Ptime.v (P.now_d_ps ()) in
let elapsed = M.elapsed_ns () in
let t, answer, out = Dns_server.Secondary.handle_buf !state now elapsed `Udp src buf in
let t, answer, out =
Dns_server.Secondary.handle_buf !state now elapsed `Udp src buf
in
maybe_update_state t >>= fun () ->
List.iter (fun x -> Lwt.async (fun () -> request x)) out ;
(match out with
| None -> ()
| Some (ip, cs) -> Lwt.async (fun () -> request_one (ip, cs))) ;
match answer with
| None -> Lwt.return_unit
| Some out -> Dns.send_udp stack port src src_port out
Expand All @@ -254,7 +251,9 @@ module Make (P : Mirage_clock_lwt.PCLOCK) (M : Mirage_clock_lwt.MCLOCK) (TIME :
Dns_server.Secondary.handle_buf !state now elapsed `Tcp dst_ip data
in
maybe_update_state t >>= fun () ->
List.iter (fun x -> Lwt.async (fun () -> request x)) out ;
(match out with
| None -> ()
| Some (ip, cs) -> Lwt.async (fun () -> request_one (ip, cs)));
match answer with
| None ->
Log.warn (fun m -> m "no TCP output") ;
Expand All @@ -274,7 +273,8 @@ module Make (P : Mirage_clock_lwt.PCLOCK) (M : Mirage_clock_lwt.MCLOCK) (TIME :
let elapsed = M.elapsed_ns () in
let t, out = Dns_server.Secondary.timer !state now elapsed in
maybe_update_state t >>= fun () ->
List.iter (fun x -> Lwt.async (fun () -> request x)) out ;
List.iter (fun (ip, cs) ->
Lwt.async (fun () -> request (ip, cs))) out ;
TIME.sleep_ns (Duration.of_sec timer) >>= fun () ->
time ()
in
Expand Down
Loading

0 comments on commit 4485675

Please sign in to comment.