Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed jobs #143

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ coverage
rdoc
pkg
.rvmrc
.byebug_history

## PROJECT::SPECIFIC
doc
Expand Down
2 changes: 2 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Style/RescueStandardError:
Enabled: false
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ source 'https://rubygems.org'
gem 'resque', '~>1.19'

group :test do
gem 'byebug'
gem 'timecop'
gem 'mocha', '~>0.9'
gem 'minitest', '~> 5.5'
end
Expand Down
4 changes: 4 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ GEM
specs:
addressable (2.3.5)
builder (3.2.2)
byebug (11.1.3)
faraday (0.7.6)
addressable (~> 2.2)
multipart-post (~> 1.1)
Expand Down Expand Up @@ -56,14 +57,17 @@ GEM
rack-protection (~> 1.2)
tilt (~> 1.3, >= 1.3.3)
tilt (1.3.3)
timecop (0.9.1)
vegas (0.1.11)
rack (>= 1.0.0)

PLATFORMS
ruby

DEPENDENCIES
byebug
jeweler
minitest (~> 5.5)
mocha (~> 0.9)
resque (~> 1.19)
timecop
48 changes: 48 additions & 0 deletions README.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,54 @@ is raised if the job is killed.

at(steps_completed, total_steps, "${steps_completed} of #{total_steps} steps completed!")

=== Distributed job

With Status you can create a destributed parent/child job, that will run some part of the processing,
in multiple, potentially parallel child jobs and then do somthing when all child jobs will finish.
To do this you first need to define <tt>perform_child</tt> method. This method will be called in child jobs
instead of <tt>perform</tt>. Then, in the parent's <tt>perform</tt>, you need initiate parent using <tt>init_parent(total)</tt>,
and finally queue children using <tt>enqueue_child(options)</tt>.
The progress is tracked automatically using regular "num" and "total" fields. The "total" will be set to
the "total" from <tt>init_parent</tt> and <tt>num</tt> will be incresed by 1 for each finished job, so DO NOT change them manually.
When all child jobs will finish, the last one will call <tt>on_success</tt> in its own context.

Example:

class VeryHeavyJob
include Resque::Plugins::Status

def perform
init_parent(options['items'].size)
options['items'].each do |item|
enqueue_child('item' => item)
end
end

def perform_child
process_item(options['item'])
end

def on_success
send_email "yey! all items finished"
end
end

If you need to get status/options of the parent, you can use <tt>parent_status</tt> method.

If you need to add something to the parent status you can use <tt>update_parent</tt> method.
Note: do not override "num" and "total" fields, as they are responsible for tracking parent job progress.

Example, assuming your child job loads some batch of data and wants to record in parent status total size of loaded data:

update_parent { |parent| parent["loaded"] = parent["loaded"].to_i + data.size }

The statuses of child jobs will be deleted, unless it failed. In case of failure on child level, the parent will stay "working"
forever, and child status will be preserved, so that you can investigate the issue and retry the child. Once it succeeds,
it will mark the parent as completed.
The child jobs will not call callbacks for each job, only once when all are finished.
When parent job is killed using <tt>Resque::Plugins::Status::Hash.kill</tt> it will skip all child jobs that didn't start yet,
and once all child jobs are either finished or skipped, it will set the parent job's status to "killed" and call "on_killed"

=== Expiration

Since Redis is RAM based, we probably don't want to keep these statuses around forever
Expand Down
90 changes: 83 additions & 7 deletions lib/resque/plugins/status.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

module Resque
module Plugins

Expand Down Expand Up @@ -49,7 +51,7 @@ module Status
class Killed < RuntimeError; end
class NotANumber < RuntimeError; end

attr_reader :uuid, :options
attr_reader :uuid, :options, :parent_uuid

def self.included(base)
base.extend(ClassMethods)
Expand Down Expand Up @@ -131,10 +133,15 @@ def dequeue(klass, uuid)
# options.
#
# You should not override this method, rahter the <tt>perform</tt> instance method.
def perform(uuid=nil, options = {})
uuid ||= Resque::Plugins::Status::Hash.generate_uuid
def perform(uuid = nil, options = {})
if (!uuid || uuid.is_a?(::Hash)) && options == {}
options = uuid || {}
uuid = Resque::Plugins::Status::Hash.generate_uuid
Resque::Plugins::Status::Hash.create uuid, options: options
end

instance = new(uuid, options)
instance.safe_perform!
instance.parent_uuid ? instance.child_safe_perform! : instance.safe_perform!
instance
end

Expand All @@ -150,6 +157,7 @@ def scheduled(queue, klass, *args)
def initialize(uuid, options = {})
@uuid = uuid
@options = options
@parent_uuid = options['_parent_uuid']
end

# Run by the Resque::Worker when processing this job. It wraps the <tt>perform</tt>
Expand All @@ -159,10 +167,13 @@ def initialize(uuid, options = {})
def safe_perform!
set_status({'status' => STATUS_WORKING})
perform
if status && status.failed?
on_failure(status.message) if respond_to?(:on_failure)
job_status = status
if job_status&.failed?
on_failure(job_status.message) if respond_to?(:on_failure)
return
elsif @is_parent_job
return
elsif status && !status.completed?
elsif job_status && !job_status.completed?
completed
end
on_success if respond_to?(:on_success)
Expand All @@ -178,6 +189,16 @@ def safe_perform!
end
end

def child_safe_perform!
set_status('status' => STATUS_WORKING, 'started_at' => Time.now.to_i)
perform_child unless parent_should_kill?
completed if status&.working?
child_complete unless status&.failed?
rescue => e
failed("The task failed because of an error: #{e}")
raise e
end

# Set the jobs status. Can take an array of strings or hashes that are merged
# (in order) into a final status hash.
def status=(new_status)
Expand All @@ -199,6 +220,10 @@ def should_kill?
Resque::Plugins::Status::Hash.should_kill?(uuid)
end

def parent_should_kill?
Resque::Plugins::Status::Hash.should_kill?(parent_uuid)
end

# set the status of the job for the current itteration. <tt>num</tt> and
# <tt>total</tt> are passed to the status as well as any messages.
# This will kill the job if it has been added to the kill list with
Expand Down Expand Up @@ -244,7 +269,58 @@ def kill!
raise Killed
end

# Initiates parent once total number of child jobs is known
# This step is essential to prevent race condition of all currently queued children finish,
# while parent still plans to enqueue more.
# When child job completes, it increments the `num` of the parent job by 1.
# When `num` gets == `total` the parent job marked as complete and `on_success` is called on the last child job
# If parent is killed, all children are prevented from running
# Child job statuses are deleted when complete or killed to avoid garbage in redis. It's preserved on error.
def init_parent(total)
at(0, total, "Queuing #{total} subjobs")
@is_parent_job = true
end

# Enqueues the same class with `options` as a child of the current job
def enqueue_child(options)
raise 'Parent not initiated' unless @is_parent_job

Resque.enqueue(self.class, options.merge('_parent_uuid' => uuid))
end

private

def parent_status
Resque::Plugins::Status::Hash.get(parent_uuid)
end

def update_parent(&block)
Resque::Plugins::Status::Hash.update(parent_uuid, &block)
end

def child_complete
parent = update_parent do |st|
st['num'] += 1
st['message'] = "Working #{st['num']}/#{st['total']}"
st['time'] = Time.now.to_i
end
Resque::Plugins::Status::Hash.remove(uuid)
return if parent.num != parent.total

if parent_should_kill?
Resque::Plugins::Status::Hash.set(parent_uuid, parent, 'status' => STATUS_KILLED)
on_killed if respond_to?(:on_killed)
else
begin
on_success if respond_to?(:on_success)
Resque::Plugins::Status::Hash.set(parent_uuid, parent, 'status' => STATUS_COMPLETED, 'message' => "Finished in #{parent['num']} jobs")
rescue => e
Resque::Plugins::Status::Hash.set(parent_uuid, parent, 'status' => STATUS_FAILED, 'message' => "on_success failed with #{e.class}/#{e}")
raise
end
end
end

def set_status(*args)
self.status = [status, {'name' => self.name}, args].flatten
end
Expand Down
10 changes: 10 additions & 0 deletions lib/resque/plugins/status/hash.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ def self.set(uuid, *messages)
val
end

# Atomically update the status in block. Example:
# status.update { |st| st['num'] += 1; st['message'] = "finished #{st['num']}" }
# Note: the updating on this UUID is locking using redis for 10s, so make sure the update is immediate
def self.update(uuid, &block)
sleep(0.01) until redis.set("lock:update:#{uuid}", 1, nx: true, ex: 10)
set(uuid, get(uuid).tap(&block))
ensure
redis.del("lock:update:#{uuid}")
end

# clear statuses from redis passing an optional range. See `statuses` for info
# about ranges
def self.clear(range_start = nil, range_end = nil)
Expand Down
22 changes: 22 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

require 'minitest/autorun'
require 'mocha/setup'
require 'timecop'

#
# make sure we can run redis
Expand Down Expand Up @@ -50,6 +51,27 @@ def perform

end

class WorkingParentJob
include Resque::Plugins::Status

def perform
init_parent(3)
# This is a workaround to test killing job while it enqueues children with inline resque.
# The real use case would be that the `kill` would happen somewhere in the middle of subjob runs,
# long after the main job finished.
Resque::Plugins::Status::Hash.kill(@uuid) if options['self_kill']
3.times { |i| enqueue_child('job_num' => i) }
end

def perform_child
Resque.redis.sadd('child_jobs_done', options['job_num'])
end

def on_success
Resque.redis.sadd('child_on_success', options['job_num'] || 'parent')
end
end

class ErrorJob

include Resque::Plugins::Status
Expand Down
38 changes: 35 additions & 3 deletions test/test_resque_plugins_status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,10 @@ class TestResquePluginsStatus < Minitest::Test

describe "invoking killall jobs to kill a range" do
before do
@uuid1 = KillableJob.create(:num => 100)
@uuid2 = KillableJob.create(:num => 100)
Timecop.travel(-1) do
@uuid1 = KillableJob.create(:num => 100)
end
@uuid2 = KillableJob.create(:num => 100)

Resque::Plugins::Status::Hash.killall(0,0) # only @uuid2 it be killed

Expand Down Expand Up @@ -353,6 +355,36 @@ class TestResquePluginsStatus < Minitest::Test

end

end
describe 'with WorkingParentJob' do
before do
Resque.stubs(:inline?).returns(true)
@uuid = WorkingParentJob.create('num' => 100)
end

it 'should have ran all the children' do
assert_equal(%w[0 1 2], Resque.redis.smembers('child_jobs_done').sort)
onsuccess_jobs = Resque.redis.smembers('child_on_success')
assert_equal 1, onsuccess_jobs.size
assert_includes(%w[0 1 2], onsuccess_jobs[0])
status = Resque::Plugins::Status::Hash.get(@uuid)
assert_equal 3, status.num
assert_equal 'completed', status.status
end
end

describe 'with self killing WorkingParentJob' do
before do
Resque.stubs(:inline?).returns(true)
@uuid = WorkingParentJob.create('num' => 100, 'self_kill' => true)
end

it 'should have ran all the children' do
assert_equal([], Resque.redis.smembers('child_jobs_done'))
assert_equal([], Resque.redis.smembers('child_on_success'))
status = Resque::Plugins::Status::Hash.get(@uuid)
assert_equal 3, status.num
assert_equal 'killed', status.status
end
end
end
end
19 changes: 19 additions & 0 deletions test/test_resque_plugins_status_hash.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ class TestResquePluginsStatusHash < Minitest::Test
end
end

describe '.update' do
before do
@uuid = Resque::Plugins::Status::Hash.generate_uuid
Resque::Plugins::Status::Hash.set(@uuid, 'num' => 10)
end

it 'updates the hash' do
Resque::Plugins::Status::Hash.update(@uuid) { |h| h['num'] += 5; h['message'] = 'hello' }
hash = Resque::Plugins::Status::Hash.get(@uuid)
assert_equal 15, hash.num
assert_equal 'hello', hash.message
end

it 'is threadsafe' do
20.times.map { Thread.new { Resque::Plugins::Status::Hash.update(@uuid) { |h| h['num'] += 1 } } }.each(&:join)
assert_equal 30, Resque::Plugins::Status::Hash.get(@uuid).num
end
end

describe ".status_ids" do

before do
Expand Down