diff --git a/pioreactor/background_jobs/od_reading.py b/pioreactor/background_jobs/od_reading.py index 252d4c5d..b5d43211 100644 --- a/pioreactor/background_jobs/od_reading.py +++ b/pioreactor/background_jobs/od_reading.py @@ -672,6 +672,29 @@ def hydate_models_from_disk(self, channel_angle_map: dict[pt.PdChannel, pt.PdAng return +def closest_point_to_domain(P, D): + # Unpack the domain D into its lower and upper bounds + a, b = D + + # Initialize the closest point and minimum distance + closest_point = None + min_distance = float("inf") + + for p in P: + if a <= p <= b: # Check if p is within the domain D + return p # If p is within D, it's the closest point with distance 0 + + # Calculate the distance to the closest boundary of D + distance = min(abs(p - a), abs(p - b)) + + # Update the closest point if this distance is smaller than the current min_distance + if distance < min_distance: + min_distance = distance + closest_point = p + + return closest_point + + class CachedCalibrationTransformer(CalibrationTransformer): def __init__(self) -> None: super().__init__() @@ -719,7 +742,7 @@ def _hydrate_model(self, calibration_data: structs.ODCalibration) -> Callable[[f this procedure effectively ignores it. """ - from numpy import roots, zeros_like, real + from numpy import roots, zeros_like, real, imag def calibration(observed_voltage: pt.Voltage) -> pt.OD: poly = calibration_data.curve_data_ @@ -733,34 +756,41 @@ def calibration(observed_voltage: pt.Voltage) -> pt.OD: coef_shift[-1] = observed_voltage solve_for_poly = poly - coef_shift roots_ = roots(solve_for_poly) - plausible_roots_ = sorted( - [real(r) for r in roots_ if (r.imag == 0) and (min_OD <= real(r) <= max_OD)] - ) + plausible_ODs_ = sorted([real(r) for r in roots_ if (imag(r) == 0)]) - try: - # Q: when do I pick the second root? (for unimodal calibration curves) - ideal_root = float(plausible_roots_[0]) - return ideal_root - - except IndexError: + if len(plausible_ODs_) == 0: if observed_voltage <= min_voltage: - # voltage less than the blank recorded during the calibration and the calibration curve doesn't have solutions (ex even-deg poly) - # this isn't great, as there is nil noise in the signal. - - if not self.has_logged_warning: - self.logger.warning( - f"Signal below suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V." - ) - self.has_logged_warning = True return min_OD - - else: - if not self.has_logged_warning: - self.logger.warning( - f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V." - ) - self.has_logged_warning = True + elif observed_voltage > max_voltage: return max_OD + else: + raise ValueError( + f"Can't compute calibrated data from curve {poly} and point {observed_voltage}" + ) + + # find the closest root to our OD domain + ideal_OD = float(closest_point_to_domain(plausible_ODs_, [min_OD, max_OD])) + + if ideal_OD < min_OD: + # voltage less than the blank recorded during the calibration and the calibration curve doesn't have solutions (ex even-deg poly) + # this isn't great, as there is nil noise in the signal. + + if not self.has_logged_warning: + self.logger.warning( + f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V." + ) + self.has_logged_warning = True + return min_OD + + elif ideal_OD > max_OD: + if not self.has_logged_warning: + self.logger.warning( + f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V." + ) + self.has_logged_warning = True + return max_OD + else: + return ideal_OD else: diff --git a/pioreactor/tests/test_od_reading.py b/pioreactor/tests/test_od_reading.py index d45d5038..f95bae43 100644 --- a/pioreactor/tests/test_od_reading.py +++ b/pioreactor/tests/test_od_reading.py @@ -620,8 +620,8 @@ def test_calibration_not_present() -> None: assert len(od.calibration_transformer.models) == 0 -def test_calibration_simple_linear_calibration() -> None: - experiment = "test_calibration_simple_linear_calibration" +def test_calibration_simple_linear_calibration_positive_slope() -> None: + experiment = "test_calibration_simple_linear_calibration_positive_slope" with local_persistant_storage("current_od_calibration") as c: c["90"] = encode( @@ -671,7 +671,7 @@ def test_calibration_simple_linear_calibration() -> None: pause() pause() pause() - assert "suggested" in bucket[0]["message"] + assert "Signal outside" in bucket[0]["message"] with local_persistant_storage("current_od_calibration") as c: del c["90"] @@ -679,7 +679,7 @@ def test_calibration_simple_linear_calibration() -> None: def test_calibration_simple_linear_calibration_negative_slope() -> None: experiment = "test_calibration_simple_linear_calibration_negative_slope" - + maximum_voltage = 2.0 with local_persistant_storage("current_od_calibration") as c: c["90"] = encode( structs.OD90Calibration( @@ -692,7 +692,7 @@ def test_calibration_simple_linear_calibration_negative_slope() -> None: ir_led_intensity=90.0, angle="90", minimum_voltage=0.0, - maximum_voltage=2.0, + maximum_voltage=maximum_voltage, voltages=[], od600s=[], pd_channel="2", @@ -713,13 +713,14 @@ def test_calibration_simple_linear_calibration_negative_slope() -> None: with collect_all_logs_of_level("warning", unit=get_unit_name(), experiment="+") as bucket: voltage = 12.0 - assert voltage > 2.0 + assert voltage > maximum_voltage pause() - assert od.calibration_transformer.models["2"](voltage) == 20.0 + assert od.calibration_transformer.models["2"](voltage) == 0.0 pause() pause() assert "suggested" in bucket[0]["message"] + with local_persistant_storage("current_od_calibration") as c: del c["90"] @@ -861,6 +862,54 @@ def test_calibration_errors_when_pd_channel_differs() -> None: del c["90"] +def test_calibration_with_irl_data1() -> None: + with local_persistant_storage("current_od_calibration") as c: + c["90"] = encode( + structs.OD90Calibration( + created_at=current_utc_datetime(), + curve_type="poly", + curve_data_=[ + 0.13015369282405273, + -0.49893265063642067, + 0.6953041334198933, + 0.45652927538964966, + 0.0024870149666305712, + ], + name="quad_test", + maximum_od600=1.131, + minimum_od600=0.0, + ir_led_intensity=70.0, + angle="90", + minimum_voltage=0.001996680972202709, + maximum_voltage=0.8995772568778957, + voltages=[ + 0.030373011520747333, + 0.0678711757682291, + 0.12972798681328354, + 0.2663836655898364, + 0.4248479170421593, + 0.5921451667865667, + 0.8995772568778957, + 0.001996680972202709, + ], + od600s=[0.042, 0.108, 0.237, 0.392, 0.585, 0.781, 1.131, 0.0], + pd_channel="2", + pioreactor_unit=get_unit_name(), + ) + ) + + cc = CachedCalibrationTransformer() + cc.hydate_models_from_disk({"2": "90"}) + assert cc({"2": 0.001})["2"] == 0 + assert cc({"2": 0.002})["2"] == 0 + assert cc({"2": 0.004})["2"] == 0.0032975807375385234 + assert cc({"2": 0.020})["2"] == 0.03639585015289039 + assert cc({"2": 1.0})["2"] == 1.131 + + with local_persistant_storage("current_od_calibration") as c: + del c["90"] + + def test_PhotodiodeIrLedReferenceTrackerStaticInit() -> None: tracker = PhotodiodeIrLedReferenceTrackerStaticInit(channel="1")