Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't report tags with empty values for traffic optimization #101

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 118 additions & 6 deletions lib/telemetry_metrics_statsd.ex
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ defmodule TelemetryMetricsStatsd do
require Record

alias Telemetry.Metrics
alias TelemetryMetricsStatsd.{EventHandler, Options, UDP}
alias TelemetryMetricsStatsd.{EventHandler, Options, UDP, CounterOk, CounterError}

@type prefix :: String.t() | atom() | nil
@type host :: String.t() | :inet.ip_address()
Expand Down Expand Up @@ -402,6 +402,17 @@ defmodule TelemetryMetricsStatsd do
end
end

def get_udp_worker(pool_id) do
case :ets.lookup(pool_id, :udp_worker) do
[] ->
:error

udp_workers ->
{:udp_worker, udp_worker} = Enum.random(udp_workers)
{:ok, udp_worker}
end
end

@doc false
@spec get_pool_id(pid()) :: :ets.tid()
def get_pool_id(reporter) do
Expand All @@ -417,6 +428,10 @@ defmodule TelemetryMetricsStatsd do
@impl true
def init(options) do
Process.flag(:trap_exit, true)

CounterOk.init()
CounterError.init()

metrics = Map.fetch!(options, :metrics)

udp_config =
Expand All @@ -432,7 +447,22 @@ defmodule TelemetryMetricsStatsd do
end

pool_id = :ets.new(__MODULE__, [:bag, :protected, read_concurrency: true])

udp_workers =
for _ <- 1..options.pool_size do
{:ok, worker_pid} =
TelemetryMetricsStatsd.UDPWorker.start_link(
reporter: self(),
pool_id: pool_id,
max_datagram_size: options.mtu,
buffer_flush_ms: options.buffer_flush_ms
)

{:udp_worker, worker_pid}
end

:ets.insert(pool_id, udps)
:ets.insert(pool_id, udp_workers)

handler_ids =
EventHandler.attach(
Expand All @@ -445,14 +475,17 @@ defmodule TelemetryMetricsStatsd do
options.global_tags
)

schedule_metrics_report(options.diagnostic_metrics_report_interval)

{:ok,
%{
udp_config: udp_config,
handler_ids: handler_ids,
pool_id: pool_id,
host: options.host,
port: options.port,
host_resolution_interval: options.host_resolution_interval
host_resolution_interval: options.host_resolution_interval,
diagnostic_metrics_report_interval: options.diagnostic_metrics_report_interval
}}
end

Expand Down Expand Up @@ -480,11 +513,37 @@ defmodule TelemetryMetricsStatsd do
end
end

@impl true
def handle_cast(:close, %{pool_id: pool_id} = state) do
pool_id
|> :ets.tab2list()
|> Enum.each(fn item ->
case item do
{:udp, udp} ->
:ets.delete_object(pool_id, {:udp, udp})
UDP.close(udp)
:ets.insert(pool_id, {:udp, udp})

_ ->
:ok
end
end)

{:noreply, state}
end

@impl true
def handle_call(:get_pool_id, _from, %{pool_id: pool_id} = state) do
{:reply, pool_id, state}
end

defp message_queue_len(target) when is_pid(target) do
case Process.info(target, :message_queue_len) do
{:message_queue_len, len} -> len
_ -> nil
end
end

@impl true
def handle_info({:EXIT, _pid, reason}, state) do
{:stop, reason, state}
Expand Down Expand Up @@ -520,6 +579,45 @@ defmodule TelemetryMetricsStatsd do
{:noreply, new_state}
end

@impl true
def handle_info(:report_metrics, state) do
:telemetry.execute(
[:telemetry_metrics_statsd, :udp_metrics],
%{
ok_count: CounterOk.get(),
error_count: CounterError.get()
},
%{}
)

state.pool_id
|> :ets.tab2list()
|> Enum.each(fn item ->
case item do
{:udp_worker, udp_worker} ->
:telemetry.execute(
[:telemetry_metrics_statsd, :udp_metrics],
%{
message_queue_len: message_queue_len(udp_worker),
},
%{}
)

_ ->
:ok
end
end)

schedule_metrics_report(state.diagnostic_metrics_report_interval)

{:noreply, state}
rescue
_ ->
Logger.error("Failed to report diagnostic metrics")

{:noreply, state}
end

@impl true
def terminate(_reason, state) do
EventHandler.detach(state.handler_ids)
Expand Down Expand Up @@ -567,10 +665,24 @@ defmodule TelemetryMetricsStatsd do
defp update_pool(pool_id, new_host, new_port) do
pool_id
|> :ets.tab2list()
|> Enum.each(fn {:udp, udp} ->
:ets.delete_object(pool_id, {:udp, udp})
updated_udp = UDP.update(udp, new_host, new_port)
:ets.insert(pool_id, {:udp, updated_udp})
|> Enum.each(fn item ->
case item do
{:udp, udp} ->
:ets.delete_object(pool_id, {:udp, udp})
updated_udp = UDP.update(udp, new_host, new_port)
:ets.insert(pool_id, {:udp, updated_udp})

_ ->
:ok
end
end)
end

# closes udp sockets, currently used only for testing purpose
def close(pid) do
GenServer.cast(pid, :close)
end

defp schedule_metrics_report(interval_ms) when is_integer(interval_ms),
do: Process.send_after(self(), :report_metrics, interval_ms)
end
33 changes: 33 additions & 0 deletions lib/telemetry_metrics_statsd/counter_error.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule TelemetryMetricsStatsd.CounterError do
@counter_ref "telemetry_metrics_statsd_counter_error_ref"

@spec init :: any()
def init do
:persistent_term.put(@counter_ref, :counters.new(1, [:write_concurrency]))
end

@spec reset :: :ok
def reset do
:counters.put(:persistent_term.get(@counter_ref), 1, 0)
end

@spec destroy :: any()
def destroy do
:persistent_term.erase(@counter_ref)
end

@spec increment(integer()) :: :ok
def increment(incr \\ 1) when is_integer(incr) and incr > 0 do
:counters.add(:persistent_term.get(@counter_ref), 1, incr)
end

@spec decrement(integer()) :: :ok
def decrement(decr \\ 1) when is_integer(decr) and decr > 0 do
:counters.sub(:persistent_term.get(@counter_ref), 1, decr)
end

@spec get :: any()
def get do
:counters.get(:persistent_term.get(@counter_ref), 1)
end
end
33 changes: 33 additions & 0 deletions lib/telemetry_metrics_statsd/counter_ok.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule TelemetryMetricsStatsd.CounterOk do
@counter_ref "telemetry_metrics_statsd_counter_ok_ref"

@spec init :: any()
def init do
:persistent_term.put(@counter_ref, :counters.new(1, [:write_concurrency]))
end

@spec reset :: :ok
def reset do
:counters.put(:persistent_term.get(@counter_ref), 1, 0)
end

@spec destroy :: any()
def destroy do
:persistent_term.erase(@counter_ref)
end

@spec increment(integer()) :: :ok
def increment(incr \\ 1) when is_integer(incr) and incr > 0 do
:counters.add(:persistent_term.get(@counter_ref), 1, incr)
end

@spec decrement(integer()) :: :ok
def decrement(decr \\ 1) when is_integer(decr) and decr > 0 do
:counters.sub(:persistent_term.get(@counter_ref), 1, decr)
end

@spec get :: any()
def get do
:counters.get(:persistent_term.get(@counter_ref), 1)
end
end
33 changes: 14 additions & 19 deletions lib/telemetry_metrics_statsd/event_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ defmodule TelemetryMetricsStatsd.EventHandler do
@moduledoc false

alias Telemetry.Metrics
alias TelemetryMetricsStatsd.{Formatter, Packet, UDP}
alias TelemetryMetricsStatsd.{Formatter}

require Logger

@spec attach(
[Metrics.t()],
Expand Down Expand Up @@ -46,10 +48,8 @@ defmodule TelemetryMetricsStatsd.EventHandler do
end

def handle_event(_event, measurements, metadata, %{
reporter: reporter,
pool_id: pool_id,
metrics: metrics,
mtu: mtu,
prefix: prefix,
formatter: formatter_mod,
global_tags: global_tags
Expand All @@ -63,7 +63,10 @@ defmodule TelemetryMetricsStatsd.EventHandler do
|> Map.new()
|> Map.merge(metric.tag_values.(metadata))

tags = Enum.map(metric.tags, &{&1, Map.get(tag_values, &1, "")})
tags =
Enum.map(metric.tags, &{&1, Map.get(tag_values, &1, "")})
|> Enum.filter(fn {_, tag_value} -> tag_value != "" end)

Formatter.format(formatter_mod, metric, prefix, value, tags)
else
:nopublish
Expand All @@ -76,7 +79,7 @@ defmodule TelemetryMetricsStatsd.EventHandler do
:ok

packets ->
publish_metrics(reporter, pool_id, Packet.build_packets(packets, mtu, "\n"))
schedule_metrics_publish(pool_id, packets)
end
end

Expand Down Expand Up @@ -125,22 +128,14 @@ defmodule TelemetryMetricsStatsd.EventHandler do
end
end

@spec publish_metrics(pid(), :ets.tid(), [binary()]) :: :ok
defp publish_metrics(reporter, pool_id, packets) do
case TelemetryMetricsStatsd.get_udp(pool_id) do
{:ok, udp} ->
Enum.reduce_while(packets, :cont, fn packet, :cont ->
case UDP.send(udp, packet) do
:ok ->
{:cont, :cont}

{:error, reason} ->
TelemetryMetricsStatsd.udp_error(reporter, udp, reason)
{:halt, :halt}
end
end)
defp schedule_metrics_publish(pool_id, packets) do
case TelemetryMetricsStatsd.get_udp_worker(pool_id) do
{:ok, udp_worker_pid} ->
TelemetryMetricsStatsd.UDPWorker.publish_datagrams(udp_worker_pid, packets)

:error ->
Logger.error("Failed to schedule metrics publishing over UDP: no workers available.")

:ok
end
end
Expand Down
16 changes: 15 additions & 1 deletion lib/telemetry_metrics_statsd/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,24 @@ defmodule TelemetryMetricsStatsd.Options do
],
mtu: [
type: :non_neg_integer,
default: 512,
# https://github.com/DataDog/datadog-go/blob/3255e6186e83fad1e447573c9fa03dd13c023394/statsd/statsd.go#L35-L41
# usually IPv4 networks are configured with a MTU of 1500 bytes, so packet size should <= MTU
default: 1432,
doc:
"Maximum Transmission Unit of the link between your application and the StastD server in bytes. " <>
"If this value is greater than the actual MTU of the link, UDP packets with published metrics will be dropped."
],
buffer_flush_ms: [
type: :non_neg_integer,
default: 0,
doc:
"The maximum time in milliseconds to wait before flushing the buffer. If the buffer is not full, it will be flushed after this time."
],
diagnostic_metrics_report_interval: [
type: :non_neg_integer,
# every 15 seconds
default: 15_000,
doc: "The interval in milliseconds when diagnostic metrics are published"
]
]

Expand Down
Loading