Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add telemetry event span for socket dispatch #6019

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions lib/phoenix/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -635,10 +635,26 @@ defmodule Phoenix.Endpoint do

dispatches =
for {path, socket, socket_opts} <- sockets,
{path, plug, conn_ast, plug_opts} <- socket_paths(module, path, socket, socket_opts) do
{match_path, plug, conn_ast, plug_opts} <- socket_paths(module, path, socket, socket_opts) do
quote do
defp do_socket_dispatch(unquote(path), conn) do
halt(unquote(plug).call(unquote(conn_ast), unquote(Macro.escape(plug_opts))))
defp do_socket_dispatch(unquote(match_path), conn) do
conn = unquote(conn_ast)

metadata = %{
conn: conn,
plug: unquote(plug),
route: unquote("#{path}/#{List.last(match_path)}"),
plug_opts: unquote(Macro.escape(plug_opts)),
path_params: conn.path_params,
user_socket: unquote(socket),
log: unquote(Keyword.get(socket_opts, :log, false))
}

:telemetry.span([:phoenix, :socket_dispatch], metadata, fn ->
conn = halt(unquote(plug).call(unquote(conn_ast), unquote(Macro.escape(plug_opts))))

{conn, %{metadata | conn: conn}}
end)
end
end
end
Expand Down Expand Up @@ -980,6 +996,10 @@ defmodule Phoenix.Endpoint do

def handle_error(conn, :rate_limit), do: Plug.Conn.send_resp(conn, 429, "Too many requests")

* `:log` - the level to log the socket dispatching under, may be set to false. Defaults to
`false`. To alter the plug log level, please see
https://hexdocs.pm/phoenix/Phoenix.Logger.html#module-dynamic-log-level.

## Longpoll configuration

The following configuration applies only to `:longpoll`:
Expand Down
36 changes: 36 additions & 0 deletions lib/phoenix/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,24 @@ defmodule Phoenix.Logger do
* Metadata: `%{conn: Plug.Conn.t, route: binary, plug: module, plug_opts: term, path_params: map, pipe_through: [atom], log: Logger.level | false}`
* Disable logging: This event is not logged

* `[:phoenix, :socket_dispatch, :start]` - dispatched by `Phoenix.Endpoint`
before dispatching to a matched route
* Measurement: `%{system_time: System.system_time}`
* Metadata: `%{conn: Plug.Conn.t, route: binary, plug: module, plug_opts: term, path_params: map, user_socket: atom, log: Logger.level | false}`
* Disable logging: `socket "/foo", MySocket, log: false` in your endpoint

* `[:phoenix, :socket_dispatch, :exception]` - dispatched by `Phoenix.Endpoint`
after exceptions on dispatching a route
* Measurement: `%{duration: native_time}`
* Metadata: `%{conn: Plug.Conn.t, kind: :throw | :error | :exit, reason: term(), stacktrace: Exception.stacktrace()}`
* Disable logging: This event is not logged

* `[:phoenix, :socket_dispatch, :stop]` - dispatched by `Phoenix.Endpoint`
after successfully dispatching a matched route
* Measurement: `%{duration: native_time}`
* Metadata: `%{conn: Plug.Conn.t, route: binary, plug: module, plug_opts: term, path_params: map, user_socket: atom, log: Logger.level | false}`
* Disable logging: This event is not logged

* `[:phoenix, :error_rendered]` - dispatched at the end of an error view being rendered
* Measurement: `%{duration: native_time}`
* Metadata: `%{conn: Plug.Conn.t, status: Plug.Conn.status, kind: Exception.kind, reason: term, stacktrace: Exception.stacktrace}`
Expand Down Expand Up @@ -132,6 +150,7 @@ defmodule Phoenix.Logger do
[:phoenix, :endpoint, :start] => &__MODULE__.phoenix_endpoint_start/4,
[:phoenix, :endpoint, :stop] => &__MODULE__.phoenix_endpoint_stop/4,
[:phoenix, :router_dispatch, :start] => &__MODULE__.phoenix_router_dispatch_start/4,
[:phoenix, :socket_dispatch, :start] => &__MODULE__.phoenix_socket_dispatch_start/4,
[:phoenix, :error_rendered] => &__MODULE__.phoenix_error_rendered/4,
[:phoenix, :socket_connected] => &__MODULE__.phoenix_socket_connected/4,
[:phoenix, :channel_joined] => &__MODULE__.phoenix_channel_joined/4,
Expand Down Expand Up @@ -306,6 +325,23 @@ defmodule Phoenix.Logger do
defp params(%Plug.Conn.Unfetched{}), do: "[UNFETCHED]"
defp params(params), do: params |> filter_values() |> inspect()

## Event: [:phoenix, :socket_dispatch, :start]

@doc false
def phoenix_socket_dispatch_start(_, _, %{log: false}, _), do: :ok

def phoenix_socket_dispatch_start(_, _, metadata, _) do
%{log: level, conn: conn, plug: plug} = metadata
level = log_level(level, conn)

Logger.log(level, fn ->
[
"Processing with ",
inspect(plug)
]
end)
end

## Event: [:phoenix, :socket_connected]

@doc false
Expand Down
53 changes: 53 additions & 0 deletions test/phoenix/endpoint/endpoint_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Phoenix.Endpoint.EndpointTest do
use ExUnit.Case, async: true
use RouterHelper

import ExUnit.CaptureLog

@config [
url: [host: {:system, "ENDPOINT_TEST_HOST"}, path: "/api"],
static_url: [host: "static.example.com"],
Expand Down Expand Up @@ -254,6 +256,57 @@ defmodule Phoenix.Endpoint.EndpointTest do
:code.delete(__MODULE__.AddressEndpoint)
end

@socket_dispatch_start [:phoenix, :socket_dispatch, :start]
@socket_dispatch_stop [:phoenix, :socket_dispatch, :stop]
@socket_dispatch_events [@socket_dispatch_start, @socket_dispatch_stop]

def message_pid(event, measures, metadata, test_pid) do
send(test_pid, {:telemetry_event, event, {measures, metadata}})
end

test "phoenix.socket_dispatch.start and .stop are emitted on sockets", context do
Application.put_env(:phoenix, __MODULE__.SocketEndpoint, static_url: [path: "/static"])

defmodule TestSocket do
@behaviour Phoenix.Socket.Transport
def child_spec(_), do: :ignore
def connect(_), do: {:ok, []}
def init(state), do: {:ok, state}
def handle_in(_, state), do: {:ok, state}
def handle_info(_, state), do: {:ok, state}
def terminate(_, _), do: :ok
end

defmodule SocketEndpoint do
use Phoenix.Endpoint, otp_app: :phoenix

socket "/custom/:socket_var", TestSocket, websocket: [early_validate_upgrade: false], log: :debug
end

:telemetry.attach_many(context.test, @socket_dispatch_events, &__MODULE__.message_pid/4, self())
Logger.enable(self())

SocketEndpoint.start_link()

assert capture_log(fn -> SocketEndpoint.call(conn(:get, "/custom/value/websocket"), []) end) =~ """
[debug] Processing with Phoenix.Transports.WebSocket
"""

assert_received {:telemetry_event, [:phoenix, :socket_dispatch, :start], {_, metadata}}

assert metadata.plug == Phoenix.Transports.WebSocket
assert metadata.route == "/custom/:socket_var/websocket"
assert metadata.path_params == %{"socket_var" => "value"}
assert metadata.log == :debug
assert metadata.user_socket == TestSocket

assert_received {:telemetry_event, [:phoenix, :socket_dispatch, :stop], {_, _}}
after
:telemetry.detach(context.test)
:code.purge(__MODULE__.SocketEndpoint)
:code.delete(__MODULE__.SocketEndpoint)
end

test "injects pubsub broadcast with configured server" do
Endpoint.subscribe("sometopic")
some = spawn(fn -> :ok end)
Expand Down