Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async job processing in EBS SQSD middleware #168

Merged
merged 12 commits into from
Dec 5, 2024
Merged
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ jobs:
with:
ruby-version: ${{ matrix.ruby }}
bundler-cache: true
bundler: latest
mullermp marked this conversation as resolved.
Show resolved Hide resolved

- name: Test
run: bundle exec rake spec
Expand Down
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Metrics/ModuleLength:
Exclude:
- 'spec/**/*.rb'

Metrics/ClassLength:
Max: 150

Naming/FileName:
Exclude:
- 'lib/aws-sdk-rails.rb'
Expand Down
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,35 @@ the `AWS_PROCESS_BEANSTALK_WORKER_REQUESTS` environment variable to `true` in
the worker environment configuration. The
[SQS Daemon](https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html#worker-daemon)
running on the worker sends messages as a POST request to `http://localhost/`.
The aws-sdk-rails middleware will forward each request and parameters to their
The ElasticBeanstalkSQSD middleware will forward each request and parameters to their
appropriate jobs. The middleware will only process requests from the SQS daemon
and will pass on others and so will not interfere with other routes in your
application.

To protect against forgeries, daemon requests will only be processed if they
originate from localhost or the Docker host.

#### Running Jobs Async
By default the ElasticBeanstalkSQSD middleware will process jobs synchronously
and will not complete the request until the job has finished executing. For
long running jobs (exceeding the configured nginix timeout on the worker) this
may cause timeouts and incomplete executions.

To run jobs asynchronously, set the `AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC`
environment variable to `true` in your worker environment. Jobs will be queued
in a ThreadPoolExecutor and the request will return a 200 OK immediately and the
SQS message will be deleted and the job will be executed in the background.

By default the executor will use the available processor count as the the
max_threads. You can configure the max threads for the executor by setting
the `AWS_PROCESS_BEANSTALK_WORKER_THREADS` environment variable.
mullermp marked this conversation as resolved.
Show resolved Hide resolved

When there is no additional capacity to execute a task, the middleware
returns a 429 (too many requests) response which will result in the
sqsd NOT deleting the message. The mesagge will be retried again once its
visibility timeout is reached.

#### Periodic (scheduled) jobs
Periodic (scheduled) jobs are also supported with this approach. Elastic
Beanstalk workers support the addition of a `cron.yaml` file in the application
root to configure this. You can call your jobs from your controller actions
Expand Down
57 changes: 49 additions & 8 deletions lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class ElasticBeanstalkSQSD
def initialize(app)
@app = app
@logger = ::Rails.logger

init_executor if ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC']
mullermp marked this conversation as resolved.
Show resolved Hide resolved
end

def call(env)
Expand All @@ -28,25 +30,64 @@ def call(env)
periodic_task?(request) ? execute_periodic_task(request) : execute_job(request)
alextwoods marked this conversation as resolved.
Show resolved Hide resolved
end

def shutdown(timeout = nil)
return unless @executor

@logger.info("Shutting down SQS EBS background job executor. Timeout: #{timeout}")
@executor.shutdown
clean_shutdown = @executor.wait_for_termination(timeout)
@logger.info("SQS EBS background executor shutdown complete. Clean: #{clean_shutdown}")
end

private

def init_executor
threads = Integer(ENV.fetch('AWS_PROCESS_BEANSTALK_WORKER_THREADS',
Concurrent.available_processor_count || Concurrent.processor_count))
puts "Running with threads: #{threads}"
options = {
max_threads: threads,
max_queue: 1,
fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled
}
@executor = Concurrent::ThreadPoolExecutor.new(options)
at_exit { shutdown }
mullermp marked this conversation as resolved.
Show resolved Hide resolved
end

def execute_job(request)
if @executor
_execute_job_parallel(request)
else
_execute_job_now(request)
end
end

# Execute a job in the current thread
def _execute_job_now(request)
# Jobs queued from the SQS adapter contain the JSON message in the request body.
job = ::ActiveSupport::JSON.decode(request.body.string)
job_name = job['job_class']
@logger.debug("Executing job: #{job_name}")
_execute_job(job, job_name)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]]
rescue NameError
internal_error_response
end

def _execute_job(job, job_name)
::ActiveJob::Base.execute(job)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]]
rescue NameError => e
@logger.error("Job #{job_name} could not resolve to a class that inherits from Active Job.")
@logger.error("Error: #{e}")
raise e
internal_error_response
end

# Execute a job using the thread pool executor
def _execute_job_parallel(request)
job_data = ::ActiveSupport::JSON.decode(request.body.string)
@executor.post(job_data) do |job|
@logger.debug("Executing job [in thread]: #{job['job_class']}")
::ActiveJob::Base.execute(job)
end
[200, { 'Content-Type' => 'text/plain' }, ["Successfully queued job #{job_data['job_class']}"]]
alextwoods marked this conversation as resolved.
Show resolved Hide resolved
rescue Concurrent::RejectedExecutionError
msg = 'No capacity to execute job.'
@logger.info(msg)
[429, { 'Content-Type' => 'text/plain' }, [msg]]
end

def execute_periodic_task(request)
Expand Down
25 changes: 25 additions & 0 deletions spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,31 @@ module Middleware
include_examples 'is valid in either cgroup1 or cgroup2'
end

context 'when AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC' do
before(:each) do
ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC'] = 'true'
end

after(:each) do
ENV.delete('AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC')
end

it 'queues job' do
expect_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post)
expect(response[0]).to eq(200)
expect(response[2]).to eq(['Successfully queued job ElasticBeanstalkJob'])
end

context 'no capacity' do
it 'returns too many requests error' do
allow_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post)
.and_raise Concurrent::RejectedExecutionError

expect(response[0]).to eq(429)
end
end
end

def stub_runs_in_neither_docker_container
proc_1_cgroup = <<~CONTENT
0::/
Expand Down
2 changes: 1 addition & 1 deletion tasks/release
Loading