Skip to content

Commit

Permalink
Restart worker patch (#237)
Browse files Browse the repository at this point in the history
* restart worker patch with fixed commit history

* fixed black error for CI/CD

* fixed pylint error for CI/CD

* fixed pylint error for CI/CD

* added submodules

* added data checking to communication worker

* fixed code that was commented on during review
  • Loading branch information
Adityya-K authored Jan 22, 2025
1 parent ff42e24 commit def8f2d
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 4 deletions.
13 changes: 13 additions & 0 deletions modules/cluster_estimation/cluster_estimation_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pathlib

from modules import detection_in_world
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import cluster_estimation
Expand Down Expand Up @@ -76,6 +77,18 @@ def cluster_estimation_worker(

input_data = input_queue.queue.get()
if input_data is None:
local_logger.info("Recieved type None, exiting.")
break

is_invalid = False

for single_input in input_data:
if not isinstance(single_input, detection_in_world.DetectionInWorld):
local_logger.warning(f"Skipping unexpected input: {input}")
is_invalid = True
break

if is_invalid:
continue

# TODO: When to override
Expand Down
20 changes: 19 additions & 1 deletion modules/communications/communications_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pathlib
import queue

from modules import object_in_world
from . import communications
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
Expand Down Expand Up @@ -59,7 +60,24 @@ def communications_worker(
while not controller.is_exit_requested():
controller.check_pause()

result, value = comm.run(input_queue.queue.get())
input_data = input_queue.queue.get()

if input_data is None:
local_logger.info("Recieved type None, exiting.")
break

is_invalid = False

for single_input in input_data:
if not isinstance(single_input, object_in_world.ObjectInWorld):
local_logger.warning(f"Skipping unexpected input: {input}")
is_invalid = True
break

if is_invalid:
continue

result, value = comm.run(input_data)
if not result:
continue

Expand Down
6 changes: 6 additions & 0 deletions modules/detect_target/detect_target_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pathlib

from modules import image_and_time
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import detect_target_factory
Expand Down Expand Up @@ -64,8 +65,13 @@ def detect_target_worker(

input_data = input_queue.queue.get()
if input_data is None:
local_logger.info("Recieved type None, exiting.")
break

if not isinstance(input_data, image_and_time.ImageAndTime):
local_logger.warning(f"Skipping unexpected input: {input}")
continue

result, value = detector.run(input_data)
if not result:
continue
Expand Down
6 changes: 6 additions & 0 deletions modules/geolocation/geolocation_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pathlib

from modules import merged_odometry_detections
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import camera_properties
Expand Down Expand Up @@ -55,8 +56,13 @@ def geolocation_worker(

input_data = input_queue.queue.get()
if input_data is None:
local_logger.info("Recieved type None, exiting.")
break

if not isinstance(input_data, merged_odometry_detections.MergedOdometryDetections):
local_logger.warning(f"Skipping unexpected input: {input}")
continue

result, value = locator.run(input_data)
if not result:
continue
Expand Down
6 changes: 3 additions & 3 deletions utilities/workers/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ def check_and_restart_dead_workers(self) -> bool:
# Draining the preceding queue ensures that the preceding queue data wasn't what
# caused the worker to fail. Draining the succeeding queues is not needed
# because a worker that died would not have put bad data into the queue.
input_queues = self.__worker_properties.get_input_queues()
for queue in input_queues:
queue.drain_queue()
# input_queues = self.__worker_properties.get_input_queues()
# for queue in input_queues:
# queue.drain_queue()

return True

0 comments on commit def8f2d

Please sign in to comment.