@@ -8,6 +8,10 @@ class ElasticBeanstalkSQSD
8
8
def initialize ( app )
9
9
@app = app
10
10
@logger = ::Rails . logger
11
+
12
+ return unless ENV [ 'AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC' ]
13
+
14
+ @executor = init_executor
11
15
end
12
16
13
17
def call ( env )
@@ -25,48 +29,108 @@ def call(env)
25
29
end
26
30
27
31
# Execute job or periodic task based on HTTP request context
28
- periodic_task? ( request ) ? execute_periodic_task ( request ) : execute_job ( request )
32
+ execute ( request )
33
+ end
34
+
35
+ def shutdown ( timeout = nil )
36
+ return unless @executor
37
+
38
+ @logger . info ( "Shutting down SQS EBS background job executor. Timeout: #{ timeout } " )
39
+ @executor . shutdown
40
+ clean_shutdown = @executor . wait_for_termination ( timeout )
41
+ @logger . info ( "SQS EBS background executor shutdown complete. Clean: #{ clean_shutdown } " )
29
42
end
30
43
31
44
private
32
45
46
+ def init_executor
47
+ threads = Integer ( ENV . fetch ( 'AWS_PROCESS_BEANSTALK_WORKER_THREADS' ,
48
+ Concurrent . available_processor_count || Concurrent . processor_count ) )
49
+ options = {
50
+ max_threads : threads ,
51
+ max_queue : 1 ,
52
+ auto_terminate : false , # register our own at_exit to gracefully shutdown
53
+ fallback_policy : :abort # Concurrent::RejectedExecutionError must be handled
54
+ }
55
+ at_exit { shutdown }
56
+
57
+ Concurrent ::ThreadPoolExecutor . new ( options )
58
+ end
59
+
60
+ def execute ( request )
61
+ if periodic_task? ( request )
62
+ execute_periodic_task ( request )
63
+ else
64
+ execute_job ( request )
65
+ end
66
+ end
67
+
33
68
def execute_job ( request )
69
+ if @executor
70
+ _execute_job_background ( request )
71
+ else
72
+ _execute_job_now ( request )
73
+ end
74
+ end
75
+
76
+ # Execute a job in the current thread
77
+ def _execute_job_now ( request )
34
78
# Jobs queued from the SQS adapter contain the JSON message in the request body.
35
79
job = ::ActiveSupport ::JSON . decode ( request . body . string )
36
80
job_name = job [ 'job_class' ]
37
81
@logger . debug ( "Executing job: #{ job_name } " )
38
- _execute_job ( job , job_name )
39
- [ 200 , { 'Content-Type' => 'text/plain' } , [ "Successfully ran job #{ job_name } ." ] ]
40
- rescue NameError
41
- internal_error_response
42
- end
43
-
44
- def _execute_job ( job , job_name )
45
82
::ActiveJob ::Base . execute ( job )
83
+ [ 200 , { 'Content-Type' => 'text/plain' } , [ "Successfully ran job #{ job_name } ." ] ]
46
84
rescue NameError => e
47
85
@logger . error ( "Job #{ job_name } could not resolve to a class that inherits from Active Job." )
48
86
@logger . error ( "Error: #{ e } " )
49
- raise e
87
+ internal_error_response
88
+ end
89
+
90
+ # Execute a job using the thread pool executor
91
+ def _execute_job_background ( request )
92
+ job_data = ::ActiveSupport ::JSON . decode ( request . body . string )
93
+ @logger . debug ( "Queuing background job: #{ job_data [ 'job_class' ] } " )
94
+ @executor . post ( job_data ) do |job |
95
+ ::ActiveJob ::Base . execute ( job )
96
+ end
97
+ [ 200 , { 'Content-Type' => 'text/plain' } , [ "Successfully queued job #{ job_data [ 'job_class' ] } " ] ]
98
+ rescue Concurrent ::RejectedExecutionError
99
+ msg = 'No capacity to execute job.'
100
+ @logger . info ( msg )
101
+ [ 429 , { 'Content-Type' => 'text/plain' } , [ msg ] ]
50
102
end
51
103
52
104
def execute_periodic_task ( request )
53
105
# The beanstalk worker SQS Daemon will add the 'X-Aws-Sqsd-Taskname' for periodic tasks set in cron.yaml.
54
106
job_name = request . headers [ 'X-Aws-Sqsd-Taskname' ]
55
- @logger . debug ( "Creating and executing periodic task: #{ job_name } " )
56
- _execute_periodic_task ( job_name )
57
- [ 200 , { 'Content-Type' => 'text/plain' } , [ "Successfully ran periodic task #{ job_name } ." ] ]
58
- rescue NameError
59
- internal_error_response
60
- end
61
-
62
- def _execute_periodic_task ( job_name )
63
107
job = job_name . constantize . new
64
- job . perform_now
108
+ if @executor
109
+ _execute_periodic_task_background ( job )
110
+ else
111
+ _execute_periodic_task_now ( job )
112
+ end
65
113
rescue NameError => e
66
114
@logger . error ( "Periodic task #{ job_name } could not resolve to an Active Job class " \
67
115
'- check the cron name spelling and set the path as / in cron.yaml.' )
68
116
@logger . error ( "Error: #{ e } ." )
69
- raise e
117
+ internal_error_response
118
+ end
119
+
120
+ def _execute_periodic_task_now ( job )
121
+ @logger . debug ( "Executing periodic task: #{ job . class } " )
122
+ job . perform_now
123
+ [ 200 , { 'Content-Type' => 'text/plain' } , [ "Successfully ran periodic task #{ job . class } ." ] ]
124
+ end
125
+
126
+ def _execute_periodic_task_background ( job )
127
+ @logger . debug ( "Queuing bakground periodic task: #{ job . class } " )
128
+ @executor . post ( job , &:perform_now )
129
+ [ 200 , { 'Content-Type' => 'text/plain' } , [ "Successfully queued periodic task #{ job . class } " ] ]
130
+ rescue Concurrent ::RejectedExecutionError
131
+ msg = 'No capacity to execute periodic task.'
132
+ @logger . info ( msg )
133
+ [ 429 , { 'Content-Type' => 'text/plain' } , [ msg ] ]
70
134
end
71
135
72
136
def internal_error_response
0 commit comments