diff --git a/lib/telemetry_metrics_statsd.ex b/lib/telemetry_metrics_statsd.ex index 9ca8821..4502d58 100644 --- a/lib/telemetry_metrics_statsd.ex +++ b/lib/telemetry_metrics_statsd.ex @@ -317,7 +317,7 @@ defmodule TelemetryMetricsStatsd do require Record alias Telemetry.Metrics - alias TelemetryMetricsStatsd.{EventHandler, Options, UDP} + alias TelemetryMetricsStatsd.{EventHandler, Options, UDP, CounterOk, CounterError} @type prefix :: String.t() | atom() | nil @type host :: String.t() | :inet.ip_address() @@ -402,6 +402,17 @@ defmodule TelemetryMetricsStatsd do end end + def get_udp_worker(pool_id) do + case :ets.lookup(pool_id, :udp_worker) do + [] -> + :error + + udp_workers -> + {:udp_worker, udp_worker} = Enum.random(udp_workers) + {:ok, udp_worker} + end + end + @doc false @spec get_pool_id(pid()) :: :ets.tid() def get_pool_id(reporter) do @@ -417,6 +428,10 @@ defmodule TelemetryMetricsStatsd do @impl true def init(options) do Process.flag(:trap_exit, true) + + CounterOk.init() + CounterError.init() + metrics = Map.fetch!(options, :metrics) udp_config = @@ -432,7 +447,22 @@ defmodule TelemetryMetricsStatsd do end pool_id = :ets.new(__MODULE__, [:bag, :protected, read_concurrency: true]) + + udp_workers = + for _ <- 1..options.pool_size do + {:ok, worker_pid} = + TelemetryMetricsStatsd.UDPWorker.start_link( + reporter: self(), + pool_id: pool_id, + max_datagram_size: options.mtu, + buffer_flush_ms: options.buffer_flush_ms + ) + + {:udp_worker, worker_pid} + end + :ets.insert(pool_id, udps) + :ets.insert(pool_id, udp_workers) handler_ids = EventHandler.attach( @@ -445,6 +475,8 @@ defmodule TelemetryMetricsStatsd do options.global_tags ) + schedule_metrics_report(options.diagnostic_metrics_report_interval) + {:ok, %{ udp_config: udp_config, @@ -452,7 +484,8 @@ defmodule TelemetryMetricsStatsd do pool_id: pool_id, host: options.host, port: options.port, - host_resolution_interval: options.host_resolution_interval + host_resolution_interval: options.host_resolution_interval, + diagnostic_metrics_report_interval: options.diagnostic_metrics_report_interval }} end @@ -480,11 +513,37 @@ defmodule TelemetryMetricsStatsd do end end + @impl true + def handle_cast(:close, %{pool_id: pool_id} = state) do + pool_id + |> :ets.tab2list() + |> Enum.each(fn item -> + case item do + {:udp, udp} -> + :ets.delete_object(pool_id, {:udp, udp}) + UDP.close(udp) + :ets.insert(pool_id, {:udp, udp}) + + _ -> + :ok + end + end) + + {:noreply, state} + end + @impl true def handle_call(:get_pool_id, _from, %{pool_id: pool_id} = state) do {:reply, pool_id, state} end + defp message_queue_len(target) when is_pid(target) do + case Process.info(target, :message_queue_len) do + {:message_queue_len, len} -> len + _ -> nil + end + end + @impl true def handle_info({:EXIT, _pid, reason}, state) do {:stop, reason, state} @@ -520,6 +579,45 @@ defmodule TelemetryMetricsStatsd do {:noreply, new_state} end + @impl true + def handle_info(:report_metrics, state) do + :telemetry.execute( + [:telemetry_metrics_statsd, :udp_metrics], + %{ + ok_count: CounterOk.get(), + error_count: CounterError.get() + }, + %{} + ) + + state.pool_id + |> :ets.tab2list() + |> Enum.each(fn item -> + case item do + {:udp_worker, udp_worker} -> + :telemetry.execute( + [:telemetry_metrics_statsd, :udp_metrics], + %{ + message_queue_len: message_queue_len(udp_worker), + }, + %{} + ) + + _ -> + :ok + end + end) + + schedule_metrics_report(state.diagnostic_metrics_report_interval) + + {:noreply, state} + rescue + _ -> + Logger.error("Failed to report diagnostic metrics") + + {:noreply, state} + end + @impl true def terminate(_reason, state) do EventHandler.detach(state.handler_ids) @@ -567,10 +665,24 @@ defmodule TelemetryMetricsStatsd do defp update_pool(pool_id, new_host, new_port) do pool_id |> :ets.tab2list() - |> Enum.each(fn {:udp, udp} -> - :ets.delete_object(pool_id, {:udp, udp}) - updated_udp = UDP.update(udp, new_host, new_port) - :ets.insert(pool_id, {:udp, updated_udp}) + |> Enum.each(fn item -> + case item do + {:udp, udp} -> + :ets.delete_object(pool_id, {:udp, udp}) + updated_udp = UDP.update(udp, new_host, new_port) + :ets.insert(pool_id, {:udp, updated_udp}) + + _ -> + :ok + end end) end + + # closes udp sockets, currently used only for testing purpose + def close(pid) do + GenServer.cast(pid, :close) + end + + defp schedule_metrics_report(interval_ms) when is_integer(interval_ms), + do: Process.send_after(self(), :report_metrics, interval_ms) end diff --git a/lib/telemetry_metrics_statsd/counter_error.ex b/lib/telemetry_metrics_statsd/counter_error.ex new file mode 100644 index 0000000..301e091 --- /dev/null +++ b/lib/telemetry_metrics_statsd/counter_error.ex @@ -0,0 +1,33 @@ +defmodule TelemetryMetricsStatsd.CounterError do + @counter_ref "telemetry_metrics_statsd_counter_error_ref" + + @spec init :: any() + def init do + :persistent_term.put(@counter_ref, :counters.new(1, [:write_concurrency])) + end + + @spec reset :: :ok + def reset do + :counters.put(:persistent_term.get(@counter_ref), 1, 0) + end + + @spec destroy :: any() + def destroy do + :persistent_term.erase(@counter_ref) + end + + @spec increment(integer()) :: :ok + def increment(incr \\ 1) when is_integer(incr) and incr > 0 do + :counters.add(:persistent_term.get(@counter_ref), 1, incr) + end + + @spec decrement(integer()) :: :ok + def decrement(decr \\ 1) when is_integer(decr) and decr > 0 do + :counters.sub(:persistent_term.get(@counter_ref), 1, decr) + end + + @spec get :: any() + def get do + :counters.get(:persistent_term.get(@counter_ref), 1) + end +end diff --git a/lib/telemetry_metrics_statsd/counter_ok.ex b/lib/telemetry_metrics_statsd/counter_ok.ex new file mode 100644 index 0000000..5cc4acf --- /dev/null +++ b/lib/telemetry_metrics_statsd/counter_ok.ex @@ -0,0 +1,33 @@ +defmodule TelemetryMetricsStatsd.CounterOk do + @counter_ref "telemetry_metrics_statsd_counter_ok_ref" + + @spec init :: any() + def init do + :persistent_term.put(@counter_ref, :counters.new(1, [:write_concurrency])) + end + + @spec reset :: :ok + def reset do + :counters.put(:persistent_term.get(@counter_ref), 1, 0) + end + + @spec destroy :: any() + def destroy do + :persistent_term.erase(@counter_ref) + end + + @spec increment(integer()) :: :ok + def increment(incr \\ 1) when is_integer(incr) and incr > 0 do + :counters.add(:persistent_term.get(@counter_ref), 1, incr) + end + + @spec decrement(integer()) :: :ok + def decrement(decr \\ 1) when is_integer(decr) and decr > 0 do + :counters.sub(:persistent_term.get(@counter_ref), 1, decr) + end + + @spec get :: any() + def get do + :counters.get(:persistent_term.get(@counter_ref), 1) + end +end diff --git a/lib/telemetry_metrics_statsd/event_handler.ex b/lib/telemetry_metrics_statsd/event_handler.ex index e2934c5..892a665 100644 --- a/lib/telemetry_metrics_statsd/event_handler.ex +++ b/lib/telemetry_metrics_statsd/event_handler.ex @@ -2,7 +2,9 @@ defmodule TelemetryMetricsStatsd.EventHandler do @moduledoc false alias Telemetry.Metrics - alias TelemetryMetricsStatsd.{Formatter, Packet, UDP} + alias TelemetryMetricsStatsd.{Formatter} + + require Logger @spec attach( [Metrics.t()], @@ -46,10 +48,8 @@ defmodule TelemetryMetricsStatsd.EventHandler do end def handle_event(_event, measurements, metadata, %{ - reporter: reporter, pool_id: pool_id, metrics: metrics, - mtu: mtu, prefix: prefix, formatter: formatter_mod, global_tags: global_tags @@ -63,7 +63,10 @@ defmodule TelemetryMetricsStatsd.EventHandler do |> Map.new() |> Map.merge(metric.tag_values.(metadata)) - tags = Enum.map(metric.tags, &{&1, Map.get(tag_values, &1, "")}) + tags = + Enum.map(metric.tags, &{&1, Map.get(tag_values, &1, "")}) + |> Enum.filter(fn {_, tag_value} -> tag_value != "" end) + Formatter.format(formatter_mod, metric, prefix, value, tags) else :nopublish @@ -76,7 +79,7 @@ defmodule TelemetryMetricsStatsd.EventHandler do :ok packets -> - publish_metrics(reporter, pool_id, Packet.build_packets(packets, mtu, "\n")) + schedule_metrics_publish(pool_id, packets) end end @@ -125,22 +128,14 @@ defmodule TelemetryMetricsStatsd.EventHandler do end end - @spec publish_metrics(pid(), :ets.tid(), [binary()]) :: :ok - defp publish_metrics(reporter, pool_id, packets) do - case TelemetryMetricsStatsd.get_udp(pool_id) do - {:ok, udp} -> - Enum.reduce_while(packets, :cont, fn packet, :cont -> - case UDP.send(udp, packet) do - :ok -> - {:cont, :cont} - - {:error, reason} -> - TelemetryMetricsStatsd.udp_error(reporter, udp, reason) - {:halt, :halt} - end - end) + defp schedule_metrics_publish(pool_id, packets) do + case TelemetryMetricsStatsd.get_udp_worker(pool_id) do + {:ok, udp_worker_pid} -> + TelemetryMetricsStatsd.UDPWorker.publish_datagrams(udp_worker_pid, packets) :error -> + Logger.error("Failed to schedule metrics publishing over UDP: no workers available.") + :ok end end diff --git a/lib/telemetry_metrics_statsd/options.ex b/lib/telemetry_metrics_statsd/options.ex index b950355..ef9d29a 100644 --- a/lib/telemetry_metrics_statsd/options.ex +++ b/lib/telemetry_metrics_statsd/options.ex @@ -61,10 +61,24 @@ defmodule TelemetryMetricsStatsd.Options do ], mtu: [ type: :non_neg_integer, - default: 512, + # https://github.com/DataDog/datadog-go/blob/3255e6186e83fad1e447573c9fa03dd13c023394/statsd/statsd.go#L35-L41 + # usually IPv4 networks are configured with a MTU of 1500 bytes, so packet size should <= MTU + default: 1432, doc: "Maximum Transmission Unit of the link between your application and the StastD server in bytes. " <> "If this value is greater than the actual MTU of the link, UDP packets with published metrics will be dropped." + ], + buffer_flush_ms: [ + type: :non_neg_integer, + default: 0, + doc: + "The maximum time in milliseconds to wait before flushing the buffer. If the buffer is not full, it will be flushed after this time." + ], + diagnostic_metrics_report_interval: [ + type: :non_neg_integer, + # every 15 seconds + default: 15_000, + doc: "The interval in milliseconds when diagnostic metrics are published" ] ] diff --git a/lib/telemetry_metrics_statsd/udp_worker.ex b/lib/telemetry_metrics_statsd/udp_worker.ex new file mode 100644 index 0000000..a18c021 --- /dev/null +++ b/lib/telemetry_metrics_statsd/udp_worker.ex @@ -0,0 +1,116 @@ +defmodule TelemetryMetricsStatsd.UDPWorker do + @moduledoc false + + use GenServer + + alias TelemetryMetricsStatsd.{UDP, Packet, CounterOk, CounterError} + + @default_buffer_flush_ms 1000 + @default_max_datagram_size 1432 + + defstruct [ + :reporter, + :pool_id, + :buffered_datagram, + :buffer_flush_ms, + :max_datagram_size + ] + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @impl true + def init(opts) do + buffer_flush_ms = opts[:buffer_flush_ms] || @default_buffer_flush_ms + + state = %__MODULE__{ + reporter: opts[:reporter], + pool_id: opts[:pool_id], + buffered_datagram: [], + buffer_flush_ms: buffer_flush_ms, + max_datagram_size: opts[:max_datagram_size] || @default_max_datagram_size + } + + schedule_flush(state) + + {:ok, state} + end + + def publish_datagrams(pid, datagrams) do + GenServer.call(pid, {:publish_datagrams, datagrams}) + end + + @impl true + def handle_call({:publish_datagrams, datagrams}, _from, state) do + new_buffered_datagrams = + Enum.reduce(datagrams, state.buffered_datagram, &do_append_datagram/2) + + cond do + state.buffer_flush_ms == 0 -> + for packet <- Packet.build_packets(datagrams, state.max_datagram_size, "\n") do + maybe_send_udp_datagrams(state, packet) + end + + {:reply, :ok, %{state | buffered_datagram: []}} + + Enum.any?(datagrams, fn datagram -> IO.iodata_length(datagram) > state.max_datagram_size end) -> + {:reply, + {:error, "Payload is too big (more than #{state.max_datagram_size} bytes), dropped."}, + state} + + IO.iodata_length(new_buffered_datagrams) > state.max_datagram_size -> + maybe_send_udp_datagrams(state) + + {:reply, :ok, %{state | buffered_datagram: datagrams}} + + true -> + {:reply, :ok, %{state | buffered_datagram: new_buffered_datagrams}} + end + end + + defp maybe_send_udp_datagrams(state, datagrams \\ nil) do + datagrams_to_send = datagrams || state.buffered_datagram + + if datagrams_to_send != [] do + case TelemetryMetricsStatsd.get_udp(state.pool_id) do + {:ok, udp} -> + case UDP.send(udp, datagrams_to_send) do + :ok -> + CounterOk.increment() + + :ok + + {:error, reason} -> + CounterError.increment() + + TelemetryMetricsStatsd.udp_error(state.reporter, udp, reason) + end + + :error -> + :ok + end + else + :ok + end + end + + @impl true + def handle_info(:buffer_flush, state) do + if state.buffered_datagram != [] do + maybe_send_udp_datagrams(state) + end + + schedule_flush(state) + + {:noreply, %{state | buffered_datagram: []}} + end + + defp schedule_flush(%{buffer_flush_ms: buffer_flush_ms}) when buffer_flush_ms > 0, + do: Process.send_after(self(), :buffer_flush, buffer_flush_ms) + + defp schedule_flush(_), do: :ok + + defp do_append_datagram([], first_datagram), do: first_datagram + defp do_append_datagram(current_buffer, datagram), do: [current_buffer, "\n", datagram] +end diff --git a/test/support/helpers.ex b/test/support/helpers.ex index a65e73b..12f4383 100644 --- a/test/support/helpers.ex +++ b/test/support/helpers.ex @@ -1,6 +1,8 @@ defmodule TelemetryMetricsStatsd.Test.Helpers do @moduledoc false + use ExUnit.Case + def given_counter(event_name, opts \\ []) do Telemetry.Metrics.counter(event_name, opts) end @@ -20,4 +22,24 @@ defmodule TelemetryMetricsStatsd.Test.Helpers do def given_distribution(event_name, opts \\ []) do Telemetry.Metrics.distribution(event_name, opts) end + + def setup_telemetry(events, config \\ []) do + telemetry_handle_id = "test-telemetry-handler-#{inspect(self())}" + + :ok = + :telemetry.attach_many( + telemetry_handle_id, + events, + &send_to_pid/4, + config + ) + + :ok = on_exit(fn -> :telemetry.detach(telemetry_handle_id) end) + end + + defp send_to_pid(event, measurements, metadata, config) do + pid = config[:pid] || self() + + send(pid, {:telemetry_event, {event, measurements, metadata, config}}) + end end diff --git a/test/telemetry_metrics_statsd_test.exs b/test/telemetry_metrics_statsd_test.exs index ca80d08..a871a3e 100644 --- a/test/telemetry_metrics_statsd_test.exs +++ b/test/telemetry_metrics_statsd_test.exs @@ -6,6 +6,18 @@ defmodule TelemetryMetricsStatsdTest do import Liveness import Mock + alias TelemetryMetricsStatsd.{CounterOk, CounterError} + alias TelemetryMetricsStatsd.Test.Helpers + + setup do + on_exit(fn -> + CounterOk.reset() + CounterError.reset() + end) + + :ok + end + test "counter metric is reported as StatsD counter with 1 as a value" do {socket, port} = given_udp_port_opened() counter = given_counter("http.requests", event_name: "http.request") @@ -157,7 +169,8 @@ defmodule TelemetryMetricsStatsdTest do handlers_before = :telemetry.list_handlers([]) :telemetry.execute([:http, :request], %{latency: 172}, %{method: "GET"}) - assert_reported(socket, "") + + assert_reported(socket, "http.request.count.GET:1|c") handlers_after = :telemetry.list_handlers([]) assert handlers_after == handlers_before @@ -196,7 +209,7 @@ defmodule TelemetryMetricsStatsdTest do handlers_before = :telemetry.list_handlers([]) :telemetry.execute([:http, :request], %{latency: 172}, %{method: "GET"}) - assert_reported(socket, "http.request.count:1|c|#method:GET,status:") + assert_reported(socket, "http.request.count:1|c|#method:GET") handlers_after = :telemetry.list_handlers([]) assert handlers_after == handlers_before @@ -741,6 +754,110 @@ defmodule TelemetryMetricsStatsdTest do assert udp.host == {127, 0, 0, 1} end) end + + test "buffers packets when buffer_flush_ms is non-zero" do + {socket, port} = given_udp_port_opened() + counter = given_counter("http.requests", event_name: "http.request") + + start_reporter(metrics: [counter], port: port, buffer_flush_ms: 10, pool_size: 1) + + :telemetry.execute([:http, :request], %{latency: 211}) + :telemetry.execute([:http, :request], %{latency: 200}) + :telemetry.execute([:http, :request], %{latency: 198}) + + assert_reported(socket, "http.requests:1|c\nhttp.requests:1|c\nhttp.requests:1|c\n") + end + + test "splits packets into multiple ones when number exceeds mtu" do + {socket, port} = given_udp_port_opened() + counter = given_counter("http.requests", event_name: "http.request") + counter2 = given_counter("db.query", event_name: "db.query") + + start_reporter( + metrics: [counter, counter2], + port: port, + buffer_flush_ms: 10, + pool_size: 1, + mtu: 17 + ) + + :telemetry.execute([:http, :request], %{latency: 211}) + :telemetry.execute([:db, :query], %{latency: 211}) + + eventually(fn -> + assert CounterOk.get() == 2 + assert CounterError.get() == 0 + end) + + assert_reported(socket, "http.requests:1|c") + assert_reported(socket, "db.query:1|c") + end + + test "counts udp ok packets" do + {socket, port} = given_udp_port_opened() + counter = given_counter("http.requests", event_name: "http.request") + + start_reporter(metrics: [counter], port: port, buffer_flush_ms: 10, pool_size: 1) + + :telemetry.execute([:http, :request], %{latency: 211}) + :telemetry.execute([:http, :request], %{latency: 200}) + :telemetry.execute([:http, :request], %{latency: 198}) + + assert_reported(socket, "http.requests:1|c\nhttp.requests:1|c\nhttp.requests:1|c\n") + + assert CounterOk.get() == 1 + assert CounterError.get() == 0 + end + + @tag :capture_log + test "counts udp error packets" do + {_socket, port} = given_udp_port_opened() + counter = given_counter("http.requests", event_name: "http.request") + + reporter = start_reporter(port: port, metrics: [counter], buffer_flush_ms: 10, pool_size: 1) + + # closes udp connections to simulate an error + TelemetryMetricsStatsd.close(reporter) + + :telemetry.execute([:http, :request], %{latency: 211}) + + eventually(fn -> + assert CounterOk.get() == 0 + assert CounterError.get() == 1 + end) + end + + @tag :only + test "sends diagnostic telemetry periodically" do + {socket, port} = given_udp_port_opened() + counter = given_counter("http.requests", event_name: "http.request") + + Helpers.setup_telemetry( + [ + [:telemetry_metrics_statsd, :udp_metrics], + [:telemetry_metrics_statsd, :udp_worker_metrics] + ], + pid: self() + ) + + start_reporter( + metrics: [counter], + port: port, + pool_size: 1, + buffer_flush_ms: 10, + diagnostic_metrics_report_interval: 50 + ) + + :telemetry.execute([:http, :request], %{latency: 211}) + + assert_receive {:telemetry_event, + {[:telemetry_metrics_statsd, :udp_metrics], %{ok_count: 1, error_count: 0}, + %{}, _}} + + assert_receive {:telemetry_event, + {[:telemetry_metrics_statsd, :udp_metrics], %{message_queue_len: 0}, + %{}, _}} + end end defp given_udp_port_opened(inet_address_family \\ :inet) do