From 4ce669f1c848d8a5f91d7ce2d3f0e979100b3019 Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Tue, 3 Sep 2024 20:32:54 -0400 Subject: [PATCH 1/2] fix for od_calibration if _below_ min voltage, and the only root is negative --- pioreactor/background_jobs/od_reading.py | 80 ++++++++++++++++-------- pioreactor/tests/test_od_reading.py | 63 ++++++++++++++++--- 2 files changed, 111 insertions(+), 32 deletions(-) diff --git a/pioreactor/background_jobs/od_reading.py b/pioreactor/background_jobs/od_reading.py index 252d4c5d..b5d43211 100644 --- a/pioreactor/background_jobs/od_reading.py +++ b/pioreactor/background_jobs/od_reading.py @@ -672,6 +672,29 @@ def hydate_models_from_disk(self, channel_angle_map: dict[pt.PdChannel, pt.PdAng return +def closest_point_to_domain(P, D): + # Unpack the domain D into its lower and upper bounds + a, b = D + + # Initialize the closest point and minimum distance + closest_point = None + min_distance = float("inf") + + for p in P: + if a <= p <= b: # Check if p is within the domain D + return p # If p is within D, it's the closest point with distance 0 + + # Calculate the distance to the closest boundary of D + distance = min(abs(p - a), abs(p - b)) + + # Update the closest point if this distance is smaller than the current min_distance + if distance < min_distance: + min_distance = distance + closest_point = p + + return closest_point + + class CachedCalibrationTransformer(CalibrationTransformer): def __init__(self) -> None: super().__init__() @@ -719,7 +742,7 @@ def _hydrate_model(self, calibration_data: structs.ODCalibration) -> Callable[[f this procedure effectively ignores it. """ - from numpy import roots, zeros_like, real + from numpy import roots, zeros_like, real, imag def calibration(observed_voltage: pt.Voltage) -> pt.OD: poly = calibration_data.curve_data_ @@ -733,34 +756,41 @@ def calibration(observed_voltage: pt.Voltage) -> pt.OD: coef_shift[-1] = observed_voltage solve_for_poly = poly - coef_shift roots_ = roots(solve_for_poly) - plausible_roots_ = sorted( - [real(r) for r in roots_ if (r.imag == 0) and (min_OD <= real(r) <= max_OD)] - ) + plausible_ODs_ = sorted([real(r) for r in roots_ if (imag(r) == 0)]) - try: - # Q: when do I pick the second root? (for unimodal calibration curves) - ideal_root = float(plausible_roots_[0]) - return ideal_root - - except IndexError: + if len(plausible_ODs_) == 0: if observed_voltage <= min_voltage: - # voltage less than the blank recorded during the calibration and the calibration curve doesn't have solutions (ex even-deg poly) - # this isn't great, as there is nil noise in the signal. - - if not self.has_logged_warning: - self.logger.warning( - f"Signal below suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V." - ) - self.has_logged_warning = True return min_OD - - else: - if not self.has_logged_warning: - self.logger.warning( - f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V." - ) - self.has_logged_warning = True + elif observed_voltage > max_voltage: return max_OD + else: + raise ValueError( + f"Can't compute calibrated data from curve {poly} and point {observed_voltage}" + ) + + # find the closest root to our OD domain + ideal_OD = float(closest_point_to_domain(plausible_ODs_, [min_OD, max_OD])) + + if ideal_OD < min_OD: + # voltage less than the blank recorded during the calibration and the calibration curve doesn't have solutions (ex even-deg poly) + # this isn't great, as there is nil noise in the signal. + + if not self.has_logged_warning: + self.logger.warning( + f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V." + ) + self.has_logged_warning = True + return min_OD + + elif ideal_OD > max_OD: + if not self.has_logged_warning: + self.logger.warning( + f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V." + ) + self.has_logged_warning = True + return max_OD + else: + return ideal_OD else: diff --git a/pioreactor/tests/test_od_reading.py b/pioreactor/tests/test_od_reading.py index d45d5038..f95bae43 100644 --- a/pioreactor/tests/test_od_reading.py +++ b/pioreactor/tests/test_od_reading.py @@ -620,8 +620,8 @@ def test_calibration_not_present() -> None: assert len(od.calibration_transformer.models) == 0 -def test_calibration_simple_linear_calibration() -> None: - experiment = "test_calibration_simple_linear_calibration" +def test_calibration_simple_linear_calibration_positive_slope() -> None: + experiment = "test_calibration_simple_linear_calibration_positive_slope" with local_persistant_storage("current_od_calibration") as c: c["90"] = encode( @@ -671,7 +671,7 @@ def test_calibration_simple_linear_calibration() -> None: pause() pause() pause() - assert "suggested" in bucket[0]["message"] + assert "Signal outside" in bucket[0]["message"] with local_persistant_storage("current_od_calibration") as c: del c["90"] @@ -679,7 +679,7 @@ def test_calibration_simple_linear_calibration() -> None: def test_calibration_simple_linear_calibration_negative_slope() -> None: experiment = "test_calibration_simple_linear_calibration_negative_slope" - + maximum_voltage = 2.0 with local_persistant_storage("current_od_calibration") as c: c["90"] = encode( structs.OD90Calibration( @@ -692,7 +692,7 @@ def test_calibration_simple_linear_calibration_negative_slope() -> None: ir_led_intensity=90.0, angle="90", minimum_voltage=0.0, - maximum_voltage=2.0, + maximum_voltage=maximum_voltage, voltages=[], od600s=[], pd_channel="2", @@ -713,13 +713,14 @@ def test_calibration_simple_linear_calibration_negative_slope() -> None: with collect_all_logs_of_level("warning", unit=get_unit_name(), experiment="+") as bucket: voltage = 12.0 - assert voltage > 2.0 + assert voltage > maximum_voltage pause() - assert od.calibration_transformer.models["2"](voltage) == 20.0 + assert od.calibration_transformer.models["2"](voltage) == 0.0 pause() pause() assert "suggested" in bucket[0]["message"] + with local_persistant_storage("current_od_calibration") as c: del c["90"] @@ -861,6 +862,54 @@ def test_calibration_errors_when_pd_channel_differs() -> None: del c["90"] +def test_calibration_with_irl_data1() -> None: + with local_persistant_storage("current_od_calibration") as c: + c["90"] = encode( + structs.OD90Calibration( + created_at=current_utc_datetime(), + curve_type="poly", + curve_data_=[ + 0.13015369282405273, + -0.49893265063642067, + 0.6953041334198933, + 0.45652927538964966, + 0.0024870149666305712, + ], + name="quad_test", + maximum_od600=1.131, + minimum_od600=0.0, + ir_led_intensity=70.0, + angle="90", + minimum_voltage=0.001996680972202709, + maximum_voltage=0.8995772568778957, + voltages=[ + 0.030373011520747333, + 0.0678711757682291, + 0.12972798681328354, + 0.2663836655898364, + 0.4248479170421593, + 0.5921451667865667, + 0.8995772568778957, + 0.001996680972202709, + ], + od600s=[0.042, 0.108, 0.237, 0.392, 0.585, 0.781, 1.131, 0.0], + pd_channel="2", + pioreactor_unit=get_unit_name(), + ) + ) + + cc = CachedCalibrationTransformer() + cc.hydate_models_from_disk({"2": "90"}) + assert cc({"2": 0.001})["2"] == 0 + assert cc({"2": 0.002})["2"] == 0 + assert cc({"2": 0.004})["2"] == 0.0032975807375385234 + assert cc({"2": 0.020})["2"] == 0.03639585015289039 + assert cc({"2": 1.0})["2"] == 1.131 + + with local_persistant_storage("current_od_calibration") as c: + del c["90"] + + def test_PhotodiodeIrLedReferenceTrackerStaticInit() -> None: tracker = PhotodiodeIrLedReferenceTrackerStaticInit(channel="1") From 493f80e6c23921f6b66194aad1c6f45344ae574c Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Tue, 3 Sep 2024 20:36:06 -0400 Subject: [PATCH 2/2] introduce the watchdog 'fix' too --- pioreactor/background_jobs/leader/watchdog.py | 51 +------------------ 1 file changed, 2 insertions(+), 49 deletions(-) diff --git a/pioreactor/background_jobs/leader/watchdog.py b/pioreactor/background_jobs/leader/watchdog.py index ac96d2f6..2a57f23a 100644 --- a/pioreactor/background_jobs/leader/watchdog.py +++ b/pioreactor/background_jobs/leader/watchdog.py @@ -46,62 +46,15 @@ def announce_new_workers(self) -> None: ) def watch_for_lost_state(self, state_message: MQTTMessage) -> None: - # generally, I hate this code below... - unit = state_message.topic.split("/")[1] - # don't check workers that aren't part of the cluster - current_workers = get_workers_in_inventory() - # ignore if leader is "lost" if ( (state_message.payload.decode() == self.LOST) and (unit != self.unit) - and (unit in current_workers) + and (unit in get_workers_in_inventory()) ): - # TODO: this song-and-dance works for monitor, why not extend it to other jobs... - - self.logger.warning(f"{unit} seems to be lost. Trying to re-establish connection...") - time.sleep(5) - - if self.state != self.READY: - # when the entire Rpi shuts down, ex via sudo reboot, monitor can publish a lost. This code will halt the shutdown. - # let's return early. - return - - # this is a hack! If the monitor job is in state READY, it will no op any transition. - # so we set to sleeping for a second, and the back to ready. - self.pub_client.publish( - f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/monitor/$state/set", self.SLEEPING - ) - time.sleep(1) - self.pub_client.publish( - f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/monitor/$state/set", self.READY - ) - ### - time.sleep(10) - - if self.state != self.READY: - # when the entire Rpi shuts down, ex via sudo reboot, monitor can publish a lost. This code will halt the shutdown. - # let's return early. - return - - msg = subscribe( # I don't think this can be self.sub_client because we are in a callback. - f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/monitor/$state", - timeout=15, - name=self.job_name, - retries=1, - ) - if msg is None: - return - - current_state = msg.payload.decode() - - if current_state == self.LOST: - # failed, let's confirm to user - self.logger.error(f"{unit} was lost.") - else: - self.logger.info(f"Update: {unit} is connected. All is well.") + self.logger.warning(f"{unit} seems to be lost.") def start_passive_listeners(self) -> None: self.subscribe_and_callback(