Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Scheduler3 module #356

Merged
merged 5 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .scalafix3.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
rules = [
NoAutoTupling,
DisableSyntax,
LeakingImplicitClassVal,
NoValInForComprehension
]

OrganizeImports {
coalesceToWildcardImportThreshold = 6
groupedImports = AggressiveMerge
importSelectorsOrder = SymbolsFirst
removeUnused = false
targetDialect = Scala3
}
18 changes: 18 additions & 0 deletions .scalafmt3.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version = 3.8.3

align.preset = most
includeCurlyBraceInSelectChains = false
maxColumn = 120
project.git = true
rewrite.rules = [SortImports, RedundantBraces, RedundantParens, PreferCurlyFors]
runner.dialect = scala3
spaces.beforeContextBoundColon = Always

fileOverride {
"glob:**/project/*.scala" {
runner.dialect = scala212source3
}
"glob:**/*.sbt" {
runner.dialect = scala212source3
}
}
10 changes: 10 additions & 0 deletions aliases.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
addCommandAlias("checkFix", "scalafixAll --check OrganizeImports; scalafixAll --check")
addCommandAlias("runFix", "scalafixAll OrganizeImports; scalafixAll")
addCommandAlias("checkFmt", "scalafmtCheckAll; scalafmtSbtCheck")
addCommandAlias("runFmt", "scalafmtAll; scalafmtSbt")

addCommandAlias("checkLint", "checkFmt; checkFix")
addCommandAlias("runLint", "runFmt; runFix")

addCommandAlias("ciBuild", "checkFmt; checkFix; test; schema")
addCommandAlias("ciRelease", "clean; schema; project scheduler; release with-defaults")
70 changes: 44 additions & 26 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,25 +1,40 @@
import Aliases.*
import Release.*
import DockerPublish.*
import org.typelevel.scalacoptions.ScalacOptions

ThisBuild / scalafmtOnCompile := true
ThisBuild / semanticdbEnabled := true
ThisBuild / semanticdbVersion := scalafixSemanticdb.revision
ThisBuild / scalafixDependencies += "com.github.liancheng" %% "organize-imports" % "0.6.0"
ThisBuild / organization := "com.sky"

Global / onChangedBuildSource := ReloadOnSourceChanges
ThisBuild / scalafmtOnCompile := true
ThisBuild / semanticdbEnabled := true
ThisBuild / semanticdbVersion := scalafixSemanticdb.revision

Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oF")
Global / onChangedBuildSource := ReloadOnSourceChanges

val commonSettings = Seq(
organization := "com.sky",
scalaVersion := "2.13.10"
val scala2Settings = Seq(
scalaVersion := "2.13.10",
tpolecatScalacOptions ++= Set(
ScalacOptions.other("-Ymacro-annotations"),
ScalacOptions.source3
),
tpolecatExcludeOptions ++= Set(
ScalacOptions.warnNonUnitStatement,
ScalacOptions.warnValueDiscard
),
run / fork := true,
Test / fork := true,
Test / parallelExecution := false
)

val compilerSettings = Seq(
tpolecatScalacOptions ++= Set(ScalacOptions.other("-Ymacro-annotations"), ScalacOptions.source3),
tpolecatExcludeOptions ++= Set(ScalacOptions.warnNonUnitStatement, ScalacOptions.warnValueDiscard)
val scala3Settings = Seq(
scalaVersion := "3.6.2",
tpolecatScalacOptions ++= Set(
ScalacOptions.other("-no-indent"),
ScalacOptions.other("-old-syntax")
),
Test / tpolecatExcludeOptions += ScalacOptions.warnNonUnitStatement,
run / fork := true,
Test / fork := true,
Test / parallelExecution := false
)

val buildInfoSettings = Seq(
Expand All @@ -29,35 +44,38 @@ val buildInfoSettings = Seq(

lazy val scheduler = (project in file("scheduler"))
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, UniversalDeployPlugin, JavaAgent, DockerPlugin)
.settings(commonSettings)
.settings(compilerSettings)
.settings(scala2Settings)
.settings(
libraryDependencies ++= Dependencies.all,
libraryDependencies ++= Dependencies.scheduler,
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.3" cross CrossVersion.full),
run / fork := true,
Test / fork := true,
javaAgents += "io.kamon" % "kanela-agent" % "1.0.18",
javaAgents += "io.kamon" % "kanela-agent" % "1.0.18",
buildInfoSettings,
dockerSettings,
releaseSettings,
Test / parallelExecution := false
releaseSettings
)

lazy val scheduler3 = (project in file("scheduler-3"))
.enablePlugins(JavaAgent, DockerPlugin, JavaAppPackaging, BuildInfoPlugin)
.settings(scala3Settings)
.settings(
libraryDependencies ++= Dependencies.scheduler3,
buildInfoSettings,
scalafixConfig := Some((ThisBuild / baseDirectory).value / ".scalafix3.conf"),
scalafmtConfig := (ThisBuild / baseDirectory).value / ".scalafmt3.conf"
)

val schema = inputKey[Unit]("Generate the Avro schema file for the Schedule schema.")

lazy val avro = (project in file("avro"))
.settings(commonSettings)
.settings(compilerSettings)
.settings(scala2Settings)
.settings(libraryDependencies += Dependencies.avro4s)
.settings(schema := (Compile / run).toTask("").value)
.dependsOn(scheduler % "compile->compile")
.disablePlugins(ReleasePlugin)

lazy val root = (project in file("."))
.withId("kafka-message-scheduler")
.settings(commonSettings)
.settings(defineCommandAliases)
.settings(dockerImageCreationTask := (scheduler / Docker / publishLocal).value)
.aggregate(scheduler, avro)
.aggregate(scheduler, scheduler3, avro)
.enablePlugins(DockerComposePlugin)
.disablePlugins(ReleasePlugin)
13 changes: 0 additions & 13 deletions project/Aliases.scala

This file was deleted.

61 changes: 52 additions & 9 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,28 @@ object Dependencies {
}

object Cats {
private val version = "2.7.0"
val core = "org.typelevel" %% "cats-core" % version
val testKit = "org.typelevel" %% "cats-testkit" % version % Test
val scalatest = "com.ironcorelabs" %% "cats-scalatest" % "3.1.1" % Test
val base = Seq(core)
val test = Seq(testKit, scalatest)
private val version = "2.7.0"
private val catsEffectVersion = "3.5.7"

lazy val effectTestKit = "org.typelevel" %% "cats-effect-testkit" % catsEffectVersion % Test
lazy val effectTesting = "org.typelevel" %% "cats-effect-testing-scalatest" % "1.6.0" % Test
lazy val effectTestkitScalatest = "org.typelevel" %% "cats-testkit-scalatest" % "2.1.5" % Test
lazy val caseInsensitive = "org.typelevel" %% "case-insensitive" % "1.4.2"
lazy val caseInsensitiveTesting = "org.typelevel" %% "case-insensitive-testing" % "1.4.2"
lazy val core = "org.typelevel" %% "cats-core" % version
lazy val effect = "org.typelevel" %% "cats-effect" % catsEffectVersion
lazy val scalatest = "com.ironcorelabs" %% "cats-scalatest" % "3.1.1" % Test
lazy val testKit = "org.typelevel" %% "cats-testkit" % version % Test
lazy val base = Seq(core)
lazy val test = Seq(testKit, scalatest)
}

object Fs2 {
private lazy val version = "3.11.0"
private lazy val kafkaVersion = "3.6.0"

lazy val core = "co.fs2" %% "fs2-core" % version
lazy val kafka = "com.github.fd4s" %% "fs2-kafka" % kafkaVersion
}

object Kafka {
Expand All @@ -39,6 +55,10 @@ object Dependencies {
val all = Seq(core, akka, prometheus)
}

object Monocle {
lazy val core = "dev.optics" %% "monocle-core" % "3.3.0"
}

object PureConfig {
private val version = "0.17.1"
val pureconfig = "com.github.pureconfig" %% "pureconfig" % version
Expand All @@ -55,6 +75,13 @@ object Dependencies {
val test = Seq(scalaCheck)
}

object Vulcan {
private lazy val version = "1.11.1"

val core = "com.github.fd4s" %% "vulcan" % version
val generic = "com.github.fd4s" %% "vulcan-generic" % version % Test
}

val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % "4.1.2"
val kafkaTopicLoader = "uk.sky" %% "kafka-topic-loader" % "1.5.6"
val monix = "io.monix" %% "monix-execution" % "3.4.1"
Expand All @@ -72,18 +99,20 @@ object Dependencies {
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.18" % Test
val scalaTestPlusMockito = "org.scalatestplus" %% "mockito-3-12" % "3.2.10.0" % Test

val core: Seq[ModuleID] = Akka.base ++ Cats.base ++ Kafka.base ++ Kamon.all ++ PureConfig.all ++ Refined.base ++ Seq(
val core: Seq[ModuleID] = Akka.base ++ Cats.base ++ Kafka.base ++ Kamon.all ++ PureConfig.all ++ Refined.base ++ Seq(
avro4s,
kafkaTopicLoader,
monix,
scalaLogging
)

val runtime: Seq[ModuleID] = Seq(
janino,
logbackClassic,
logbackEncoder
)
val test: Seq[ModuleID] = Akka.test ++ Cats.test ++ Kafka.test ++ Refined.test ++ Seq(

val test: Seq[ModuleID] = Akka.test ++ Cats.test ++ Kafka.test ++ Refined.test ++ Seq(
embeddedKafka,
mockito,
randomDataGenerator,
Expand All @@ -92,5 +121,19 @@ object Dependencies {
scalaTest,
scalaTestPlusMockito
)
val all: Seq[sbt.ModuleID] = core ++ runtime ++ test
val scheduler: Seq[ModuleID] = core ++ runtime ++ test

val scheduler3: Seq[ModuleID] = Seq(
Cats.caseInsensitive,
Cats.caseInsensitiveTesting,
Cats.effect,
Cats.effectTestKit,
Cats.effectTesting,
Cats.effectTestkitScalatest,
Fs2.core,
Fs2.kafka,
Monocle.core,
Vulcan.core,
Vulcan.generic
)
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.10.1
sbt.version=1.10.7
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ addSbtPlugin("com.tapad" % "sbt-docker-compose" % "1.0.35")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1")
addSbtPlugin("io.kamon" % "sbt-kanela-runner" % "2.0.14")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.2")
10 changes: 10 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package uk.sky.scheduler

import fs2.Stream
import uk.sky.scheduler.domain.ScheduleEvent
import uk.sky.scheduler.error.ScheduleError
import uk.sky.scheduler.message.Message

trait EventSubscriber[F[_]] {
def messages: Stream[F, Message[Either[ScheduleError, Option[ScheduleEvent]]]]
}
16 changes: 16 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package uk.sky.scheduler

import cats.effect.*
import fs2.Stream

object Main extends IOApp.Simple {

def stream[F[_] : Concurrent]: Stream[F, Unit] =
btanyab marked this conversation as resolved.
Show resolved Hide resolved
for {
scheduler <- Stream.resource(Scheduler.live[F])
message <- scheduler.stream
} yield message

override def run: IO[Unit] = stream[IO].compile.drain

}
7 changes: 7 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/Repository.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package uk.sky.scheduler

trait Repository[F[_], K, V] {
def set(key: K, value: V): F[Unit]
def get(key: K): F[Option[V]]
def delete(key: K): F[Unit]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package uk.sky.scheduler

import fs2.Pipe
import uk.sky.scheduler.domain.ScheduleEvent

trait SchedulePublisher[F[_], O] {
def publish: Pipe[F, ScheduleEvent, O]
}
10 changes: 10 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/ScheduleQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package uk.sky.scheduler

import fs2.Stream
import uk.sky.scheduler.domain.ScheduleEvent

trait ScheduleQueue[F[_]] {
def schedule(key: String, scheduleEvent: ScheduleEvent): F[Unit]
def cancel(key: String): F[Unit]
def schedules: Stream[F, ScheduleEvent]
}
35 changes: 35 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package uk.sky.scheduler

import cats.effect.*
import fs2.Stream
import uk.sky.scheduler.domain.ScheduleEvent
import uk.sky.scheduler.message.Message
import uk.sky.scheduler.message.Metadata.*

class Scheduler[F[_] : Concurrent, O](
eventSubscriber: EventSubscriber[F],
scheduleQueue: ScheduleQueue[F],
schedulePublisher: SchedulePublisher[F, O]
) {
private val scheduleEvents = eventSubscriber.messages.evalTapChunk { case Message(key, source, value, metadata) =>
value match {
case Left(_) => scheduleQueue.cancel(key)
case Right(None) => Concurrent[F].unlessA(metadata.isExpired)(scheduleQueue.cancel(key))
case Right(Some(schedule)) => scheduleQueue.schedule(key, schedule)
}
}

def stream: Stream[F, O] =
scheduleEvents.drain
.merge(scheduleQueue.schedules.through(schedulePublisher.publish))
}

object Scheduler {
def live[F[_] : Concurrent]: Resource[F, Scheduler[F, Unit]] =
for {
eventSubscriber <- Resource.pure(??? : EventSubscriber[F])
scheduleQueue <- Resource.pure(??? : ScheduleQueue[F])
schedulePublisher <- Resource.pure(??? : SchedulePublisher[F, Unit])
} yield Scheduler(eventSubscriber, scheduleQueue, schedulePublisher)

}
Loading