Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries for schema registry errors #683

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ package fs2.kafka.vulcan

import _root_.vulcan.Codec
import cats.effect.Sync
import cats.implicits._
import cats.syntax.functor._
import fs2.kafka.{Deserializer, RecordDeserializer}
import io.confluent.kafka.schemaregistry.avro.AvroSchema

import java.nio.ByteBuffer

final class AvroDeserializer[A] private[vulcan] (
Expand All @@ -25,23 +26,26 @@ final class AvroDeserializer[A] private[vulcan] (
settings.createAvroDeserializer(_).map {
case (deserializer, schemaRegistryClient) =>
Deserializer.instance { (topic, _, bytes) =>
F.defer {
val writerSchemaId =
ByteBuffer.wrap(bytes).getInt(1) // skip magic byte
settings.schemaRegistryClientRetry.withRetry(
F.defer {
val writerSchemaId =
ByteBuffer.wrap(bytes).getInt(1) // skip magic byte

val writerSchema = {
val schema = schemaRegistryClient.getSchemaById(writerSchemaId)
if (schema.isInstanceOf[AvroSchema])
schema.asInstanceOf[AvroSchema].rawSchema()
else
null
}
val writerSchema = {
val schema = schemaRegistryClient.getSchemaById(writerSchemaId)
if (schema.isInstanceOf[AvroSchema])
schema.asInstanceOf[AvroSchema].rawSchema()
else
null
}

codec.decode(deserializer.deserialize(topic, bytes, schema), writerSchema) match {
case Right(a) => F.pure(a)
case Left(error) => F.raiseError(error.throwable)
codec
.decode(deserializer.deserialize(topic, bytes, schema), writerSchema) match {
case Right(a) => F.pure(a)
case Left(error) => F.raiseError(error.throwable)
}
}
}
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package fs2.kafka.vulcan

import _root_.vulcan.Codec
import cats.effect.Sync
import cats.implicits._
import cats.syntax.all._
import fs2.kafka.{RecordSerializer, Serializer}

final class AvroSerializer[A] private[vulcan] (
Expand All @@ -23,8 +23,10 @@ final class AvroSerializer[A] private[vulcan] (
Serializer.instance { (topic, _, a) =>
F.defer {
codec.encode(a) match {
case Right(value) => F.pure(serializer.serialize(topic, value))
case Left(error) => F.raiseError(error.throwable)
case Right(value) =>
settings.schemaRegistryClientRetry
.withRetry(F.delay(serializer.serialize(topic, value)))
case Left(error) => F.raiseError(error.throwable)
}
}
}
Expand Down
35 changes: 29 additions & 6 deletions modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

package fs2.kafka.vulcan

import cats.effect.Sync
import cats.effect.Async
import cats.implicits._

import scala.jdk.CollectionConverters._
import fs2.kafka.internal.syntax._

Expand Down Expand Up @@ -79,6 +80,22 @@ sealed abstract class AvroSettings[F[_]] {
*/
def withProperties(properties: Map[String, String]): AvroSettings[F]

/**
* TODO
*/
def schemaRegistryClientRetry: SchemaRegistryClientRetry[F]

/**
* TODO
*
* Creates a new `AvroSettings` instance with the specified
* retry strategy when recoverable Schema Registry client
* errors are experienced during serialization/deserialization.
*/
def withSchemaRegistryClientRetry(
schemaRegistryClientRetry: SchemaRegistryClientRetry[F]
): AvroSettings[F]

/**
* Creates a new `KafkaAvroDeserializer` using the settings
* contained within this [[AvroSettings]] instance, and the
Expand Down Expand Up @@ -126,8 +143,9 @@ object AvroSettings {
override val properties: Map[String, String],
// format: off
val createAvroDeserializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient)],
val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)],
// format: on
override val schemaRegistryClientRetry: SchemaRegistryClientRetry[F]
) extends AvroSettings[F] {
override def withAutoRegisterSchemas(autoRegisterSchemas: Boolean): AvroSettings[F] =
withProperty("auto.register.schemas", autoRegisterSchemas.toString)
Expand All @@ -151,6 +169,10 @@ object AvroSettings {
override def withProperties(properties: Map[String, String]): AvroSettings[F] =
copy(properties = this.properties ++ properties)

override def withSchemaRegistryClientRetry(
schemaRegistryClientRetry: SchemaRegistryClientRetry[F]
): AvroSettings[F] = copy(schemaRegistryClientRetry = schemaRegistryClientRetry)

override def createAvroDeserializer(
isKey: Boolean
): F[(KafkaAvroDeserializer, SchemaRegistryClient)] =
Expand Down Expand Up @@ -184,7 +206,7 @@ object AvroSettings {

private[this] def create[F[_]](
schemaRegistryClient: F[SchemaRegistryClient]
)(implicit F: Sync[F]): AvroSettings[F] =
)(implicit F: Async[F]): AvroSettings[F] =
bplommer marked this conversation as resolved.
Show resolved Hide resolved
AvroSettingsImpl(
schemaRegistryClient = schemaRegistryClient,
properties = Map.empty,
Expand All @@ -203,16 +225,17 @@ object AvroSettings {
serializer.configure(withDefaults(properties), isKey)
(serializer, schemaRegistryClient)
}
}
},
schemaRegistryClientRetry = SchemaRegistryClientRetry.Default[F]
)

def apply[F[_]](
schemaRegistryClientSettings: SchemaRegistryClientSettings[F]
)(implicit F: Sync[F]): AvroSettings[F] =
)(implicit F: Async[F]): AvroSettings[F] =
create(schemaRegistryClientSettings.createSchemaRegistryClient)

def apply[F[_]](
schemaRegistryClient: SchemaRegistryClient
)(implicit F: Sync[F]): AvroSettings[F] =
)(implicit F: Async[F]): AvroSettings[F] =
create(F.pure(schemaRegistryClient))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package fs2.kafka.vulcan

import cats.effect.Async
import cats.syntax.applicativeError._
import cats.syntax.flatMap._
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import org.apache.kafka.common.errors.SerializationException

import java.io.IOException

import scala.concurrent.duration._

/**
* [[SchemaRegistryClientRetry]] describes how to recover from
* exceptions raised while communicating with a schema registry.
* See [[SchemaRegistryClientRetry#Default]] for the default
* recovery strategy. If you do not wish to recover from any
* exceptions, you can use [[SchemaRegistryClientRetry#None]].<br>
* <br>
* To create a new [[SchemaRegistryClientRetry]], simply create a
* new instance and implement the [[withRetry]] function with the
* wanted recovery strategy. You can use [[isRetriable]] to
* identify the exceptions thrown by schema registry failures.
*/
trait SchemaRegistryClientRetry[F[_]] {

def withRetry[A](action: F[A]): F[A]
}

object SchemaRegistryClientRetry {
def isRetriable(error: Throwable): Boolean = {
case _: SerializationException => true
case _: IOException => true
case apiError: RestClientException => apiError.getErrorCode >= 500
case _ => false
}

def Default[F[_]](implicit F: Async[F]): SchemaRegistryClientRetry[F] =
new SchemaRegistryClientRetry[F] {
override def withRetry[A](action: F[A]): F[A] = {
def retry(attempt: Int, action: F[A]): F[A] =
action.handleErrorWith(
err =>
if ((attempt + 1) <= 10 && isRetriable(err))
F.sleep((10 * Math.pow(2, attempt.toDouble)).millis) >> retry(attempt + 1, action)
else
F.raiseError(err)
)

retry(attempt = 1, action)
}
Comment on lines +38 to +51
Copy link
Author

@gordon-rennie gordon-rennie Sep 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java SchemaRegistryClient uses an underlying RestService with 60 second timeouts. Ten attempts is probably far too many if timeouts are occurring. I'm thinking of mitigating by reducing to ~3 attempts with a more aggressive backoff, 100 millis and 1000 millis.


override def toString: String = "RetryPolicy.Default"
}

def None[F[_]]: SchemaRegistryClientRetry[F] =
new SchemaRegistryClientRetry[F] {
override def withRetry[A](action: F[A]): F[A] = action

override def toString: String = "RetryPolicy.None"
}
}