Skip to content

Commit a21e060

Browse files
authored
Add Cron support for Quantum jobs (#699)
1 parent 30a684a commit a21e060

File tree

6 files changed

+314
-2
lines changed

6 files changed

+314
-2
lines changed

lib/sentry/application.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,9 @@ defmodule Sentry.Application do
5151
if config[:oban][:cron][:enabled] do
5252
Sentry.Cron.Oban.attach_telemetry_handler()
5353
end
54+
55+
if config[:quantum][:cron][:enabled] do
56+
Sentry.Cron.Quantum.attach_telemetry_handler()
57+
end
5458
end
5559
end

lib/sentry/config.ex

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,32 @@ defmodule Sentry.Config do
2727
]
2828
]
2929
]
30+
],
31+
quantum: [
32+
type: :keyword_list,
33+
doc: """
34+
Configuration for the [Quantum](https://github.com/quantum-elixir/quantum-core) integration.
35+
*Available since v10.2.0*.
36+
""",
37+
keys: [
38+
cron: [
39+
doc: """
40+
Configuration options for configuring [*crons*](https://docs.sentry.io/product/crons/)
41+
for Quantum.
42+
""",
43+
type: :keyword_list,
44+
keys: [
45+
enabled: [
46+
type: :boolean,
47+
default: false,
48+
doc: """
49+
Whether to enable the Quantum integration. When enabled, the Sentry SDK will
50+
capture check-ins for Quantum jobs. *Available since v10.2.0*.
51+
"""
52+
]
53+
]
54+
]
55+
]
3056
]
3157
]
3258

lib/sentry/cron/quantum.ex

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
defmodule Sentry.Cron.Quantum do
2+
@moduledoc false
3+
4+
@events [
5+
[:quantum, :job, :start],
6+
[:quantum, :job, :stop],
7+
[:quantum, :job, :exception]
8+
]
9+
10+
@spec attach_telemetry_handler() :: :ok
11+
def attach_telemetry_handler do
12+
_ = :telemetry.attach_many(__MODULE__, @events, &__MODULE__.handle_event/4, :no_config)
13+
:ok
14+
end
15+
16+
@spec handle_event([atom()], term(), term(), :no_config) :: :ok
17+
def handle_event(event, measurements, metadata, _config)
18+
19+
def handle_event(
20+
[:quantum, :job, event],
21+
measurements,
22+
%{job: %mod{schedule: schedule}} = metadata,
23+
_config
24+
)
25+
when event in [:start, :stop, :exception] and mod == Quantum.Job and not is_nil(schedule) do
26+
_ = handle_event(event, measurements, metadata)
27+
:ok
28+
end
29+
30+
def handle_event([:quantum, :job, event], _measurements, _metadata, _config)
31+
when event in [:start, :stop, :exception] do
32+
:ok
33+
end
34+
35+
## Helpers
36+
37+
defp handle_event(:start, _measurements, metadata) do
38+
if opts = check_in_opts(metadata) do
39+
opts
40+
|> Keyword.merge(status: :in_progress)
41+
|> Sentry.capture_check_in()
42+
end
43+
end
44+
45+
defp handle_event(:stop, measurements, metadata) do
46+
if opts = check_in_opts(metadata) do
47+
opts
48+
|> Keyword.merge(status: :ok, duration: duration_in_seconds(measurements))
49+
|> Sentry.capture_check_in()
50+
end
51+
end
52+
53+
defp handle_event(:exception, measurements, metadata) do
54+
if opts = check_in_opts(metadata) do
55+
opts
56+
|> Keyword.merge(status: :error, duration: duration_in_seconds(measurements))
57+
|> Sentry.capture_check_in()
58+
end
59+
end
60+
61+
defp check_in_opts(%{job: job} = metadata) when is_struct(job, Quantum.Job) do
62+
if schedule_opts = schedule_opts(job) do
63+
id = metadata.telemetry_span_context |> :erlang.phash2() |> Integer.to_string()
64+
65+
[
66+
check_in_id: "quantum-#{id}",
67+
# This is already a binary.
68+
monitor_slug: "quantum-#{inspect(job.name)}",
69+
monitor_config: [schedule: schedule_opts]
70+
]
71+
else
72+
nil
73+
end
74+
end
75+
76+
defp schedule_opts(job) when is_struct(job, Quantum.Job) do
77+
case apply(Crontab.CronExpression.Composer, :compose, [job.schedule]) do
78+
"@hourly" -> [type: :interval, value: 1, unit: :hour]
79+
"@daily" -> [type: :interval, value: 1, unit: :day]
80+
"@weekly" -> [type: :interval, value: 1, unit: :week]
81+
"@monthly" -> [type: :interval, value: 1, unit: :month]
82+
"@yearly" -> [type: :interval, value: 1, unit: :year]
83+
"@annually" -> [type: :interval, value: 1, unit: :year]
84+
"@reboot" -> nil
85+
cron_expr when is_binary(cron_expr) -> [type: :crontab, value: cron_expr]
86+
_other -> nil
87+
end
88+
end
89+
90+
defp duration_in_seconds(%{duration: duration} = _measurements) do
91+
duration
92+
|> System.convert_time_unit(:native, :millisecond)
93+
|> Kernel./(1000)
94+
end
95+
end

mix.exs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ defmodule Sentry.Mixfile do
9797
{:excoveralls, "~> 0.17.1", only: [:test]},
9898
{:phoenix, "~> 1.5", only: [:test]},
9999
{:phoenix_html, "~> 2.0", only: [:test]}
100-
] ++ maybe_oban_optional_dependency()
100+
] ++ maybe_oban_optional_dependency() ++ maybe_quantum_optional_dependency()
101101
end
102102

103-
# TODO: Remove this once we drop support for Elixir 1.13.
103+
# TODO: Remove this once we drop support for Elixir < 1.13.
104104
defp maybe_oban_optional_dependency do
105105
if Version.match?(System.version(), "~> 1.13") do
106106
[{:oban, "~> 2.17 and >= 2.17.6", only: [:test]}]
@@ -109,6 +109,15 @@ defmodule Sentry.Mixfile do
109109
end
110110
end
111111

112+
# TODO: Remove this once we drop support for Elixir < 1.12.
113+
defp maybe_quantum_optional_dependency do
114+
if Version.match?(System.version(), "~> 1.12") do
115+
[{:quantum, "~> 3.0", only: [:test]}]
116+
else
117+
[]
118+
end
119+
end
120+
112121
defp package do
113122
[
114123
files: ["lib", "LICENSE", "mix.exs", "README.md", "CHANGELOG.md"],

mix.lock

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"cowboy": {:hex, :cowboy, "2.8.0", "f3dc62e35797ecd9ac1b50db74611193c29815401e53bac9a5c0577bd7bc667d", [:rebar3], [{:cowlib, "~> 2.9.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "4643e4fba74ac96d4d152c75803de6fad0b3fa5df354c71afdd6cbeeb15fac8a"},
55
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"},
66
"cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"},
7+
"crontab": {:hex, :crontab, "1.1.13", "3bad04f050b9f7f1c237809e42223999c150656a6b2afbbfef597d56df2144c5", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm", "d67441bec989640e3afb94e123f45a2bc42d76e02988c9613885dc3d01cf7085"},
78
"db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"},
89
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
910
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
@@ -13,6 +14,7 @@
1314
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
1415
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
1516
"excoveralls": {:hex, :excoveralls, "0.17.1", "83fa7906ef23aa7fc8ad7ee469c357a63b1b3d55dd701ff5b9ce1f72442b2874", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "95bc6fda953e84c60f14da4a198880336205464e75383ec0f570180567985ae0"},
17+
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
1618
"hackney": {:hex, :hackney, "1.17.4", "99da4674592504d3fb0cfef0db84c3ba02b4508bae2dff8c0108baa0d6e0977c", [:rebar3], [{:certifi, "~>2.6.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "de16ff4996556c8548d512f4dbe22dd58a587bf3332e7fd362430a7ef3986b16"},
1719
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
1820
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
@@ -33,8 +35,10 @@
3335
"plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"},
3436
"plug_cowboy": {:hex, :plug_cowboy, "2.4.1", "779ba386c0915027f22e14a48919a9545714f849505fa15af2631a0d298abf0f", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d72113b6dff7b37a7d9b2a5b68892808e3a9a752f2bf7e503240945385b70507"},
3537
"plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"},
38+
"quantum": {:hex, :quantum, "3.5.3", "ee38838a07761663468145f489ad93e16a79440bebd7c0f90dc1ec9850776d99", [:mix], [{:crontab, "~> 1.1", [hex: :crontab, repo: "hexpm", optional: false]}, {:gen_stage, "~> 0.14 or ~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.2", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "500fd3fa77dcd723ed9f766d4a175b684919ff7b6b8cfd9d7d0564d58eba8734"},
3639
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
3740
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
3841
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
42+
"telemetry_registry": {:hex, :telemetry_registry, "0.2.1", "fe648a691f2128e4279d993cd010994c67f282354dc061e697bf070d4b87b480", [:mix, :rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4221cefbcadd0b3e7076960339223742d973f1371bc20f3826af640257bc3690"},
3943
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
4044
}

test/sentry/cron/quantum_test.exs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
# TODO: Quantum requires Elixir 1.12+, remove this once we depend on that too.
2+
if Version.match?(System.version(), "~> 1.12") do
3+
defmodule Sentry.Cron.QuantumTest do
4+
use Sentry.Case, async: false
5+
6+
import Sentry.TestHelpers
7+
8+
defmodule Scheduler do
9+
use Quantum, otp_app: :sentry
10+
end
11+
12+
setup_all do
13+
Sentry.Cron.Quantum.attach_telemetry_handler()
14+
end
15+
16+
setup do
17+
bypass = Bypass.open()
18+
19+
put_test_config(
20+
dsn: "http://public:secret@localhost:#{bypass.port}/1",
21+
dedup_events: false,
22+
environment_name: "test"
23+
)
24+
25+
%{bypass: bypass}
26+
end
27+
28+
for event_type <- [:start, :stop, :exception] do
29+
test "ignores #{event_type} events without a cron meta", %{bypass: bypass} do
30+
Bypass.down(bypass)
31+
32+
:telemetry.execute([:quantum, :job, unquote(event_type)], %{}, %{
33+
job: Scheduler.new_job(name: :test_job)
34+
})
35+
end
36+
37+
test "ignores #{event_type} events with a cron expr of @reboot", %{bypass: bypass} do
38+
Bypass.down(bypass)
39+
40+
:telemetry.execute([:quantum, :job, unquote(event_type)], %{}, %{
41+
job:
42+
Scheduler.new_job(
43+
name: :reboot_job,
44+
schedule: Crontab.CronExpression.Parser.parse!("@reboot")
45+
)
46+
})
47+
end
48+
end
49+
50+
test "captures start events with monitor config", %{bypass: bypass} do
51+
test_pid = self()
52+
ref = make_ref()
53+
54+
Bypass.expect_once(bypass, "POST", "/api/1/envelope/", fn conn ->
55+
{:ok, body, conn} = Plug.Conn.read_body(conn)
56+
assert [{headers, check_in_body}] = decode_envelope!(body)
57+
58+
assert headers["type"] == "check_in"
59+
60+
assert check_in_body["check_in_id"] == "quantum-#{:erlang.phash2(ref)}"
61+
assert check_in_body["status"] == "in_progress"
62+
assert check_in_body["monitor_slug"] == "quantum-:test_job"
63+
assert check_in_body["duration"] == nil
64+
assert check_in_body["environment"] == "test"
65+
66+
assert check_in_body["monitor_config"] == %{
67+
"schedule" => %{
68+
"type" => "crontab",
69+
"value" => "0 0 * * * *"
70+
}
71+
}
72+
73+
send(test_pid, {ref, :done})
74+
75+
Plug.Conn.send_resp(conn, 200, ~s<{"id": "1923"}>)
76+
end)
77+
78+
:telemetry.execute([:quantum, :job, :start], %{}, %{
79+
job:
80+
Scheduler.new_job(
81+
name: :test_job,
82+
schedule: Crontab.CronExpression.Parser.parse!("@daily")
83+
),
84+
telemetry_span_context: ref
85+
})
86+
87+
assert_receive {^ref, :done}, 1000
88+
end
89+
90+
test "captures exception events with monitor config", %{bypass: bypass} do
91+
test_pid = self()
92+
ref = make_ref()
93+
94+
Bypass.expect_once(bypass, "POST", "/api/1/envelope/", fn conn ->
95+
{:ok, body, conn} = Plug.Conn.read_body(conn)
96+
assert [{headers, check_in_body}] = decode_envelope!(body)
97+
98+
assert headers["type"] == "check_in"
99+
100+
assert check_in_body["check_in_id"] == "quantum-#{:erlang.phash2(ref)}"
101+
assert check_in_body["status"] == "error"
102+
assert check_in_body["monitor_slug"] == "quantum-:test_job"
103+
assert check_in_body["duration"] == 12.099
104+
assert check_in_body["environment"] == "test"
105+
106+
assert check_in_body["monitor_config"] == %{
107+
"schedule" => %{
108+
"type" => "crontab",
109+
"value" => "0 0 * * * *"
110+
}
111+
}
112+
113+
send(test_pid, {ref, :done})
114+
115+
Plug.Conn.send_resp(conn, 200, ~s<{"id": "1923"}>)
116+
end)
117+
118+
duration = System.convert_time_unit(12_099, :millisecond, :native)
119+
120+
:telemetry.execute([:quantum, :job, :exception], %{duration: duration}, %{
121+
job:
122+
Scheduler.new_job(
123+
name: :test_job,
124+
schedule: Crontab.CronExpression.Parser.parse!("@daily")
125+
),
126+
telemetry_span_context: ref
127+
})
128+
129+
assert_receive {^ref, :done}, 1000
130+
end
131+
132+
test "captures stop events with monitor config", %{bypass: bypass} do
133+
test_pid = self()
134+
ref = make_ref()
135+
136+
Bypass.expect_once(bypass, "POST", "/api/1/envelope/", fn conn ->
137+
{:ok, body, conn} = Plug.Conn.read_body(conn)
138+
assert [{headers, check_in_body}] = decode_envelope!(body)
139+
140+
assert headers["type"] == "check_in"
141+
142+
assert check_in_body["check_in_id"] == "quantum-#{:erlang.phash2(ref)}"
143+
assert check_in_body["status"] == "ok"
144+
assert check_in_body["monitor_slug"] == "quantum-:test_job"
145+
assert check_in_body["duration"] == 12.099
146+
assert check_in_body["environment"] == "test"
147+
148+
assert check_in_body["monitor_config"] == %{
149+
"schedule" => %{
150+
"type" => "crontab",
151+
"value" => "0 0 * * * *"
152+
}
153+
}
154+
155+
send(test_pid, {ref, :done})
156+
157+
Plug.Conn.send_resp(conn, 200, ~s<{"id": "1923"}>)
158+
end)
159+
160+
duration = System.convert_time_unit(12_099, :millisecond, :native)
161+
162+
:telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{
163+
job:
164+
Scheduler.new_job(
165+
name: :test_job,
166+
schedule: Crontab.CronExpression.Parser.parse!("@daily")
167+
),
168+
telemetry_span_context: ref
169+
})
170+
171+
assert_receive {^ref, :done}, 1000
172+
end
173+
end
174+
end

0 commit comments

Comments
 (0)