-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add retries for schema registry errors #683
Add retries for schema registry errors #683
Conversation
def Default[F[_]](implicit F: Async[F]): SchemaRegistryClientRetry[F] = | ||
new SchemaRegistryClientRetry[F] { | ||
override def withRetry[A](action: F[A]): F[A] = { | ||
def retry(attempt: Int, action: F[A]): F[A] = | ||
action.handleErrorWith( | ||
err => | ||
if ((attempt + 1) <= 10 && isRetriable(err)) | ||
F.sleep((10 * Math.pow(2, attempt.toDouble)).millis) >> retry(attempt + 1, action) | ||
else | ||
F.raiseError(err) | ||
) | ||
|
||
retry(attempt = 1, action) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java SchemaRegistryClient uses an underlying RestService
with 60 second timeouts. Ten attempts is probably far too many if timeouts are occurring. I'm thinking of mitigating by reducing to ~3 attempts with a more aggressive backoff, 100 millis and 1000 millis.
Hi @gordon-rennie - so sorry for dropping the ball on this! I'll do my best to look at this properly over the weekend. |
No worries at all, thanks! |
modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala
Outdated
Show resolved
Hide resolved
modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientRetry.scala
Outdated
Show resolved
Hide resolved
modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientRetry.scala
Outdated
Show resolved
Hide resolved
modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientRetry.scala
Outdated
Show resolved
Hide resolved
This looks great! I think I'm happy with the approach - any thoughts on this @vlovgr? |
Co-authored-by: Ben Plommer <[email protected]>
Co-authored-by: Ben Plommer <[email protected]>
Co-authored-by: Ben Plommer <[email protected]>
Co-authored-by: Ben Plommer <[email protected]>
Agree with @bplommer -- this looks great! Thanks @gordon-rennie! |
Closing as master has drifted significantly - the last time I reviewed the situation, new changes on master made this approach unattractive and it would be better implemented by having a pure interface wrapping the Java schema registry client, then using retry combinators on the call sites. |
Fixes #663. Updated from abandoned PR #673: rebased to target 3.x and improved retry interface. Restructured PR description for readability
Goal
Prevent transient schema registry API errors from blowing up Vulcan serialisation/deserialisation by adding retries with sensible default config.
Implementation
trait SchemaRegistryClientRetry[F[_]]
allowing a retry strategy on an arbitrary computation that uses the schema registry,F[A]
, to be specified.Throwable
s should we retry?" below).Sync
to be upgraded toAsync
on someAvroSettings
constructors; therefore this change is breaking and targets3.x
Retry Interface Design
Feedback is very welcome here. I took care in designing this, drawing significant influence from discussions on issue #623 about commit retry.
Proposed Interface
Benefits: very flexible and composable; users can bring their own
F
to perform logging etc. Easy interop with e.g.cats-retry
library. Robust against future implementation changes in Vulcan or the underlying Java libraries; the retry can be applied to any computation thanks to the type param.Cons: required an intermediate trait to get the type parameter
[A]
because an anonymous function can't have a type param, e.g. we can't dodef schemaRegistryClientRetry: F[A] => F[A]
.To try and get rid of the need of the type param and new trait, I also considered a variation of this proposal where the three Vulcan call sites that use the schema registry (directly or indirectly) could take one non-type-paramaterised anonymous function each as retry strategies (e.g.
def schemaRegistryGetSchemaRetry: F[ParsedSchema] => F[ParsedSchema]
), but this would leave the retry config extremely coupled to the implementation details of the existing ser/des.Alternative 1
(this was my first iteration)
Benefits: simple interface with no new traits.
Cons: inflexible; no provision to add logging etc. No
cats-retry
interop.Alternative 2
Implement tagless final traits as facades over the relevant Java schema registry classes, allow retries to be specified on their implementations.
Cons: this requires huge change.
What
Throwable
s should we retry?Some code analysis was required to determine where the schema registry is used and what exceptions it can throw. I wanted to avoid retrying on every error, which would include deterministic failures.
IOException
orRestClientException
IOException
andRestClientException
(if non-client http error code) toSerializationException
SerializationException
TL;DR this new helper function catches the relevant exceptions:
I made the function public so users of
cats-retry
etc. have it available to them for easy re-use.TODO
Once the interface is agreed, I need to add unit tests and docs.