|
1 | 1 | # Copyright (c) 2025 Cumulocity GmbH |
2 | 2 |
|
3 | | -import logging |
4 | | -import threading |
5 | | -import time |
6 | | -from concurrent.futures import wait, Future |
7 | | -from concurrent.futures.thread import ThreadPoolExecutor |
8 | | -from typing import Callable, Union |
9 | 3 |
|
10 | | -from c8y_api.app import MultiTenantCumulocityApp |
| 4 | +from c8y_tk.app.interactive import * |
| 5 | +from c8y_tk.app.subscription_listener import * |
11 | 6 |
|
12 | | - |
13 | | -class SubscriptionListener: |
14 | | - """Multi-tenant subscription listener. |
15 | | -
|
16 | | - When implementing a multi-tenant microservice it is sometimes required to |
17 | | - keep track of the tenants which subscribe to the microservice. |
18 | | - Effectively, this needs to be done via polling the `get_subscribers` |
19 | | - function of the MultiTenantCumulocityApp class. |
20 | | -
|
21 | | - The `SubscriptionListener` class provides a default implementation of |
22 | | - such a polling mechanism which can be easily integrated using callbacks. |
23 | | - """ |
24 | | - |
25 | | - # instance counter to ensure unique loggers |
26 | | - _n = 0 |
27 | | - |
28 | | - def __init__( |
29 | | - self, |
30 | | - app: MultiTenantCumulocityApp, |
31 | | - callback: Callable[[list[str]], None] = None, |
32 | | - max_threads: int = 5, |
33 | | - blocking: bool = True, |
34 | | - polling_interval: float = 3600, |
35 | | - startup_delay: float = 60, |
36 | | - ): |
37 | | - """Create and initialize a new instance. |
38 | | -
|
39 | | - See also the `add_callback` function which can be used to add callbacks |
40 | | - in a more fine-granular fashion. |
41 | | -
|
42 | | - Args: |
43 | | - app (MultiTenantCumulocityApp): The microservice app instance |
44 | | - callback (Callable): A callback to be invoked when another tenant |
45 | | - subscribes or unsubscribes from the microservice; The function |
46 | | - will be invoked with the current list of subscribers. |
47 | | - blocking (bool): Whether the `callback` function will be invoked |
48 | | - in blocking fashion (True, default) or detached in a thread |
49 | | - (False). |
50 | | - polling_interval (float): The polling interval |
51 | | - startup_delay (float): A minimum delay before a newly added |
52 | | - microservice is considered to be "added" (the callback |
53 | | - invocation will be delayed by this). |
54 | | - """ |
55 | | - self._n = self._n + 1 |
56 | | - self._instance_name = f"{__name__}.{type(self).__name__}[{self._n}]"\ |
57 | | - if self._n > 1 else f"{__name__}.{type(self).__name__}" |
58 | | - self.app = app |
59 | | - self.max_threads = max_threads |
60 | | - self.startup_delay = startup_delay |
61 | | - self.polling_interval = polling_interval |
62 | | - self.callbacks = [(callback, blocking)] if callback else [] |
63 | | - self.callbacks_on_add = [] |
64 | | - self.callbacks_on_remove = [] |
65 | | - self._log = logging.getLogger(self._instance_name) |
66 | | - self._executor = None |
67 | | - self._callback_futures = set() |
68 | | - self._listen_thread = None |
69 | | - self._is_closed = False |
70 | | - |
71 | | - def _cleanup_future(self, future): |
72 | | - """Remove a finished future from the internal list.""" |
73 | | - self._callback_futures.remove(future) |
74 | | - |
75 | | - def add_callback( |
76 | | - self, |
77 | | - callback: Callable[[Union[str ,list[str]]], None], |
78 | | - blocking: bool = True, |
79 | | - when: str = 'any', |
80 | | - ) -> "SubscriptionListener": |
81 | | - """Add a callback function to be invoked if a tenant subscribes |
82 | | - to/unsubscribes from the monitored multi-tenant microservice. |
83 | | -
|
84 | | - Note: multiple callbacks (even listening to the same event) can |
85 | | - be defined. The `add_callback` function supports a fluent interface, |
86 | | - i.e. it can be chained, to ease configuration. |
87 | | -
|
88 | | - Args: |
89 | | - callback (Callable): A callback function to invoke in case |
90 | | - of a change in subscribers. If parameter `when` is either |
91 | | - "added" or "removed" the function is invoked with a single |
92 | | - tenant ID for every added/removed subscriber respectively. |
93 | | - Otherwise (or if "always/any"), the callback function is |
94 | | - invoked with a list of the current subscriber's tenant IDs. |
95 | | - blocking (bool): Whether to invoke the callback function in a |
96 | | - blocking fashion (default) or not. If False, a thread is |
97 | | - spawned for each invocation. |
98 | | - when (str): When to invoke this particular callback function. |
99 | | - If "added" or "removed" the callback function is invoked with |
100 | | - a single tenant ID for every added/removed subscriber |
101 | | - respectively. Otherwise (or if "always/any"), the callback |
102 | | - function is invoked with a list of the current subscriber's |
103 | | - tenant IDs. |
104 | | - """ |
105 | | - if when in {'always', 'any'}: |
106 | | - self.callbacks.append((callback, blocking)) |
107 | | - return self |
108 | | - if when == 'added': |
109 | | - self.callbacks_on_add.append((callback, blocking)) |
110 | | - return self |
111 | | - if when == 'removed': |
112 | | - self.callbacks_on_remove.append((callback, blocking)) |
113 | | - return self |
114 | | - raise ValueError(f"Invalid activation mode: {when}") |
115 | | - |
116 | | - def listen(self): |
117 | | - """Start the listener. |
118 | | -
|
119 | | - This is blocking. |
120 | | - """ |
121 | | - # safely invoke a callback function blocking or non-blocking |
122 | | - def invoke_callback(callback, is_blocking, _, arg): |
123 | | - def safe_invoke(a): |
124 | | - # pylint: disable=broad-exception-caught |
125 | | - try: |
126 | | - self._log.debug(f"Invoking callback: {callback}") |
127 | | - callback(a) |
128 | | - except Exception as error: |
129 | | - self._log.error(f"Uncaught exception in callback: {error}", exc_info=error) |
130 | | - if is_blocking: |
131 | | - safe_invoke(arg) |
132 | | - else: |
133 | | - future = self._executor.submit(safe_invoke, arg) |
134 | | - self._callback_futures.add(future) |
135 | | - future.add_done_callback(self._cleanup_future) |
136 | | - |
137 | | - # create an executor if there are non-blocking callbacks |
138 | | - if any(not x[1] for x in (*self.callbacks, *self.callbacks_on_add, *self.callbacks_on_remove)): |
139 | | - self._executor = ThreadPoolExecutor( |
140 | | - max_workers=self.max_threads, |
141 | | - thread_name_prefix=self._instance_name) |
142 | | - |
143 | | - last_subscribers = set() |
144 | | - next_run = 0 |
145 | | - while not self._is_closed: |
146 | | - # sleep until next poll |
147 | | - now = time.monotonic() |
148 | | - if not now > next_run: |
149 | | - time.sleep(next_run - now) |
150 | | - # read & check current subscribers |
151 | | - current_subscribers = set(self.app.get_subscribers()) |
152 | | - added = current_subscribers - last_subscribers |
153 | | - removed = last_subscribers - current_subscribers |
154 | | - # run 'removed' callbacks |
155 | | - for tenant_id in removed: |
156 | | - self._log.info(f"Tenant subscription removed: {tenant_id}.") |
157 | | - for fun, blocking in self.callbacks_on_remove: |
158 | | - invoke_callback(fun, blocking, 'Removed', tenant_id) |
159 | | - # wait remaining time for startup delay |
160 | | - if added and self.startup_delay: |
161 | | - min_startup_delay = self.startup_delay - (time.monotonic() - now) |
162 | | - if min_startup_delay > 0: |
163 | | - time.sleep(min_startup_delay) |
164 | | - # run 'added' callbacks |
165 | | - for tenant_id in added: |
166 | | - self._log.info(f"Tenant subscription added: {tenant_id}.") |
167 | | - for fun, blocking in self.callbacks_on_add: |
168 | | - invoke_callback(fun, blocking, 'Added', tenant_id) |
169 | | - # run 'any' callbacks |
170 | | - if added or removed: |
171 | | - self._log.info(f"Tenant subscriptions changed: {current_subscribers}.") |
172 | | - for fun, blocking in self.callbacks: |
173 | | - invoke_callback(fun, blocking, None, current_subscribers) |
174 | | - # set new baseline |
175 | | - last_subscribers = current_subscribers |
176 | | - # schedule next run, skip if already exceeded |
177 | | - next_run = time.monotonic() + self.polling_interval |
178 | | - self._log.debug(f"Next run: ${next_run}.") |
179 | | - # release GIL |
180 | | - time.sleep(0.1) |
181 | | - |
182 | | - # shutdown executor, but don't wait for the callbacks |
183 | | - if self._executor: |
184 | | - self._executor.shutdown(wait=False, cancel_futures=False) |
185 | | - |
186 | | - def start(self) -> threading.Thread: |
187 | | - """Start the listener in a separate thread. |
188 | | -
|
189 | | - This function will return immediately. The listening can be stopped |
190 | | - using the `shutdown` function. |
191 | | -
|
192 | | - Returns: |
193 | | - The created Thread. |
194 | | - """ |
195 | | - self._listen_thread = threading.Thread(target=self.listen, name=f"{self._instance_name}Main") |
196 | | - self._listen_thread.start() |
197 | | - return self._listen_thread |
198 | | - |
199 | | - def stop(self): |
200 | | - """Signal to stop the listening thread. |
201 | | -
|
202 | | - This function returns immediately; neither the completion of the |
203 | | - `listen` function, nor potentially running callbacks are awaited. |
204 | | - Use this, if the `listen` function is running in a thread managed |
205 | | - by your code. |
206 | | -
|
207 | | - See also: |
208 | | - Function `await_callbacks`, to await the completion of potentially |
209 | | - running callback functions. |
210 | | - Functions `start` and `shutdown` if you don't want to manage the |
211 | | - listening thread on your own. |
212 | | - """ |
213 | | - self._is_closed = True |
214 | | - |
215 | | - def shutdown(self, timeout: float = None): |
216 | | - """Shutdown the listener thread and wait for it to finish. |
217 | | -
|
218 | | - This function can only be invoked if the listener thread was started |
219 | | - using the `start` function (i.e. the thread is managed by this class). |
220 | | - Otherwise, the `stop` function should be used. |
221 | | -
|
222 | | - Args: |
223 | | - timeout (float): Maximum wait time (None to wait indefinitely). |
224 | | -
|
225 | | - Raises: |
226 | | - TimeoutError, if the shutdown could not complete within the |
227 | | - specified timeout. The shutdown procedure is not interrupted |
228 | | - by this and will complete eventually. |
229 | | - """ |
230 | | - if not self._listen_thread: |
231 | | - raise RuntimeError("Listener thread is maintained elsewhere. Nothing to do.") |
232 | | - self.stop() |
233 | | - # wait for listen thread |
234 | | - start = time.monotonic() |
235 | | - self._listen_thread.join(timeout=timeout) |
236 | | - # wait for callbacks if there is time |
237 | | - if not timeout: |
238 | | - self.await_callbacks() |
239 | | - else: |
240 | | - remaining = timeout - (time.monotonic() - start) |
241 | | - if remaining > 0: |
242 | | - self.await_callbacks(timeout=remaining) |
243 | | - # raise timeout error if not complete |
244 | | - if self._listen_thread.is_alive() or ( |
245 | | - self._executor and self.get_callbacks()): |
246 | | - raise TimeoutError(f"Listener thread did not close within the specified timeout ({timeout}s).") |
247 | | - |
248 | | - def get_callbacks(self) -> list[Future]: |
249 | | - """Get currently running callbacks. |
250 | | -
|
251 | | - This function can be used to gain direct access to the currently |
252 | | - running callback threads. Usually, this is not necessary. |
253 | | -
|
254 | | - See also: |
255 | | - Function `await_callbacks` to await the completion of all |
256 | | - currently running callback threads. |
257 | | - """ |
258 | | - return [f for f in self._callback_futures if f.running()] |
259 | | - |
260 | | - def await_callbacks(self, timeout: float = None): |
261 | | - """Await running callbacks. |
262 | | -
|
263 | | - Args: |
264 | | - timeout (float): Maximum wait time (None to wait indefinitely) |
265 | | -
|
266 | | - Raises: |
267 | | - TimeoutError, if there are still running callbacks after the |
268 | | - specified timeout. |
269 | | - """ |
270 | | - wait(self._callback_futures, timeout=timeout) |
271 | | - if self.get_callbacks(): |
272 | | - raise TimeoutError(f"Callback functions did not complete within the specified timeout ({timeout}s).") |
| 7 | +__all__ = ['CumulocityApp', 'SubscriptionListener'] |
0 commit comments