Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarity support with pipeline for data processing #124

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 52 additions & 28 deletions microscope/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,10 @@ def initialize(self):

def shutdown(self):
"""Shutdown the device for a prolonged period of inactivity."""
self.disable()
try:
self.disable()
except Exception as e:
_logger.warning("Exception in disable() during shutdown: %s" % e)
_logger.info("Shutting down ... ... ...")
self._on_shutdown()
_logger.info("... ... ... ... shut down completed.")
Expand Down Expand Up @@ -304,11 +307,15 @@ def get_setting(self, name):

def get_all_settings(self):
"""Return ordered settings as a list of dicts."""
try:
return {k: v.get() for k, v in self._settings.items()}
except Exception as err:
_logger.error("in get_all_settings:", exc_info=err)
raise
# Fetching some settings may fail depending on device state.
# Report these values as 'None' and continue fetching other settings.
def catch(f):
try:
return f()
except Exception as err:
_logger.error("getting %s: %s" % (f.__self__.name, err))
return None
return {k: catch(v.get) for k, v in self._settings.items()}

def set_setting(self, name, value):
"""Set a setting."""
Expand Down Expand Up @@ -411,6 +418,8 @@ def __init__(self, buffer_length=0, **kwargs):
self._acquiring = False
# A condition to signal arrival of a new data and unblock grab_next_data
self._new_data_condition = threading.Condition()
# A data processing pipeline: a list of f(data) -> data.
self.pipeline = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not make this a private attribute? Also, would be nice to also have type annotations for it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be private, but then we'd need methods to allow other devices to add their processing steps.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this at length.

  • Most (all?) classes derived from DataDevice should probably add to the pipeline rather than override _process_data.
  • Devices that override _process_data must call super()._process_data().
  • The docstring on DataDevice._process_data should indicate this clearly.
  • We could make pipeline private, but that would require methods to add/remove steps from it.

Copy link
Collaborator Author

@mickp mickp Dec 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_process_data docstring made more explicit in 1c16359.
CameraDevice made to use pipeline instead of override in 0361a02.
_process_data is now only defined in one place, on the DataDevice class.


def __del__(self):
self.disable()
Expand All @@ -431,18 +440,6 @@ def enable(self):
Implement device-specific code in _on_enable .
"""
_logger.debug("Enabling ...")
if self._using_callback:
if self._fetch_thread:
self._fetch_thread_run = False
else:
if not self._fetch_thread or not self._fetch_thread.is_alive():
self._fetch_thread = Thread(target=self._fetch_loop)
self._fetch_thread.daemon = True
self._fetch_thread.start()
if not self._dispatch_thread or not self._dispatch_thread.is_alive():
self._dispatch_thread = Thread(target=self._dispatch_loop)
self._dispatch_thread.daemon = True
self._dispatch_thread.start()
# Call device-specific code.
try:
result = self._on_enable()
Expand All @@ -454,7 +451,20 @@ def enable(self):
self.enabled = False
else:
self.enabled = True
_logger.debug("... enabled.")
# Set up data fetching
if self._using_callback:
if self._fetch_thread:
self._fetch_thread_run = False
else:
if not self._fetch_thread or not self._fetch_thread.is_alive():
self._fetch_thread = Thread(target=self._fetch_loop)
self._fetch_thread.daemon = True
self._fetch_thread.start()
if not self._dispatch_thread or not self._dispatch_thread.is_alive():
self._dispatch_thread = Thread(target=self._dispatch_loop)
self._dispatch_thread.daemon = True
self._dispatch_thread.start()
_logger.debug("... enabled.")
return self.enabled


Expand Down Expand Up @@ -483,8 +493,14 @@ def _fetch_data(self):
return None

def _process_data(self, data):
"""Do any data processing and return data."""
return data
"""Do any data processing and return data.

Subclasses should add their processing to self.pipeline in preference
to overriding this method. Anything that overrides this method must
call super()_process_data(data) either before or after its own
processing."""
import functools
return functools.reduce(lambda x, f: f(x), self.pipeline, data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have an attribute which is a list of processing steps, why should subclasses do their own processing first instead of just appending to self.pipeline?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - I think we should do one or the other. This requires some thought, though.

Originally _process_data was a method on DataDevices to implement the processing step, and I think that was meant to be pure virtual.

Now we have Devices that are not DataDevices that need to add processing steps, and I addressed that with the pipeline, but that needs super()._process_data() to ensure that the pipeline is executed after the DataDevice does anything it needs to do (e.g. running it through a LUT, type conversions, reshaping arrays).

The DataDevice could just add it's processing to the start of the pipeline, instead, and _process_data would be implemented on DataDevice and just process the pipeline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments in review immediately below this one.


def _send_data(self, client, data, timestamp):
"""Dispatch data to the client."""
Expand Down Expand Up @@ -611,6 +627,8 @@ def grab_next_data(self, soft_trigger=True):
:param soft_trigger: calls soft_trigger if True,
waits for hardware trigger if False.
"""
if not self.enabled:
raise Exception("Camera not enabled.")
self._new_data_condition.acquire()
# Push self onto client stack.
self.set_client(self)
Expand Down Expand Up @@ -652,6 +670,8 @@ def __init__(self, **kwargs):
self._client_transform = (False, False, False)
# Result of combining client and readout transforms
self._transform = (False, False, False)
# Add _apply_transform to pipeline.
self.pipeline.insert(0, self._apply_transform)
# A transform provided by the client.
self.add_setting('transform', 'enum',
lambda: CameraDevice.ALLOWED_TRANSFORMS.index(self._transform),
Expand All @@ -669,18 +689,22 @@ def __init__(self, **kwargs):
self.get_roi,
self.set_roi,
None)
def _process_data(self, data):

def _apply_transform(self, data):
"""Apply self._transform to data."""
flips = (self._transform[0], self._transform[1])
rot = self._transform[2]

# Choose appropriate transform based on (flips, rot).
return {(0, 0): numpy.rot90(data, rot),
(0, 1): numpy.flipud(numpy.rot90(data, rot)),
(1, 0): numpy.fliplr(numpy.rot90(data, rot)),
(1, 1): numpy.fliplr(numpy.flipud(numpy.rot90(data, rot)))
}[flips]

# Do rotation
data = numpy.rot90(data, rot)
# Flip
data = {(0, 0): lambda d: d,
(0, 1): numpy.flipud,
(1, 0): numpy.fliplr,
(1, 1): lambda d: numpy.fliplr(numpy.flipud(d))
}[flips](data)
return data

def set_readout_mode(self, description):
"""Set the readout mode and _readout_transform."""
Expand Down
139 changes: 122 additions & 17 deletions microscope/filterwheels/aurox.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,38 @@

"""Adds support for Aurox devices

Requires package hidapi."""
Requires package hidapi.

Config sample:

device(microscope.filterwheels.aurox.Clarity,
{'camera': 'microscope.Cameras.cameramodule.SomeCamera',
'camera.someSetting': value})

Deconvolving data requires:
* availability of clarity_process and cv2
* successful completion of a calibration step
+ set_mode(Modes.calibrate)
+ trigger the camera to generate an image
+ when the camera returns the image, calibration is complete
"""
import hid
import logging
import microscope.devices
from enum import Enum
from typing import Mapping
from enum import IntEnum

_logger = logging.getLogger(__name__)

try:
# Currently, clarity_process is a module that is not packaged, so needs
# to be put on the python path somewhere manually.
from clarity_process import ClarityProcessor
except:
_logger.warning("Could not import clarity_process module: no processing available.")


Mode = IntEnum("Mode", "difference, raw, calibrate")

## Clarity constants. These may differ across products, so mangle names.
# USB IDs
Expand Down Expand Up @@ -72,7 +99,11 @@
_Clarity__SETSVCMODE1 = 0xe0 #1 byte for service mode. SLEEP activates service mode. RUN returns to normal mode.


class Clarity(microscope.devices.FilterWheelBase):
class Clarity(microscope.devices.ControllerDevice, microscope.devices.FilterWheelBase):
"""Adds support for Aurox Clarity

Acts as a ControllerDevice providing the camera attached to the Clarity.
"""
_slide_to_sectioning = {__SLDPOS0: 'bypass',
__SLDPOS1: 'low',
__SLDPOS2: 'mid',
Expand All @@ -86,20 +117,94 @@ class Clarity(microscope.devices.FilterWheelBase):
__GETSERIAL: 4,
__FULLSTAT: 10}

def __init__(self, **kwargs):

def __init__(self, camera=None, **kwargs) -> None:
"""Create a Clarity instance controlling an optional Camera device.

:param camera: a class to control the connected camera
:param kwargs: Provide camera parameters as keyword arguments:
'camera.some_parameter'
The 'camera.' prefix will be stripped, and
'some_parameter' passed to the camera's constructor.
"""
# Extract kwargs for camera device.
cam_kw_keys = [k for k in kwargs if k.startswith("camera.")]
cam_kwargs = {}
for key in cam_kw_keys:
cam_kwargs[key.replace("camera.", "")] = kwargs[key]
del kwargs[key]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a camera_kwargs argument with the arguments to construct the camera make this cleaner?

def __init__(self, camera=None, camera_kwargs={}, **kwargs):
    super().__init__(**kwargs)
    if camera is not None:
        self._camera = camera(**camera_kwargs)
        ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make the code a bit cleaner, but then you'd have nested { {} } in the config, and maybe that's not cleaner / more error-prone.

super().__init__(**kwargs)
from threading import Lock
# A comms lock.
self._lock = Lock()
# The hid connection object
self._hid = None
if camera is None:
self._cam = None
_logger.warning("No camera specified.")
else:
self._cam = camera(**cam_kwargs)
self._cam.pipeline.append(self._c_process_data)
# Is processing available?
self._can_process = 'ClarityProcessor' in globals()
# Data processor object, created after calibration
self._processor = None
# Acquisition mode
self._mode = Mode.raw
# Add device settings
self.add_setting("sectioning", "enum",
self.get_slide_position,
lambda val: self.set_slide_position(val),
self._slide_to_sectioning)
self.add_setting("mode", "enum",
lambda: self._mode.name,
self.set_mode,
Mode)

def _c_process_data(self, data):
"""A function to insert into the camera's processing pipeline.

Depending on the mode, this function will pass through, perform
calibration, or deconvolve the camera data."""
if self._mode == Mode.raw:
return data
elif self._mode == Mode.difference:
if self._processor is None:
raise Exception("Not calibrated yet - can not process image")
return self._processor.process(data).get() # get converts UMat to ndarray.
elif self._mode == Mode.calibrate:
# This will introduce a significant delay, but returning the
# image indicates that the calibration step is complete.
self._processor = ClarityProcessor(data)
return data
else:
raise Exception("Unrecognised mode: %s", self._mode)

@property
def devices(self) -> Mapping[str, microscope.devices.Device]:
"""Devices property, required by ControllerDevice interface."""
if self._cam:
return {'camera': self._cam}
else:
return {}

def set_mode(self, mode: Mode) -> None:
"""Set the operation mode"""
if mode in [Mode.calibrate, Mode.difference] and not self._can_process:
raise Exception ("Processing not available")
if mode == Mode.calibrate:
self._set_calibration(True)
else:
self._set_calibration(False)
self._mode = mode

def _send_command(self, command, param=0, max_length=16, timeout_ms=100):
"""Send a command to the Clarity and return its response"""
if not self._hid:
self.open()
try:
self.open()
except:
raise Exception("Connection error")
with self._lock:
# The device expects a list of 16 integers
buffer = [0x00] * max_length # The 0th element must be 0.
Expand Down Expand Up @@ -153,15 +258,13 @@ def get_id(self):
return self._send_command(__GETSERIAL)

def _on_enable(self):
if not self.is_connected:
self.open()
self._send_command(__SETONOFF, __RUN)
return self._send_command(__GETONOFF) == __RUN

def _on_disable(self):
self._send_command(__SETONOFF, __SLEEP)

def set_calibration(self, state):
def _set_calibration(self, state):
if state:
result = self._send_command(__SETCAL, __CALON)
else:
Expand All @@ -188,12 +291,17 @@ def get_slides(self):
return (self._slide_to_sectioning)

def get_status(self):
# Fetch 10 bytes VERSION[3],ONOFF,SHUTTER,SLIDE,FILT,CAL,??,??
result = self._send_command(__FULLSTAT)
if result is None:
return
# A status dict to populate and return
status = {}
status = dict.fromkeys(['connected','on','door open','slide',
'filter','calibration','busy', 'mode'])
status['mode'] = self._mode.name
# Fetch 10 bytes VERSION[3],ONOFF,SHUTTER,SLIDE,FILT,CAL,??,??
try:
result = self._send_command(__FULLSTAT)
status['connected'] = True
except:
status['connected'] = False
return status
# A list to track states, any one of which mean the device is busy.
busy = []
# Disk running
Expand Down Expand Up @@ -263,8 +371,5 @@ def set_position(self, pos, blocking=True):
pass
return result

def _on_shutdown(self):
pass

def initialize(self):
pass
super().initialize()
Loading