Skip to content

Commit f81f90b

Browse files
committed
#537 Update 'kafka-clients' version and move it to 'extras', including HyperdriveNotificationTarget.
1 parent d4a2e3d commit f81f90b

File tree

10 files changed

+77
-17
lines changed

10 files changed

+77
-17
lines changed

pramen/build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ lazy val assemblySettingsExtras = assemblySettingsCommon ++ Seq(assembly / assem
243243
ShadeRule.zap("org.apache.avro.**").inAll,
244244
ShadeRule.zap("org.apache.commons.**").inAll,
245245
ShadeRule.zap("org.apache.jute.**").inAll,
246+
ShadeRule.zap("org.apache.kafka.**").inAll,
246247
ShadeRule.zap("org.apache.spark.annotation.**").inAll,
247248
ShadeRule.zap("org.apache.yetus.**").inAll,
248249
ShadeRule.zap("org.apache.zookeeper.**").inAll,
@@ -282,7 +283,6 @@ lazy val assemblySettingsRunner = assemblySettingsCommon ++ Seq(assembly / assem
282283
ShadeRule.zap("com.ibm.icu.**").inAll,
283284
ShadeRule.zap("net.jpountz.**").inAll,
284285
ShadeRule.zap("org.abego.**").inAll,
285-
ShadeRule.zap("org.apache.kafka.**").inAll,
286286
ShadeRule.zap("org.glassfish.**").inAll,
287287
ShadeRule.zap("org.lz4.**").inAll,
288288
ShadeRule.zap("org.slf4j.**").inAll,

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/NotificationTargetManagerSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@ package za.co.absa.pramen.core.tests.notify
1919
import com.typesafe.config.{Config, ConfigFactory}
2020
import org.scalatest.wordspec.AnyWordSpec
2121
import za.co.absa.pramen.core.base.SparkTestBase
22-
import za.co.absa.pramen.core.notify.{HyperdriveNotificationTarget, NotificationTargetManager}
22+
import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock
23+
import za.co.absa.pramen.core.notify.NotificationTargetManager
2324

2425
class NotificationTargetManagerSuite extends AnyWordSpec with SparkTestBase {
2526
private val conf: Config = ConfigFactory.parseString(
2627
s"""
2728
| pramen.notification.targets = [
2829
| {
2930
| name = "hyperdrive1"
30-
| factory.class = "za.co.absa.pramen.core.notify.HyperdriveNotificationTarget"
31+
| factory.class = "za.co.absa.pramen.core.mocks.notify.NotificationTargetMock"
3132
|
3233
| kafka.topic = "mytopic"
3334
|
@@ -48,7 +49,7 @@ class NotificationTargetManagerSuite extends AnyWordSpec with SparkTestBase {
4849
"return a notification target" in {
4950
val nt = NotificationTargetManager.getByName("hyperdrive1", conf, None)
5051

51-
assert(nt.isInstanceOf[HyperdriveNotificationTarget])
52+
assert(nt.isInstanceOf[NotificationTargetMock])
5253
}
5354

5455
"throw an exception if the notification target does not exist" in {

pramen/core/src/main/scala/za/co/absa/pramen/core/notify/HyperdriveNotificationTarget.scala renamed to pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/HyperdriveNotificationTarget.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.pramen.core.notify
17+
package za.co.absa.pramen.extras.notification
1818

1919
import com.typesafe.config.Config
2020
import org.apache.spark.sql.SparkSession
2121
import org.slf4j.LoggerFactory
2222
import za.co.absa.pramen.api.status.{RunStatus, TaskResult}
2323
import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, PipelineInfo}
24-
import za.co.absa.pramen.core.notify.mq.{SingleMessageProducer, SingleMessageProducerKafka}
2524
import za.co.absa.pramen.core.utils.ConfigUtils
2625
import za.co.absa.pramen.core.utils.Emoji._
26+
import za.co.absa.pramen.extras.notification.mq.{SingleMessageProducer, SingleMessageProducerKafka}
2727

2828
class HyperdriveNotificationTarget(conf: Config,
2929
producer: SingleMessageProducer,

pramen/core/src/main/scala/za/co/absa/pramen/core/notify/mq/SingleMessageProducer.scala renamed to pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.pramen.core.notify.mq
17+
package za.co.absa.pramen.extras.notification.mq
1818

1919
trait SingleMessageProducer {
2020
def send(topic: String, message: String, numberOrRetries: Int = 3): Unit

pramen/core/src/main/scala/za/co/absa/pramen/core/notify/mq/SingleMessageProducerKafka.scala renamed to pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducerKafka.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.pramen.core.notify.mq
17+
package za.co.absa.pramen.extras.notification.mq
1818

1919
import com.typesafe.config.Config
2020
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2022 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.pramen.extras
18+
19+
import za.co.absa.pramen.api.SchemaDifference
20+
import za.co.absa.pramen.api.status._
21+
22+
import java.time.{Instant, LocalDate}
23+
24+
object TaskNotificationFactory {
25+
def getDummyTaskNotification(taskDef: TaskDef = TaskDefFactory.getDummyTaskNotification(),
26+
runInfo: Option[RunInfo] = Some(RunInfo(
27+
LocalDate.of(2022, 2, 18),
28+
Instant.ofEpochMilli(1613600000000L),
29+
Instant.ofEpochMilli(1672759508000L)
30+
)),
31+
status: RunStatus = RunStatus.Succeeded(None, Some(100), None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty),
32+
applicationId: String = "app_12345",
33+
isTransient: Boolean = false,
34+
isRawFilesJob: Boolean = false,
35+
schemaChanges: Seq[SchemaDifference] = Seq.empty,
36+
dependencyWarnings: Seq[DependencyWarning] = Seq.empty,
37+
notificationTargetErrors: Seq[NotificationFailure] = Seq.empty,
38+
options: Map[String, String] = Map.empty[String, String]): TaskResult = {
39+
TaskResult(taskDef,
40+
status,
41+
runInfo,
42+
applicationId,
43+
isTransient,
44+
isRawFilesJob,
45+
schemaChanges,
46+
dependencyWarnings,
47+
notificationTargetErrors,
48+
options)
49+
}
50+
}

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/HyperdriveNotificationTargetSuite.scala renamed to pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/HyperdriveNotificationTargetSuite.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.pramen.core.tests.notify
17+
package za.co.absa.pramen.extras.notification
1818

1919
import com.typesafe.config.ConfigFactory
2020
import org.scalatest.wordspec.AnyWordSpec
21-
import za.co.absa.pramen.api.status.{RunStatus, TaskStatus}
22-
import za.co.absa.pramen.core.TaskNotificationFactory
23-
import za.co.absa.pramen.core.base.SparkTestBase
24-
import za.co.absa.pramen.core.mocks.notify.SingleMessageProducerSpy
25-
import za.co.absa.pramen.core.notify.HyperdriveNotificationTarget
21+
import za.co.absa.pramen.api.status.RunStatus
22+
import za.co.absa.pramen.extras.TaskNotificationFactory
23+
import za.co.absa.pramen.extras.base.SparkTestBase
2624

2725
class HyperdriveNotificationTargetSuite extends AnyWordSpec with SparkTestBase {
2826
"apply()" should {

pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/SingleMessageProducerSpy.scala renamed to pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/SingleMessageProducerSpy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.pramen.core.mocks.notify
17+
package za.co.absa.pramen.extras.notification
1818

19-
import za.co.absa.pramen.core.notify.mq.SingleMessageProducer
19+
import za.co.absa.pramen.extras.notification.mq.SingleMessageProducer
2020

2121
class SingleMessageProducerSpy extends SingleMessageProducer {
2222
var connectInvoked = 0

pramen/project/Dependencies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ object Dependencies {
3535
"org.postgresql" % "postgresql" % postgreSqlDriverVersion,
3636
"com.github.scopt" %% "scopt" % scoptVersion,
3737
"com.github.yruslan" %% "channel_scala" % channelVersion,
38-
"org.apache.kafka" % "kafka-clients" % kafkaClientVersion,
3938
"com.sun.mail" % "javax.mail" % javaXMailVersion,
4039
"com.lihaoyi" %% "requests" % requestsVersion,
4140
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
@@ -51,6 +50,7 @@ object Dependencies {
5150
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
5251
"org.mockito" % "mockito-core" % mockitoVersion % Test
5352
) ++ Seq(
53+
getKafkaClientsDependency(sparkVersion(scalaVersion)),
5454
getAbrisDependency(sparkVersion(scalaVersion)),
5555
getDeltaDependency(sparkVersion(scalaVersion), isCompile = false, isTest = true)
5656
)

pramen/project/Versions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,17 @@ object Versions {
9393
}
9494
}
9595

96+
def getKafkaClientsDependency(sparkVersion: String): ModuleID = {
97+
val kafkaClientsVersion = sparkVersion match {
98+
case version if version.startsWith("2.4.") => "2.5.1"
99+
case _ => "3.9.0"
100+
}
101+
102+
println(s"Using 'kafla-clients' version $kafkaClientsVersion")
103+
104+
"org.apache.kafka" % "kafka-clients" % kafkaClientsVersion
105+
}
106+
96107
def getAbrisDependency(sparkVersion: String): ModuleID = {
97108
// According to this: https://github.com/AbsaOSS/ABRiS?tab=readme-ov-file#supported-versions
98109
val abrisVersion = sparkVersion match {

0 commit comments

Comments
 (0)