|
| 1 | +# coding: utf-8 |
| 2 | + |
| 3 | +import numpy as np |
| 4 | +from typing import Optional, Union, Iterable, Dict |
| 5 | +from collections import defaultdict |
| 6 | +import logging |
| 7 | + |
| 8 | +from .meta_block import Block |
| 9 | + |
| 10 | + |
| 11 | +class Synchronizer(Block): |
| 12 | + """This Block takes data from upstream Blocks as input and interpolates it to |
| 13 | + output all the labels on the same timestamps as a reference label. |
| 14 | +
|
| 15 | + This Block is very similar to the :class:`~crappy.blocks.Multiplexer` Block, |
| 16 | + but the `Multiplexer` interpolates data in a time base independent of the |
| 17 | + labels whereas this one takes one label as a reference. |
| 18 | +
|
| 19 | + It can take any number of inputs, provided that they all share a common time |
| 20 | + label. It is also possible to choose which labels are considered for |
| 21 | + interpolation and which are dropped. The interpolation is performed using the |
| 22 | + :obj:`numpy.interp` method. |
| 23 | +
|
| 24 | + This Block is useful for synchronizing data acquired from different sensors, |
| 25 | + in the context when one label should be treated as a reference. This is for |
| 26 | + example the case when synchronizing signals with the output of an image |
| 27 | + processing, to be able to compare all the signals in the time base of the |
| 28 | + image acquisition. |
| 29 | +
|
| 30 | + .. versionadded:: 2.0.5 |
| 31 | + """ |
| 32 | + |
| 33 | + def __init__(self, |
| 34 | + reference_label: str, |
| 35 | + time_label: str = 't(s)', |
| 36 | + labels_to_sync: Optional[Union[str, Iterable[str]]] = None, |
| 37 | + freq: Optional[float] = 50, |
| 38 | + display_freq: bool = False, |
| 39 | + debug: Optional[bool] = False) -> None: |
| 40 | + """Sets the arguments and initializes the parent class. |
| 41 | +
|
| 42 | + Args: |
| 43 | + reference_label: The label whose timestamps will be taken as a time base |
| 44 | + for performing the interpolation. |
| 45 | + time_label: The label carrying the time information. Should be common to |
| 46 | + all the input Blocks. |
| 47 | + labels_to_sync: An iterable (like a :obj:`list` or a :obj:`tuple`) |
| 48 | + containing the labels to interpolate on the reference label's time |
| 49 | + base, except for the time label that is given separately in the |
| 50 | + ``time_label`` argument. The Block also doesn't output anything until |
| 51 | + data has been received on all these labels. If left to :obj:`None`, all |
| 52 | + the received labels are considered. **It is recommended to always set |
| 53 | + this argument !** It is also possible to give this argument as a single |
| 54 | + :obj:`str` (i.e. not in an iterable). |
| 55 | + freq: The target looping frequency for the Block. If :obj:`None`, loops |
| 56 | + as fast as possible. |
| 57 | + display_freq: If :obj:`True`, displays the looping frequency of the |
| 58 | + Block. |
| 59 | + debug: If :obj:`True`, displays all the log messages including the |
| 60 | + :obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log |
| 61 | + messages with :obj:`~logging.INFO` level or higher. If :obj:`None`, |
| 62 | + disables logging for this Block. |
| 63 | + """ |
| 64 | + |
| 65 | + super().__init__() |
| 66 | + self.freq = freq |
| 67 | + self.display_freq = display_freq |
| 68 | + self.debug = debug |
| 69 | + |
| 70 | + # Initializing the attributes |
| 71 | + self._ref_label = reference_label |
| 72 | + self._time_label = time_label |
| 73 | + self._data: Dict[str, np.ndarray] = defaultdict(self._default_array) |
| 74 | + |
| 75 | + # Forcing the labels_to_sync into a list |
| 76 | + if labels_to_sync is not None and isinstance(labels_to_sync, str): |
| 77 | + self._to_sync = [labels_to_sync] |
| 78 | + elif labels_to_sync is not None: |
| 79 | + self._to_sync = list(labels_to_sync) |
| 80 | + else: |
| 81 | + self._to_sync = None |
| 82 | + |
| 83 | + def loop(self) -> None: |
| 84 | + """Receives data, interpolates it, and sends it to the downstream |
| 85 | + Blocks.""" |
| 86 | + |
| 87 | + # Receiving all the upcoming data |
| 88 | + data = self.recv_all_data_raw() |
| 89 | + |
| 90 | + # Iterating over all the links |
| 91 | + for link_data in data: |
| 92 | + # Only data associated with a time label can be synchronized |
| 93 | + if self._time_label not in link_data: |
| 94 | + continue |
| 95 | + # Extracting the time information from the data |
| 96 | + timestamps = link_data.pop(self._time_label) |
| 97 | + |
| 98 | + # Adding data from each label in the buffer |
| 99 | + for label, values in link_data.items(): |
| 100 | + # Only the labels specified in out_labels is considered |
| 101 | + if (self._to_sync is not None and label not in self._to_sync |
| 102 | + and label != self._ref_label): |
| 103 | + continue |
| 104 | + |
| 105 | + # Adding the received values to the buffered ones |
| 106 | + self._data[label] = np.concatenate((self._data[label], |
| 107 | + np.array((timestamps, values))), |
| 108 | + axis=1) |
| 109 | + # Sorting the buffered data, if a same label comes from multiple Links |
| 110 | + self._data[label] = self._data[label][ |
| 111 | + :, self._data[label][0].argsort()] |
| 112 | + |
| 113 | + # Aborting if there's no data to process |
| 114 | + if not self._data: |
| 115 | + self.log(logging.DEBUG, "No data in the buffer to process") |
| 116 | + return |
| 117 | + |
| 118 | + # Aborting if there's no data for the reference label |
| 119 | + if self._ref_label not in self._data: |
| 120 | + self.log(logging.DEBUG, "No value for the reference label found in " |
| 121 | + "the buffer") |
| 122 | + return |
| 123 | + |
| 124 | + # Making sure there's data for all the requested labels |
| 125 | + if (self._to_sync is not None and |
| 126 | + any(label not in self._data for label in self._to_sync)): |
| 127 | + self.log(logging.DEBUG, "Not all the requested labels received yet") |
| 128 | + return |
| 129 | + |
| 130 | + # There should also be at least two values for each label |
| 131 | + if any(len(self._data[label][0]) < 2 for label in self._data): |
| 132 | + self.log(logging.DEBUG, "Not at least 2 values for each label in buffer") |
| 133 | + return |
| 134 | + |
| 135 | + # Getting the minimum time for the interpolation (maximin over all labels) |
| 136 | + min_t = max(data[0, 0] for data in self._data.values()) |
| 137 | + |
| 138 | + # Getting the maximum time for the interpolation (minimax over all labels) |
| 139 | + max_t = min(data[0, -1] for data in self._data.values()) |
| 140 | + |
| 141 | + # Checking if there's a valid time range for interpolation |
| 142 | + if max_t < min_t: |
| 143 | + self.log(logging.DEBUG, "Ranges not matching for interpolation") |
| 144 | + return |
| 145 | + |
| 146 | + # The array containing the timestamps for interpolating |
| 147 | + interp_times = self._data[self._ref_label][0, |
| 148 | + (self._data[self._ref_label][0] >= min_t) & |
| 149 | + (self._data[self._ref_label][0] <= max_t)] |
| 150 | + |
| 151 | + # Checking if there are values for the target label in the valid time range |
| 152 | + if not np.any(interp_times): |
| 153 | + self.log(logging.DEBUG, |
| 154 | + "No value of the target label found between the minimum and " |
| 155 | + "maximum possible interpolation times") |
| 156 | + return |
| 157 | + |
| 158 | + to_send = dict() |
| 159 | + |
| 160 | + # Building the dict of values to send |
| 161 | + for label, values in self._data.items(): |
| 162 | + to_send[label] = list(np.interp(interp_times, values[0], values[1])) |
| 163 | + # Keeping the last data point before max_t to pass this information on |
| 164 | + last = values[:, values[0] <= max_t][:, -1] |
| 165 | + # Removing the used values from the buffer, except the last data point |
| 166 | + self._data[label] = np.column_stack((last, values[:, values[0] > max_t])) |
| 167 | + |
| 168 | + if to_send: |
| 169 | + # Adding the time values to the dict of values to send |
| 170 | + to_send[self._time_label] = list(interp_times) |
| 171 | + |
| 172 | + # Sending the values |
| 173 | + for i, _ in enumerate(interp_times): |
| 174 | + self.send({label: values[i] for label, values in to_send.items()}) |
| 175 | + |
| 176 | + @staticmethod |
| 177 | + def _default_array() -> np.ndarray: |
| 178 | + """Helper function for the default dict.""" |
| 179 | + |
| 180 | + return np.array(([], [])) |
0 commit comments