diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala index 286950035..969ed5b7d 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala @@ -8,9 +8,10 @@ package fs2.kafka.vulcan import _root_.vulcan.Codec import cats.effect.Sync -import cats.implicits._ +import cats.syntax.functor._ import fs2.kafka.{Deserializer, RecordDeserializer} import io.confluent.kafka.schemaregistry.avro.AvroSchema + import java.nio.ByteBuffer final class AvroDeserializer[A] private[vulcan] ( @@ -25,23 +26,26 @@ final class AvroDeserializer[A] private[vulcan] ( settings.createAvroDeserializer(_).map { case (deserializer, schemaRegistryClient) => Deserializer.instance { (topic, _, bytes) => - F.defer { - val writerSchemaId = - ByteBuffer.wrap(bytes).getInt(1) // skip magic byte + settings.schemaRegistryClientRetry.withRetry( + F.defer { + val writerSchemaId = + ByteBuffer.wrap(bytes).getInt(1) // skip magic byte - val writerSchema = { - val schema = schemaRegistryClient.getSchemaById(writerSchemaId) - if (schema.isInstanceOf[AvroSchema]) - schema.asInstanceOf[AvroSchema].rawSchema() - else - null - } + val writerSchema = { + val schema = schemaRegistryClient.getSchemaById(writerSchemaId) + if (schema.isInstanceOf[AvroSchema]) + schema.asInstanceOf[AvroSchema].rawSchema() + else + null + } - codec.decode(deserializer.deserialize(topic, bytes, schema), writerSchema) match { - case Right(a) => F.pure(a) - case Left(error) => F.raiseError(error.throwable) + codec + .decode(deserializer.deserialize(topic, bytes, schema), writerSchema) match { + case Right(a) => F.pure(a) + case Left(error) => F.raiseError(error.throwable) + } } - } + ) } } diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala index a42440341..8e93d35c8 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala @@ -8,7 +8,7 @@ package fs2.kafka.vulcan import _root_.vulcan.Codec import cats.effect.Sync -import cats.implicits._ +import cats.syntax.all._ import fs2.kafka.{RecordSerializer, Serializer} final class AvroSerializer[A] private[vulcan] ( @@ -23,8 +23,10 @@ final class AvroSerializer[A] private[vulcan] ( Serializer.instance { (topic, _, a) => F.defer { codec.encode(a) match { - case Right(value) => F.pure(serializer.serialize(topic, value)) - case Left(error) => F.raiseError(error.throwable) + case Right(value) => + settings.schemaRegistryClientRetry + .withRetry(F.delay(serializer.serialize(topic, value))) + case Left(error) => F.raiseError(error.throwable) } } } diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala index 95904d1b3..1358c84ce 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala @@ -6,8 +6,9 @@ package fs2.kafka.vulcan -import cats.effect.Sync +import cats.effect.Async import cats.implicits._ + import scala.jdk.CollectionConverters._ import fs2.kafka.internal.syntax._ @@ -79,6 +80,22 @@ sealed abstract class AvroSettings[F[_]] { */ def withProperties(properties: Map[String, String]): AvroSettings[F] + /** + * TODO + */ + def schemaRegistryClientRetry: SchemaRegistryClientRetry[F] + + /** + * TODO + * + * Creates a new `AvroSettings` instance with the specified + * retry strategy when recoverable Schema Registry client + * errors are experienced during serialization/deserialization. + */ + def withSchemaRegistryClientRetry( + schemaRegistryClientRetry: SchemaRegistryClientRetry[F] + ): AvroSettings[F] + /** * Creates a new `KafkaAvroDeserializer` using the settings * contained within this [[AvroSettings]] instance, and the @@ -126,8 +143,9 @@ object AvroSettings { override val properties: Map[String, String], // format: off val createAvroDeserializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient)], - val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)] + val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)], // format: on + override val schemaRegistryClientRetry: SchemaRegistryClientRetry[F] ) extends AvroSettings[F] { override def withAutoRegisterSchemas(autoRegisterSchemas: Boolean): AvroSettings[F] = withProperty("auto.register.schemas", autoRegisterSchemas.toString) @@ -151,6 +169,10 @@ object AvroSettings { override def withProperties(properties: Map[String, String]): AvroSettings[F] = copy(properties = this.properties ++ properties) + override def withSchemaRegistryClientRetry( + schemaRegistryClientRetry: SchemaRegistryClientRetry[F] + ): AvroSettings[F] = copy(schemaRegistryClientRetry = schemaRegistryClientRetry) + override def createAvroDeserializer( isKey: Boolean ): F[(KafkaAvroDeserializer, SchemaRegistryClient)] = @@ -184,7 +206,7 @@ object AvroSettings { private[this] def create[F[_]]( schemaRegistryClient: F[SchemaRegistryClient] - )(implicit F: Sync[F]): AvroSettings[F] = + )(implicit F: Async[F]): AvroSettings[F] = AvroSettingsImpl( schemaRegistryClient = schemaRegistryClient, properties = Map.empty, @@ -203,16 +225,17 @@ object AvroSettings { serializer.configure(withDefaults(properties), isKey) (serializer, schemaRegistryClient) } - } + }, + schemaRegistryClientRetry = SchemaRegistryClientRetry.Default[F] ) def apply[F[_]]( schemaRegistryClientSettings: SchemaRegistryClientSettings[F] - )(implicit F: Sync[F]): AvroSettings[F] = + )(implicit F: Async[F]): AvroSettings[F] = create(schemaRegistryClientSettings.createSchemaRegistryClient) def apply[F[_]]( schemaRegistryClient: SchemaRegistryClient - )(implicit F: Sync[F]): AvroSettings[F] = + )(implicit F: Async[F]): AvroSettings[F] = create(F.pure(schemaRegistryClient)) } diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientRetry.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientRetry.scala new file mode 100644 index 000000000..5cda842ff --- /dev/null +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientRetry.scala @@ -0,0 +1,62 @@ +package fs2.kafka.vulcan + +import cats.effect.Async +import cats.syntax.applicativeError._ +import cats.syntax.flatMap._ +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException +import org.apache.kafka.common.errors.SerializationException + +import java.io.IOException + +import scala.concurrent.duration._ + +/** + * [[SchemaRegistryClientRetry]] describes how to recover from + * exceptions raised while communicating with a schema registry. + * See [[SchemaRegistryClientRetry#Default]] for the default + * recovery strategy. If you do not wish to recover from any + * exceptions, you can use [[SchemaRegistryClientRetry#None]].
+ *
+ * To create a new [[SchemaRegistryClientRetry]], simply create a + * new instance and implement the [[withRetry]] function with the + * wanted recovery strategy. You can use [[isRetriable]] to + * identify the exceptions thrown by schema registry failures. + */ +trait SchemaRegistryClientRetry[F[_]] { + + def withRetry[A](action: F[A]): F[A] +} + +object SchemaRegistryClientRetry { + def isRetriable(error: Throwable): Boolean = { + case _: SerializationException => true + case _: IOException => true + case apiError: RestClientException => apiError.getErrorCode >= 500 + case _ => false + } + + def Default[F[_]](implicit F: Async[F]): SchemaRegistryClientRetry[F] = + new SchemaRegistryClientRetry[F] { + override def withRetry[A](action: F[A]): F[A] = { + def retry(attempt: Int, action: F[A]): F[A] = + action.handleErrorWith( + err => + if ((attempt + 1) <= 10 && isRetriable(err)) + F.sleep((10 * Math.pow(2, attempt.toDouble)).millis) >> retry(attempt + 1, action) + else + F.raiseError(err) + ) + + retry(attempt = 1, action) + } + + override def toString: String = "RetryPolicy.Default" + } + + def None[F[_]]: SchemaRegistryClientRetry[F] = + new SchemaRegistryClientRetry[F] { + override def withRetry[A](action: F[A]): F[A] = action + + override def toString: String = "RetryPolicy.None" + } +}