Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries for schema registry errors #673

Closed

Conversation

gordon-rennie
Copy link

Initial PR for feedback - aims to fix #663

Goal

Prevent transient schema registry API errors from blowing up Vulcan ser/des by adding retries with sensible default config.

Implementation

  • some code analysis was require to determine when to retry:
    • schema registry client operations can throw IOException or RestClientException
    • for serialization, the relevant Java KafkaAvroSerializer error handling maps both IOException and RestClientException (if non-client error code) to SerializationException
    • for deserialization, the vulcan code makes a call using the schema registry client itself and further calls are made by the underlying Java code (1, 2, 3) which are mapped again to SerializationException
    • tl;dr: retry on IOException, RestClientException, SerializationException
  • added retry logic for the above exceptions
  • added config for number of retries and backoff duration
    • default config is 5 retries with exponential backoff starting at 50 millis.

Notes

  • Adding backoff to retries requires Temporal[F], which requires upgrading from F: Sync to F: Async. Presumably this is considered a breaking change and so this feature needs to start targetting 3.x instead? I'm new to library code, just let me know 😄
  • There is no provision for logging in fs2-kafka-vulcan (there's only a private, sealed interface in fs2-kafka core) so I have not added any
  • Retry logic is hand-crafted rather than using a dependency like cats-retry
  • I was influenced by discussions in issue Rework or remove CommitRecovery #623 not to create any new retry interfaces and not to use Jitter[F], which we seem to want to move away from
    • from the discussions in this issue, I think we would prefer a more generic interface like def schemaRegistryRetryPolicy[A]: (F[A], Throwable) => F[A] - this would give good flexibility for clients and allow interop with libraries like cats-retry. I really like this idea. Unfortunately I can't see a way to make this work: deserialization will return an arbitrary F[A] and A is basically a wildcard type that needs to accommodate any arbitrary vulcan.Codec[A]. Wildcard types aren't supported for anonymous functions, e.g. I can't do:
  private[this] final case class AvroSettingsImpl[F[_]](
    ...
    schemaRegistryRetryPolicy: (F[*], SerializationException) => F[*]

feedback on the interface very welcome!

TODO

  • agree/finalize interface changes
  • unit test

@gordon-rennie gordon-rennie changed the base branch from series/2.x to series/3.x August 31, 2021 20:55
@gordon-rennie
Copy link
Author

gordon-rennie commented Aug 31, 2021

(rebasing from 2.x to 3.x had issues and conflicts, I just applied the diff as a patch onto 3.x instead before force-pushing but GitHub hasn't liked that. I will re-raise the PR targetting 3.x)

@gordon-rennie gordon-rennie deleted the add-schema-registry-retry branch September 4, 2021 22:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Improve resilience of ser/des using Vulcan amd Schema Registry
1 participant