Skip to content

Commit

Permalink
Merge pull request huginn#3478 from huginn/delay_agent_fix_race_condi…
Browse files Browse the repository at this point in the history
…tion

Fix race condition in DelayAgent
  • Loading branch information
knu authored Dec 30, 2024
2 parents ea040bd + c7c1980 commit a7d9279
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 21 deletions.
2 changes: 1 addition & 1 deletion app/models/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def credential(name)
end
end

def reload
def reload(...)
@credential_cache = {}
super
end
Expand Down
54 changes: 34 additions & 20 deletions app/models/agents/delay_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,41 +71,55 @@ def working?
end

def receive(incoming_events)
incoming_events.each do |event|
event_ids = memory['event_ids'] || []
event_ids << event.id
if event_ids.length > interpolated['max_events'].to_i
if options['keep'] == 'newest'
event_ids.shift
else
event_ids.pop
save!

with_lock do
incoming_events.each do |event|
event_ids = memory['event_ids'] || []
event_ids << event.id
if event_ids.length > interpolated['max_events'].to_i
if options['keep'] == 'newest'
event_ids.shift
else
event_ids.pop
end
end
memory['event_ids'] = event_ids
end
memory['event_ids'] = event_ids
end
end

def check
return if memory['event_ids'].blank?
private def extract_emitted_events!
save!

events = received_events.where(id: memory['event_ids']).reorder(:id).to_a
with_lock do
emitted_events = received_events.where(id: memory['event_ids']).reorder(:id).to_a

if interpolated[SortableEvents::EVENTS_ORDER_KEY].present?
events = sort_events(events)
end
if interpolated[SortableEvents::EVENTS_ORDER_KEY].present?
emitted_events = sort_events(emitted_events)
end

max_emitted_events = interpolated['max_emitted_events'].presence&.to_i

max_emitted_events = interpolated['max_emitted_events'].presence&.to_i
if max_emitted_events&.< emitted_events.length
emitted_events[max_emitted_events..] = []
end

if max_emitted_events&.< events.length
events[max_emitted_events..] = []
memory['event_ids'] -= emitted_events.map(&:id)
save!

emitted_events
end
end

def check
return if memory['event_ids'].blank?

interval = (options['emit_interval'].presence&.to_f || 0).clamp(0..)

events.each_with_index do |event, i|
extract_emitted_events!.each_with_index do |event, i|
sleep interval unless i.zero?
create_event payload: event.payload
memory['event_ids'].delete(event.id)
end
end
end
Expand Down

0 comments on commit a7d9279

Please sign in to comment.