Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve resilience of ser/des using Vulcan amd Schema Registry #663

Open
gordon-rennie opened this issue Aug 20, 2021 · 3 comments
Open

Comments

@gordon-rennie
Copy link

Serialisation / deserialisation using vulcan codecs in fs2-kafka-vulcan requires communication with a Schema Registry REST API, implemented in Confluent's Java code which fs2-kafka-vulcan wraps. Transient problems communicating with the registry's API - e.g. network problems, timeouts, unexpected server-side errors - bubble up from the schema registry client's Java code to fs2-kafka-vulcan and cause fatal exceptions unless deliberately handled in your application code - fs2-kafka-vulcan makes no attempts to retry. This makes Avro ser/des a weak point for fs2-kafka application resiliency.

For example, I recently experienced an unhealthy Kafka cluster affecting both broker and schema registry. Interactions with the broker were robust and self-recovered from various errors, but a SocketTimeoutException communicating with the Schema Registry during serialisation blew up the application (stacktrace).

Is there appetite to include Schema Registry retries in fs2-kafka-vulcan? As I see it, ideally the library would have some default retry config - similar to the situation with the Kafka broker publisher/consumer. Having reviewed the Confluent Java code, there's no allowance for retries in the client or related classes, so this would have to be added in fs2-kafka-vulcan.

@gordon-rennie
Copy link
Author

gordon-rennie commented Aug 20, 2021

I have done some code analysis for serialisation:

  • fs2-kafka-vulcan interfaces with the Java code here:
    case Right(value) => F.pure(serializer.serialize(topic, value))
  • F.pure(serializer.serialize(topic, value)) doesn't seem quite right to me - serializer.serialize calls into the Java code, which is not pure and performs IO including checking a schema cache and (on cache miss) making a schema registry API call. This seems like the call site that should be enhanced with retry logic.
  • serializer.serialize(...) invokes Java code in KafkaAvroSerializer which defers to the AbstractKafkaAvroSerializer here.
  • the API calls (possibly, depending on cache) invoked from here can return IOException or RestClientException, and are caught and mapped here to either SerializationException (as observed in my linked stack trace) or an org.apache.kafka.common.KafkaException respectively. Information about the API response's statusCode: int and errorCode: int are preserved in the causing RestClientException and could be used to be selective about which cases to retry on.

@bplommer
Copy link
Member

  • F.pure(serializer.serialize(topic, value)) doesn't seem quite right to me - serializer.serialize calls into the Java code, which is not pure and performs IO including checking a schema cache and (on cache miss) making a schema registry API call.

This is wrapped in a defer block, so it does end up getting suspended. But I agree it's not the clearest way to do it.

This seems like the call site that should be enhanced with retry logic.

I agree - a PR would be very welcome!

@gordon-rennie
Copy link
Author

I agree - a PR would be very welcome!

👍 thanks for the feedback! I am happy to take a crack at this (I will also review and include deserialisation in scope, too).

@gordon-rennie gordon-rennie changed the title Improve resilience of serialisation using Vulcan amd Schema Registry Improve resilience of ser/des using Vulcan amd Schema Registry Aug 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
2 participants