Skip to content

Commit 4b14065

Browse files
committed
added init, debugged drain function, edited docstrings
1 parent 66beec3 commit 4b14065

File tree

3 files changed

+13
-11
lines changed

3 files changed

+13
-11
lines changed

worker/__init__.py

Whitespace-only changes.

worker/queue_wrapper.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
Worker Queue
2+
Worker Queue.
33
"""
44

55
import multiprocessing.managers
@@ -21,7 +21,7 @@ def __init__(self, mp_manager: multiprocessing.managers.SyncManager, max_size: i
2121

2222
def fill_queue_with_sentinel(self, timeout: float = 0.0) -> None:
2323
"""
24-
Fills the queue with sentinel (None)
24+
Fills the queue with sentinel (None).
2525
"""
2626
if timeout <= 0.0:
2727
timeout = self.__QUEUE_TIMEOUT
@@ -35,19 +35,21 @@ def fill_queue_with_sentinel(self, timeout: float = 0.0) -> None:
3535

3636
def drain_queue(self, timeout: float = 0.0) -> None:
3737
"""
38-
Drains the queue
38+
Drains the queue.
3939
"""
4040
if timeout <= 0.0:
4141
timeout = self.__QUEUE_TIMEOUT
4242

4343
try:
4444
self.queue.get(timeout=timeout)
45+
for _ in range(1, self.max_size):
46+
self.queue.get(timeout=timeout)
4547
except queue.Empty:
4648
return
4749

4850
def fill_and_drain_queue(self) -> None:
4951
"""
50-
Fills the queue with sentinel and then drains it
52+
Fills the queue with sentinel and then drains it.
5153
"""
5254
self.fill_queue_with_sentinel()
5355
time.sleep(self.__QUEUE_DELAY)

worker/worker_controller.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,53 +15,53 @@ class WorkerController:
1515

1616
def __init__(self) -> None:
1717
"""
18-
Initializes semaphore and queue
18+
Initializes semaphore and queue.
1919
"""
2020
self.__process_limiter = mp.BoundedSemaphore(1)
2121
self.__is_paused = False
2222
self.__exit_queue = mp.Queue(1)
2323

2424
def request_pause(self) -> None:
2525
"""
26-
Request a worker to pause its process
26+
Request a worker to pause its process.
2727
"""
2828
if not self.__is_paused:
2929
self.__process_limiter.acquire()
3030
self.__is_paused = True
3131

3232
def request_resume(self) -> None:
3333
"""
34-
Request a worker to resume its process
34+
Request a worker to resume its process.
3535
"""
3636
if self.__is_paused:
3737
self.__process_limiter.release()
3838
self.__is_paused = False
3939

4040
def check_pause(self) -> None:
4141
"""
42-
If pause requested by main, worker is blocked, otherwise worker continues
42+
If pause requested by main, worker is blocked, otherwise worker continues.
4343
"""
4444
self.__process_limiter.acquire()
4545
self.__process_limiter.release()
4646

4747
def request_exit(self) -> None:
4848
"""
49-
Requests worker to exit its process
49+
Requests worker to exit its process.
5050
"""
5151
time.sleep(self.__QUEUE_DELAY)
5252
if not self.__exit_queue.empty():
5353
self.__exit_queue.put(None)
5454

5555
def clear_exit(self) -> None:
5656
"""
57-
Clears exit request
57+
Clears exit request.
5858
"""
5959
time.sleep(self.__QUEUE_DELAY)
6060
if not self.__exit_queue.empty():
6161
_ = self.__exit_queue.get()
6262

6363
def is_exit_requested(self) -> bool:
6464
"""
65-
Returns whether main has requested a worker to exit
65+
Returns whether main has requested a worker to exit.
6666
"""
6767
return not self.__exit_queue.empty()

0 commit comments

Comments
 (0)