Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic exception handling #433

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

freol35241
Copy link

@freol35241 freol35241 commented Oct 1, 2021

Work in progress: A (naive?) stab at generic exception handling within streamz.

This PR attaches an exception handler to each Stream object that acts as an optional start of a new pipeline branch.

from streamz import Stream

def problem(item):
    raise ValueError

s = Stream()
mapper = s.map(problem)

# Default
s.emit(123) # <-- raises and InvalidDataError

# explicit silencing of exceptions
mapper.on_exception().silent = True
s.emit(123) # No exception 

# Do something with the exception and the data that caused it
def problem_solver(item):
    data_that_caused_the_problem, resulting_exception = item
    # Do something
mapper.on_exception().map(problem_solver)
s.emit(123) # The exception and the faulty data is pushed downstream in the on_exception branch

This PR requires some linting, a few tests fail (but nothing major it seems) and updated docs. Consider it a starting point for discussion, mainly in relation to #86. Would appreciate some feedback on this before I continue.

@martindurant
Copy link
Member

I like this idea! I can't promise exactly when I can have a look, but soon.
First a couple of questions:

  • what happens if an exception is raised not by the next node, but something else downstream?
  • does this respect async backpressure? (I would expect the exception handler to not normally be async, but it could be)

@freol35241
Copy link
Author

freol35241 commented Oct 2, 2021

Hmm, good questions:

  • Since there is one exception handler associated with each node, each exception handler only deals with exceptions caused by "its own" node, nothing further downstream. On the other hand, exceptions caused by "its own" node can be both from a user-supplied function (for example as input to map and internal errors in streamz. Looking to my own use case this is exactly what I want, all exceptions caused by a node can be handled/not handled easily by the end user without interrupting a running pipeline.
  • To be honest, I am not knowledgeable enough about python async programming to give you an answer to this. On the other hand, the implementation is not really making any changes to the dataflow (still just using update of a downstream node) so I wouldn't think it should cause any problems.

All existing tests now pass and the linting complaints have been taken care of.

Remaining things:

  • add tests for the new functionality
  • docs

But I wont continue further before you have had a chance to take a look.

@martindurant
Copy link
Member

I have had a chance to glance at the code, and it had no occurred to me that the exception handling needs to go into all stream nodes, and handled upstream of the actual exception, in order to make things possible. I can see why you can't put exception handling into the node that is actually generating the exception, but might I suggest a less general change.

You could make a node for this specific functionality, something like

class ExceptionHandler(Stream):
    def __init__(self, upstream, on_error, catch=Exception, **kwargs): ...

    def update(self, x, who=None, metadata=None):
        try:
            self.emit(x, metadata=metadata)
        except self.catch as e:
            self.on_error.update((x, e), self, metadata=metadata)

(here, on_error is a streamz node, but it could be just a function maybe)
Unfortunately, the exception branch would not show up in the graph visualisation. Somewhere, probably in the checkpointing conversation, we already talked about the possibility of labelling off-track nodes (i.e., something not following upstream/downstream), but I don't think there was any progress on that.

Finally, InvalidDataError seems to me more like a testing entity, not something to put into the main core module.

cc @jsmaupin

@freol35241
Copy link
Author

Thanks for taking a look 👍

Yes, my opinion is that a fully generic and integrated mechanism for exception handling probably needs to be present in all nodes. With that said, your suggestion is a less intrusive (but makes "building" the pipeline a bit more verbose and perhaps slightly less intuitive (maybe a highly personal preference?)) so if you feel my suggestion is over the top feel free to close this PR.

@martindurant
Copy link
Member

I think I would prefer the minimal change - would you like to code it up along the lines of my suggestion? I think your example should be easy enough to make.

@freol35241
Copy link
Author

freol35241 commented Oct 12, 2021

@martindurant
I have been thinking a bit about this and the reason for me to put together the initial implementation in this PR was to try to get rid of my current way of "handling exceptions gracefully":

from streamz import Stream

def problem(item):
    try:
        raise ValueError
    except Exception as exc:
        return exc

s = Stream()
mapper = s.map(problem)
exception_branch = mapper.filter(lambda x: isinstance(x, Exception)) # Handle exceptions
everything_good = mapper.filter(lambda x: not isinstance(x, Exception)) # Continue as usual

After trying out your suggestion I realized I probably wouldn't use it since it messes with my mental model of the pipeline (i.e. needing to explicitly define an exception handler upstream of where an exception might happen). I can also be very sure where the exception originates from. So, I will choose to not implement your suggestion after all. Sorry for that.

On a slight side note: This could perhaps be solved by defining some kind of "composed" node which functions according to the above logic.

@martindurant
Copy link
Member

I think there is a general discussion to be had here, and perhaps @jsmaupin , @roveo and @chinmaychandak might have thoughts.

At the moment, a node can have multiple downstreamz, and they are all of equal importance. There are also some streamz nodes that take multiple upstramz (e.g., combine_latest), where one of those inputs may be special, and the logic for this is contained within the node.

We did discuss the possibility of multiple possible output stream types for a node, and the exception case would be one of these. We could annotate them with different line styles in the graph display. Do we have multiple output types? Should there be, for example, a where which passes each event to only one of two output streamz?

The main thing that I had against this PR as it stands, is that the logic for handling an exception is upstream, but the downstream holds the exception output stream. On the other hand, it's nice to only have to edit _emit to have this available for all node types, rather than the many versions of update(). After this discussion, I may be persuaded, so long as we take out the InvalidData stuff.

@roveo
Copy link
Contributor

roveo commented Oct 12, 2021

My thoughts:

  • In the general case, handlers should only apply to the stuff that happens in a paricular node's logic. This is currently tough to do, because calling the downstreams is done inside _emit, which is normally called inside .update, so there's no separation between result computation and passing the result downstream/calling the downstreams. There are multiple solutions to this (like having ._emit return a callable), but none I can think of are backwards-compatible. To achieve this, update should terminate before downstreams are called.

  • I think API-wise, which error type is handled should be more explicit, e.g.

dicts = src.map(json.loads)
dicts.on_exception(ParsingError).sink_to_file(...)

instead of checking for error type in the handler function and re-raising if it's not ParsingError.

  • Handlers should be part of the stream graph and be a 1st-class citizen. Let's say I'm getting JSON strings from a source and want to do something if there's an error parsing it:
dicts = src.map(json.loads)
invalid_batches = dicts.on_exception(ParsingError).partition(100, timeout=30)
invalid_batches.sink_to_s3(...)
invalid_batches.map(make_slack_message).sink_to_slack(...)

So I want to wait 30 seconds if there are any more bad records, write them to S3 for manual inspection and notify myself. This looks like a normal stream, just a different branch.

  • There's retrying always somewhere close to exception handling. We should think about how this would play with a 3rd-party library like tenacity or maybe integrate it into streamz. Ideally, I want to do something like:
sink = result.sink_to_db(...)
sink.on_exception(InvalidDataError).sink_to_file("rejecteddata.log")
sink.on_exception(ConnectionError).retry(wait_seconds=10, max_retries=5)
  • An idea that I'm not sure is good/in scope, but still want to share. Is there a use case for wrapping whole sections of a pipeline in a handler?
node.try().something().something_else().except(Exception).map(handle)

@martindurant
Copy link
Member

Note that it's OK to break the API, since almost all streamz code lives right here, and we can edit classes as needed to achieve what we want. Of course, we'd rather not change the user-called methods (a small number).

node.try().something().something_else().except(Exception).map(handle)

That is an interesting API! Now there would be something tricky to show graphically or indeed to implement :)

@freol35241
Copy link
Author

Stumbled upon this again after not having used streamz for a while. For what its worth, I currently use the following for exception handling, it gives me the flexibility I need but I wouldn't be surprised if it has some nasty side effects that I have yet to discover...

from types import MethodType
from streamz import Stream, Sink

@Stream.register_api()
class on_exception(Sink):
    def __init__(self, upstream: Stream, exception=Exception, **kwargs):
        super().__init__(upstream, **kwargs)

        original_upstream_update_method = upstream.update

        def _(upstream_self, x, who=None, metadata=None):
            try:
                return original_upstream_update_method(x, who, metadata)
            except exception as exc:
                # Pass down the branch started with this stream instead
                self._emit((x, exc), metadata)

        # Bind to upstream
        upstream.update = MethodType(_, upstream)

    def update(self, x, who=None, metadata=None):
        pass  # NO-OP

@martindurant
Copy link
Member

Hah, monkey-patch :)

I still think there is merit in making this kind of operation available for everyone. I'm afraid I still like my solution, which can be fleshed out a bit:

class ExceptionHandler(Stream):
    def __init__(self, upstream, on_error: Stream,
                 catch: Tuple[Exception] = (Exception,), retries=0, retry_wait=1, **kwargs):
        self.on_error = on_error
        self.catch = catch
        self.retries = retries
        self.retry_wait = retry_wait
        super().__init__(upstream, **kwargs)

    def update(self, x, who=None, metadata=None):
        retries = self.retries
        while True:
            try:
                return super().emit(x, metadata=metadata)
            except self.catch as e:
                if retries > 1:
                    time.sleep(self.retry_wait)  # or async?
                    retries -= 1
                    continue
                return self.on_error.update((x, e), self, metadata=metadata)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants