Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
tlambert03 committed Jul 11, 2024
2 parents 7e57280 + e8498f9 commit a59e9b0
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 57 deletions.
171 changes: 114 additions & 57 deletions src/useq/_plate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from __future__ import annotations

from ast import literal_eval
from contextlib import suppress
from functools import cached_property
from typing import (
TYPE_CHECKING,
Any,
Iterable,
List,
Expand All @@ -16,38 +15,25 @@

import numpy as np
from annotated_types import Gt # noqa: TCH002
from pydantic import Field, field_validator, model_validator
from pydantic_core import core_schema
from pydantic import (
Field,
ValidationInfo,
ValidatorFunctionWrapHandler,
field_validator,
model_validator,
)
from typing_extensions import Annotated

from useq._base_model import FrozenModel
from useq._grid import RandomPoints, RelativeMultiPointPlan, Shape
from useq._plate_registry import _PLATE_REGISTRY
from useq._position import Position, PositionBase, RelativePosition

if TYPE_CHECKING:
from pydantic_core import core_schema

class _SliceType:
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler: Any
) -> core_schema.CoreSchema:
def _to_slice(value: Any) -> slice:
if isinstance(value, slice):
return value
if isinstance(value, str):
if value.startswith("slice(") and value.endswith(")"):
with suppress(Exception):
return slice(*literal_eval(value.replace("slice", "")))
raise ValueError(f"Invalid slice expression {value}")

return core_schema.no_info_before_validator_function(
_to_slice,
schema=core_schema.any_schema(),
serialization=core_schema.plain_serializer_function_ser_schema(
repr,
return_schema=core_schema.str_schema(),
),
)
Index = Union[int, List[int], slice]
IndexExpression = Union[Tuple[Index, ...], Index]


class WellPlate(FrozenModel):
Expand Down Expand Up @@ -90,6 +76,26 @@ def shape(self) -> tuple[int, int]:
"""Return the shape of the plate."""
return self.rows, self.columns

@property
def all_well_indices(self) -> np.ndarray:
"""Return the indices of all wells as array with shape (Rows, Cols, 2)."""
Y, X = np.meshgrid(np.arange(self.rows), np.arange(self.columns), indexing="ij")
return np.stack([Y, X], axis=-1)

def indices(self, expr: IndexExpression) -> np.ndarray:
"""Return the indices for any index expression as array with shape (N, 2)."""
return self.all_well_indices[expr].reshape(-1, 2).T

@property
def all_well_names(self) -> np.ndarray:
"""Return the names of all wells as array of strings with shape (Rows, Cols)."""
return np.array(
[
[f"{_index_to_row_name(r)}{c+1}" for c in range(self.columns)]
for r in range(self.rows)
]
)

@field_validator("well_spacing", "well_size", mode="before")
def _validate_well_spacing_and_size(cls, value: Any) -> Any:
return (value, value) if isinstance(value, (int, float)) else value
Expand Down Expand Up @@ -117,10 +123,6 @@ def from_str(cls, name: str) -> WellPlate:
return WellPlate.model_validate(obj)


Index = Union[int, List[int], Annotated[slice, _SliceType]]
IndexExpression = Union[Tuple[Index, ...], Index]


class WellPlatePlan(FrozenModel, Sequence[Position]):
"""A plan for acquiring images from a multi-well plate.
Expand Down Expand Up @@ -158,9 +160,17 @@ class WellPlatePlan(FrozenModel, Sequence[Position]):
plate: WellPlate
a1_center_xy: Tuple[float, float]
rotation: Union[float, None] = None
selected_wells: Union[IndexExpression, None] = None
selected_wells: Union[Tuple[Tuple[int, ...], Tuple[int, ...]], None] = None
well_points_plan: RelativeMultiPointPlan = Field(default_factory=RelativePosition)

def __repr_args__(self) -> Iterable[Tuple[str | None, Any]]:
for item in super().__repr_args__():
if item[0] == "selected_wells":
# improve repr for selected_wells
yield "selected_wells", _expression_repr(item[1])
else:
yield item

@field_validator("plate", mode="before")
@classmethod
def _validate_plate(cls, value: Any) -> Any:
Expand Down Expand Up @@ -213,18 +223,26 @@ def _validate_rotation(cls, value: Any) -> Any:
return np.degrees(np.arctan2(ary[2], ary[0]))
return value

@model_validator(mode="after")
def _validate_self(self) -> WellPlatePlan:
@field_validator("selected_wells", mode="wrap")
@classmethod
def _validate_selected_wells(
cls, value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo
) -> Tuple[Tuple[int, ...], Tuple[int, ...]]:
plate = info.data.get("plate")
if not isinstance(plate, WellPlate):
raise ValueError("Plate must be defined before selecting wells")

if isinstance(value, list):
value = tuple(value)
try:
# make sure we can index an array of shape (Rows, Cols)
# with the selected_wells expression
self._dummy_indexed()
except Exception as e:
selected = plate.indices(value)
except (TypeError, IndexError) as e:
raise ValueError(
f"Invalid well selection {self.selected_wells!r} for plate of "
f"shape {self.plate.shape}: {e}"
f"Invalid well selection {value!r} for plate of "
f"shape {plate.shape}: {e}"
) from e
return self

return handler(selected) # type: ignore [no-any-return]

@property
def rotation_matrix(self) -> np.ndarray:
Expand All @@ -240,11 +258,11 @@ def __iter__(self) -> Iterable[Position]: # type: ignore

def __len__(self) -> int:
"""Return the total number of points (stage positions) to be acquired."""
return self._dummy_indexed().size * self.num_points_per_well

def _dummy_indexed(self) -> np.ndarray:
dummy = np.empty((self.plate.rows, self.plate.columns))
return dummy[self.selected_wells]
if self.selected_wells is None:
n_wells = self.plate.size
else:
n_wells = len(self.selected_wells[0])
return n_wells * self.num_points_per_well

@overload
def __getitem__(self, index: int) -> Position: ...
Expand All @@ -267,20 +285,17 @@ def num_points_per_well(self) -> int:
@property
def all_well_indices(self) -> np.ndarray:
"""Return the indices of all wells as array with shape (Rows, Cols, 2)."""
Y, X = np.meshgrid(
np.arange(self.plate.rows), np.arange(self.plate.columns), indexing="ij"
)
return np.stack([Y, X], axis=-1)
return self.plate.all_well_indices

@property
def selected_well_indices(self) -> np.ndarray:
"""Return the indices of selected wells as array with shape (N, 2)."""
return self.all_well_indices[self.selected_wells].reshape(-1, 2)
return self.plate.all_well_indices[self.selected_wells].reshape(-1, 2)

@cached_property
def all_well_coordinates(self) -> np.ndarray:
"""Return the stage coordinates of all wells as array with shape (N, 2)."""
return self._transorm_coords(self.all_well_indices.reshape(-1, 2))
return self._transorm_coords(self.plate.all_well_indices.reshape(-1, 2))

@cached_property
def selected_well_coordinates(self) -> np.ndarray:
Expand All @@ -290,12 +305,7 @@ def selected_well_coordinates(self) -> np.ndarray:
@property
def all_well_names(self) -> np.ndarray:
"""Return the names of all wells as array of strings with shape (Rows, Cols)."""
return np.array(
[
[f"{_index_to_row_name(r)}{c+1}" for c in range(self.plate.columns)]
for r in range(self.plate.rows)
]
)
return self.plate.all_well_names

@property
def selected_well_names(self) -> list[str]:
Expand Down Expand Up @@ -387,3 +397,50 @@ def _index_to_row_name(index: int) -> str:
name = chr(index % 26 + 65) + name
index = index // 26 - 1
return name


def _find_pattern(seq: Sequence[int]) -> tuple[list[int] | None, int | None]:
n = len(seq)

# Try different lengths of the potential repeating pattern
for pattern_length in range(1, n // 2 + 1):
pattern = list(seq[:pattern_length])
repetitions = n // pattern_length

# Check if the pattern repeats enough times
if np.array_equal(pattern * repetitions, seq[: pattern_length * repetitions]):
return (pattern, repetitions)

return None, None


def _pattern_repr(pattern: Sequence[int]) -> str:
"""Turn pattern into a slice object if possible."""
start = pattern[0]
stop = pattern[-1] + 1
step = pattern[1] - pattern[0]
if all(pattern[i] == pattern[0] + i * step for i in range(1, len(pattern))):
if step == 1:
if start == 0:
return f"slice({stop})"
return f"slice({start}, {stop})"
return f"slice({start}, {stop}, {step})"
return repr(pattern)


class _Repr:
def __init__(self, string: str) -> None:
self._string = string

def __repr__(self) -> str:
return self._string


def _expression_repr(expr: tuple[Sequence[int], Sequence[int]]) -> _Repr:
"""Try to represent an index expression as slice objects if possible."""
e0, e1 = expr
ptrn1, repeats = _find_pattern(e1)
if ptrn1 is None:
return _Repr(str(expr))
ptrn0 = e0[:: len(ptrn1)]
return _Repr(f"({_pattern_repr(ptrn0)}, {_pattern_repr(ptrn1)})")
41 changes: 41 additions & 0 deletions tests/test_well_plate.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,44 @@ def test_plate_plan_position_order() -> None:
for i in range(0, len(names), 3):
chunk = names[i : i + 3]
assert len(set(chunk)) == 1, f"Chunk {chunk} does not have the same elements"


def test_plate_plan_equality() -> None:
"""Various ways of selecting wells should result in the same plan."""
pp = useq.WellPlatePlan(
plate=96, a1_center_xy=(0, 0), selected_wells=np.s_[1:5:2, :6:3]
)
pp2 = useq.WellPlatePlan(
plate="96-well",
a1_center_xy=(0, 0),
selected_wells=[(1, 1, 3, 3), (0, 3, 0, 3)],
)
pp3 = useq.WellPlatePlan.model_validate_json(pp.model_dump_json())

assert pp == pp2 == pp3


def test_plate_repr() -> None:
# both can be reduced
pp = useq.WellPlatePlan(
plate=96, a1_center_xy=(0, 0), selected_wells=np.s_[1:5, 3:12:2]
)
rpp = repr(pp)
assert "selected_wells=(slice(1, 5), slice(3, 12, 2))" in rpp
assert eval(rpp, vars(useq)) == pp # noqa: S307

# can't be reduced
pp = useq.WellPlatePlan(
plate=96, a1_center_xy=(0, 0), selected_wells=[(1, 1, 1, 2), (7, 3, 4, 2)]
)
rpp = repr(pp)
assert "selected_wells=((1, 1, 1, 2), (7, 3, 4, 2))" in rpp
assert eval(rpp, vars(useq)) == pp # noqa: S307

# one can be reduced
pp = useq.WellPlatePlan(
plate=96, a1_center_xy=(0, 0), selected_wells=np.s_[(1, 2, 2, 3), 1:5]
)
rpp = repr(pp)
assert "selected_wells=((1, 2, 2, 3), slice(1, 5))" in rpp
assert eval(rpp, vars(useq)) == pp # noqa: S307

0 comments on commit a59e9b0

Please sign in to comment.