Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions networking_nsxv3/common/synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,51 @@ def __ne__(self, other):
return (not self.__eq__(other))

def __lt__(self, other):
""" Order Runnable by their priority """
""" Order Runnable by their priority
Only the passive queue is ordered by priority.
The active queue is FiFo.
"""
return self.priority < other.priority

def __hash__(self):
return hash(self.__repr__())

class UniqFiFoQueue(eventlet.queue.Queue):
"""
A subclass of :class:`Queue` that maintains job order by insertion.

Problem with the old approach:
Jobs may starve in the active queue.

- The internal sync run adds up to 20 "outdated" objects to the passive queue.
- If space is available, jobs move from the passive to the active queue.
- The active queue prioritizes jobs based on priority while enforcing uniqueness
(determined by OpenStack ID and execution method).

Issues:
1. Job starvation – If high-priority jobs keep arriving and the agent is at full capacity,
low-priority jobs may never get processed.
2. Blocking of new high-priority jobs – Lower-priority jobs in the passive queue
can prevent new high-priority jobs from being added due to the uniqueness constraint
(see the `__eq__` method of the `Runnable` class).
3. Adding a job with the **HIGHEST** priority does not
guarantee execution in insertion order. See
https://docs.python.org/3/library/heapq.html#priority-queue-implementation-notes
for details.
"""


def _init(self, maxsize):
self.queue = collections.deque()

def _put(self, item):
if item not in self.queue:
# Add item to the right side of the deque
self.queue.append(item)
self._put_bookkeeping()

def _get(self):
return self.queue.popleft()

class UniqPriorityQueue(eventlet.queue.Queue):

Expand Down Expand Up @@ -129,7 +168,7 @@ class Runner(object):
def __init__(self, active_size=INFINITY, passive_size=INFINITY,
workers_size=1):
# if queue_size is < 0, the queue size is infinite.
self._active = UniqPriorityQueue(maxsize=active_size)
self._active = UniqFiFoQueue(active_size)
self._passive = UniqPriorityQueue(maxsize=passive_size)
self._workers = eventlet.greenpool.GreenPool(size=workers_size)
self._idle = workers_size
Expand Down
Loading