From 857fa030530549e955245b7380e65a5bc30b75e6 Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Fri, 10 Jan 2025 14:26:46 +0000 Subject: [PATCH 1/5] Add Scheduler3 module --- .scalafix3.conf | 14 +++ aliases.sbt | 10 ++ build.sbt | 69 ++++++++----- project/Aliases.scala | 13 --- project/Dependencies.scala | 61 ++++++++++-- project/build.properties | 2 +- project/plugins.sbt | 2 +- .../uk/sky/scheduler/EventSubscriber.scala | 10 ++ .../scala/uk/sky/scheduler/Repository.scala | 7 ++ .../uk/sky/scheduler/SchedulePublisher.scala | 8 ++ .../uk/sky/scheduler/ScheduleQueue.scala | 10 ++ .../scala/uk/sky/scheduler/Scheduler.scala | 25 +++++ .../sky/scheduler/domain/ScheduleEvent.scala | 44 +++++++++ .../sky/scheduler/error/ScheduleError.scala | 15 +++ .../uk/sky/scheduler/message/Message.scala | 30 ++++++ .../uk/sky/scheduler/message/Metadata.scala | 35 +++++++ .../uk/sky/scheduler/syntax/ClockSyntax.scala | 19 ++++ .../scala/uk/sky/scheduler/syntax/all.scala | 3 + .../scheduler/message/MessageLawsSpec.scala | 9 ++ .../scheduler/message/MetadataLawsSpec.scala | 9 ++ .../uk/sky/scheduler/util/Generator.scala | 97 +++++++++++++++++++ 21 files changed, 442 insertions(+), 50 deletions(-) create mode 100644 .scalafix3.conf create mode 100644 aliases.sbt delete mode 100644 project/Aliases.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/Repository.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/SchedulePublisher.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/ScheduleQueue.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/domain/ScheduleEvent.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/syntax/ClockSyntax.scala create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/syntax/all.scala create mode 100644 scheduler-3/src/test/scala/uk/sky/scheduler/message/MessageLawsSpec.scala create mode 100644 scheduler-3/src/test/scala/uk/sky/scheduler/message/MetadataLawsSpec.scala create mode 100644 scheduler-3/src/test/scala/uk/sky/scheduler/util/Generator.scala diff --git a/.scalafix3.conf b/.scalafix3.conf new file mode 100644 index 00000000..a556341f --- /dev/null +++ b/.scalafix3.conf @@ -0,0 +1,14 @@ +rules = [ + NoAutoTupling, + DisableSyntax, + LeakingImplicitClassVal, + NoValInForComprehension +] + +OrganizeImports { + coalesceToWildcardImportThreshold = 6 + groupedImports = AggressiveMerge + importSelectorsOrder = SymbolsFirst + removeUnused = false + targetDialect = Scala3 +} diff --git a/aliases.sbt b/aliases.sbt new file mode 100644 index 00000000..7f078634 --- /dev/null +++ b/aliases.sbt @@ -0,0 +1,10 @@ +addCommandAlias("checkFix", "scalafixAll --check OrganizeImports; scalafixAll --check") +addCommandAlias("runFix", "scalafixAll OrganizeImports; scalafixAll") +addCommandAlias("checkFmt", "scalafmtCheckAll; scalafmtSbtCheck") +addCommandAlias("runFmt", "scalafmtAll; scalafmtSbt") + +addCommandAlias("checkLint", "checkFmt; checkFix") +addCommandAlias("runLint", "runFmt; runFix") + +addCommandAlias("ciBuild", "checkFmt; checkFix; test; schema") +addCommandAlias("ciRelease", "clean; schema; project scheduler; release with-defaults") diff --git a/build.sbt b/build.sbt index 695626ab..255bdc44 100644 --- a/build.sbt +++ b/build.sbt @@ -1,25 +1,40 @@ -import Aliases.* import Release.* import DockerPublish.* import org.typelevel.scalacoptions.ScalacOptions -ThisBuild / scalafmtOnCompile := true -ThisBuild / semanticdbEnabled := true -ThisBuild / semanticdbVersion := scalafixSemanticdb.revision -ThisBuild / scalafixDependencies += "com.github.liancheng" %% "organize-imports" % "0.6.0" +ThisBuild / organization := "com.sky" -Global / onChangedBuildSource := ReloadOnSourceChanges +ThisBuild / scalafmtOnCompile := true +ThisBuild / semanticdbEnabled := true +ThisBuild / semanticdbVersion := scalafixSemanticdb.revision -Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oF") +Global / onChangedBuildSource := ReloadOnSourceChanges -val commonSettings = Seq( - organization := "com.sky", - scalaVersion := "2.13.10" +val scala2Settings = Seq( + scalaVersion := "2.13.10", + tpolecatScalacOptions ++= Set( + ScalacOptions.other("-Ymacro-annotations"), + ScalacOptions.source3 + ), + tpolecatExcludeOptions ++= Set( + ScalacOptions.warnNonUnitStatement, + ScalacOptions.warnValueDiscard + ), + run / fork := true, + Test / fork := true, + Test / parallelExecution := false ) -val compilerSettings = Seq( - tpolecatScalacOptions ++= Set(ScalacOptions.other("-Ymacro-annotations"), ScalacOptions.source3), - tpolecatExcludeOptions ++= Set(ScalacOptions.warnNonUnitStatement, ScalacOptions.warnValueDiscard) +val scala3Settings = Seq( + scalaVersion := "3.6.2", + tpolecatScalacOptions ++= Set( + ScalacOptions.other("-no-indent"), + ScalacOptions.other("-old-syntax") + ), + Test / tpolecatExcludeOptions += ScalacOptions.warnNonUnitStatement, + run / fork := true, + Test / fork := true, + Test / parallelExecution := false ) val buildInfoSettings = Seq( @@ -29,25 +44,29 @@ val buildInfoSettings = Seq( lazy val scheduler = (project in file("scheduler")) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, UniversalDeployPlugin, JavaAgent, DockerPlugin) - .settings(commonSettings) - .settings(compilerSettings) + .settings(scala2Settings) .settings( - libraryDependencies ++= Dependencies.all, + libraryDependencies ++= Dependencies.scheduler, addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.3" cross CrossVersion.full), - run / fork := true, - Test / fork := true, - javaAgents += "io.kamon" % "kanela-agent" % "1.0.18", + javaAgents += "io.kamon" % "kanela-agent" % "1.0.18", buildInfoSettings, dockerSettings, - releaseSettings, - Test / parallelExecution := false + releaseSettings + ) + +lazy val scheduler3 = (project in file("scheduler-3")) + .enablePlugins(JavaAgent, DockerPlugin, JavaAppPackaging, BuildInfoPlugin) + .settings(scala3Settings) + .settings( + libraryDependencies ++= Dependencies.scheduler3, + buildInfoSettings, + scalafixConfig := Some((ThisBuild / baseDirectory).value / ".scalafix3.conf") ) val schema = inputKey[Unit]("Generate the Avro schema file for the Schedule schema.") lazy val avro = (project in file("avro")) - .settings(commonSettings) - .settings(compilerSettings) + .settings(scala2Settings) .settings(libraryDependencies += Dependencies.avro4s) .settings(schema := (Compile / run).toTask("").value) .dependsOn(scheduler % "compile->compile") @@ -55,9 +74,7 @@ lazy val avro = (project in file("avro")) lazy val root = (project in file(".")) .withId("kafka-message-scheduler") - .settings(commonSettings) - .settings(defineCommandAliases) .settings(dockerImageCreationTask := (scheduler / Docker / publishLocal).value) - .aggregate(scheduler, avro) + .aggregate(scheduler, scheduler3, avro) .enablePlugins(DockerComposePlugin) .disablePlugins(ReleasePlugin) diff --git a/project/Aliases.scala b/project/Aliases.scala deleted file mode 100644 index b2d9a88a..00000000 --- a/project/Aliases.scala +++ /dev/null @@ -1,13 +0,0 @@ -import sbt.* - -object Aliases { - - lazy val defineCommandAliases = - addCommandAlias("ciBuild", "checkFmt; checkFix; test; schema") ++ - addCommandAlias("ciRelease", "clean; schema; project scheduler; release with-defaults") ++ - addCommandAlias("checkFix", "scalafixAll --check OrganizeImports; scalafixAll --check") ++ - addCommandAlias("runFix", "scalafixAll OrganizeImports; scalafixAll") ++ - addCommandAlias("checkFmt", "scalafmtCheckAll; scalafmtSbtCheck") ++ - addCommandAlias("runFmt", "scalafmtAll; scalafmtSbt") - -} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 954f8048..797436c2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,12 +15,28 @@ object Dependencies { } object Cats { - private val version = "2.7.0" - val core = "org.typelevel" %% "cats-core" % version - val testKit = "org.typelevel" %% "cats-testkit" % version % Test - val scalatest = "com.ironcorelabs" %% "cats-scalatest" % "3.1.1" % Test - val base = Seq(core) - val test = Seq(testKit, scalatest) + private val version = "2.7.0" + private val catsEffectVersion = "3.5.7" + + lazy val effectTestKit = "org.typelevel" %% "cats-effect-testkit" % catsEffectVersion % Test + lazy val effectTesting = "org.typelevel" %% "cats-effect-testing-scalatest" % "1.6.0" % Test + lazy val effectTestkitScalatest = "org.typelevel" %% "cats-testkit-scalatest" % "2.1.5" % Test + lazy val caseInsensitive = "org.typelevel" %% "case-insensitive" % "1.4.2" + lazy val caseInsensitiveTesting = "org.typelevel" %% "case-insensitive-testing" % "1.4.2" + lazy val core = "org.typelevel" %% "cats-core" % version + lazy val effect = "org.typelevel" %% "cats-effect" % catsEffectVersion + lazy val scalatest = "com.ironcorelabs" %% "cats-scalatest" % "3.1.1" % Test + lazy val testKit = "org.typelevel" %% "cats-testkit" % version % Test + lazy val base = Seq(core) + lazy val test = Seq(testKit, scalatest) + } + + object Fs2 { + private lazy val version = "3.11.0" + private lazy val kafkaVersion = "3.6.0" + + lazy val core = "co.fs2" %% "fs2-core" % version + lazy val kafka = "com.github.fd4s" %% "fs2-kafka" % kafkaVersion } object Kafka { @@ -39,6 +55,10 @@ object Dependencies { val all = Seq(core, akka, prometheus) } + object Monocle { + lazy val core = "dev.optics" %% "monocle-core" % "3.3.0" + } + object PureConfig { private val version = "0.17.1" val pureconfig = "com.github.pureconfig" %% "pureconfig" % version @@ -55,6 +75,13 @@ object Dependencies { val test = Seq(scalaCheck) } + object Vulcan { + private lazy val version = "1.11.1" + + val core = "com.github.fd4s" %% "vulcan" % version + val generic = "com.github.fd4s" %% "vulcan-generic" % version % Test + } + val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % "4.1.2" val kafkaTopicLoader = "uk.sky" %% "kafka-topic-loader" % "1.5.6" val monix = "io.monix" %% "monix-execution" % "3.4.1" @@ -72,18 +99,20 @@ object Dependencies { val scalaTest = "org.scalatest" %% "scalatest" % "3.2.18" % Test val scalaTestPlusMockito = "org.scalatestplus" %% "mockito-3-12" % "3.2.10.0" % Test - val core: Seq[ModuleID] = Akka.base ++ Cats.base ++ Kafka.base ++ Kamon.all ++ PureConfig.all ++ Refined.base ++ Seq( + val core: Seq[ModuleID] = Akka.base ++ Cats.base ++ Kafka.base ++ Kamon.all ++ PureConfig.all ++ Refined.base ++ Seq( avro4s, kafkaTopicLoader, monix, scalaLogging ) + val runtime: Seq[ModuleID] = Seq( janino, logbackClassic, logbackEncoder ) - val test: Seq[ModuleID] = Akka.test ++ Cats.test ++ Kafka.test ++ Refined.test ++ Seq( + + val test: Seq[ModuleID] = Akka.test ++ Cats.test ++ Kafka.test ++ Refined.test ++ Seq( embeddedKafka, mockito, randomDataGenerator, @@ -92,5 +121,19 @@ object Dependencies { scalaTest, scalaTestPlusMockito ) - val all: Seq[sbt.ModuleID] = core ++ runtime ++ test + val scheduler: Seq[ModuleID] = core ++ runtime ++ test + + val scheduler3: Seq[ModuleID] = Seq( + Cats.caseInsensitive, + Cats.caseInsensitiveTesting, + Cats.effect, + Cats.effectTestKit, + Cats.effectTesting, + Cats.effectTestkitScalatest, + Fs2.core, + Fs2.kafka, + Monocle.core, + Vulcan.core, + Vulcan.generic + ) } diff --git a/project/build.properties b/project/build.properties index cb409aac..0a832a2d 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.10.1 \ No newline at end of file +sbt.version=1.10.7 \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index 9663c7a4..17b4f296 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,4 +6,4 @@ addSbtPlugin("com.tapad" % "sbt-docker-compose" % "1.0.35") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1") addSbtPlugin("io.kamon" % "sbt-kanela-runner" % "2.0.14") -addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1") +addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.2") diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala new file mode 100644 index 00000000..de1a9f7f --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala @@ -0,0 +1,10 @@ +package uk.sky.scheduler + +import fs2.Stream +import uk.sky.scheduler.domain.ScheduleEvent +import uk.sky.scheduler.error.ScheduleError +import uk.sky.scheduler.message.Message + +trait EventSubscriber[F[_]] { + def messages: Stream[F, Message[Either[ScheduleError, Option[ScheduleEvent]]]] +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/Repository.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/Repository.scala new file mode 100644 index 00000000..a031bc44 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/Repository.scala @@ -0,0 +1,7 @@ +package uk.sky.scheduler + +trait Repository[F[_], K, V] { + def set(key: K, value: V): F[Unit] + def get(key: K): F[Option[V]] + def delete(key: K): F[Unit] +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/SchedulePublisher.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/SchedulePublisher.scala new file mode 100644 index 00000000..3ad27ea7 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/SchedulePublisher.scala @@ -0,0 +1,8 @@ +package uk.sky.scheduler + +import fs2.Pipe +import uk.sky.scheduler.domain.ScheduleEvent + +trait SchedulePublisher[F[_], O] { + def publish: Pipe[F, ScheduleEvent, O] +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/ScheduleQueue.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/ScheduleQueue.scala new file mode 100644 index 00000000..7bb95076 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/ScheduleQueue.scala @@ -0,0 +1,10 @@ +package uk.sky.scheduler + +import fs2.Stream +import uk.sky.scheduler.domain.ScheduleEvent + +trait ScheduleQueue[F[_]] { + def schedule(key: String, scheduleEvent: ScheduleEvent): F[Unit] + def cancel(key: String): F[Unit] + def schedules: Stream[F, ScheduleEvent] +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala new file mode 100644 index 00000000..bd2a5b9a --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala @@ -0,0 +1,25 @@ +package uk.sky.scheduler + +import cats.effect.Concurrent +import fs2.Stream +import uk.sky.scheduler.domain.ScheduleEvent +import uk.sky.scheduler.message.Message +import uk.sky.scheduler.message.Metadata.* + +class Scheduler[F[_] : Concurrent, O]( + eventSubscriber: EventSubscriber[F], + scheduleQueue: ScheduleQueue[F], + schedulePublisher: SchedulePublisher[F, O] +) { + private val scheduleEvents = eventSubscriber.messages.evalTapChunk { case Message(key, source, value, metadata) => + value match { + case Left(_) => scheduleQueue.cancel(key) + case Right(None) => Concurrent[F].unlessA(metadata.isExpired)(scheduleQueue.cancel(key)) + case Right(Some(schedule)) => scheduleQueue.schedule(key, schedule) + } + } + + def stream: Stream[F, O] = + scheduleEvents.drain + .merge(scheduleQueue.schedules.through(schedulePublisher.publish)) +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/domain/ScheduleEvent.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/domain/ScheduleEvent.scala new file mode 100644 index 00000000..50a74a02 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/domain/ScheduleEvent.scala @@ -0,0 +1,44 @@ +package uk.sky.scheduler.domain + +/** A ScheduleEvent represents the metadata that created the Schedule, and the Schedule to be suspended. + * @param metadata + * Information about the Schedule's source. + * @param schedule + * The Schedule's destination and payload. + */ +case class ScheduleEvent( + metadata: Metadata, + schedule: Schedule +) + +/** Information about a Schedule's source. + * @param id + * The ID of the Schedule's message. + * @param scheduleTopic + * The topic the Schedule arrived on. + */ +case class Metadata( + id: String, + scheduleTopic: String +) + +/** The Schedule's destination and payload. + * + * @param time + * The time to execute the Schedule, in epoch milliseconds. + * @param topic + * The topic to send the Schedule to. + * @param key + * The key identifying the payload. + * @param value + * The payload to be sent. + * @param headers + * Extra metadata to send with the payload. + */ +case class Schedule( + time: Long, + topic: String, + key: Array[Byte], + value: Option[Array[Byte]], + headers: Map[String, Array[Byte]] +) diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala new file mode 100644 index 00000000..df507c4a --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala @@ -0,0 +1,15 @@ +package uk.sky.scheduler.error + +import cats.syntax.all.* +import cats.{Eq, Show} +import org.apache.avro.Schema + +enum ScheduleError(val message: String, val cause: Throwable) extends Throwable(message, cause) { + case InvalidAvroError(schema: Schema, error: Throwable) + extends ScheduleError(s"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error) +} + +object ScheduleError { + given Eq[ScheduleError] = Eq.fromUniversalEquals + given Show[ScheduleError] = _.message +} \ No newline at end of file diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala new file mode 100644 index 00000000..13a07610 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala @@ -0,0 +1,30 @@ +package uk.sky.scheduler.message + +import cats.syntax.all.* +import cats.{Eq, Functor, Show} +import monocle.syntax.all.* + +final case class Message[V](key: String, source: String, value: V, metadata: Metadata) { + def map[B](f: V => B): Message[B] = this.copy(value = f(this.value)) +} + +object Message { + extension [T](message: Message[T]) { + def isExpired: Boolean = message.metadata.isExpired + def expire: Message[T] = + message.focus(_.metadata).modify(_.transform(_ + (Metadata.expiredKey -> Metadata.expiredValue))) + } + + given [V : Eq]: Eq[Message[V]] = Eq.by { case Message(key, source, value, metadata) => + (key, source, value, metadata) + } + + given [V : Show]: Show[Message[V]] = Show.show { case Message(key, source, value, metadata) => + show"""Message(key=$key, source=$source, value=$value, metadata=$metadata)""" + } + + given Functor[Message] = new Functor[Message] { + override def map[A, B](fa: Message[A])(f: A => B): Message[B] = + fa.map(f) + } +} \ No newline at end of file diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala new file mode 100644 index 00000000..b4c36b7e --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala @@ -0,0 +1,35 @@ +package uk.sky.scheduler.message + +import cats.{Eq, Monoid, Show} +import org.typelevel.ci.CIString + +opaque type Metadata = Map[CIString, String] + +object Metadata { + def apply(value: Iterable[(CIString, String)]): Metadata = value match { + case map: Map[CIString, String] => map + case other => other.toMap + } + + extension (metadata: Metadata) { + inline def value: Map[CIString, String] = metadata + inline def toMap: Map[String, String] = metadata.map(_.toString -> _) + inline def transform(f: Map[CIString, String] => Map[CIString, String]): Metadata = f(metadata) + inline def combine(other: Metadata): Metadata = metadata.concat(other) + + inline def isExpired: Boolean = metadata.get(expiredKey).exists(_.equalsIgnoreCase(expiredValue)) + } + + val expiredKey = CIString("schedule:expired") + val expiredValue = "true" + + val empty: Metadata = Map.empty[CIString, String] + + given Monoid[Metadata] = new Monoid[Metadata] { + override def empty: Metadata = Metadata.empty + override def combine(x: Metadata, y: Metadata): Metadata = x.combine(y) + } + + given Show[Metadata] = Show.catsShowForMap[CIString, String] + given Eq[Metadata] = Eq.catsKernelEqForMap[CIString, String] +} \ No newline at end of file diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/ClockSyntax.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/ClockSyntax.scala new file mode 100644 index 00000000..e386b7cc --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/ClockSyntax.scala @@ -0,0 +1,19 @@ +package uk.sky.scheduler.syntax + +import cats.Applicative +import cats.effect.Clock +import cats.syntax.all.* + +import java.time.Instant + +trait ClockSyntax { + extension [F[_] : Applicative](c: Clock[F]) { + def epochMilli: F[Long] = + c.realTimeInstant.map(_.toEpochMilli) + + def epochMilli(f: Instant => Instant): F[Long] = + c.realTimeInstant.map(f andThen (_.toEpochMilli)) + } +} + +object clock extends ClockSyntax \ No newline at end of file diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/all.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/all.scala new file mode 100644 index 00000000..b5ebee6c --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/all.scala @@ -0,0 +1,3 @@ +package uk.sky.scheduler.syntax + +object all extends ClockSyntax diff --git a/scheduler-3/src/test/scala/uk/sky/scheduler/message/MessageLawsSpec.scala b/scheduler-3/src/test/scala/uk/sky/scheduler/message/MessageLawsSpec.scala new file mode 100644 index 00000000..1adf175f --- /dev/null +++ b/scheduler-3/src/test/scala/uk/sky/scheduler/message/MessageLawsSpec.scala @@ -0,0 +1,9 @@ +package uk.sky.scheduler.message + +import cats.laws.discipline.FunctorTests +import cats.tests.CatsSuite +import uk.sky.scheduler.util.Generator.given + +final class MessageLawsSpec extends CatsSuite { + checkAll("Message.MonoidLaws", FunctorTests[Message].functor[String, String, String]) +} diff --git a/scheduler-3/src/test/scala/uk/sky/scheduler/message/MetadataLawsSpec.scala b/scheduler-3/src/test/scala/uk/sky/scheduler/message/MetadataLawsSpec.scala new file mode 100644 index 00000000..032c4d29 --- /dev/null +++ b/scheduler-3/src/test/scala/uk/sky/scheduler/message/MetadataLawsSpec.scala @@ -0,0 +1,9 @@ +package uk.sky.scheduler.message + +import cats.kernel.laws.discipline.MonoidTests +import cats.tests.CatsSuite +import uk.sky.scheduler.util.Generator.given + +final class MetadataLawsSpec extends CatsSuite { + checkAll("Headers.MonoidLaws", MonoidTests[Metadata].monoid) +} diff --git a/scheduler-3/src/test/scala/uk/sky/scheduler/util/Generator.scala b/scheduler-3/src/test/scala/uk/sky/scheduler/util/Generator.scala new file mode 100644 index 00000000..4251af66 --- /dev/null +++ b/scheduler-3/src/test/scala/uk/sky/scheduler/util/Generator.scala @@ -0,0 +1,97 @@ +package uk.sky.scheduler.util + +import cats.effect.Sync +import cats.syntax.all.* +import fs2.kafka.{ConsumerRecord, ProducerRecord} +import monocle.syntax.all.* +import org.scalacheck.{Arbitrary, Gen} +import org.scalatest.exceptions.TestFailedException +import org.typelevel.ci.CIString +import org.typelevel.ci.testing.arbitraries.* +import uk.sky.scheduler.domain.* +import uk.sky.scheduler.error.ScheduleError +import uk.sky.scheduler.message.{Message, Metadata as MessageMetadata} +import uk.sky.scheduler.syntax.all.* + +import java.time.Instant + +object Generator { + given Arbitrary[Metadata] = Arbitrary(Gen.resultOf(Metadata.apply)) + given Arbitrary[Schedule] = Arbitrary(Gen.resultOf(Schedule.apply)) + + given Arbitrary[MessageMetadata] = Arbitrary { + for { + raw <- Arbitrary.arbitrary[Map[CIString, String]] + } yield MessageMetadata(raw) + } + + given [T : Arbitrary]: Arbitrary[Message[T]] = Arbitrary(Gen.resultOf(Message.apply[T])) + + given [K : Arbitrary, V : Arbitrary]: Arbitrary[ProducerRecord[K, V]] = Arbitrary( + Gen.resultOf(ProducerRecord.apply[K, V]) + ) + + given [K : Arbitrary, V : Arbitrary]: Arbitrary[ConsumerRecord[K, V]] = Arbitrary( + Gen.resultOf(ConsumerRecord.apply[K, V]) + ) + + val scheduleEventArb: Gen[ScheduleEvent] = Gen.resultOf(ScheduleEvent.apply) + given Arbitrary[ScheduleEvent] = Arbitrary(scheduleEventArb) + + def generateSchedule[F[_] : Sync]: F[ScheduleEvent] = + for { + maybeSchedule <- Sync[F].delay(scheduleEventArb.sample) + schedule <- maybeSchedule.liftTo(TestFailedException("Could not generate a schedule", 0)) + now <- Sync[F].epochMilli + } yield schedule.focus(_.schedule.time).replace(now) + + def generateSchedule[F[_] : Sync](f: Instant => Instant): F[ScheduleEvent] = + generateSchedule[F].map(_.focus(_.schedule.time).modify { l => + f(Instant.ofEpochMilli(l)).toEpochMilli + }) + + def generateSchedule[F[_] : Sync](time: Long): F[ScheduleEvent] = + generateSchedule[F].map(_.focus(_.schedule.time).replace(time)) + + private def message( + key: String, + source: String, + scheduleEvent: Option[ScheduleEvent], + metadata: MessageMetadata, + expire: Boolean + ): Message[Either[ScheduleError, Option[ScheduleEvent]]] = { + val m = Message( + key = key, + source = source, + value = scheduleEvent.asRight[ScheduleError], + metadata = metadata + ) + + if (expire) m.expire else m + } + + extension (scheduleEvent: ScheduleEvent) { + def update( + metadata: MessageMetadata = MessageMetadata.empty + ): Message[Either[ScheduleError, Option[ScheduleEvent]]] = + message( + key = scheduleEvent.metadata.id, + source = scheduleEvent.metadata.scheduleTopic, + scheduleEvent = scheduleEvent.some, + metadata = metadata, + expire = false + ) + + def delete( + metadata: MessageMetadata = MessageMetadata.empty, + expire: Boolean = false + ): Message[Either[ScheduleError, Option[ScheduleEvent]]] = + message( + key = scheduleEvent.metadata.id, + source = scheduleEvent.metadata.scheduleTopic, + scheduleEvent = none[ScheduleEvent], + metadata = metadata, + expire = expire + ) + } +} \ No newline at end of file From d6cee97a8491045c20fda485616cd64c405c820b Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Fri, 10 Jan 2025 14:30:24 +0000 Subject: [PATCH 2/5] Add scalafmt3 config --- .scalafmt3.conf | 18 ++++++++++++ build.sbt | 3 +- .../sky/scheduler/error/ScheduleError.scala | 4 +-- .../uk/sky/scheduler/message/Message.scala | 2 +- .../uk/sky/scheduler/message/Metadata.scala | 2 +- .../uk/sky/scheduler/syntax/ClockSyntax.scala | 2 +- .../uk/sky/scheduler/util/Generator.scala | 28 +++++++++---------- 7 files changed, 39 insertions(+), 20 deletions(-) create mode 100644 .scalafmt3.conf diff --git a/.scalafmt3.conf b/.scalafmt3.conf new file mode 100644 index 00000000..c9d02d0b --- /dev/null +++ b/.scalafmt3.conf @@ -0,0 +1,18 @@ +version = 3.8.3 + +align.preset = most +includeCurlyBraceInSelectChains = false +maxColumn = 120 +project.git = true +rewrite.rules = [SortImports, RedundantBraces, RedundantParens, PreferCurlyFors] +runner.dialect = scala3 +spaces.beforeContextBoundColon = Always + +fileOverride { + "glob:**/project/*.scala" { + runner.dialect = scala212source3 + } + "glob:**/*.sbt" { + runner.dialect = scala212source3 + } +} \ No newline at end of file diff --git a/build.sbt b/build.sbt index 255bdc44..6abe416a 100644 --- a/build.sbt +++ b/build.sbt @@ -60,7 +60,8 @@ lazy val scheduler3 = (project in file("scheduler-3")) .settings( libraryDependencies ++= Dependencies.scheduler3, buildInfoSettings, - scalafixConfig := Some((ThisBuild / baseDirectory).value / ".scalafix3.conf") + scalafixConfig := Some((ThisBuild / baseDirectory).value / ".scalafix3.conf"), + scalafmtConfig := (ThisBuild / baseDirectory).value / ".scalafmt3.conf" ) val schema = inputKey[Unit]("Generate the Avro schema file for the Schedule schema.") diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala index df507c4a..ca3a2849 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala @@ -6,10 +6,10 @@ import org.apache.avro.Schema enum ScheduleError(val message: String, val cause: Throwable) extends Throwable(message, cause) { case InvalidAvroError(schema: Schema, error: Throwable) - extends ScheduleError(s"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error) + extends ScheduleError(s"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error) } object ScheduleError { given Eq[ScheduleError] = Eq.fromUniversalEquals given Show[ScheduleError] = _.message -} \ No newline at end of file +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala index 13a07610..b9d64419 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala @@ -27,4 +27,4 @@ object Message { override def map[A, B](fa: Message[A])(f: A => B): Message[B] = fa.map(f) } -} \ No newline at end of file +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala index b4c36b7e..37cee1bc 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala @@ -32,4 +32,4 @@ object Metadata { given Show[Metadata] = Show.catsShowForMap[CIString, String] given Eq[Metadata] = Eq.catsKernelEqForMap[CIString, String] -} \ No newline at end of file +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/ClockSyntax.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/ClockSyntax.scala index e386b7cc..48e47089 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/ClockSyntax.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/syntax/ClockSyntax.scala @@ -16,4 +16,4 @@ trait ClockSyntax { } } -object clock extends ClockSyntax \ No newline at end of file +object clock extends ClockSyntax diff --git a/scheduler-3/src/test/scala/uk/sky/scheduler/util/Generator.scala b/scheduler-3/src/test/scala/uk/sky/scheduler/util/Generator.scala index 4251af66..1f02dfeb 100644 --- a/scheduler-3/src/test/scala/uk/sky/scheduler/util/Generator.scala +++ b/scheduler-3/src/test/scala/uk/sky/scheduler/util/Generator.scala @@ -16,8 +16,8 @@ import uk.sky.scheduler.syntax.all.* import java.time.Instant object Generator { - given Arbitrary[Metadata] = Arbitrary(Gen.resultOf(Metadata.apply)) - given Arbitrary[Schedule] = Arbitrary(Gen.resultOf(Schedule.apply)) + given Arbitrary[Metadata] = Arbitrary(Gen.resultOf(Metadata.apply)) + given Arbitrary[Schedule] = Arbitrary(Gen.resultOf(Schedule.apply)) given Arbitrary[MessageMetadata] = Arbitrary { for { @@ -54,12 +54,12 @@ object Generator { generateSchedule[F].map(_.focus(_.schedule.time).replace(time)) private def message( - key: String, - source: String, - scheduleEvent: Option[ScheduleEvent], - metadata: MessageMetadata, - expire: Boolean - ): Message[Either[ScheduleError, Option[ScheduleEvent]]] = { + key: String, + source: String, + scheduleEvent: Option[ScheduleEvent], + metadata: MessageMetadata, + expire: Boolean + ): Message[Either[ScheduleError, Option[ScheduleEvent]]] = { val m = Message( key = key, source = source, @@ -72,8 +72,8 @@ object Generator { extension (scheduleEvent: ScheduleEvent) { def update( - metadata: MessageMetadata = MessageMetadata.empty - ): Message[Either[ScheduleError, Option[ScheduleEvent]]] = + metadata: MessageMetadata = MessageMetadata.empty + ): Message[Either[ScheduleError, Option[ScheduleEvent]]] = message( key = scheduleEvent.metadata.id, source = scheduleEvent.metadata.scheduleTopic, @@ -83,9 +83,9 @@ object Generator { ) def delete( - metadata: MessageMetadata = MessageMetadata.empty, - expire: Boolean = false - ): Message[Either[ScheduleError, Option[ScheduleEvent]]] = + metadata: MessageMetadata = MessageMetadata.empty, + expire: Boolean = false + ): Message[Either[ScheduleError, Option[ScheduleEvent]]] = message( key = scheduleEvent.metadata.id, source = scheduleEvent.metadata.scheduleTopic, @@ -94,4 +94,4 @@ object Generator { expire = expire ) } -} \ No newline at end of file +} From 11ab9e2ca68b6a5a4c71df99bfbfaabe67c0c517 Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Fri, 10 Jan 2025 14:30:50 +0000 Subject: [PATCH 3/5] newline --- .scalafmt3.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.scalafmt3.conf b/.scalafmt3.conf index c9d02d0b..476b1767 100644 --- a/.scalafmt3.conf +++ b/.scalafmt3.conf @@ -15,4 +15,4 @@ fileOverride { "glob:**/*.sbt" { runner.dialect = scala212source3 } -} \ No newline at end of file +} From c75452af382d0f71eacddbe468bed483ec9dbe83 Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Sat, 11 Jan 2025 16:45:35 +0000 Subject: [PATCH 4/5] Add main, clean up some domain classes --- .../main/scala/uk/sky/scheduler/Main.scala | 16 ++++++++++++++++ .../scala/uk/sky/scheduler/Scheduler.scala | 12 +++++++++++- .../sky/scheduler/error/ScheduleError.scala | 4 +++- .../uk/sky/scheduler/message/Message.scala | 11 +++-------- .../uk/sky/scheduler/message/Metadata.scala | 19 ++++++++++++------- 5 files changed, 45 insertions(+), 17 deletions(-) create mode 100644 scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala new file mode 100644 index 00000000..c7987cd0 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala @@ -0,0 +1,16 @@ +package uk.sky.scheduler + +import cats.effect.* +import fs2.Stream + +object Main extends IOApp.Simple { + + def stream[F[_] : Concurrent]: Stream[F, Unit] = + for { + scheduler <- Stream.resource(Scheduler.live[F]) + message <- scheduler.stream + } yield message + + override def run: IO[Unit] = stream[IO].compile.drain + +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala index bd2a5b9a..3f95a9d2 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala @@ -1,6 +1,6 @@ package uk.sky.scheduler -import cats.effect.Concurrent +import cats.effect.* import fs2.Stream import uk.sky.scheduler.domain.ScheduleEvent import uk.sky.scheduler.message.Message @@ -23,3 +23,13 @@ class Scheduler[F[_] : Concurrent, O]( scheduleEvents.drain .merge(scheduleQueue.schedules.through(schedulePublisher.publish)) } + +object Scheduler { + def live[F[_] : Concurrent]: Resource[F, Scheduler[F, Unit]] = + for { + eventSubscriber <- Resource.pure(??? : EventSubscriber[F]) + scheduleQueue <- Resource.pure(??? : ScheduleQueue[F]) + schedulePublisher <- Resource.pure(??? : SchedulePublisher[F, Unit]) + } yield Scheduler(eventSubscriber, scheduleQueue, schedulePublisher) + +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala index ca3a2849..dc499c17 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala @@ -6,10 +6,12 @@ import org.apache.avro.Schema enum ScheduleError(val message: String, val cause: Throwable) extends Throwable(message, cause) { case InvalidAvroError(schema: Schema, error: Throwable) - extends ScheduleError(s"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error) + extends ScheduleError(show"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error) } object ScheduleError { given Eq[ScheduleError] = Eq.fromUniversalEquals given Show[ScheduleError] = _.message + + private given Show[Schema] = _.toString() } diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala index b9d64419..0943070d 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala @@ -5,15 +5,11 @@ import cats.{Eq, Functor, Show} import monocle.syntax.all.* final case class Message[V](key: String, source: String, value: V, metadata: Metadata) { - def map[B](f: V => B): Message[B] = this.copy(value = f(this.value)) + def transform[B](f: V => B): Message[B] = this.copy(value = f(this.value)) } object Message { - extension [T](message: Message[T]) { - def isExpired: Boolean = message.metadata.isExpired - def expire: Message[T] = - message.focus(_.metadata).modify(_.transform(_ + (Metadata.expiredKey -> Metadata.expiredValue))) - } + extension [T](message: Message[T]) def expire: Message[T] = message.focus(_.metadata).modify(_.expire) given [V : Eq]: Eq[Message[V]] = Eq.by { case Message(key, source, value, metadata) => (key, source, value, metadata) @@ -24,7 +20,6 @@ object Message { } given Functor[Message] = new Functor[Message] { - override def map[A, B](fa: Message[A])(f: A => B): Message[B] = - fa.map(f) + override def map[A, B](fa: Message[A])(f: A => B): Message[B] = fa.transform(f) } } diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala index 37cee1bc..05092a1a 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala @@ -1,5 +1,6 @@ package uk.sky.scheduler.message +import cats.syntax.all.* import cats.{Eq, Monoid, Show} import org.typelevel.ci.CIString @@ -11,6 +12,8 @@ object Metadata { case other => other.toMap } + val empty: Metadata = Map.empty[CIString, String] + extension (metadata: Metadata) { inline def value: Map[CIString, String] = metadata inline def toMap: Map[String, String] = metadata.map(_.toString -> _) @@ -18,18 +21,20 @@ object Metadata { inline def combine(other: Metadata): Metadata = metadata.concat(other) inline def isExpired: Boolean = metadata.get(expiredKey).exists(_.equalsIgnoreCase(expiredValue)) + inline def expire: Metadata = metadata + (expiredKey -> expiredValue) } - val expiredKey = CIString("schedule:expired") - val expiredValue = "true" - - val empty: Metadata = Map.empty[CIString, String] - given Monoid[Metadata] = new Monoid[Metadata] { override def empty: Metadata = Metadata.empty override def combine(x: Metadata, y: Metadata): Metadata = x.combine(y) } - given Show[Metadata] = Show.catsShowForMap[CIString, String] - given Eq[Metadata] = Eq.catsKernelEqForMap[CIString, String] + given Show[Metadata] = + _.map((k, v) => show"$k: $v") + .mkString("Metadata(", ", ", ")") + + given Eq[Metadata] = Eq.catsKernelEqForMap[CIString, String] + + private val expiredKey = CIString("schedule:expired") + private val expiredValue = "true" } From 00aa18279f64e565ac157becf4bb0342407504ec Mon Sep 17 00:00:00 2001 From: Luke Smith Date: Fri, 24 Jan 2025 14:15:47 +0000 Subject: [PATCH 5/5] fixup! Add main, clean up some domain classes --- scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala | 4 ++-- .../main/scala/uk/sky/scheduler/domain/ScheduleEvent.scala | 6 +++--- .../scala/uk/sky/scheduler/message/MessageLawsSpec.scala | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala index c7987cd0..dc4eb021 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala @@ -5,9 +5,9 @@ import fs2.Stream object Main extends IOApp.Simple { - def stream[F[_] : Concurrent]: Stream[F, Unit] = + def stream[IO[_] : Concurrent]: Stream[IO, Unit] = for { - scheduler <- Stream.resource(Scheduler.live[F]) + scheduler <- Stream.resource(Scheduler.live[IO]) message <- scheduler.stream } yield message diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/domain/ScheduleEvent.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/domain/ScheduleEvent.scala index 50a74a02..1de2f88a 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/domain/ScheduleEvent.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/domain/ScheduleEvent.scala @@ -6,7 +6,7 @@ package uk.sky.scheduler.domain * @param schedule * The Schedule's destination and payload. */ -case class ScheduleEvent( +final case class ScheduleEvent( metadata: Metadata, schedule: Schedule ) @@ -17,7 +17,7 @@ case class ScheduleEvent( * @param scheduleTopic * The topic the Schedule arrived on. */ -case class Metadata( +final case class Metadata( id: String, scheduleTopic: String ) @@ -35,7 +35,7 @@ case class Metadata( * @param headers * Extra metadata to send with the payload. */ -case class Schedule( +final case class Schedule( time: Long, topic: String, key: Array[Byte], diff --git a/scheduler-3/src/test/scala/uk/sky/scheduler/message/MessageLawsSpec.scala b/scheduler-3/src/test/scala/uk/sky/scheduler/message/MessageLawsSpec.scala index 1adf175f..0d9a1e4e 100644 --- a/scheduler-3/src/test/scala/uk/sky/scheduler/message/MessageLawsSpec.scala +++ b/scheduler-3/src/test/scala/uk/sky/scheduler/message/MessageLawsSpec.scala @@ -5,5 +5,5 @@ import cats.tests.CatsSuite import uk.sky.scheduler.util.Generator.given final class MessageLawsSpec extends CatsSuite { - checkAll("Message.MonoidLaws", FunctorTests[Message].functor[String, String, String]) + checkAll("Message.FunctorLaws", FunctorTests[Message].functor[String, String, String]) }