Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Od cal fix #519

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 2 additions & 49 deletions pioreactor/background_jobs/leader/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,62 +46,15 @@ def announce_new_workers(self) -> None:
)

def watch_for_lost_state(self, state_message: MQTTMessage) -> None:
# generally, I hate this code below...

unit = state_message.topic.split("/")[1]

# don't check workers that aren't part of the cluster
current_workers = get_workers_in_inventory()

# ignore if leader is "lost"
if (
(state_message.payload.decode() == self.LOST)
and (unit != self.unit)
and (unit in current_workers)
and (unit in get_workers_in_inventory())
):
# TODO: this song-and-dance works for monitor, why not extend it to other jobs...

self.logger.warning(f"{unit} seems to be lost. Trying to re-establish connection...")
time.sleep(5)

if self.state != self.READY:
# when the entire Rpi shuts down, ex via sudo reboot, monitor can publish a lost. This code will halt the shutdown.
# let's return early.
return

# this is a hack! If the monitor job is in state READY, it will no op any transition.
# so we set to sleeping for a second, and the back to ready.
self.pub_client.publish(
f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/monitor/$state/set", self.SLEEPING
)
time.sleep(1)
self.pub_client.publish(
f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/monitor/$state/set", self.READY
)
###
time.sleep(10)

if self.state != self.READY:
# when the entire Rpi shuts down, ex via sudo reboot, monitor can publish a lost. This code will halt the shutdown.
# let's return early.
return

msg = subscribe( # I don't think this can be self.sub_client because we are in a callback.
f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/monitor/$state",
timeout=15,
name=self.job_name,
retries=1,
)
if msg is None:
return

current_state = msg.payload.decode()

if current_state == self.LOST:
# failed, let's confirm to user
self.logger.error(f"{unit} was lost.")
else:
self.logger.info(f"Update: {unit} is connected. All is well.")
self.logger.warning(f"{unit} seems to be lost.")

def start_passive_listeners(self) -> None:
self.subscribe_and_callback(
Expand Down
80 changes: 55 additions & 25 deletions pioreactor/background_jobs/od_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,29 @@ def hydate_models_from_disk(self, channel_angle_map: dict[pt.PdChannel, pt.PdAng
return


def closest_point_to_domain(P, D):
# Unpack the domain D into its lower and upper bounds
a, b = D

# Initialize the closest point and minimum distance
closest_point = None
min_distance = float("inf")

for p in P:
if a <= p <= b: # Check if p is within the domain D
return p # If p is within D, it's the closest point with distance 0

# Calculate the distance to the closest boundary of D
distance = min(abs(p - a), abs(p - b))

# Update the closest point if this distance is smaller than the current min_distance
if distance < min_distance:
min_distance = distance
closest_point = p

return closest_point


class CachedCalibrationTransformer(CalibrationTransformer):
def __init__(self) -> None:
super().__init__()
Expand Down Expand Up @@ -719,7 +742,7 @@ def _hydrate_model(self, calibration_data: structs.ODCalibration) -> Callable[[f
this procedure effectively ignores it.

"""
from numpy import roots, zeros_like, real
from numpy import roots, zeros_like, real, imag

def calibration(observed_voltage: pt.Voltage) -> pt.OD:
poly = calibration_data.curve_data_
Expand All @@ -733,34 +756,41 @@ def calibration(observed_voltage: pt.Voltage) -> pt.OD:
coef_shift[-1] = observed_voltage
solve_for_poly = poly - coef_shift
roots_ = roots(solve_for_poly)
plausible_roots_ = sorted(
[real(r) for r in roots_ if (r.imag == 0) and (min_OD <= real(r) <= max_OD)]
)
plausible_ODs_ = sorted([real(r) for r in roots_ if (imag(r) == 0)])

try:
# Q: when do I pick the second root? (for unimodal calibration curves)
ideal_root = float(plausible_roots_[0])
return ideal_root

except IndexError:
if len(plausible_ODs_) == 0:
if observed_voltage <= min_voltage:
# voltage less than the blank recorded during the calibration and the calibration curve doesn't have solutions (ex even-deg poly)
# this isn't great, as there is nil noise in the signal.

if not self.has_logged_warning:
self.logger.warning(
f"Signal below suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V."
)
self.has_logged_warning = True
return min_OD

else:
if not self.has_logged_warning:
self.logger.warning(
f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V."
)
self.has_logged_warning = True
elif observed_voltage > max_voltage:
return max_OD
else:
raise ValueError(
f"Can't compute calibrated data from curve {poly} and point {observed_voltage}"
)

# find the closest root to our OD domain
ideal_OD = float(closest_point_to_domain(plausible_ODs_, [min_OD, max_OD]))

if ideal_OD < min_OD:
# voltage less than the blank recorded during the calibration and the calibration curve doesn't have solutions (ex even-deg poly)
# this isn't great, as there is nil noise in the signal.

if not self.has_logged_warning:
self.logger.warning(
f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V."
)
self.has_logged_warning = True
return min_OD

elif ideal_OD > max_OD:
if not self.has_logged_warning:
self.logger.warning(
f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V."
)
self.has_logged_warning = True
return max_OD
else:
return ideal_OD

else:

Expand Down
63 changes: 56 additions & 7 deletions pioreactor/tests/test_od_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,8 @@ def test_calibration_not_present() -> None:
assert len(od.calibration_transformer.models) == 0


def test_calibration_simple_linear_calibration() -> None:
experiment = "test_calibration_simple_linear_calibration"
def test_calibration_simple_linear_calibration_positive_slope() -> None:
experiment = "test_calibration_simple_linear_calibration_positive_slope"

with local_persistant_storage("current_od_calibration") as c:
c["90"] = encode(
Expand Down Expand Up @@ -671,15 +671,15 @@ def test_calibration_simple_linear_calibration() -> None:
pause()
pause()
pause()
assert "suggested" in bucket[0]["message"]
assert "Signal outside" in bucket[0]["message"]

with local_persistant_storage("current_od_calibration") as c:
del c["90"]


def test_calibration_simple_linear_calibration_negative_slope() -> None:
experiment = "test_calibration_simple_linear_calibration_negative_slope"

maximum_voltage = 2.0
with local_persistant_storage("current_od_calibration") as c:
c["90"] = encode(
structs.OD90Calibration(
Expand All @@ -692,7 +692,7 @@ def test_calibration_simple_linear_calibration_negative_slope() -> None:
ir_led_intensity=90.0,
angle="90",
minimum_voltage=0.0,
maximum_voltage=2.0,
maximum_voltage=maximum_voltage,
voltages=[],
od600s=[],
pd_channel="2",
Expand All @@ -713,13 +713,14 @@ def test_calibration_simple_linear_calibration_negative_slope() -> None:

with collect_all_logs_of_level("warning", unit=get_unit_name(), experiment="+") as bucket:
voltage = 12.0
assert voltage > 2.0
assert voltage > maximum_voltage

pause()
assert od.calibration_transformer.models["2"](voltage) == 20.0
assert od.calibration_transformer.models["2"](voltage) == 0.0
pause()
pause()
assert "suggested" in bucket[0]["message"]

with local_persistant_storage("current_od_calibration") as c:
del c["90"]

Expand Down Expand Up @@ -861,6 +862,54 @@ def test_calibration_errors_when_pd_channel_differs() -> None:
del c["90"]


def test_calibration_with_irl_data1() -> None:
with local_persistant_storage("current_od_calibration") as c:
c["90"] = encode(
structs.OD90Calibration(
created_at=current_utc_datetime(),
curve_type="poly",
curve_data_=[
0.13015369282405273,
-0.49893265063642067,
0.6953041334198933,
0.45652927538964966,
0.0024870149666305712,
],
name="quad_test",
maximum_od600=1.131,
minimum_od600=0.0,
ir_led_intensity=70.0,
angle="90",
minimum_voltage=0.001996680972202709,
maximum_voltage=0.8995772568778957,
voltages=[
0.030373011520747333,
0.0678711757682291,
0.12972798681328354,
0.2663836655898364,
0.4248479170421593,
0.5921451667865667,
0.8995772568778957,
0.001996680972202709,
],
od600s=[0.042, 0.108, 0.237, 0.392, 0.585, 0.781, 1.131, 0.0],
pd_channel="2",
pioreactor_unit=get_unit_name(),
)
)

cc = CachedCalibrationTransformer()
cc.hydate_models_from_disk({"2": "90"})
assert cc({"2": 0.001})["2"] == 0
assert cc({"2": 0.002})["2"] == 0
assert cc({"2": 0.004})["2"] == 0.0032975807375385234
assert cc({"2": 0.020})["2"] == 0.03639585015289039
assert cc({"2": 1.0})["2"] == 1.131

with local_persistant_storage("current_od_calibration") as c:
del c["90"]


def test_PhotodiodeIrLedReferenceTrackerStaticInit() -> None:
tracker = PhotodiodeIrLedReferenceTrackerStaticInit(channel="1")

Expand Down
Loading