Skip to content

Commit 18bdfc2

Browse files
Updated GroupedDataSubscriber to process only one buffer publication at a time, and on a separate thread
1 parent d360f21 commit 18bdfc2

File tree

1 file changed

+68
-4
lines changed
  • examples/groupeddatasubscribe

1 file changed

+68
-4
lines changed

examples/groupeddatasubscribe/main.py

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ class GroupedDataSubscriber(Subscriber):
5555
the subsecond distribution, some data will be downsampled. Downsampled data count is tracked and reported to through the
5656
downsampled_count property.
5757
58+
Only a single one-second data buffer will be published at a time. If data cannot be processed within the one-second
59+
window, a warning message will be displayed and the data will be skipped. The number of skipped data sets is tracked
60+
and reported through the process_missed_count property.
61+
5862
This example depends on a semi-accurate system clock to group data by timestamp. If the system clock is not accurate,
5963
data may not be grouped as expected.
6064
"""
@@ -96,6 +100,11 @@ def __init__(self):
96100
self._downsampled_count_lock = threading.Lock()
97101
self._downsampled_count = 0
98102

103+
self._process_lock = threading.Lock()
104+
105+
self._process_missed_count_lock = threading.Lock()
106+
self._process_missed_count = 0
107+
99108
# Set up event handlers for STTP API
100109
self.set_subscriptionupdated_receiver(self._subscription_updated)
101110
self.set_newmeasurements_receiver(self._new_measurements)
@@ -118,7 +127,25 @@ def downsampled_count(self, value: np.int32):
118127

119128
with self._downsampled_count_lock:
120129
self._downsampled_count = value
130+
131+
@property
132+
def process_missed_count(self) -> int:
133+
"""
134+
Gets the count of missed data processing.
135+
"""
136+
137+
with self._process_missed_count_lock:
138+
return self._process_missed_count
121139

140+
@process_missed_count.setter
141+
def process_missed_count(self, value: np.int32):
142+
"""
143+
Sets the count of missed data processing.
144+
"""
145+
146+
with self._process_missed_count_lock:
147+
self._process_missed_count = value
148+
122149
def set_grouped_data_receiver(self, callback: Optional[Callable[[GroupedDataSubscriber, np.uint64, Dict[np.uint64, Dict[UUID, Measurement]]], None]]):
123150
"""
124151
Defines the callback function that handles grouped data that has been received.
@@ -209,9 +236,8 @@ def _new_measurements(self, measurements: List[Measurement]):
209236
if current_time - timestamp >= window_size:
210237
grouped_data = self._grouped_data.pop(timestamp)
211238

212-
# Call user defined data function handler with grouped data
213-
if self._grouped_data_receiver is not None:
214-
self._grouped_data_receiver(self, timestamp, grouped_data)
239+
# Call user defined data function handler with one-second grouped data buffer on a separate thread
240+
threading.Thread(target=self._publish_data, args=(timestamp, grouped_data), name="PublishDataThread").start()
215241

216242
# Provide user feedback on data reception
217243
if time() - self._lastmessage < 5.0:
@@ -236,6 +262,40 @@ def _new_measurements(self, measurements: List[Measurement]):
236262
finally:
237263
self._lastmessage = time()
238264

265+
def _publish_data(self, timestamp: np.uint64, data_buffer: Dict[np.uint64, Dict[UUID, Measurement]]):
266+
time_str = Ticks.to_shortstring(timestamp).split(".")[0]
267+
268+
if self._process_lock.acquire(False):
269+
try:
270+
process_started = time()
271+
272+
if self._grouped_data_receiver is not None:
273+
self._grouped_data_receiver(self, timestamp, data_buffer)
274+
275+
self.statusmessage(f"Data publication for buffer at {time_str} processed in {self._get_elapsed_time_str(time() - process_started)}.\n")
276+
finally:
277+
self._process_lock.release()
278+
else:
279+
with self._process_missed_count_lock:
280+
self._process_missed_count += 1
281+
self.errormessage(f"WARNING: Data publication missed for buffer at {time_str}, a previous data buffer is still processing. {self._process_missed_count:,} data sets missed so far...\n")
282+
283+
def _get_elapsed_time_str(self, elapsed: float) -> str:
284+
hours, rem = divmod(elapsed, 3600)
285+
minutes, seconds = divmod(rem, 60)
286+
milliseconds = (elapsed - int(elapsed)) * 1000
287+
288+
if hours < 1.0:
289+
if minutes < 1.0:
290+
if seconds < 1.0:
291+
return f"{int(milliseconds):03} ms"
292+
293+
return f"{int(seconds):02}.{int(milliseconds):03} sec"
294+
295+
return f"{int(minutes):02}:{int(seconds):02}.{int(milliseconds):03}"
296+
297+
return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}.{int(milliseconds):03}"
298+
239299
def _connection_terminated(self):
240300
# Call default implementation which will display a connection terminated message to stderr
241301
self.default_connectionterminated_receiver()
@@ -277,6 +337,10 @@ def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_b
277337
"""
278338
User defined callback function that handles grouped data that has been received.
279339
340+
Note: This function is called by the subscriber when grouped data is available for processing.
341+
Normally the function is called once per second with a buffer of grouped data for the second.
342+
The call frequency can be higher if the processing of the data takes longer than a second.
343+
280344
Parameters:
281345
timestamp: The timestamp, at top of second, for the grouped data
282346
data_buffer: The grouped one second data buffer:
@@ -328,7 +392,7 @@ def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_b
328392

329393
average_frequency = frequency_sum / frequency_count
330394

331-
print(f"Average frequency for {frequency_count:,} values in second {Ticks.to_shortstring(timestamp)}: {average_frequency:.6f} Hz")
395+
print(f"Average frequency for {frequency_count:,} values in second {Ticks.to_datetime(timestamp).second}: {average_frequency:.6f} Hz")
332396

333397
if subscriber.downsampled_count > 0:
334398
print(f" Downsampled {subscriber.downsampled_count:,} measurements in last measurement set...")

0 commit comments

Comments
 (0)