Skip to content

Commit

Permalink
Fix race condition in DelayAgent by using record locking
Browse files Browse the repository at this point in the history
Problems can arise if a DelayAgent is triggered while a previous
invocation is still in progress.  This is more likely to occur when
the execution interval is short and the `emit_interval` option is
configured to add more delay between events.
  • Loading branch information
knu committed Dec 30, 2024
1 parent 707ebc8 commit c7c1980
Showing 1 changed file with 34 additions and 20 deletions.
54 changes: 34 additions & 20 deletions app/models/agents/delay_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,41 +71,55 @@ def working?
end

def receive(incoming_events)
incoming_events.each do |event|
event_ids = memory['event_ids'] || []
event_ids << event.id
if event_ids.length > interpolated['max_events'].to_i
if options['keep'] == 'newest'
event_ids.shift
else
event_ids.pop
save!

with_lock do
incoming_events.each do |event|
event_ids = memory['event_ids'] || []
event_ids << event.id
if event_ids.length > interpolated['max_events'].to_i
if options['keep'] == 'newest'
event_ids.shift
else
event_ids.pop
end
end
memory['event_ids'] = event_ids
end
memory['event_ids'] = event_ids
end
end

def check
return if memory['event_ids'].blank?
private def extract_emitted_events!
save!

events = received_events.where(id: memory['event_ids']).reorder(:id).to_a
with_lock do
emitted_events = received_events.where(id: memory['event_ids']).reorder(:id).to_a

if interpolated[SortableEvents::EVENTS_ORDER_KEY].present?
events = sort_events(events)
end
if interpolated[SortableEvents::EVENTS_ORDER_KEY].present?
emitted_events = sort_events(emitted_events)
end

max_emitted_events = interpolated['max_emitted_events'].presence&.to_i

max_emitted_events = interpolated['max_emitted_events'].presence&.to_i
if max_emitted_events&.< emitted_events.length
emitted_events[max_emitted_events..] = []
end

if max_emitted_events&.< events.length
events[max_emitted_events..] = []
memory['event_ids'] -= emitted_events.map(&:id)
save!

emitted_events
end
end

def check
return if memory['event_ids'].blank?

interval = (options['emit_interval'].presence&.to_f || 0).clamp(0..)

events.each_with_index do |event, i|
extract_emitted_events!.each_with_index do |event, i|
sleep interval unless i.zero?
create_event payload: event.payload
memory['event_ids'].delete(event.id)
end
end
end
Expand Down

0 comments on commit c7c1980

Please sign in to comment.