Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async job processing in EBS SQSD middleware #168

Merged
merged 12 commits into from
Dec 5, 2024
4 changes: 4 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ Metrics/BlockLength:
- 'spec/**/*.rb'

Metrics/MethodLength:
Max: 15
Exclude:
- 'spec/**/*.rb'

Metrics/ModuleLength:
Exclude:
- 'spec/**/*.rb'

Metrics/ClassLength:
Max: 150

Naming/FileName:
Exclude:
- 'lib/aws-sdk-rails.rb'
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Feature - Support async job processing in Elastic Beanstalk middleware. (#167)

5.0.0 (2024-11-21)
------------------

Expand Down
31 changes: 29 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,42 @@ the `AWS_PROCESS_BEANSTALK_WORKER_REQUESTS` environment variable to `true` in
the worker environment configuration. The
[SQS Daemon](https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html#worker-daemon)
running on the worker sends messages as a POST request to `http://localhost/`.
The aws-sdk-rails middleware will forward each request and parameters to their
The ElasticBeanstalkSQSD middleware will forward each request and parameters to their
appropriate jobs. The middleware will only process requests from the SQS daemon
and will pass on others and so will not interfere with other routes in your
application.

To protect against forgeries, daemon requests will only be processed if they
originate from localhost or the Docker host.

Periodic (scheduled) jobs are also supported with this approach. Elastic
#### Running Jobs Async
By default the ElasticBeanstalkSQSD middleware will process jobs synchronously
and will not complete the request until the job has finished executing. For
long running jobs (exceeding the configured nginix timeout on the worker) this
may cause timeouts and incomplete executions.

To run jobs asynchronously, set the `AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC`
environment variable to `true` in your worker environment. Jobs will be queued
in a ThreadPoolExecutor and the request will return a 200 OK immediately and the
SQS message will be deleted and the job will be executed in the background.

By default the executor will use the available processor count as the the
max_threads. You can configure the max threads for the executor by setting
the `AWS_PROCESS_BEANSTALK_WORKER_THREADS` environment variable.
mullermp marked this conversation as resolved.
Show resolved Hide resolved

When there is no additional capacity to execute a task, the middleware
returns a 429 (too many requests) response which will result in the
sqsd NOT deleting the message. The message will be retried again once its
visibility timeout is reached.

Periodic (scheduled) tasks will also be run asynchronously in the same way.
Elastic beanstalk queues a message for the periodic task and if there is
no capacity to execute the task, it will be retried again once the message's
visibility timeout is reached.

#### Periodic (scheduled) jobs
[Periodic (scheduled) tasks](https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html#worker-periodictasks)
are also supported with this approach. Elastic
Beanstalk workers support the addition of a `cron.yaml` file in the application
root to configure this. You can call your jobs from your controller actions
or if you name your cron job the same as your job class and set the URL to
Expand Down
102 changes: 83 additions & 19 deletions lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ class ElasticBeanstalkSQSD
def initialize(app)
@app = app
@logger = ::Rails.logger

return unless ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC']

@executor = init_executor
end

def call(env)
Expand All @@ -25,48 +29,108 @@ def call(env)
end

# Execute job or periodic task based on HTTP request context
periodic_task?(request) ? execute_periodic_task(request) : execute_job(request)
execute(request)
end

def shutdown(timeout = nil)
return unless @executor

@logger.info("Shutting down SQS EBS background job executor. Timeout: #{timeout}")
@executor.shutdown
clean_shutdown = @executor.wait_for_termination(timeout)
@logger.info("SQS EBS background executor shutdown complete. Clean: #{clean_shutdown}")
end

private

def init_executor
threads = Integer(ENV.fetch('AWS_PROCESS_BEANSTALK_WORKER_THREADS',
Concurrent.available_processor_count || Concurrent.processor_count))
options = {
max_threads: threads,
max_queue: 1,
auto_terminate: false, # register our own at_exit to gracefully shutdown
fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled
}
at_exit { shutdown }
mullermp marked this conversation as resolved.
Show resolved Hide resolved

Concurrent::ThreadPoolExecutor.new(options)
end

def execute(request)
if periodic_task?(request)
execute_periodic_task(request)
else
execute_job(request)
end
end

def execute_job(request)
if @executor
_execute_job_background(request)
else
_execute_job_now(request)
end
end

# Execute a job in the current thread
def _execute_job_now(request)
# Jobs queued from the SQS adapter contain the JSON message in the request body.
job = ::ActiveSupport::JSON.decode(request.body.string)
job_name = job['job_class']
@logger.debug("Executing job: #{job_name}")
_execute_job(job, job_name)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]]
rescue NameError
internal_error_response
end

def _execute_job(job, job_name)
::ActiveJob::Base.execute(job)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]]
rescue NameError => e
@logger.error("Job #{job_name} could not resolve to a class that inherits from Active Job.")
@logger.error("Error: #{e}")
raise e
internal_error_response
end

# Execute a job using the thread pool executor
def _execute_job_background(request)
job_data = ::ActiveSupport::JSON.decode(request.body.string)
@logger.debug("Queuing background job: #{job_data['job_class']}")
@executor.post(job_data) do |job|
::ActiveJob::Base.execute(job)
end
[200, { 'Content-Type' => 'text/plain' }, ["Successfully queued job #{job_data['job_class']}"]]
alextwoods marked this conversation as resolved.
Show resolved Hide resolved
rescue Concurrent::RejectedExecutionError
msg = 'No capacity to execute job.'
@logger.info(msg)
[429, { 'Content-Type' => 'text/plain' }, [msg]]
end

def execute_periodic_task(request)
# The beanstalk worker SQS Daemon will add the 'X-Aws-Sqsd-Taskname' for periodic tasks set in cron.yaml.
job_name = request.headers['X-Aws-Sqsd-Taskname']
@logger.debug("Creating and executing periodic task: #{job_name}")
_execute_periodic_task(job_name)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job_name}."]]
rescue NameError
internal_error_response
end

def _execute_periodic_task(job_name)
job = job_name.constantize.new
job.perform_now
if @executor
_execute_periodic_task_background(job)
else
_execute_periodic_task_now(job)
end
rescue NameError => e
@logger.error("Periodic task #{job_name} could not resolve to an Active Job class " \
'- check the cron name spelling and set the path as / in cron.yaml.')
@logger.error("Error: #{e}.")
raise e
internal_error_response
end

def _execute_periodic_task_now(job)
@logger.debug("Executing periodic task: #{job.class}")
job.perform_now
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job.class}."]]
end

def _execute_periodic_task_background(job)
@logger.debug("Queuing bakground periodic task: #{job.class}")
@executor.post(job, &:perform_now)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully queued periodic task #{job.class}"]]
rescue Concurrent::RejectedExecutionError
msg = 'No capacity to execute periodic task.'
@logger.info(msg)
[429, { 'Content-Type' => 'text/plain' }, [msg]]
end

def internal_error_response
Expand Down
44 changes: 44 additions & 0 deletions spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,50 @@ module Middleware
include_examples 'is valid in either cgroup1 or cgroup2'
end

context 'when AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC' do
before(:each) do
ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC'] = 'true'
end

after(:each) do
ENV.delete('AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC')
end

it 'queues job' do
expect_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post)
expect(response[0]).to eq(200)
expect(response[2]).to eq(['Successfully queued job ElasticBeanstalkJob'])
end

context 'no capacity' do
it 'returns too many requests error' do
allow_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post)
.and_raise Concurrent::RejectedExecutionError

expect(response[0]).to eq(429)
end
end

context 'periodic task' do
let(:is_periodic_task) { true }

it 'queues job' do
expect_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post)
expect(response[0]).to eq(200)
expect(response[2]).to eq(['Successfully queued periodic task ElasticBeanstalkPeriodicTask'])
end

context 'no capacity' do
it 'returns too many requests error' do
allow_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post)
.and_raise Concurrent::RejectedExecutionError

expect(response[0]).to eq(429)
end
end
end
end

def stub_runs_in_neither_docker_container
proc_1_cgroup = <<~CONTENT
0::/
Expand Down
2 changes: 1 addition & 1 deletion tasks/release
Loading