@@ -94,7 +94,8 @@ def __init__(self):
94
94
95
95
self .display_measurement_summary = False
96
96
"""
97
- Defines if the subscriber should display a summary of received measurements every few seconds.
97
+ Defines if the subscriber should display a summary of received measurements
98
+ every few seconds.
98
99
"""
99
100
100
101
self ._grouped_data : Dict [np .uint64 , Dict [UUID , Measurement ]] = {}
@@ -267,7 +268,7 @@ def _new_measurements(self, measurements: List[Measurement]):
267
268
self ._lastmessage = time ()
268
269
269
270
def _publish_data (self , timestamp : np .uint64 , data_buffer : Dict [np .uint64 , Dict [UUID , Measurement ]]):
270
- time_str = Ticks .to_shortstring (timestamp ).split ("." )[0 ]
271
+ data_buffer_time_str = Ticks .to_shortstring (timestamp ).split ("." )[0 ]
271
272
272
273
if self ._process_lock .acquire (False ):
273
274
try :
@@ -276,13 +277,13 @@ def _publish_data(self, timestamp: np.uint64, data_buffer: Dict[np.uint64, Dict[
276
277
if self ._grouped_data_receiver is not None :
277
278
self ._grouped_data_receiver (self , timestamp , data_buffer )
278
279
279
- self .statusmessage (f"Data publication for buffer at { time_str } processed in { self ._get_elapsed_time_str (time () - process_started )} ." )
280
+ self .statusmessage (f"Data publication for buffer at { data_buffer_time_str } processed in { self ._get_elapsed_time_str (time () - process_started )} ." )
280
281
finally :
281
282
self ._process_lock .release ()
282
283
else :
283
284
with self ._process_missed_count_lock :
284
285
self ._process_missed_count += 1
285
- self .errormessage (f"WARNING: Data publication missed for buffer at { time_str } , a previous data buffer is still processing. { self ._process_missed_count :,} data sets missed so far..." )
286
+ self .errormessage (f"WARNING: Data publication missed for buffer at { data_buffer_time_str } , a previous data buffer is still processing. { self ._process_missed_count :,} data sets missed so far..." )
286
287
287
288
def _get_elapsed_time_str (self , elapsed : float ) -> str :
288
289
hours , rem = divmod (elapsed , 3600 )
@@ -308,13 +309,10 @@ def _connection_terminated(self):
308
309
self ._lastmessage = 0.0
309
310
310
311
# Reset grouped data on disconnect
311
- with self ._downsampled_count_lock :
312
- self ._downsampled_count = 0
312
+ self .downsampled_count = 0
313
313
314
314
# Reset process missed count on disconnect
315
- with self ._process_missed_count_lock :
316
- self ._process_missed_count += 1
317
-
315
+ self .process_missed_count = 0
318
316
319
317
def main ():
320
318
parser = argparse .ArgumentParser ()
@@ -340,15 +338,15 @@ def main():
340
338
finally :
341
339
subscriber .dispose ()
342
340
343
-
344
-
345
341
def process_data (subscriber : GroupedDataSubscriber , timestamp : np .uint64 , data_buffer : Dict [np .uint64 , Dict [UUID , Measurement ]]):
346
342
"""
347
343
User defined callback function that handles grouped data that has been received.
348
344
349
345
Note: This function is called by the subscriber when grouped data is available for processing.
350
- Normally the function is called once per second with a buffer of grouped data for the second.
351
- The call frequency can be higher if the processing of the data takes longer than a second.
346
+ The function will only be called once per second with a buffer of grouped data for the second.
347
+ If the function processing time exceeds the one second window, a warning message will be displayed
348
+ and the data will be skipped. The number of skipped data sets is tracked and reported through the
349
+ process_missed_count property.
352
350
353
351
Parameters:
354
352
timestamp: The timestamp, at top of second, for the grouped data
@@ -357,7 +355,7 @@ def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_b
357
355
Dict[UUID, Measurement]: aligned measurements for the sub-second timestamp
358
356
"""
359
357
360
- # Calculate average frequency for all frequencies in the one second buffer
358
+ # In this example, we calculate average frequency for all frequencies in the one second buffer
361
359
frequency_sum = 0.0
362
360
frequency_count = 0
363
361
@@ -395,8 +393,10 @@ def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_b
395
393
396
394
# Ensure frequency is in reasonable range (59.95 to 60.05 Hz) and not NaN
397
395
if not np .isnan (measurement .value ) and measurement .value >= 59.95 and measurement .value <= 60.05 :
398
- frequency_sum += subscriber .adjustedvalue (measurement )
399
- #frequency_sum += measurement.value
396
+ # The following line demonstrates how to use the value of a measurement based on its
397
+ # linear adjustment factor metadata , i.e., the configured adder and multiplier:
398
+ #frequency_sum += subscriber.adjustedvalue(measurement)
399
+ frequency_sum += measurement .value # raw, unadjusted value
400
400
frequency_count += 1
401
401
402
402
average_frequency = frequency_sum / frequency_count
0 commit comments