diff --git a/docs/source/crappy_docs/blocks.rst b/docs/source/crappy_docs/blocks.rst index ec044c45..fe3cd7a8 100644 --- a/docs/source/crappy_docs/blocks.rst +++ b/docs/source/crappy_docs/blocks.rst @@ -149,6 +149,12 @@ Stop Button :members: prepare, loop, finish :special-members: __init__ +Synchronizer +++++++++++++ +.. autoclass:: crappy.blocks.Synchronizer + :members: loop + :special-members: __init__ + UController +++++++++++ .. autoclass:: crappy.blocks.UController diff --git a/docs/source/features.rst b/docs/source/features.rst index 396335f7..8ebd08ee 100644 --- a/docs/source/features.rst +++ b/docs/source/features.rst @@ -121,6 +121,15 @@ Data processing `_. +- :ref:`Synchronizer` + + Allows putting labels emitted at different frequencies on the same time base + as a reference label. Very similar to the :ref:`Multiplexer` Block, except + the :ref:`Multiplexer` takes an independent time base for interpolation. Used + when the original values of a label need to be preserved while the other + labels can be interpolated, for example when the reference is the output of a + low-frequency image-processing. + Real-time image correlation +++++++++++++++++++++++++++ diff --git a/examples/blocks/synchronizer.py b/examples/blocks/synchronizer.py new file mode 100644 index 00000000..7d08ead6 --- /dev/null +++ b/examples/blocks/synchronizer.py @@ -0,0 +1,136 @@ +# coding: utf-8 + +""" +This example demonstrates the use of the Synchronizer Block. It does not +require any specific hardware to run, but necessitates the matplotlib Python +module to be installed. + +The Synchronizer Block takes inputs from several Blocks, and interpolates the +target labels on the timestamps of another label taken as a reference. This +behavior is very similar to that of the Multiplexer Block, except the +Multiplexer uses a time basis independent of the received labels. The +Synchronizer is useful when signals need to be synchronized with a reference +label, for example with data derived from image processing. + +Here, the idea is to visually demonstrate how several signals with different +data rates can be interpolated by the Synchronizer Block on a target time base. +Two labels are generated by two Generators at different frequencies, as well as +one random label generated by another Generator and considered as a reference. +They are all sent to a Synchronizer for interpolation, and the interpolated +data is sent to a Grapher for display. + +After starting this script, just watch how the two interpolated labels are now +synchronized on the time base of the reference labels. They are then all +displayed at the same frequency by the Grapher, whereas they have originally +very different frequencies. You can try to change the different frequencies and +see what the result is. This demo ends after 22s. You can also hit CTRL+C to +stop it earlier, but it is not a clean way to stop Crappy. +""" + +import crappy +from typing import Tuple, Dict, Optional +from time import sleep +import random + + +class RandomPath(crappy.blocks.generator_path.meta_path.Path): + """This custom Generator Path outputs a random value within given bounds, and + waits for a given delay before returning this value. + + Both the values and the timestamps are therefore random. This class is used + to demonstrate that the Synchronizer works as expected in this example, and + not because of some regularity in the input data. + """ + + def __init__(self, + time_range: Tuple[float, float], + value_range: Tuple[float, float]) -> None: + """Sets the arguments and initializes the parent class. + + Args: + time_range: The possible seconds range for sleeping before returning the + generated value. + value_range: The possible range for the randomly generated values. + """ + + super().__init__() + + self._time_range: Tuple[float, float] = time_range + self._value_range: Tuple[float, float] = value_range + + def get_cmd(self, data: Dict[str, list]) -> Optional[float]: + """Returns a randomly generated value after sleeping a random number of + seconds.""" + + sleep(random.uniform(*self._time_range)) + return random.uniform(*self._value_range) + + +if __name__ == '__main__': + + # This Generator generates a random signal considered as a reference, and + # sends it to the Synchronizer Block for interpolation + gen_ref = crappy.blocks.Generator( + # Generating a random signal to take as a reference time base + ({'type': 'RandomPath', 'time_range': (0.50, 0.150), + 'value_range': (-1, 1)},), + cmd_label='ref', # The label carrying the generated signal + freq=3, # Frequency is a prime number to ensure signals from several + # Blocks will be desynchronized + + # Sticking to default for the other arguments + ) + + # This Generator generates a sine signal to send to the Synchronizer Block + # for interpolation + gen_sig_1 = crappy.blocks.Generator( + # Generating a sine signal of period 20s and amplitude 2 + ({'type': 'Sine', 'freq': 0.05, 'amplitude': 2, 'condition': None},), + cmd_label='sig_1', # The label carrying the generated signal + freq=7, # Frequency is a prime number to ensure signals from several + # Blocks will be desynchronized + + # Sticking to default for the other arguments + ) + + # This Generator generates a sine signal to send to the Synchronizer Block + # for interpolation + gen_sig_2 = crappy.blocks.Generator( + # Generating a constant signal of value 0.5 + ({'type': 'Constant', 'value': 0.5, 'condition': 'delay=20'},), + cmd_label='sig_2', # The label carrying the generated signal + spam=True, # Spamming enabled, otherwise only one value would get sent + freq=5, # Frequency is a prime number to ensure signals from several + # Blocks will be desynchronized + + # Sticking to default for the other arguments + ) + + sync = crappy.blocks.Synchronizer(reference_label='ref', + time_label='t(s)', + labels_to_sync=('sig_1', 'sig_2'), + freq=20) + + # This Grapher displays the interpolated data transmitted by the Synchronizer + # Block. The 'sig_1' and 'sig_2' signals are now synchronized with the 'ref' + # signal, and the 'ref' is preserved + graph = crappy.blocks.Grapher( + # The names of the labels to plot on the graph + ('t(s)', 'ref'), ('t(s)', 'sig_1'), ('t(s)', 'sig_2'), + interp=False, # Displaying the data points only, no lines, to clearly + # distinguish them + length=40, # Only displaying the last 40 received values, so that the + # data points remain always clearly visible + freq=2, # Updating the graph twice every second + + # Sticking to default for the other arguments + ) + + # Linking the Block so that the information is correctly sent and received + crappy.link(gen_ref, sync) + crappy.link(gen_sig_1, sync) + crappy.link(gen_sig_2, sync) + crappy.link(sync, graph) + + # Mandatory line for starting the test, this call is blocking + crappy.start() diff --git a/src/crappy/blocks/__init__.py b/src/crappy/blocks/__init__.py index 130cc3c1..76883864 100644 --- a/src/crappy/blocks/__init__.py +++ b/src/crappy/blocks/__init__.py @@ -25,6 +25,7 @@ from .sink import Sink from .stop_block import StopBlock from .stop_button import StopButton +from .synchronizer import Synchronizer from .ucontroller import UController from .video_extenso import VideoExtenso diff --git a/src/crappy/blocks/multiplexer.py b/src/crappy/blocks/multiplexer.py index f90846fb..589fbb8f 100644 --- a/src/crappy/blocks/multiplexer.py +++ b/src/crappy/blocks/multiplexer.py @@ -12,6 +12,11 @@ class Multiplexer(Block): """This Block takes data from upstream Blocks as input and interpolates it to output all the labels in a common time basis. + This Block is very similar to the :class:`~crappy.blocks.Synchronizer` Block, + but the `Synchronizer` takes the timestamps of a reference label as a time + base whereas this one performs the interpolation on a time base independent + of the received labels. + It can take any number of inputs, provided that they all share a common time label. It is also possible to choose which labels are considered for multiplexing and which are dropped. The interpolation is performed using the diff --git a/src/crappy/blocks/synchronizer.py b/src/crappy/blocks/synchronizer.py new file mode 100644 index 00000000..42f1977d --- /dev/null +++ b/src/crappy/blocks/synchronizer.py @@ -0,0 +1,180 @@ +# coding: utf-8 + +import numpy as np +from typing import Optional, Union, Iterable, Dict +from collections import defaultdict +import logging + +from .meta_block import Block + + +class Synchronizer(Block): + """This Block takes data from upstream Blocks as input and interpolates it to + output all the labels on the same timestamps as a reference label. + + This Block is very similar to the :class:`~crappy.blocks.Multiplexer` Block, + but the `Multiplexer` interpolates data in a time base independent of the + labels whereas this one takes one label as a reference. + + It can take any number of inputs, provided that they all share a common time + label. It is also possible to choose which labels are considered for + interpolation and which are dropped. The interpolation is performed using the + :obj:`numpy.interp` method. + + This Block is useful for synchronizing data acquired from different sensors, + in the context when one label should be treated as a reference. This is for + example the case when synchronizing signals with the output of an image + processing, to be able to compare all the signals in the time base of the + image acquisition. + + .. versionadded:: 2.0.5 + """ + + def __init__(self, + reference_label: str, + time_label: str = 't(s)', + labels_to_sync: Optional[Union[str, Iterable[str]]] = None, + freq: Optional[float] = 50, + display_freq: bool = False, + debug: Optional[bool] = False) -> None: + """Sets the arguments and initializes the parent class. + + Args: + reference_label: The label whose timestamps will be taken as a time base + for performing the interpolation. + time_label: The label carrying the time information. Should be common to + all the input Blocks. + labels_to_sync: An iterable (like a :obj:`list` or a :obj:`tuple`) + containing the labels to interpolate on the reference label's time + base, except for the time label that is given separately in the + ``time_label`` argument. The Block also doesn't output anything until + data has been received on all these labels. If left to :obj:`None`, all + the received labels are considered. **It is recommended to always set + this argument !** It is also possible to give this argument as a single + :obj:`str` (i.e. not in an iterable). + freq: The target looping frequency for the Block. If :obj:`None`, loops + as fast as possible. + display_freq: If :obj:`True`, displays the looping frequency of the + Block. + debug: If :obj:`True`, displays all the log messages including the + :obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log + messages with :obj:`~logging.INFO` level or higher. If :obj:`None`, + disables logging for this Block. + """ + + super().__init__() + self.freq = freq + self.display_freq = display_freq + self.debug = debug + + # Initializing the attributes + self._ref_label = reference_label + self._time_label = time_label + self._data: Dict[str, np.ndarray] = defaultdict(self._default_array) + + # Forcing the labels_to_sync into a list + if labels_to_sync is not None and isinstance(labels_to_sync, str): + self._to_sync = [labels_to_sync] + elif labels_to_sync is not None: + self._to_sync = list(labels_to_sync) + else: + self._to_sync = None + + def loop(self) -> None: + """Receives data, interpolates it, and sends it to the downstream + Blocks.""" + + # Receiving all the upcoming data + data = self.recv_all_data_raw() + + # Iterating over all the links + for link_data in data: + # Only data associated with a time label can be synchronized + if self._time_label not in link_data: + continue + # Extracting the time information from the data + timestamps = link_data.pop(self._time_label) + + # Adding data from each label in the buffer + for label, values in link_data.items(): + # Only the labels specified in out_labels is considered + if (self._to_sync is not None and label not in self._to_sync + and label != self._ref_label): + continue + + # Adding the received values to the buffered ones + self._data[label] = np.concatenate((self._data[label], + np.array((timestamps, values))), + axis=1) + # Sorting the buffered data, if a same label comes from multiple Links + self._data[label] = self._data[label][ + :, self._data[label][0].argsort()] + + # Aborting if there's no data to process + if not self._data: + self.log(logging.DEBUG, "No data in the buffer to process") + return + + # Aborting if there's no data for the reference label + if self._ref_label not in self._data: + self.log(logging.DEBUG, "No value for the reference label found in " + "the buffer") + return + + # Making sure there's data for all the requested labels + if (self._to_sync is not None and + any(label not in self._data for label in self._to_sync)): + self.log(logging.DEBUG, "Not all the requested labels received yet") + return + + # There should also be at least two values for each label + if any(len(self._data[label][0]) < 2 for label in self._data): + self.log(logging.DEBUG, "Not at least 2 values for each label in buffer") + return + + # Getting the minimum time for the interpolation (maximin over all labels) + min_t = max(data[0, 0] for data in self._data.values()) + + # Getting the maximum time for the interpolation (minimax over all labels) + max_t = min(data[0, -1] for data in self._data.values()) + + # Checking if there's a valid time range for interpolation + if max_t < min_t: + self.log(logging.DEBUG, "Ranges not matching for interpolation") + return + + # The array containing the timestamps for interpolating + interp_times = self._data[self._ref_label][0, + (self._data[self._ref_label][0] >= min_t) & + (self._data[self._ref_label][0] <= max_t)] + + # Checking if there are values for the target label in the valid time range + if not np.any(interp_times): + self.log(logging.DEBUG, + "No value of the target label found between the minimum and " + "maximum possible interpolation times") + return + + to_send = dict() + + # Building the dict of values to send + for label, values in self._data.items(): + to_send[label] = list(np.interp(interp_times, values[0], values[1])) + # Keeping the last data point before max_t to pass this information on + last = values[:, values[0] <= max_t][:, -1] + # Removing the used values from the buffer, except the last data point + self._data[label] = np.column_stack((last, values[:, values[0] > max_t])) + + if to_send: + # Adding the time values to the dict of values to send + to_send[self._time_label] = list(interp_times) + + # Sending the values + for i, _ in enumerate(interp_times): + self.send({label: values[i] for label, values in to_send.items()}) + + @staticmethod + def _default_array() -> np.ndarray: + """Helper function for the default dict.""" + + return np.array(([], []))