-
Notifications
You must be signed in to change notification settings - Fork 0
/
MessagePayload.py
75 lines (63 loc) · 2.93 KB
/
MessagePayload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# MessagePayload
#
# Takes dict of keys/vals and makes a formatted payload message.
# Implemented as factory pattern that allows for variations in message formatting.
#
from abc import ABC, abstractmethod
class MessagePayload(ABC):
# pass array of keys to remove from message BEFORE or AFTER formatting
# allows for subclasses to use data and then remove it
#
# Typically, the caller will only supply preDropKeys if any and
# subclasses would set the postDropKeys as needed.
#
def __init__(self, d, config={'preDropKeys':[], 'postDropKeys':[]}) -> None:
self.payload = {}
self.preDropKeys = config.get('preDropKeys', [])
self.preDropKeys.append('')
self.postDropKeys = config.get('postDropKeys', [])
self._prepare_message(d)
def _prepare_message(self, d):
[ d.pop(k) for k in (set(self.preDropKeys) & set(d.keys())) ]
self.payload = d.copy()
self.make_message(d)
[ self.payload.pop(k) for k in (set(self.postDropKeys) & set(self.payload.keys())) ]
def message(self, formatter=None):
return self.payload if formatter == None else formatter(self.payload)
@abstractmethod
def make_message(self, d):
raise NotImplementedError("MessagePayload must be subclassed with an implementation of #prepare_message")
# SimpleLabelled Strategy just returns the dict
# the dict is assumed to be structured with 'key': value
# so no changes.
class SimpleLabelledPayload(MessagePayload):
def make_message(self, d):
# self.payload = d.copy()
pass
# DynamicLabelledPayload takes apart the dict and builds the payload
# the dict is of the format 'name': metric, 'value': reading
# and will be reformatted to 'metric': reading
#
class DynamicLabelledPayload(MessagePayload):
def __init__(self, d, config={'metricKey':'status', 'readingKey':'value', 'value_transform_function': float}) -> None:
self.metricKey = config.get('metricKey', 'status')
self.readingKey = config.get('readingKey', 'value')
self.transform = config.get('value_transform_function', float)
pdk = config.get('postDropKeys', [])
pdk.extend([self.metricKey, self.readingKey])
config['postDropKeys'] = pdk
super().__init__(d, config)
def make_message(self, d):
try:
self.payload[d[self.metricKey]] = self.transform(d[self.readingKey])
except Exception as e:
print("key or value didn't exist")
# UntimedDynamicLabelledPayload removes the timestamp from the payload
#
class UntimedDynamicLabelledPayload(DynamicLabelledPayload):
def __init__(self, d, config={'metricKey':'status', 'readingKey':'value', 'time_col_name': 'timestamp'}) -> None:
self.time_col_name = config.get('time_col_name', 'timestamp')
pdk = config.get('postDropKeys', [])
pdk.extend([self.time_col_name])
config['postDropKeys'] = pdk
super().__init__(d, config)