-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathclient.rb
More file actions
300 lines (263 loc) · 10.1 KB
/
client.rb
File metadata and controls
300 lines (263 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
require "digest"
require "time"
module River
# Default number of maximum attempts for a job.
MAX_ATTEMPTS_DEFAULT = 25
# Default priority for a job.
PRIORITY_DEFAULT = 1
# Default queue for a job.
QUEUE_DEFAULT = "default"
# Provides a client for River that inserts jobs. Unlike the Go version of the
# River client, this one can insert jobs only. Jobs can only be worked from Go
# code, so job arg kinds and JSON encoding details must be shared between Ruby
# and Go code.
#
# Used in conjunction with a River driver like:
#
# DB = Sequel.connect(...)
# client = River::Client.new(River::Driver::Sequel.new(DB))
#
# River drivers are found in separate gems like `riverqueue-sequel` to help
# minimize transient dependencies.
class Client
def initialize(driver)
@driver = driver
@time_now_utc = -> { Time.now.utc } # for test time stubbing
end
# Inserts a new job for work given a job args implementation and insertion
# options (which may be omitted).
#
# With job args only:
#
# insert_res = client.insert(SimpleArgs.new(job_num: 1))
# insert_res.job # inserted job row
#
# With insert opts:
#
# insert_res = client.insert(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(queue: "high_priority"))
# insert_res.job # inserted job row
#
# Job arg implementations are expected to respond to:
#
# * `#kind`: A string that uniquely identifies the job in the database.
# * `#to_json`: Encodes the args to JSON for persistence in the database.
# Must match encoding an args struct on the Go side to be workable.
#
# They may also respond to `#insert_opts` which is expected to return an
# `InsertOpts` that contains options that will apply to all jobs of this
# kind. Insertion options provided as an argument to `#insert` override
# those returned by job args.
#
# For example:
#
# class SimpleArgs
# attr_accessor :job_num
#
# def initialize(job_num:)
# self.job_num = job_num
# end
#
# def kind = "simple"
#
# def to_json = JSON.dump({job_num: job_num})
# end
#
# See also JobArgsHash for an easy way to insert a job from a hash.
#
# Returns an instance of InsertResult.
def insert(args, insert_opts: EMPTY_INSERT_OPTS)
insert_params = make_insert_params(args, insert_opts)
insert_and_check_unique_job(insert_params)
end
# Inserts many new jobs as part of a single batch operation for improved
# efficiency.
#
# Takes an array of job args or InsertManyParams which encapsulate job args
# and a paired InsertOpts.
#
# With job args:
#
# num_inserted = client.insert_many([
# SimpleArgs.new(job_num: 1),
# SimpleArgs.new(job_num: 2)
# ])
#
# With InsertManyParams:
#
# num_inserted = client.insert_many([
# River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(max_attempts: 5)),
# River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: InsertOpts.new(queue: "high_priority"))
# ])
#
# Job arg implementations are expected to respond to:
#
# * `#kind`: A string that uniquely identifies the job in the database.
# * `#to_json`: Encodes the args to JSON for persistence in the database.
# Must match encoding an args struct on the Go side to be workable.
#
# For example:
#
# class SimpleArgs
# attr_accessor :job_num
#
# def initialize(job_num:)
# self.job_num = job_num
# end
#
# def kind = "simple"
#
# def to_json = JSON.dump({job_num: job_num})
# end
#
# See also JobArgsHash for an easy way to insert a job from a hash.
#
# Returns the number of jobs inserted.
def insert_many(args)
all_params = args.map do |arg|
if arg.is_a?(InsertManyParams)
make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS)
else # jobArgs
make_insert_params(arg, EMPTY_INSERT_OPTS)
end
end
@driver.job_insert_many(all_params)
.map do |job, unique_skipped_as_duplicate|
InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate)
end
end
# Default states that are used during a unique insert. Can be overridden by
# setting UniqueOpts#by_state.
DEFAULT_UNIQUE_STATES = [
JOB_STATE_AVAILABLE,
JOB_STATE_COMPLETED,
JOB_STATE_PENDING,
JOB_STATE_RETRYABLE,
JOB_STATE_RUNNING,
JOB_STATE_SCHEDULED
].freeze
private_constant :DEFAULT_UNIQUE_STATES
REQUIRED_UNIQUE_STATES = [
JOB_STATE_AVAILABLE,
JOB_STATE_PENDING,
JOB_STATE_RUNNING,
JOB_STATE_SCHEDULED
].freeze
private_constant :REQUIRED_UNIQUE_STATES
EMPTY_INSERT_OPTS = InsertOpts.new.freeze
private_constant :EMPTY_INSERT_OPTS
private def insert_and_check_unique_job(insert_params)
job, unique_skipped_as_duplicate = @driver.job_insert(insert_params)
InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate)
end
private def make_insert_params(args, insert_opts)
raise "args should respond to `#kind`" if !args.respond_to?(:kind)
# ~all objects in Ruby respond to `#to_json`, so check non-nil instead.
args_json = args.to_json
raise "args should return non-nil from `#to_json`" if !args_json
args_insert_opts = if args.respond_to?(:insert_opts)
args_with_insert_opts = args #: _JobArgsWithInsertOpts # rubocop:disable Layout/LeadingCommentSpace
args_with_insert_opts.insert_opts || EMPTY_INSERT_OPTS
else
EMPTY_INSERT_OPTS
end
scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at
insert_params = Driver::JobInsertParams.new(
encoded_args: args_json,
kind: args.kind,
max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT,
priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT,
queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT,
scheduled_at: scheduled_at&.utc || Time.now,
state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE,
tags: validate_tags(insert_opts.tags || args_insert_opts.tags || [])
)
unique_opts = insert_opts.unique_opts || args_insert_opts.unique_opts
if unique_opts
unique_key, unique_states = make_unique_key_and_bitmask(insert_params, unique_opts)
insert_params.unique_key = unique_key
insert_params.unique_states = unique_states
end
insert_params
end
private def make_unique_key_and_bitmask(insert_params, unique_opts)
unique_key = ""
# It's extremely important here that this unique key format and algorithm
# match the one in the main River library _exactly_. Don't change them
# unless they're updated everywhere.
unless unique_opts.exclude_kind
unique_key += "&kind=#{insert_params.kind}"
end
if unique_opts.by_args
parsed_args = JSON.parse(insert_params.encoded_args)
filtered_args = if unique_opts.by_args.is_a?(Array)
parsed_args.slice(*unique_opts.by_args)
else
parsed_args
end
encoded_args = JSON.generate(filtered_args.sort.to_h)
unique_key += "&args=#{encoded_args}"
end
if unique_opts.by_period
lower_period_bound = truncate_time(insert_params.scheduled_at || @time_now_utc.call, unique_opts.by_period).utc
unique_key += "&period=#{lower_period_bound.strftime("%FT%TZ")}"
end
if unique_opts.by_queue
unique_key += "&queue=#{insert_params.queue}"
end
unique_key_hash = Digest::SHA256.digest(unique_key)
unique_states = validate_unique_states(unique_opts.by_state || DEFAULT_UNIQUE_STATES)
[unique_key_hash, UniqueBitmask.from_states(unique_states)]
end
# Truncates the given time down to the interval. For example:
#
# Thu Jan 15 21:26:36 UTC 2024 @ 15 minutes ->
# Thu Jan 15 21:15:00 UTC 2024
private def truncate_time(time, interval_seconds)
Time.at((time.to_f / interval_seconds).floor * interval_seconds)
end
# Moves an integer that may occupy the entire uint64 space to one that's
# bounded within int64. Allows overflow.
private def uint64_to_int64(int)
[int].pack("Q").unpack1("q") #: Integer # rubocop:disable Layout/LeadingCommentSpace
end
TAG_RE = /\A\w[\w-]+\w\z/
private_constant :TAG_RE
private def validate_tags(tags)
tags.each do |tag|
raise ArgumentError, "tags should be 255 characters or less" if tag.length > 255
raise ArgumentError, "tag should match regex #{TAG_RE.inspect}" unless TAG_RE.match(tag)
end
end
private def validate_unique_states(states)
REQUIRED_UNIQUE_STATES.each do |required_state|
raise ArgumentError, "by_state should include required state #{required_state}" unless states.include?(required_state)
end
states
end
end
# A single job to insert that's part of an #insert_many batch insert. Unlike
# sending raw job args, supports an InsertOpts to pair with the job.
class InsertManyParams
# Job args to insert.
attr_reader :args
# Insertion options to use with the insert.
attr_reader :insert_opts
def initialize(args, insert_opts: nil)
@args = args
@insert_opts = insert_opts
end
end
# Result of a single insertion.
class InsertResult
# Inserted job row, or an existing job row if insert was skipped due to a
# previously existing unique job.
attr_reader :job
# True if for a unique job, the insertion was skipped due to an equivalent
# job matching unique property already being present.
attr_reader :unique_skipped_as_duplicated
def initialize(job, unique_skipped_as_duplicated:)
@job = job
@unique_skipped_as_duplicated = unique_skipped_as_duplicated
end
end
end