diff --git a/.gitignore b/.gitignore index b814b61..968c6ee 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ coverage rdoc pkg .rvmrc +.byebug_history ## PROJECT::SPECIFIC doc diff --git a/.rubocop.yml b/.rubocop.yml new file mode 100644 index 0000000..8609172 --- /dev/null +++ b/.rubocop.yml @@ -0,0 +1,2 @@ +Style/RescueStandardError: + Enabled: false diff --git a/Gemfile b/Gemfile index c8388af..1a23c6c 100644 --- a/Gemfile +++ b/Gemfile @@ -3,6 +3,8 @@ source 'https://rubygems.org' gem 'resque', '~>1.19' group :test do + gem 'byebug' + gem 'timecop' gem 'mocha', '~>0.9' gem 'minitest', '~> 5.5' end diff --git a/Gemfile.lock b/Gemfile.lock index 69eaa04..fde6d85 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -3,6 +3,7 @@ GEM specs: addressable (2.3.5) builder (3.2.2) + byebug (11.1.3) faraday (0.7.6) addressable (~> 2.2) multipart-post (~> 1.1) @@ -56,6 +57,7 @@ GEM rack-protection (~> 1.2) tilt (~> 1.3, >= 1.3.3) tilt (1.3.3) + timecop (0.9.1) vegas (0.1.11) rack (>= 1.0.0) @@ -63,7 +65,9 @@ PLATFORMS ruby DEPENDENCIES + byebug jeweler minitest (~> 5.5) mocha (~> 0.9) resque (~> 1.19) + timecop diff --git a/README.rdoc b/README.rdoc index b03a6ee..26b52d4 100644 --- a/README.rdoc +++ b/README.rdoc @@ -123,6 +123,54 @@ is raised if the job is killed. at(steps_completed, total_steps, "${steps_completed} of #{total_steps} steps completed!") +=== Distributed job + +With Status you can create a destributed parent/child job, that will run some part of the processing, +in multiple, potentially parallel child jobs and then do somthing when all child jobs will finish. +To do this you first need to define perform_child method. This method will be called in child jobs +instead of perform. Then, in the parent's perform, you need initiate parent using init_parent(total), +and finally queue children using enqueue_child(options). +The progress is tracked automatically using regular "num" and "total" fields. The "total" will be set to +the "total" from init_parent and num will be incresed by 1 for each finished job, so DO NOT change them manually. +When all child jobs will finish, the last one will call on_success in its own context. + +Example: + + class VeryHeavyJob + include Resque::Plugins::Status + + def perform + init_parent(options['items'].size) + options['items'].each do |item| + enqueue_child('item' => item) + end + end + + def perform_child + process_item(options['item']) + end + + def on_success + send_email "yey! all items finished" + end + end + +If you need to get status/options of the parent, you can use parent_status method. + +If you need to add something to the parent status you can use update_parent method. +Note: do not override "num" and "total" fields, as they are responsible for tracking parent job progress. + +Example, assuming your child job loads some batch of data and wants to record in parent status total size of loaded data: + + update_parent { |parent| parent["loaded"] = parent["loaded"].to_i + data.size } + +The statuses of child jobs will be deleted, unless it failed. In case of failure on child level, the parent will stay "working" +forever, and child status will be preserved, so that you can investigate the issue and retry the child. Once it succeeds, +it will mark the parent as completed. +The child jobs will not call callbacks for each job, only once when all are finished. +When parent job is killed using Resque::Plugins::Status::Hash.kill it will skip all child jobs that didn't start yet, +and once all child jobs are either finished or skipped, it will set the parent job's status to "killed" and call "on_killed" + === Expiration Since Redis is RAM based, we probably don't want to keep these statuses around forever diff --git a/lib/resque/plugins/status.rb b/lib/resque/plugins/status.rb index bb41521..a41961e 100644 --- a/lib/resque/plugins/status.rb +++ b/lib/resque/plugins/status.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Resque module Plugins @@ -49,7 +51,7 @@ module Status class Killed < RuntimeError; end class NotANumber < RuntimeError; end - attr_reader :uuid, :options + attr_reader :uuid, :options, :parent_uuid def self.included(base) base.extend(ClassMethods) @@ -131,10 +133,15 @@ def dequeue(klass, uuid) # options. # # You should not override this method, rahter the perform instance method. - def perform(uuid=nil, options = {}) - uuid ||= Resque::Plugins::Status::Hash.generate_uuid + def perform(uuid = nil, options = {}) + if (!uuid || uuid.is_a?(::Hash)) && options == {} + options = uuid || {} + uuid = Resque::Plugins::Status::Hash.generate_uuid + Resque::Plugins::Status::Hash.create uuid, options: options + end + instance = new(uuid, options) - instance.safe_perform! + instance.parent_uuid ? instance.child_safe_perform! : instance.safe_perform! instance end @@ -150,6 +157,7 @@ def scheduled(queue, klass, *args) def initialize(uuid, options = {}) @uuid = uuid @options = options + @parent_uuid = options['_parent_uuid'] end # Run by the Resque::Worker when processing this job. It wraps the perform @@ -159,10 +167,13 @@ def initialize(uuid, options = {}) def safe_perform! set_status({'status' => STATUS_WORKING}) perform - if status && status.failed? - on_failure(status.message) if respond_to?(:on_failure) + job_status = status + if job_status&.failed? + on_failure(job_status.message) if respond_to?(:on_failure) + return + elsif @is_parent_job return - elsif status && !status.completed? + elsif job_status && !job_status.completed? completed end on_success if respond_to?(:on_success) @@ -178,6 +189,16 @@ def safe_perform! end end + def child_safe_perform! + set_status('status' => STATUS_WORKING, 'started_at' => Time.now.to_i) + perform_child unless parent_should_kill? + completed if status&.working? + child_complete unless status&.failed? + rescue => e + failed("The task failed because of an error: #{e}") + raise e + end + # Set the jobs status. Can take an array of strings or hashes that are merged # (in order) into a final status hash. def status=(new_status) @@ -199,6 +220,10 @@ def should_kill? Resque::Plugins::Status::Hash.should_kill?(uuid) end + def parent_should_kill? + Resque::Plugins::Status::Hash.should_kill?(parent_uuid) + end + # set the status of the job for the current itteration. num and # total are passed to the status as well as any messages. # This will kill the job if it has been added to the kill list with @@ -244,7 +269,58 @@ def kill! raise Killed end + # Initiates parent once total number of child jobs is known + # This step is essential to prevent race condition of all currently queued children finish, + # while parent still plans to enqueue more. + # When child job completes, it increments the `num` of the parent job by 1. + # When `num` gets == `total` the parent job marked as complete and `on_success` is called on the last child job + # If parent is killed, all children are prevented from running + # Child job statuses are deleted when complete or killed to avoid garbage in redis. It's preserved on error. + def init_parent(total) + at(0, total, "Queuing #{total} subjobs") + @is_parent_job = true + end + + # Enqueues the same class with `options` as a child of the current job + def enqueue_child(options) + raise 'Parent not initiated' unless @is_parent_job + + Resque.enqueue(self.class, options.merge('_parent_uuid' => uuid)) + end + private + + def parent_status + Resque::Plugins::Status::Hash.get(parent_uuid) + end + + def update_parent(&block) + Resque::Plugins::Status::Hash.update(parent_uuid, &block) + end + + def child_complete + parent = update_parent do |st| + st['num'] += 1 + st['message'] = "Working #{st['num']}/#{st['total']}" + st['time'] = Time.now.to_i + end + Resque::Plugins::Status::Hash.remove(uuid) + return if parent.num != parent.total + + if parent_should_kill? + Resque::Plugins::Status::Hash.set(parent_uuid, parent, 'status' => STATUS_KILLED) + on_killed if respond_to?(:on_killed) + else + begin + on_success if respond_to?(:on_success) + Resque::Plugins::Status::Hash.set(parent_uuid, parent, 'status' => STATUS_COMPLETED, 'message' => "Finished in #{parent['num']} jobs") + rescue => e + Resque::Plugins::Status::Hash.set(parent_uuid, parent, 'status' => STATUS_FAILED, 'message' => "on_success failed with #{e.class}/#{e}") + raise + end + end + end + def set_status(*args) self.status = [status, {'name' => self.name}, args].flatten end diff --git a/lib/resque/plugins/status/hash.rb b/lib/resque/plugins/status/hash.rb index c0a3015..210127e 100644 --- a/lib/resque/plugins/status/hash.rb +++ b/lib/resque/plugins/status/hash.rb @@ -46,6 +46,16 @@ def self.set(uuid, *messages) val end + # Atomically update the status in block. Example: + # status.update { |st| st['num'] += 1; st['message'] = "finished #{st['num']}" } + # Note: the updating on this UUID is locking using redis for 10s, so make sure the update is immediate + def self.update(uuid, &block) + sleep(0.01) until redis.set("lock:update:#{uuid}", 1, nx: true, ex: 10) + set(uuid, get(uuid).tap(&block)) + ensure + redis.del("lock:update:#{uuid}") + end + # clear statuses from redis passing an optional range. See `statuses` for info # about ranges def self.clear(range_start = nil, range_end = nil) diff --git a/test/test_helper.rb b/test/test_helper.rb index 5ec5783..e4b074b 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -3,6 +3,7 @@ require 'minitest/autorun' require 'mocha/setup' +require 'timecop' # # make sure we can run redis @@ -50,6 +51,27 @@ def perform end +class WorkingParentJob + include Resque::Plugins::Status + + def perform + init_parent(3) + # This is a workaround to test killing job while it enqueues children with inline resque. + # The real use case would be that the `kill` would happen somewhere in the middle of subjob runs, + # long after the main job finished. + Resque::Plugins::Status::Hash.kill(@uuid) if options['self_kill'] + 3.times { |i| enqueue_child('job_num' => i) } + end + + def perform_child + Resque.redis.sadd('child_jobs_done', options['job_num']) + end + + def on_success + Resque.redis.sadd('child_on_success', options['job_num'] || 'parent') + end +end + class ErrorJob include Resque::Plugins::Status diff --git a/test/test_resque_plugins_status.rb b/test/test_resque_plugins_status.rb index 5c14a79..94bdece 100644 --- a/test/test_resque_plugins_status.rb +++ b/test/test_resque_plugins_status.rb @@ -250,8 +250,10 @@ class TestResquePluginsStatus < Minitest::Test describe "invoking killall jobs to kill a range" do before do - @uuid1 = KillableJob.create(:num => 100) - @uuid2 = KillableJob.create(:num => 100) + Timecop.travel(-1) do + @uuid1 = KillableJob.create(:num => 100) + end + @uuid2 = KillableJob.create(:num => 100) Resque::Plugins::Status::Hash.killall(0,0) # only @uuid2 it be killed @@ -353,6 +355,36 @@ class TestResquePluginsStatus < Minitest::Test end - end + describe 'with WorkingParentJob' do + before do + Resque.stubs(:inline?).returns(true) + @uuid = WorkingParentJob.create('num' => 100) + end + it 'should have ran all the children' do + assert_equal(%w[0 1 2], Resque.redis.smembers('child_jobs_done').sort) + onsuccess_jobs = Resque.redis.smembers('child_on_success') + assert_equal 1, onsuccess_jobs.size + assert_includes(%w[0 1 2], onsuccess_jobs[0]) + status = Resque::Plugins::Status::Hash.get(@uuid) + assert_equal 3, status.num + assert_equal 'completed', status.status + end + end + + describe 'with self killing WorkingParentJob' do + before do + Resque.stubs(:inline?).returns(true) + @uuid = WorkingParentJob.create('num' => 100, 'self_kill' => true) + end + + it 'should have ran all the children' do + assert_equal([], Resque.redis.smembers('child_jobs_done')) + assert_equal([], Resque.redis.smembers('child_on_success')) + status = Resque::Plugins::Status::Hash.get(@uuid) + assert_equal 3, status.num + assert_equal 'killed', status.status + end + end + end end diff --git a/test/test_resque_plugins_status_hash.rb b/test/test_resque_plugins_status_hash.rb index 70bc157..05f02e4 100644 --- a/test/test_resque_plugins_status_hash.rb +++ b/test/test_resque_plugins_status_hash.rb @@ -162,6 +162,25 @@ class TestResquePluginsStatusHash < Minitest::Test end end + describe '.update' do + before do + @uuid = Resque::Plugins::Status::Hash.generate_uuid + Resque::Plugins::Status::Hash.set(@uuid, 'num' => 10) + end + + it 'updates the hash' do + Resque::Plugins::Status::Hash.update(@uuid) { |h| h['num'] += 5; h['message'] = 'hello' } + hash = Resque::Plugins::Status::Hash.get(@uuid) + assert_equal 15, hash.num + assert_equal 'hello', hash.message + end + + it 'is threadsafe' do + 20.times.map { Thread.new { Resque::Plugins::Status::Hash.update(@uuid) { |h| h['num'] += 1 } } }.each(&:join) + assert_equal 30, Resque::Plugins::Status::Hash.get(@uuid).num + end + end + describe ".status_ids" do before do