Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries for schema registry errors #683

Closed

Conversation

gordon-rennie
Copy link

@gordon-rennie gordon-rennie commented Sep 5, 2021

Fixes #663. Updated from abandoned PR #673: rebased to target 3.x and improved retry interface. Restructured PR description for readability

Goal

Prevent transient schema registry API errors from blowing up Vulcan serialisation/deserialisation by adding retries with sensible default config.

Implementation

  • add a new trait SchemaRegistryClientRetry[F[_]] allowing a retry strategy on an arbitrary computation that uses the schema registry, F[A], to be specified.
  • add default implementation that retries on schema registry exceptions identified in the underlying Java code (see "What Throwables should we retry?" below).
    • the default strategy uses a retry time backoff, requiring Sync to be upgraded to Async on some AvroSettings constructors; therefore this change is breaking and targets 3.x

Retry Interface Design

Feedback is very welcome here. I took care in designing this, drawing significant influence from discussions on issue #623 about commit retry.

Proposed Interface

trait SchemaRegistryClientRetry[F[_]] {

  def withRetry[A](action: F[A]): F[A]
}

// AvroSettings.scala
sealed abstract class AvroSettings[F[_]] {
  ...
  def schemaRegistryClientRetry: SchemaRegistryClientRetry[F]
}

Benefits: very flexible and composable; users can bring their own F to perform logging etc. Easy interop with e.g. cats-retry library. Robust against future implementation changes in Vulcan or the underlying Java libraries; the retry can be applied to any computation thanks to the type param.
Cons: required an intermediate trait to get the type parameter [A] because an anonymous function can't have a type param, e.g. we can't do def schemaRegistryClientRetry: F[A] => F[A].

To try and get rid of the need of the type param and new trait, I also considered a variation of this proposal where the three Vulcan call sites that use the schema registry (directly or indirectly) could take one non-type-paramaterised anonymous function each as retry strategies (e.g. def schemaRegistryGetSchemaRetry: F[ParsedSchema] => F[ParsedSchema]), but this would leave the retry config extremely coupled to the implementation details of the existing ser/des.

Alternative 1

(this was my first iteration)

// AvroSettings.scala
sealed abstract class AvroSettings[F[_]] {
  ...
  def schemaRegistryRetryMaxAttempts: Int
  def schemaRegistryRetryBackoffStrategy: Int => FiniteDuration
}

Benefits: simple interface with no new traits.
Cons: inflexible; no provision to add logging etc. No cats-retry interop.

Alternative 2

Implement tagless final traits as facades over the relevant Java schema registry classes, allow retries to be specified on their implementations.

Cons: this requires huge change.

What Throwables should we retry?

Some code analysis was required to determine where the schema registry is used and what exceptions it can throw. I wanted to avoid retrying on every error, which would include deterministic failures.

  • in general, schema registry client operations can throw IOException or RestClientException
  • for serialization, the relevant Java KafkaAvroSerializer error handling maps both IOException and RestClientException (if non-client http error code) to SerializationException
  • for deserialization, the vulcan code makes a call using the schema registry client itself and further calls are made by the underlying Java code (1, 2, 3) which are mapped again to SerializationException

TL;DR this new helper function catches the relevant exceptions:

  val isRetriable: Throwable => Boolean = {
    case _: SerializationException     => true
    case _: IOException                => true
    case apiError: RestClientException => apiError.getErrorCode >= 500
    case _                             => false
  }

I made the function public so users of cats-retry etc. have it available to them for easy re-use.

TODO

Once the interface is agreed, I need to add unit tests and docs.

Comment on lines +38 to +51
def Default[F[_]](implicit F: Async[F]): SchemaRegistryClientRetry[F] =
new SchemaRegistryClientRetry[F] {
override def withRetry[A](action: F[A]): F[A] = {
def retry(attempt: Int, action: F[A]): F[A] =
action.handleErrorWith(
err =>
if ((attempt + 1) <= 10 && isRetriable(err))
F.sleep((10 * Math.pow(2, attempt.toDouble)).millis) >> retry(attempt + 1, action)
else
F.raiseError(err)
)

retry(attempt = 1, action)
}
Copy link
Author

@gordon-rennie gordon-rennie Sep 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java SchemaRegistryClient uses an underlying RestService with 60 second timeouts. Ten attempts is probably far too many if timeouts are occurring. I'm thinking of mitigating by reducing to ~3 attempts with a more aggressive backoff, 100 millis and 1000 millis.

@gordon-rennie
Copy link
Author

Hi @bplommer, we spoke about this issue in #663 - I'm curious to know how you feel the PR turned out? It sprawled a little due to adding configuration of the retries -- figuring out what that interface should look like has easily been the most time-consuming part.

@bplommer
Copy link
Member

bplommer commented Oct 7, 2021

Hi @gordon-rennie - so sorry for dropping the ball on this! I'll do my best to look at this properly over the weekend.

@gordon-rennie
Copy link
Author

Hi @gordon-rennie - so sorry for dropping the ball on this! I'll do my best to look at this properly over the weekend.

No worries at all, thanks!

@bplommer
Copy link
Member

This looks great! I think I'm happy with the approach - any thoughts on this @vlovgr?

gordon-rennie and others added 4 commits October 18, 2021 17:25
Co-authored-by: Ben Plommer <[email protected]>
Co-authored-by: Ben Plommer <[email protected]>
Co-authored-by: Ben Plommer <[email protected]>
Co-authored-by: Ben Plommer <[email protected]>
@vlovgr
Copy link
Contributor

vlovgr commented Oct 19, 2021

Agree with @bplommer -- this looks great! Thanks @gordon-rennie!

@gordon-rennie
Copy link
Author

Closing as master has drifted significantly - the last time I reviewed the situation, new changes on master made this approach unattractive and it would be better implemented by having a pure interface wrapping the Java schema registry client, then using retry combinators on the call sites.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Improve resilience of ser/des using Vulcan amd Schema Registry
3 participants