Skip to content

Commit

Permalink
introduce the watchdog 'fix' too
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Sep 4, 2024
1 parent 4ce669f commit 493f80e
Showing 1 changed file with 2 additions and 49 deletions.
51 changes: 2 additions & 49 deletions pioreactor/background_jobs/leader/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,62 +46,15 @@ def announce_new_workers(self) -> None:
)

def watch_for_lost_state(self, state_message: MQTTMessage) -> None:
# generally, I hate this code below...

unit = state_message.topic.split("/")[1]

# don't check workers that aren't part of the cluster
current_workers = get_workers_in_inventory()

# ignore if leader is "lost"
if (
(state_message.payload.decode() == self.LOST)
and (unit != self.unit)
and (unit in current_workers)
and (unit in get_workers_in_inventory())
):
# TODO: this song-and-dance works for monitor, why not extend it to other jobs...

self.logger.warning(f"{unit} seems to be lost. Trying to re-establish connection...")
time.sleep(5)

if self.state != self.READY:
# when the entire Rpi shuts down, ex via sudo reboot, monitor can publish a lost. This code will halt the shutdown.
# let's return early.
return

# this is a hack! If the monitor job is in state READY, it will no op any transition.
# so we set to sleeping for a second, and the back to ready.
self.pub_client.publish(
f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/monitor/$state/set", self.SLEEPING
)
time.sleep(1)
self.pub_client.publish(
f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/monitor/$state/set", self.READY
)
###
time.sleep(10)

if self.state != self.READY:
# when the entire Rpi shuts down, ex via sudo reboot, monitor can publish a lost. This code will halt the shutdown.
# let's return early.
return

msg = subscribe( # I don't think this can be self.sub_client because we are in a callback.
f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/monitor/$state",
timeout=15,
name=self.job_name,
retries=1,
)
if msg is None:
return

current_state = msg.payload.decode()

if current_state == self.LOST:
# failed, let's confirm to user
self.logger.error(f"{unit} was lost.")
else:
self.logger.info(f"Update: {unit} is connected. All is well.")
self.logger.warning(f"{unit} seems to be lost.")

def start_passive_listeners(self) -> None:
self.subscribe_and_callback(
Expand Down

0 comments on commit 493f80e

Please sign in to comment.