diff --git a/.gitignore b/.gitignore
index b814b61..968c6ee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,6 +18,7 @@ coverage
rdoc
pkg
.rvmrc
+.byebug_history
## PROJECT::SPECIFIC
doc
diff --git a/.rubocop.yml b/.rubocop.yml
new file mode 100644
index 0000000..8609172
--- /dev/null
+++ b/.rubocop.yml
@@ -0,0 +1,2 @@
+Style/RescueStandardError:
+ Enabled: false
diff --git a/Gemfile b/Gemfile
index c8388af..1a23c6c 100644
--- a/Gemfile
+++ b/Gemfile
@@ -3,6 +3,8 @@ source 'https://rubygems.org'
gem 'resque', '~>1.19'
group :test do
+ gem 'byebug'
+ gem 'timecop'
gem 'mocha', '~>0.9'
gem 'minitest', '~> 5.5'
end
diff --git a/Gemfile.lock b/Gemfile.lock
index 69eaa04..fde6d85 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -3,6 +3,7 @@ GEM
specs:
addressable (2.3.5)
builder (3.2.2)
+ byebug (11.1.3)
faraday (0.7.6)
addressable (~> 2.2)
multipart-post (~> 1.1)
@@ -56,6 +57,7 @@ GEM
rack-protection (~> 1.2)
tilt (~> 1.3, >= 1.3.3)
tilt (1.3.3)
+ timecop (0.9.1)
vegas (0.1.11)
rack (>= 1.0.0)
@@ -63,7 +65,9 @@ PLATFORMS
ruby
DEPENDENCIES
+ byebug
jeweler
minitest (~> 5.5)
mocha (~> 0.9)
resque (~> 1.19)
+ timecop
diff --git a/README.rdoc b/README.rdoc
index b03a6ee..26b52d4 100644
--- a/README.rdoc
+++ b/README.rdoc
@@ -123,6 +123,54 @@ is raised if the job is killed.
at(steps_completed, total_steps, "${steps_completed} of #{total_steps} steps completed!")
+=== Distributed job
+
+With Status you can create a destributed parent/child job, that will run some part of the processing,
+in multiple, potentially parallel child jobs and then do somthing when all child jobs will finish.
+To do this you first need to define perform_child method. This method will be called in child jobs
+instead of perform. Then, in the parent's perform, you need initiate parent using init_parent(total),
+and finally queue children using enqueue_child(options).
+The progress is tracked automatically using regular "num" and "total" fields. The "total" will be set to
+the "total" from init_parent and num will be incresed by 1 for each finished job, so DO NOT change them manually.
+When all child jobs will finish, the last one will call on_success in its own context.
+
+Example:
+
+ class VeryHeavyJob
+ include Resque::Plugins::Status
+
+ def perform
+ init_parent(options['items'].size)
+ options['items'].each do |item|
+ enqueue_child('item' => item)
+ end
+ end
+
+ def perform_child
+ process_item(options['item'])
+ end
+
+ def on_success
+ send_email "yey! all items finished"
+ end
+ end
+
+If you need to get status/options of the parent, you can use parent_status method.
+
+If you need to add something to the parent status you can use update_parent method.
+Note: do not override "num" and "total" fields, as they are responsible for tracking parent job progress.
+
+Example, assuming your child job loads some batch of data and wants to record in parent status total size of loaded data:
+
+ update_parent { |parent| parent["loaded"] = parent["loaded"].to_i + data.size }
+
+The statuses of child jobs will be deleted, unless it failed. In case of failure on child level, the parent will stay "working"
+forever, and child status will be preserved, so that you can investigate the issue and retry the child. Once it succeeds,
+it will mark the parent as completed.
+The child jobs will not call callbacks for each job, only once when all are finished.
+When parent job is killed using Resque::Plugins::Status::Hash.kill it will skip all child jobs that didn't start yet,
+and once all child jobs are either finished or skipped, it will set the parent job's status to "killed" and call "on_killed"
+
=== Expiration
Since Redis is RAM based, we probably don't want to keep these statuses around forever
diff --git a/lib/resque/plugins/status.rb b/lib/resque/plugins/status.rb
index bb41521..a41961e 100644
--- a/lib/resque/plugins/status.rb
+++ b/lib/resque/plugins/status.rb
@@ -1,3 +1,5 @@
+# frozen_string_literal: true
+
module Resque
module Plugins
@@ -49,7 +51,7 @@ module Status
class Killed < RuntimeError; end
class NotANumber < RuntimeError; end
- attr_reader :uuid, :options
+ attr_reader :uuid, :options, :parent_uuid
def self.included(base)
base.extend(ClassMethods)
@@ -131,10 +133,15 @@ def dequeue(klass, uuid)
# options.
#
# You should not override this method, rahter the perform instance method.
- def perform(uuid=nil, options = {})
- uuid ||= Resque::Plugins::Status::Hash.generate_uuid
+ def perform(uuid = nil, options = {})
+ if (!uuid || uuid.is_a?(::Hash)) && options == {}
+ options = uuid || {}
+ uuid = Resque::Plugins::Status::Hash.generate_uuid
+ Resque::Plugins::Status::Hash.create uuid, options: options
+ end
+
instance = new(uuid, options)
- instance.safe_perform!
+ instance.parent_uuid ? instance.child_safe_perform! : instance.safe_perform!
instance
end
@@ -150,6 +157,7 @@ def scheduled(queue, klass, *args)
def initialize(uuid, options = {})
@uuid = uuid
@options = options
+ @parent_uuid = options['_parent_uuid']
end
# Run by the Resque::Worker when processing this job. It wraps the perform
@@ -159,10 +167,13 @@ def initialize(uuid, options = {})
def safe_perform!
set_status({'status' => STATUS_WORKING})
perform
- if status && status.failed?
- on_failure(status.message) if respond_to?(:on_failure)
+ job_status = status
+ if job_status&.failed?
+ on_failure(job_status.message) if respond_to?(:on_failure)
+ return
+ elsif @is_parent_job
return
- elsif status && !status.completed?
+ elsif job_status && !job_status.completed?
completed
end
on_success if respond_to?(:on_success)
@@ -178,6 +189,16 @@ def safe_perform!
end
end
+ def child_safe_perform!
+ set_status('status' => STATUS_WORKING, 'started_at' => Time.now.to_i)
+ perform_child unless parent_should_kill?
+ completed if status&.working?
+ child_complete unless status&.failed?
+ rescue => e
+ failed("The task failed because of an error: #{e}")
+ raise e
+ end
+
# Set the jobs status. Can take an array of strings or hashes that are merged
# (in order) into a final status hash.
def status=(new_status)
@@ -199,6 +220,10 @@ def should_kill?
Resque::Plugins::Status::Hash.should_kill?(uuid)
end
+ def parent_should_kill?
+ Resque::Plugins::Status::Hash.should_kill?(parent_uuid)
+ end
+
# set the status of the job for the current itteration. num and
# total are passed to the status as well as any messages.
# This will kill the job if it has been added to the kill list with
@@ -244,7 +269,58 @@ def kill!
raise Killed
end
+ # Initiates parent once total number of child jobs is known
+ # This step is essential to prevent race condition of all currently queued children finish,
+ # while parent still plans to enqueue more.
+ # When child job completes, it increments the `num` of the parent job by 1.
+ # When `num` gets == `total` the parent job marked as complete and `on_success` is called on the last child job
+ # If parent is killed, all children are prevented from running
+ # Child job statuses are deleted when complete or killed to avoid garbage in redis. It's preserved on error.
+ def init_parent(total)
+ at(0, total, "Queuing #{total} subjobs")
+ @is_parent_job = true
+ end
+
+ # Enqueues the same class with `options` as a child of the current job
+ def enqueue_child(options)
+ raise 'Parent not initiated' unless @is_parent_job
+
+ Resque.enqueue(self.class, options.merge('_parent_uuid' => uuid))
+ end
+
private
+
+ def parent_status
+ Resque::Plugins::Status::Hash.get(parent_uuid)
+ end
+
+ def update_parent(&block)
+ Resque::Plugins::Status::Hash.update(parent_uuid, &block)
+ end
+
+ def child_complete
+ parent = update_parent do |st|
+ st['num'] += 1
+ st['message'] = "Working #{st['num']}/#{st['total']}"
+ st['time'] = Time.now.to_i
+ end
+ Resque::Plugins::Status::Hash.remove(uuid)
+ return if parent.num != parent.total
+
+ if parent_should_kill?
+ Resque::Plugins::Status::Hash.set(parent_uuid, parent, 'status' => STATUS_KILLED)
+ on_killed if respond_to?(:on_killed)
+ else
+ begin
+ on_success if respond_to?(:on_success)
+ Resque::Plugins::Status::Hash.set(parent_uuid, parent, 'status' => STATUS_COMPLETED, 'message' => "Finished in #{parent['num']} jobs")
+ rescue => e
+ Resque::Plugins::Status::Hash.set(parent_uuid, parent, 'status' => STATUS_FAILED, 'message' => "on_success failed with #{e.class}/#{e}")
+ raise
+ end
+ end
+ end
+
def set_status(*args)
self.status = [status, {'name' => self.name}, args].flatten
end
diff --git a/lib/resque/plugins/status/hash.rb b/lib/resque/plugins/status/hash.rb
index c0a3015..210127e 100644
--- a/lib/resque/plugins/status/hash.rb
+++ b/lib/resque/plugins/status/hash.rb
@@ -46,6 +46,16 @@ def self.set(uuid, *messages)
val
end
+ # Atomically update the status in block. Example:
+ # status.update { |st| st['num'] += 1; st['message'] = "finished #{st['num']}" }
+ # Note: the updating on this UUID is locking using redis for 10s, so make sure the update is immediate
+ def self.update(uuid, &block)
+ sleep(0.01) until redis.set("lock:update:#{uuid}", 1, nx: true, ex: 10)
+ set(uuid, get(uuid).tap(&block))
+ ensure
+ redis.del("lock:update:#{uuid}")
+ end
+
# clear statuses from redis passing an optional range. See `statuses` for info
# about ranges
def self.clear(range_start = nil, range_end = nil)
diff --git a/test/test_helper.rb b/test/test_helper.rb
index 5ec5783..e4b074b 100644
--- a/test/test_helper.rb
+++ b/test/test_helper.rb
@@ -3,6 +3,7 @@
require 'minitest/autorun'
require 'mocha/setup'
+require 'timecop'
#
# make sure we can run redis
@@ -50,6 +51,27 @@ def perform
end
+class WorkingParentJob
+ include Resque::Plugins::Status
+
+ def perform
+ init_parent(3)
+ # This is a workaround to test killing job while it enqueues children with inline resque.
+ # The real use case would be that the `kill` would happen somewhere in the middle of subjob runs,
+ # long after the main job finished.
+ Resque::Plugins::Status::Hash.kill(@uuid) if options['self_kill']
+ 3.times { |i| enqueue_child('job_num' => i) }
+ end
+
+ def perform_child
+ Resque.redis.sadd('child_jobs_done', options['job_num'])
+ end
+
+ def on_success
+ Resque.redis.sadd('child_on_success', options['job_num'] || 'parent')
+ end
+end
+
class ErrorJob
include Resque::Plugins::Status
diff --git a/test/test_resque_plugins_status.rb b/test/test_resque_plugins_status.rb
index 5c14a79..94bdece 100644
--- a/test/test_resque_plugins_status.rb
+++ b/test/test_resque_plugins_status.rb
@@ -250,8 +250,10 @@ class TestResquePluginsStatus < Minitest::Test
describe "invoking killall jobs to kill a range" do
before do
- @uuid1 = KillableJob.create(:num => 100)
- @uuid2 = KillableJob.create(:num => 100)
+ Timecop.travel(-1) do
+ @uuid1 = KillableJob.create(:num => 100)
+ end
+ @uuid2 = KillableJob.create(:num => 100)
Resque::Plugins::Status::Hash.killall(0,0) # only @uuid2 it be killed
@@ -353,6 +355,36 @@ class TestResquePluginsStatus < Minitest::Test
end
- end
+ describe 'with WorkingParentJob' do
+ before do
+ Resque.stubs(:inline?).returns(true)
+ @uuid = WorkingParentJob.create('num' => 100)
+ end
+ it 'should have ran all the children' do
+ assert_equal(%w[0 1 2], Resque.redis.smembers('child_jobs_done').sort)
+ onsuccess_jobs = Resque.redis.smembers('child_on_success')
+ assert_equal 1, onsuccess_jobs.size
+ assert_includes(%w[0 1 2], onsuccess_jobs[0])
+ status = Resque::Plugins::Status::Hash.get(@uuid)
+ assert_equal 3, status.num
+ assert_equal 'completed', status.status
+ end
+ end
+
+ describe 'with self killing WorkingParentJob' do
+ before do
+ Resque.stubs(:inline?).returns(true)
+ @uuid = WorkingParentJob.create('num' => 100, 'self_kill' => true)
+ end
+
+ it 'should have ran all the children' do
+ assert_equal([], Resque.redis.smembers('child_jobs_done'))
+ assert_equal([], Resque.redis.smembers('child_on_success'))
+ status = Resque::Plugins::Status::Hash.get(@uuid)
+ assert_equal 3, status.num
+ assert_equal 'killed', status.status
+ end
+ end
+ end
end
diff --git a/test/test_resque_plugins_status_hash.rb b/test/test_resque_plugins_status_hash.rb
index 70bc157..05f02e4 100644
--- a/test/test_resque_plugins_status_hash.rb
+++ b/test/test_resque_plugins_status_hash.rb
@@ -162,6 +162,25 @@ class TestResquePluginsStatusHash < Minitest::Test
end
end
+ describe '.update' do
+ before do
+ @uuid = Resque::Plugins::Status::Hash.generate_uuid
+ Resque::Plugins::Status::Hash.set(@uuid, 'num' => 10)
+ end
+
+ it 'updates the hash' do
+ Resque::Plugins::Status::Hash.update(@uuid) { |h| h['num'] += 5; h['message'] = 'hello' }
+ hash = Resque::Plugins::Status::Hash.get(@uuid)
+ assert_equal 15, hash.num
+ assert_equal 'hello', hash.message
+ end
+
+ it 'is threadsafe' do
+ 20.times.map { Thread.new { Resque::Plugins::Status::Hash.update(@uuid) { |h| h['num'] += 1 } } }.each(&:join)
+ assert_equal 30, Resque::Plugins::Status::Hash.get(@uuid).num
+ end
+ end
+
describe ".status_ids" do
before do