diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala
index 286950035..969ed5b7d 100644
--- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala
+++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala
@@ -8,9 +8,10 @@ package fs2.kafka.vulcan
import _root_.vulcan.Codec
import cats.effect.Sync
-import cats.implicits._
+import cats.syntax.functor._
import fs2.kafka.{Deserializer, RecordDeserializer}
import io.confluent.kafka.schemaregistry.avro.AvroSchema
+
import java.nio.ByteBuffer
final class AvroDeserializer[A] private[vulcan] (
@@ -25,23 +26,26 @@ final class AvroDeserializer[A] private[vulcan] (
settings.createAvroDeserializer(_).map {
case (deserializer, schemaRegistryClient) =>
Deserializer.instance { (topic, _, bytes) =>
- F.defer {
- val writerSchemaId =
- ByteBuffer.wrap(bytes).getInt(1) // skip magic byte
+ settings.schemaRegistryClientRetry.withRetry(
+ F.defer {
+ val writerSchemaId =
+ ByteBuffer.wrap(bytes).getInt(1) // skip magic byte
- val writerSchema = {
- val schema = schemaRegistryClient.getSchemaById(writerSchemaId)
- if (schema.isInstanceOf[AvroSchema])
- schema.asInstanceOf[AvroSchema].rawSchema()
- else
- null
- }
+ val writerSchema = {
+ val schema = schemaRegistryClient.getSchemaById(writerSchemaId)
+ if (schema.isInstanceOf[AvroSchema])
+ schema.asInstanceOf[AvroSchema].rawSchema()
+ else
+ null
+ }
- codec.decode(deserializer.deserialize(topic, bytes, schema), writerSchema) match {
- case Right(a) => F.pure(a)
- case Left(error) => F.raiseError(error.throwable)
+ codec
+ .decode(deserializer.deserialize(topic, bytes, schema), writerSchema) match {
+ case Right(a) => F.pure(a)
+ case Left(error) => F.raiseError(error.throwable)
+ }
}
- }
+ )
}
}
diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala
index a42440341..8e93d35c8 100644
--- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala
+++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala
@@ -8,7 +8,7 @@ package fs2.kafka.vulcan
import _root_.vulcan.Codec
import cats.effect.Sync
-import cats.implicits._
+import cats.syntax.all._
import fs2.kafka.{RecordSerializer, Serializer}
final class AvroSerializer[A] private[vulcan] (
@@ -23,8 +23,10 @@ final class AvroSerializer[A] private[vulcan] (
Serializer.instance { (topic, _, a) =>
F.defer {
codec.encode(a) match {
- case Right(value) => F.pure(serializer.serialize(topic, value))
- case Left(error) => F.raiseError(error.throwable)
+ case Right(value) =>
+ settings.schemaRegistryClientRetry
+ .withRetry(F.delay(serializer.serialize(topic, value)))
+ case Left(error) => F.raiseError(error.throwable)
}
}
}
diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala
index 95904d1b3..1358c84ce 100644
--- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala
+++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala
@@ -6,8 +6,9 @@
package fs2.kafka.vulcan
-import cats.effect.Sync
+import cats.effect.Async
import cats.implicits._
+
import scala.jdk.CollectionConverters._
import fs2.kafka.internal.syntax._
@@ -79,6 +80,22 @@ sealed abstract class AvroSettings[F[_]] {
*/
def withProperties(properties: Map[String, String]): AvroSettings[F]
+ /**
+ * TODO
+ */
+ def schemaRegistryClientRetry: SchemaRegistryClientRetry[F]
+
+ /**
+ * TODO
+ *
+ * Creates a new `AvroSettings` instance with the specified
+ * retry strategy when recoverable Schema Registry client
+ * errors are experienced during serialization/deserialization.
+ */
+ def withSchemaRegistryClientRetry(
+ schemaRegistryClientRetry: SchemaRegistryClientRetry[F]
+ ): AvroSettings[F]
+
/**
* Creates a new `KafkaAvroDeserializer` using the settings
* contained within this [[AvroSettings]] instance, and the
@@ -126,8 +143,9 @@ object AvroSettings {
override val properties: Map[String, String],
// format: off
val createAvroDeserializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient)],
- val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
+ val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)],
// format: on
+ override val schemaRegistryClientRetry: SchemaRegistryClientRetry[F]
) extends AvroSettings[F] {
override def withAutoRegisterSchemas(autoRegisterSchemas: Boolean): AvroSettings[F] =
withProperty("auto.register.schemas", autoRegisterSchemas.toString)
@@ -151,6 +169,10 @@ object AvroSettings {
override def withProperties(properties: Map[String, String]): AvroSettings[F] =
copy(properties = this.properties ++ properties)
+ override def withSchemaRegistryClientRetry(
+ schemaRegistryClientRetry: SchemaRegistryClientRetry[F]
+ ): AvroSettings[F] = copy(schemaRegistryClientRetry = schemaRegistryClientRetry)
+
override def createAvroDeserializer(
isKey: Boolean
): F[(KafkaAvroDeserializer, SchemaRegistryClient)] =
@@ -184,7 +206,7 @@ object AvroSettings {
private[this] def create[F[_]](
schemaRegistryClient: F[SchemaRegistryClient]
- )(implicit F: Sync[F]): AvroSettings[F] =
+ )(implicit F: Async[F]): AvroSettings[F] =
AvroSettingsImpl(
schemaRegistryClient = schemaRegistryClient,
properties = Map.empty,
@@ -203,16 +225,17 @@ object AvroSettings {
serializer.configure(withDefaults(properties), isKey)
(serializer, schemaRegistryClient)
}
- }
+ },
+ schemaRegistryClientRetry = SchemaRegistryClientRetry.Default[F]
)
def apply[F[_]](
schemaRegistryClientSettings: SchemaRegistryClientSettings[F]
- )(implicit F: Sync[F]): AvroSettings[F] =
+ )(implicit F: Async[F]): AvroSettings[F] =
create(schemaRegistryClientSettings.createSchemaRegistryClient)
def apply[F[_]](
schemaRegistryClient: SchemaRegistryClient
- )(implicit F: Sync[F]): AvroSettings[F] =
+ )(implicit F: Async[F]): AvroSettings[F] =
create(F.pure(schemaRegistryClient))
}
diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientRetry.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientRetry.scala
new file mode 100644
index 000000000..5cda842ff
--- /dev/null
+++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientRetry.scala
@@ -0,0 +1,62 @@
+package fs2.kafka.vulcan
+
+import cats.effect.Async
+import cats.syntax.applicativeError._
+import cats.syntax.flatMap._
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
+import org.apache.kafka.common.errors.SerializationException
+
+import java.io.IOException
+
+import scala.concurrent.duration._
+
+/**
+ * [[SchemaRegistryClientRetry]] describes how to recover from
+ * exceptions raised while communicating with a schema registry.
+ * See [[SchemaRegistryClientRetry#Default]] for the default
+ * recovery strategy. If you do not wish to recover from any
+ * exceptions, you can use [[SchemaRegistryClientRetry#None]].
+ *
+ * To create a new [[SchemaRegistryClientRetry]], simply create a
+ * new instance and implement the [[withRetry]] function with the
+ * wanted recovery strategy. You can use [[isRetriable]] to
+ * identify the exceptions thrown by schema registry failures.
+ */
+trait SchemaRegistryClientRetry[F[_]] {
+
+ def withRetry[A](action: F[A]): F[A]
+}
+
+object SchemaRegistryClientRetry {
+ def isRetriable(error: Throwable): Boolean = {
+ case _: SerializationException => true
+ case _: IOException => true
+ case apiError: RestClientException => apiError.getErrorCode >= 500
+ case _ => false
+ }
+
+ def Default[F[_]](implicit F: Async[F]): SchemaRegistryClientRetry[F] =
+ new SchemaRegistryClientRetry[F] {
+ override def withRetry[A](action: F[A]): F[A] = {
+ def retry(attempt: Int, action: F[A]): F[A] =
+ action.handleErrorWith(
+ err =>
+ if ((attempt + 1) <= 10 && isRetriable(err))
+ F.sleep((10 * Math.pow(2, attempt.toDouble)).millis) >> retry(attempt + 1, action)
+ else
+ F.raiseError(err)
+ )
+
+ retry(attempt = 1, action)
+ }
+
+ override def toString: String = "RetryPolicy.Default"
+ }
+
+ def None[F[_]]: SchemaRegistryClientRetry[F] =
+ new SchemaRegistryClientRetry[F] {
+ override def withRetry[A](action: F[A]): F[A] = action
+
+ override def toString: String = "RetryPolicy.None"
+ }
+}