From 4e53facb2891671cbfc4c011e2a8a0dae83e5baa Mon Sep 17 00:00:00 2001 From: Fredrik Olsson Date: Fri, 1 Oct 2021 09:44:47 +0200 Subject: [PATCH] First stab at generic exception handling --- streamz/core.py | 60 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index debf8dac..497591ab 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -254,6 +254,9 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None, else: self.upstreams = [] + # Lazily loaded exception handler to avoid recursion + self._on_exception = None + self._set_asynchronous(asynchronous) self._set_loop(loop) if ensure_io_loop and not self.loop: @@ -445,14 +448,18 @@ def _emit(self, x, metadata=None): result = [] for downstream in list(self.downstreams): - r = downstream.update(x, who=self, metadata=metadata) - - if type(r) is list: - result.extend(r) - else: - result.append(r) + try: + r = downstream.update(x, who=self, metadata=metadata) - self._release_refs(metadata) + if type(r) is list: + result.extend(r) + else: + result.append(r) + except Exception as exc: + # Push this exception to the on_exception handler on the downstream that raised + downstream.on_exception().update((x, exc) , who=self, metadata=metadata) + finally: + self._release_refs(metadata) return [element for element in result if element is not None] @@ -671,6 +678,30 @@ def _release_refs(self, metadata, n=1): if 'ref' in m: m['ref'].release(n) + def on_exception(self): + """Returns the exception handler associated with this stream + """ + self._on_exception = self._on_exception or _on_exception() + return self._on_exception + + +class InvalidDataError(Exception): + pass + +class _on_exception(Stream): + + def __init__(self, *args, **kwargs): + self.silent = False + Stream.__init__(self, *args, **kwargs) + + def update(self, x, who=None, metadata=None): + cause, exc = x + + if self.silent or len(self.downstreams) > 0: + self._emit(x, metadata=metadata) + else: + logger.exception(exc) + raise InvalidDataError(cause) from exc @Stream.register_api() class map(Stream): @@ -706,13 +737,8 @@ def __init__(self, upstream, func, *args, **kwargs): Stream.__init__(self, upstream, stream_name=stream_name) def update(self, x, who=None, metadata=None): - try: - result = self.func(x, *self.args, **self.kwargs) - except Exception as e: - logger.exception(e) - raise - else: - return self._emit(result, metadata=metadata) + result = self.func(x, *self.args, **self.kwargs) + self._emit(result, metadata=metadata) @Stream.register_api() @@ -890,11 +916,7 @@ def update(self, x, who=None, metadata=None): else: return self._emit(x, metadata=metadata) else: - try: - result = self.func(self.state, x, **self.kwargs) - except Exception as e: - logger.exception(e) - raise + result = self.func(self.state, x, **self.kwargs) if self.returns_state: state, result = result else: