diff --git a/build.sbt b/build.sbt index d3ab02a1..8252ba7a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "stream-loader" -ThisBuild / scalaVersion := "2.13.15" +ThisBuild / scalaVersion := "2.13.17" ThisBuild / scalacOptions := Seq( "-unchecked", "-deprecation", @@ -25,12 +25,12 @@ ThisBuild / git.remoteRepo := { } val scalaTestVersion = "3.2.19" -val scalaCheckVersion = "1.18.1" +val scalaCheckVersion = "1.19.0" val scalaCheckTestVersion = "3.2.19.0" -val hadoopVersion = "3.4.1" -val parquetVersion = "1.15.2" -val icebergVersion = "1.7.0" +val hadoopVersion = "3.4.2" +val parquetVersion = "1.16.0" +val icebergVersion = "1.10.0" lazy val `stream-loader-core` = project .in(file("stream-loader-core")) @@ -41,19 +41,19 @@ lazy val `stream-loader-core` = project buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, git.gitHeadCommit), libraryDependencies ++= Seq( "org.scala-lang" % "scala-reflect" % scalaVersion.value, - "org.apache.kafka" % "kafka-clients" % "3.9.0", + "org.apache.kafka" % "kafka-clients" % "4.1.0", "org.log4s" %% "log4s" % "1.10.0", - "org.apache.commons" % "commons-compress" % "1.27.1", - "org.xerial.snappy" % "snappy-java" % "1.1.10.7", + "org.apache.commons" % "commons-compress" % "1.28.0", + "org.xerial.snappy" % "snappy-java" % "1.1.10.8", "org.lz4" % "lz4-java" % "1.8.0", - "com.github.luben" % "zstd-jni" % "1.5.6-8", + "com.github.luben" % "zstd-jni" % "1.5.7-5", "com.univocity" % "univocity-parsers" % "2.9.1", "org.json4s" %% "json4s-native" % "4.0.7", - "io.micrometer" % "micrometer-core" % "1.14.1", + "io.micrometer" % "micrometer-core" % "1.15.4", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test", "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", - "ch.qos.logback" % "logback-classic" % "1.5.12" % "test" + "ch.qos.logback" % "logback-classic" % "1.5.19" % "test" ) ) @@ -64,8 +64,8 @@ lazy val `stream-loader-clickhouse` = project .settings( resolvers += "jitpack" at "https://jitpack.io", libraryDependencies ++= Seq( - "org.apache.httpcomponents.client5" % "httpclient5" % "5.4.1", - "com.clickhouse" % "clickhouse-jdbc" % "0.7.1", + "org.apache.httpcomponents.client5" % "httpclient5" % "5.5.1", + "com.clickhouse" % "client-v2" % "0.9.2", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test", "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test" @@ -106,14 +106,14 @@ lazy val `stream-loader-s3` = project .settings(commonSettings) .settings( libraryDependencies ++= Seq( - "software.amazon.awssdk" % "s3" % "2.29.20", + "software.amazon.awssdk" % "s3" % "2.35.5", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", - "com.amazonaws" % "aws-java-sdk-s3" % "1.12.778" % "test", - "org.gaul" % "s3proxy" % "2.4.1" % "test" + "com.amazonaws" % "aws-java-sdk-s3" % "1.12.792" % "test", + "org.gaul" % "s3proxy" % "2.8.0" % "test" ) ) -val verticaVersion = "24.4.0-0" +val verticaVersion = "25.3.0-0" lazy val `stream-loader-vertica` = project .in(file("stream-loader-vertica")) @@ -128,7 +128,8 @@ lazy val `stream-loader-vertica` = project ) ) -val duckdbVersion = "1.1.3" +val duckdbVersion = "1.4.1.0" +val jerseyVersion = "3.1.11" lazy val packAndSplitJars = taskKey[(File, File)]("Runs pack and splits out the application jars from the external dependency jars") @@ -150,19 +151,21 @@ lazy val `stream-loader-tests` = project .settings(commonSettings) .settings( libraryDependencies ++= Seq( - "com.typesafe" % "config" % "1.4.3", - "ch.qos.logback" % "logback-classic" % "1.5.12", - "com.zaxxer" % "HikariCP" % "6.2.1", - "org.apache.iceberg" % "iceberg-parquet" % icebergVersion, - "com.vertica.jdbc" % "vertica-jdbc" % verticaVersion, - "org.scalacheck" %% "scalacheck" % scalaCheckVersion, - "org.scalatest" %% "scalatest" % scalaTestVersion % "test", - "org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test", - "org.slf4j" % "log4j-over-slf4j" % "2.0.16" % "test", - "org.mandas" % "docker-client" % "8.0.3" % "test", - "org.jboss.resteasy" % "resteasy-client" % "6.2.11.Final" % "test", - "com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.18.1" % "test", - "org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test" + "com.typesafe" % "config" % "1.4.5", + "ch.qos.logback" % "logback-classic" % "1.5.19", + "com.zaxxer" % "HikariCP" % "7.0.2", + "org.apache.iceberg" % "iceberg-parquet" % icebergVersion, + "com.vertica.jdbc" % "vertica-jdbc" % verticaVersion, + "org.scalacheck" %% "scalacheck" % scalaCheckVersion, + "org.scalatest" %% "scalatest" % scalaTestVersion % "test", + "org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test", + "org.slf4j" % "log4j-over-slf4j" % "2.0.17" % "test", + "org.mandas" % "docker-client" % "9.0.4" % "test", + "org.glassfish.jersey.core" % "jersey-client" % jerseyVersion % "test", + "org.glassfish.jersey.inject" % "jersey-hk2" % jerseyVersion % "test", + "org.glassfish.jersey.connectors" % "jersey-apache-connector" % jerseyVersion % "test", + "org.glassfish.jersey.media" % "jersey-media-json-jackson" % jerseyVersion % "test", + "org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test" ), inConfig(IntegrationTest)(Defaults.testTasks), publish := {}, @@ -195,7 +198,7 @@ lazy val `stream-loader-tests` = project val bin = s"/opt/${name.value}/bin/" new Dockerfile { - from("eclipse-temurin:21.0.2_13-jre") + from("eclipse-temurin:21.0.8_9-jre") env("APP_CLASS_PATH" -> s"$lib/*") diff --git a/project/build.properties b/project/build.properties index c02c575f..01a16ed1 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.11.3 +sbt.version=1.11.7 diff --git a/project/plugins.sbt b/project/plugins.sbt index 8e9c2f6e..1d0c4deb 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,20 +4,20 @@ addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.11.0") addSbtPlugin("com.github.sbt" % "sbt-git" % "2.1.0") -addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.20") +addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.22") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.13.1") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.5") addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2") -addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0") +addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.6.0") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0") -libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.8" +libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2025.8" -addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0") +addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.9.0") -addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.3.0") +addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.3.1") diff --git a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileBuilder.scala b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileBuilder.scala index c5b3a23c..f5cb848e 100644 --- a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileBuilder.scala +++ b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileBuilder.scala @@ -8,8 +8,8 @@ package com.adform.streamloader.clickhouse -import com.adform.streamloader.sink.file.{FileBuilder, FileBuilderFactory} -import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat} +import com.adform.streamloader.sink.file.{Compression, FileBuilder, FileBuilderFactory} +import com.clickhouse.data.ClickHouseFormat /** * A FileBuilder able to build files that can be loaded to ClickHouse. @@ -26,7 +26,7 @@ trait ClickHouseFileBuilder[-R] extends FileBuilder[R] { /** * Compression to use for the files being constructed. */ - def compression: ClickHouseCompression + def fileCompression: Compression } trait ClickHouseFileBuilderFactory[R] extends FileBuilderFactory[R, ClickHouseFileBuilder[R]] { diff --git a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatch.scala b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatch.scala index 45654607..7d164f13 100644 --- a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatch.scala +++ b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatch.scala @@ -8,10 +8,11 @@ package com.adform.streamloader.clickhouse -import java.io.File import com.adform.streamloader.model.StreamRange -import com.adform.streamloader.sink.file.FileRecordBatch -import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat} +import com.adform.streamloader.sink.file.{Compression, FileRecordBatch} +import com.clickhouse.data.ClickHouseFormat + +import java.io.File /** * A file containing a batch of records in some ClickHouse supported format that can be loaded to ClickHouse. @@ -19,7 +20,7 @@ import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat} case class ClickHouseFileRecordBatch( file: File, format: ClickHouseFormat, - compression: ClickHouseCompression, + fileCompression: Compression, recordRanges: Seq[StreamRange], rowCount: Long ) extends FileRecordBatch diff --git a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatcher.scala b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatcher.scala index c1920ed9..6ef6dd00 100644 --- a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatcher.scala +++ b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatcher.scala @@ -32,7 +32,7 @@ class ClickHouseFileRecordBatcher[R]( ClickHouseFileRecordBatch( f, fileBuilder.format, - fileBuilder.compression, + fileBuilder.fileCompression, recordRanges, fileBuilder.getRecordCount ) diff --git a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileStorage.scala b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileStorage.scala index be7a73e3..d80c1d23 100644 --- a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileStorage.scala +++ b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileStorage.scala @@ -10,23 +10,22 @@ package com.adform.streamloader.clickhouse import com.adform.streamloader.model._ import com.adform.streamloader.sink.batch.storage.InDataOffsetBatchStorage +import com.adform.streamloader.sink.file.Compression import com.adform.streamloader.util.Logging -import com.clickhouse.data.ClickHouseFile -import com.clickhouse.jdbc.ClickHouseConnection +import com.clickhouse.client.api.Client +import com.clickhouse.client.api.insert.InsertSettings import org.apache.kafka.common.TopicPartition -import java.sql.Connection -import javax.sql.DataSource +import java.nio.file.Files +import java.time.ZoneOffset import scala.collection.mutable -import scala.jdk.CollectionConverters._ -import scala.util.Using /** * A ClickHouse storage implementation, stores offsets in rows of data. * Queries ClickHouse upon initialization in order to retrieve committed stream positions. */ class ClickHouseFileStorage( - dbDataSource: DataSource, + client: Client, table: String, topicColumnName: String, partitionColumnName: String, @@ -35,7 +34,7 @@ class ClickHouseFileStorage( ) extends InDataOffsetBatchStorage[ClickHouseFileRecordBatch] with Logging { - def committedPositions(connection: Connection): Map[TopicPartition, StreamPosition] = { + override def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]] = { val positionQuery = s"""SELECT | $topicColumnName, @@ -47,53 +46,52 @@ class ClickHouseFileStorage( |GROUP BY $topicColumnName, $partitionColumnName |""".stripMargin - Using.resource(connection.prepareStatement(positionQuery)) { statement => - { - log.info(s"Running stream position query: $positionQuery") - Using.resource(statement.executeQuery()) { result => - val positions: mutable.HashMap[TopicPartition, StreamPosition] = mutable.HashMap.empty - while (result.next()) { - val topic = result.getString(1) - val partition = result.getInt(2) - val offset = result.getLong(3) - val watermark = Timestamp(result.getTimestamp(4).getTime) - if (!result.wasNull()) { - val topicPartition = new TopicPartition(topic, partition) - val position = StreamPosition(offset, watermark) - positions.put(topicPartition, position) - } - } - positions.toMap - } - } - } + log.info(s"Running stream position query: $positionQuery") + val positions: mutable.HashMap[TopicPartition, StreamPosition] = mutable.HashMap.empty + client + .queryAll(positionQuery) + .forEach(row => { + val topic = row.getString(1) + val partition = row.getInteger(2) + val offset = row.getLong(3) + val watermark = Timestamp(row.getLocalDateTime(4).toInstant(ZoneOffset.UTC).toEpochMilli) + + val topicPartition = new TopicPartition(topic, partition) + val position = StreamPosition(offset, watermark) + positions.put(topicPartition, position) + }) + + topicPartitions.map(tp => (tp, positions.get(tp))).toMap } - override def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]] = { - Using.resource(dbDataSource.getConnection()) { connection => - val positions = committedPositions(connection) - topicPartitions.map(tp => (tp, positions.get(tp))).toMap - } + override def commitBatchWithOffsets(batch: ClickHouseFileRecordBatch): Unit = { + val settings = new InsertSettings() + .setOption("max_insert_block_size", batch.rowCount) // ensure single block to prevent partial writes + .setDeduplicationToken(deduplicationToken(batch.recordRanges)) // deduplicate based on ranges + + contentEncoding(batch.fileCompression).foreach(encoding => settings.appCompressedData(true, encoding)) + + client.insert(table, Files.newInputStream(batch.file.toPath), batch.format, settings).get() } - override def commitBatchWithOffsets(batch: ClickHouseFileRecordBatch): Unit = { - Using.resource(dbDataSource.getConnection) { connection => - Using.resource(connection.unwrap(classOf[ClickHouseConnection]).createStatement) { statement => - statement - .write() - .data(ClickHouseFile.of(batch.file, batch.compression, 1, batch.format)) - .table(table) - .params(Map("max_insert_block_size" -> batch.rowCount.toString).asJava) // atomic insert - .executeAndWait() - } - } + private def contentEncoding(fileCompression: Compression): Option[String] = fileCompression match { + case Compression.NONE => None + case Compression.ZSTD => Some("zstd") + case Compression.GZIP => Some("gzip") + case Compression.BZIP => Some("bz2") + case Compression.LZ4 => Some("lz4") + case _ => throw new UnsupportedOperationException(s"Compression $fileCompression is not supported by ClickHouse") + } + + private def deduplicationToken(ranges: Seq[StreamRange]): String = { + ranges.map(range => s"${range.topic}:${range.partition}:${range.start.offset}:${range.end.offset}").mkString(";") } } object ClickHouseFileStorage { case class Builder( - private val _dbDataSource: DataSource, + private val _client: Client, private val _table: String, private val _topicColumnName: String, private val _partitionColumnName: String, @@ -102,9 +100,9 @@ object ClickHouseFileStorage { ) { /** - * Sets a data source for ClickHouse JDBC connections. + * Sets the ClickHouse client. */ - def dbDataSource(source: DataSource): Builder = copy(_dbDataSource = source) + def client(client: Client): Builder = copy(_client = client) /** * Sets the table to load data to. @@ -129,11 +127,11 @@ object ClickHouseFileStorage { ) def build(): ClickHouseFileStorage = { - if (_dbDataSource == null) throw new IllegalStateException("Must provide a ClickHouse data source") + if (_client == null) throw new IllegalStateException("Must provide a ClickHouse client") if (_table == null) throw new IllegalStateException("Must provide a valid table name") new ClickHouseFileStorage( - _dbDataSource, + _client, _table, _topicColumnName, _partitionColumnName, diff --git a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/rowbinary/RowBinaryClickHouseFileBuilder.scala b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/rowbinary/RowBinaryClickHouseFileBuilder.scala index b115da9f..72d32f89 100644 --- a/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/rowbinary/RowBinaryClickHouseFileBuilder.scala +++ b/stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/rowbinary/RowBinaryClickHouseFileBuilder.scala @@ -10,7 +10,7 @@ package com.adform.streamloader.clickhouse.rowbinary import com.adform.streamloader.clickhouse.ClickHouseFileBuilder import com.adform.streamloader.sink.file.{Compression, StreamFileBuilder} -import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat} +import com.clickhouse.data.ClickHouseFormat /** * File builder for the ClickHouse native RowBinary file format, requires @@ -21,7 +21,7 @@ import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat} * @tparam R type of the records written to files being built. */ class RowBinaryClickHouseFileBuilder[-R: RowBinaryClickHouseRecordEncoder]( - fileCompression: Compression = Compression.NONE, + val fileCompression: Compression = Compression.NONE, bufferSizeBytes: Int = 8192 ) extends StreamFileBuilder[R]( os => new RowBinaryClickHouseRecordStreamWriter[R](os), @@ -31,13 +31,4 @@ class RowBinaryClickHouseFileBuilder[-R: RowBinaryClickHouseRecordEncoder]( with ClickHouseFileBuilder[R] { override val format: ClickHouseFormat = ClickHouseFormat.RowBinary - - override def compression: ClickHouseCompression = fileCompression match { - case Compression.NONE => ClickHouseCompression.NONE - case Compression.ZSTD => ClickHouseCompression.ZSTD - case Compression.GZIP => ClickHouseCompression.GZIP - case Compression.BZIP => ClickHouseCompression.BZ2 - case Compression.LZ4 => ClickHouseCompression.LZ4 - case _ => throw new UnsupportedOperationException(s"Compression $fileCompression is not supported by ClickHouse") - } } diff --git a/stream-loader-s3/src/test/scala/com/adform/streamloader/s3/MockS3.scala b/stream-loader-s3/src/test/scala/com/adform/streamloader/s3/MockS3.scala index 5972d4fb..0b16e6c4 100644 --- a/stream-loader-s3/src/test/scala/com/adform/streamloader/s3/MockS3.scala +++ b/stream-loader-s3/src/test/scala/com/adform/streamloader/s3/MockS3.scala @@ -15,6 +15,7 @@ import org.jclouds.blobstore.BlobStoreContext import org.scalatest.BeforeAndAfterAll import org.scalatest.funspec.AnyFunSpec import software.amazon.awssdk.auth.credentials.AwsBasicCredentials +import software.amazon.awssdk.core.checksums.{RequestChecksumCalculation, ResponseChecksumValidation} import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.services.s3.endpoints.{S3EndpointParams, S3EndpointProvider} @@ -65,6 +66,8 @@ trait MockS3 extends BeforeAndAfterAll { this: AnyFunSpec => .credentialsProvider(() => AwsBasicCredentials.create(accessKey, secretKey)) .forcePathStyle(true) .endpointOverride(endpoint.url()) + .requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED) + .responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED) .build() override def beforeAll(): Unit = { diff --git a/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/ClickHouse.scala b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/ClickHouse.scala index e9382441..ba9ff3a9 100644 --- a/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/ClickHouse.scala +++ b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/ClickHouse.scala @@ -8,20 +8,21 @@ package com.adform.streamloader.loaders -import java.time.LocalDateTime -import java.util.UUID import com.adform.streamloader.clickhouse._ import com.adform.streamloader.clickhouse.rowbinary.RowBinaryClickHouseFileBuilder -import com.adform.streamloader.sink.encoding.macros.DataTypeEncodingAnnotation.DecimalEncoding -import com.adform.streamloader.sink.file.FileCommitStrategy.ReachedAnyOf import com.adform.streamloader.model.{ExampleMessage, Timestamp} import com.adform.streamloader.sink.batch.{RecordBatchingSink, RecordFormatter} +import com.adform.streamloader.sink.encoding.macros.DataTypeEncodingAnnotation.DecimalEncoding import com.adform.streamloader.sink.file.Compression +import com.adform.streamloader.sink.file.FileCommitStrategy.ReachedAnyOf import com.adform.streamloader.source.KafkaSource import com.adform.streamloader.util.ConfigExtensions._ import com.adform.streamloader.{Loader, StreamLoader} +import com.clickhouse.client.api.Client import com.typesafe.config.ConfigFactory -import com.zaxxer.hikari.{HikariConfig, HikariDataSource} + +import java.time.LocalDateTime +import java.util.UUID /* CREATE TABLE IF NOT EXISTS test_table ( @@ -66,22 +67,12 @@ object TestClickHouseLoader extends Loader { def main(args: Array[String]): Unit = { val cfg = ConfigFactory.load().getConfig("stream-loader") - val hikariConf = new HikariConfig() - - val host = cfg.getString("clickhouse.host") - val port = cfg.getInt("clickhouse.port") - val db = cfg.getString("clickhouse.db") - - hikariConf.setDriverClassName(classOf[com.clickhouse.jdbc.ClickHouseDriver].getName) - hikariConf.setJdbcUrl(s"jdbc:clickhouse://$host:$port/$db") - - hikariConf.addDataSourceProperty("host", host) - hikariConf.addDataSourceProperty("port", port) - hikariConf.addDataSourceProperty("database", db) - hikariConf.addDataSourceProperty("userID", cfg.getString("clickhouse.user")) - hikariConf.addDataSourceProperty("password", cfg.getString("clickhouse.password")) - - val clickHouseDataSource = new HikariDataSource(hikariConf) + val client = new Client.Builder() + .addEndpoint(s"http://${cfg.getString("clickhouse.host")}:${cfg.getInt("clickhouse.port")}") + .setDefaultDatabase(cfg.getString("clickhouse.db")) + .setUsername(cfg.getString("clickhouse.user")) + .setPassword(cfg.getString("clickhouse.password")) + .build() val recordFormatter: RecordFormatter[TestClickHouseRecord] = record => { val msg = ExampleMessage.parseFrom(record.consumerRecord.value()) @@ -125,7 +116,7 @@ object TestClickHouseLoader extends Loader { .batchStorage( ClickHouseFileStorage .builder() - .dbDataSource(clickHouseDataSource) + .client(client) .table(cfg.getString("clickhouse.table")) .build() ) @@ -135,7 +126,7 @@ object TestClickHouseLoader extends Loader { sys.addShutdownHook { loader.stop() - clickHouseDataSource.close() + client.close() } loader.start() diff --git a/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala index 8e5f976a..e71cbab8 100644 --- a/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala +++ b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala @@ -15,15 +15,14 @@ import com.adform.streamloader.sink.file.FileCommitStrategy._ import com.adform.streamloader.sink.file.MultiFileCommitStrategy import com.adform.streamloader.source.KafkaSource import com.adform.streamloader.util.ConfigExtensions._ -import com.adform.streamloader.util.UuidExtensions._ import com.adform.streamloader.{Loader, StreamLoader} import com.typesafe.config.ConfigFactory import org.apache.hadoop.conf.Configuration -import org.apache.iceberg.{FileFormat, TableMetadata, TableOperations} import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.data.{GenericRecord, Record => IcebergRecord} import org.apache.iceberg.hadoop.HadoopCatalog import org.apache.iceberg.io.{FileIO, LocationProvider} +import org.apache.iceberg.{FileFormat, TableMetadata, TableOperations} import java.time.{Duration, ZoneOffset} import java.util @@ -54,7 +53,7 @@ object TestIcebergLoader extends Loader { icebergRecord.setField("isEnabled", avroMessage.isEnabled) icebergRecord.setField("childIds", util.Arrays.asList(avroMessage.childIds: _*)) icebergRecord.setField("parentId", avroMessage.parentId.orNull) - icebergRecord.setField("transactionId", avroMessage.transactionId.toBytes) + icebergRecord.setField("transactionId", avroMessage.transactionId) icebergRecord.setField("moneySpent", avroMessage.moneySpent.bigDecimal) Seq(icebergRecord) diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/ClickHouseIntegrationTests.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/ClickHouseIntegrationTests.scala index 6e2c929f..50a9cb24 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/ClickHouseIntegrationTests.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/ClickHouseIntegrationTests.scala @@ -12,11 +12,11 @@ import com.adform.streamloader.behaviors.{BasicLoaderBehaviors, RebalanceBehavio import com.adform.streamloader.fixtures._ import com.adform.streamloader.loaders.TestClickHouseLoader import com.adform.streamloader.storage.ClickHouseStorageBackend -import com.zaxxer.hikari.{HikariConfig, HikariDataSource} +import com.clickhouse.client.api.Client import org.scalatest.concurrent.Eventually import org.scalatest.funspec.AnyFunSpec -import org.scalatest.tags.Slow import org.scalatest.matchers.should.Matchers +import org.scalatest.tags.Slow import org.scalatestplus.scalacheck.Checkers import scala.concurrent.ExecutionContext @@ -38,33 +38,21 @@ class ClickHouseIntegrationTests val kafkaConfig: KafkaConfig = KafkaConfig() val clickHouseConfig: ClickHouseConfig = ClickHouseConfig() - - var hikariConf: HikariConfig = _ - var dataSource: HikariDataSource = _ + var clickHouseClient: Client = _ override def beforeAll(): Unit = { super.beforeAll() - hikariConf = new HikariConfig() - hikariConf.setDriverClassName(classOf[com.clickhouse.jdbc.ClickHouseDriver].getName) - hikariConf.setJdbcUrl(s"jdbc:clickhouse://${clickHouseContainer.endpoint}/${clickHouseConfig.dbName}") - hikariConf.addDataSourceProperty("host", clickHouseContainer.ip) - hikariConf.addDataSourceProperty("port", jdbcPort) - hikariConf.addDataSourceProperty("database", clickHouseConfig.dbName) - hikariConf.addDataSourceProperty("userID", "") - hikariConf.addDataSourceProperty("password", "") - hikariConf.setConnectionTimeout(1000) - hikariConf.setValidationTimeout(1000) - hikariConf.setConnectionTestQuery("SELECT 1") - - hikariConf.setMinimumIdle(4) - hikariConf.setMaximumPoolSize(8) - - dataSource = new HikariDataSource(hikariConf) + clickHouseClient = new Client.Builder() + .addEndpoint(s"http://${clickHouseContainer.ip}:${clickHouseContainer.port}") + .setUsername(clickHouseConfig.userName) + .setPassword(clickHouseConfig.password) + .setDefaultDatabase(clickHouseConfig.dbName) + .build() } override def afterAll(): Unit = { super.afterAll() - dataSource.close() + clickHouseClient.close() } def clickHouseStorageBackend(testId: String): ClickHouseStorageBackend = { @@ -75,8 +63,8 @@ class ClickHouseIntegrationTests dockerNetwork, kafkaContainer, clickHouseContainer, - hikariConf, - dataSource, + clickHouseConfig, + clickHouseClient, table, TestClickHouseLoader ) diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala index c21be981..fc830c7b 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala @@ -12,7 +12,6 @@ import com.adform.streamloader.behaviors.BasicLoaderBehaviors import com.adform.streamloader.fixtures._ import com.adform.streamloader.storage._ import com.zaxxer.hikari.{HikariConfig, HikariDataSource} -import org.scalatest.Ignore import org.scalatest.concurrent.Eventually import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers @@ -22,9 +21,6 @@ import org.scalatestplus.scalacheck.Checkers import scala.concurrent.ExecutionContext @Slow -// Temporarily ignore Vertica tests, vertica-ce image is not available on DockerHub -// https://github.com/vertica/vertica-containers/issues/64 -@Ignore class VerticaIntegrationTests extends AnyFunSpec with Matchers diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/ClickHouse.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/ClickHouse.scala index 155d3165..6961513b 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/ClickHouse.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/ClickHouse.scala @@ -15,7 +15,12 @@ import org.scalatest.{BeforeAndAfterAll, Suite} import java.time.Duration import scala.jdk.CollectionConverters._ -case class ClickHouseConfig(dbName: String = "default", image: String = "clickhouse/clickhouse-server:24.10.2.80") +case class ClickHouseConfig( + image: String = "clickhouse/clickhouse-server:25.9.3.48", + dbName: String = "default", + userName: String = "default", + password: String = "password" +) trait ClickHouseTestFixture extends ClickHouse with BeforeAndAfterAll { this: Suite with DockerTestFixture => override def beforeAll(): Unit = { @@ -64,6 +69,12 @@ trait ClickHouse { this: Docker => .portBindings(makePortBindings(jdbcPort, nativeClientPort)) .build() ) + .env( + s"CLICKHOUSE_DB=${clickHouseConfig.dbName}", + s"CLICKHOUSE_USER=${clickHouseConfig.userName}", + s"CLICKHOUSE_PASSWORD=${clickHouseConfig.password}", + "CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1" + ) .healthcheck( Healthcheck .builder() diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Docker.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Docker.scala index 486af638..8c1d19a9 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Docker.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Docker.scala @@ -10,12 +10,11 @@ package com.adform.streamloader.fixtures import java.time.Duration import java.util.UUID - import org.mandas.docker.client.DefaultDockerClient import org.mandas.docker.client.DockerClient.RemoveContainerParam -import org.mandas.docker.client.builder.resteasy.ResteasyDockerClientBuilder import org.mandas.docker.client.messages.{ContainerConfig, NetworkConfig, PortBinding} import org.log4s.getLogger +import org.mandas.docker.client.builder.DockerClientBuilder import org.scalatest.{BeforeAndAfterAll, Suite} import scala.jdk.CollectionConverters._ @@ -53,7 +52,7 @@ trait Docker { private var network: DockerNetwork = _ - val docker: DefaultDockerClient = new ResteasyDockerClientBuilder().fromEnv().build() + val docker: DefaultDockerClient = DockerClientBuilder.fromEnv().build() val dockerSandboxId: String = UUID.randomUUID().toString val healthCheckTimeout: Duration = Duration.ofSeconds(60) diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Hdfs.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Hdfs.scala index 68adca52..725ebed6 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Hdfs.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Hdfs.scala @@ -8,18 +8,16 @@ package com.adform.streamloader.fixtures -import java.net.URI - -import org.mandas.docker.client.messages.{ContainerConfig, HostConfig} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.log4s._ +import org.mandas.docker.client.messages.{ContainerConfig, Healthcheck, HostConfig} import org.scalatest.{BeforeAndAfterAll, Suite} -case class HdfsConfig( - namenodeImage: String = "bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8", - datanodeImage: String = "bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8" -) +import java.net.URI +import scala.jdk.CollectionConverters._ + +case class HdfsConfig(image: String = "apache/hadoop:3.4.1") trait HdfsTestFixture extends Hdfs with BeforeAndAfterAll { this: Suite with DockerTestFixture => @@ -54,16 +52,18 @@ trait Hdfs { this: Docker => namenode = startNameNode(dockerNetwork)( nameNodeName, List( - s"CLUSTER_NAME=$dockerSandboxId-integration-test", - s"CORE_CONF_fs_defaultFS=hdfs://$nameNodeName:$hdfsPort" + "HADOOP_USER_NAME=root", + "ENSURE_NAMENODE_DIR=/tmp/hadoop-root/dfs/name", + s"CORE-SITE.XML_fs.defaultFS=hdfs://$nameNodeName:$hdfsPort", + "HDFS-SITE.XML_dfs.namenode.rpc-bind-host=0.0.0.0" ) ) datanode = startDataNode(dockerNetwork)( s"$dockerSandboxId-hadoop-datanode", List( - s"CORE_CONF_fs_defaultFS=hdfs://${namenode.name}:$hdfsPort", - s"CORE_CONF_dfs_datanode_address=${dockerNetwork.ip}:$datanodePort" + s"CORE-SITE.XML_fs.defaultFS=hdfs://${namenode.name}:$hdfsPort", + s"HDFS-SITE.XML_dfs.datanode.address=0.0.0.0:$datanodePort" ), List(namenode.name) ) @@ -98,7 +98,7 @@ trait Hdfs { this: Docker => )(containerName: String, envVars: List[String] = List(), links: List[String] = List()): Container = { val config = ContainerConfig .builder() - .image(hdfsConfig.datanodeImage) + .image(hdfsConfig.image) .hostConfig( HostConfig .builder() @@ -106,8 +106,18 @@ trait Hdfs { this: Docker => .portBindings(makePortBindings(datanodePort)) .build() ) + .healthcheck( + Healthcheck + .builder() + .test(List("CMD-SHELL", s"nc -z localhost $datanodePort").asJava) + .retries(6) + .interval(1_000_000_000L) + .timeout(5_000_000_000L) + .build() + ) .exposedPorts(datanodePort.toString) .env(envVars: _*) + .cmd(Seq("hdfs", "datanode").asJava) .build() val containerId = startContainer(config, containerName) @@ -124,7 +134,7 @@ trait Hdfs { this: Docker => )(containerName: String, envVars: List[String] = List()): ContainerWithEndpoint = { val config = ContainerConfig .builder() - .image(hdfsConfig.namenodeImage) + .image(hdfsConfig.image) .hostConfig( HostConfig .builder() @@ -132,8 +142,18 @@ trait Hdfs { this: Docker => .portBindings(makePortBindings(hdfsPort)) .build() ) + .healthcheck( + Healthcheck + .builder() + .test(List("CMD-SHELL", s"nc -z localhost $hdfsPort").asJava) + .retries(6) + .interval(1_000_000_000L) + .timeout(5_000_000_000L) + .build() + ) .exposedPorts(hdfsPort.toString) .env(envVars: _*) + .cmd(Seq("hdfs", "namenode").asJava) .build() val containerId = startContainer(config, containerName) diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala index 32e81005..8cb01821 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import scala.util.Using -case class KafkaConfig(image: String = "bitnami/kafka:3.9.0-debian-12-r3") +case class KafkaConfig(image: String = "apache/kafka-native:4.1.0") trait KafkaTestFixture extends Kafka with BeforeAndAfterAll with BeforeAndAfterEach { this: Suite with DockerTestFixture => @@ -139,12 +139,7 @@ trait Kafka { this: Docker => .healthcheck( Healthcheck .builder() - .test( - List( - "CMD-SHELL", - s"/opt/bitnami/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:$kafkaPort" - ).asJava - ) + .test(List("CMD-SHELL", s"nc -z localhost $kafkaPort").asJava) .retries(6) .interval(1_000_000_000L) .timeout(5_000_000_000L) @@ -152,15 +147,15 @@ trait Kafka { this: Docker => ) .exposedPorts(kafkaPort.toString) .env( - "KAFKA_CFG_NODE_ID=1", - "KAFKA_CFG_PROCESS_ROLES=broker,controller", - "KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER", - s"KAFKA_CFG_LISTENERS=PLAINTEXT://:$kafkaPort,CONTROLLER://:$kafkaControllerPort", - "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT", - s"KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${dockerNetwork.ip}:$kafkaPort", - s"KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:$kafkaControllerPort", - "ALLOW_PLAINTEXT_LISTENER=yes", - s"KAFKA_CFG_LOG_RETENTION_HOURS=${Int.MaxValue}" + "KAFKA_NODE_ID=1", + "KAFKA_PROCESS_ROLES=broker,controller", + "KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER", + s"KAFKA_LISTENERS=PLAINTEXT://:$kafkaPort,CONTROLLER://:$kafkaControllerPort", + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT", + s"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${dockerNetwork.ip}:$kafkaPort", + s"KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:$kafkaControllerPort", + s"KAFKA_LOG_RETENTION_HOURS=${Int.MaxValue}", + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1" ) .build() diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/S3.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/S3.scala index 36646358..53384c20 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/S3.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/S3.scala @@ -18,7 +18,7 @@ import software.amazon.awssdk.services.s3.S3Client import java.net.URI import scala.jdk.CollectionConverters._ -case class S3Config(image: String = "minio/minio:RELEASE.2024-11-07T00-52-20Z") +case class S3Config(image: String = "minio/minio:RELEASE.2025-09-07T16-13-09Z") trait S3TestFixture extends S3 with BeforeAndAfterAll { this: Suite with DockerTestFixture => override def beforeAll(): Unit = { diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Vertica.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Vertica.scala index 377ae9e6..319165ad 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Vertica.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Vertica.scala @@ -18,10 +18,10 @@ import scala.jdk.CollectionConverters._ import scala.util.Using case class VerticaConfig( - dbName: String = "", + dbName: String = "docker", user: String = "dbadmin", password: String = "", - image: String = "opentext/vertica-ce:24.4.0-1" + image: String = "jbfavre/vertica:9.2.0-7_centos-7" ) trait VerticaTestFixture extends Vertica with BeforeAndAfterAll { this: Suite with DockerTestFixture => diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/ClickHouseStorageBackend.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/ClickHouseStorageBackend.scala index 27997c3d..725a28e7 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/ClickHouseStorageBackend.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/ClickHouseStorageBackend.scala @@ -9,37 +9,35 @@ package com.adform.streamloader.storage import com.adform.streamloader.clickhouse.ClickHouseFileStorage -import com.adform.streamloader.fixtures.{Container, ContainerWithEndpoint, DockerNetwork, SimpleContainer} +import com.adform.streamloader.fixtures._ import com.adform.streamloader.model.{ExampleMessage, StreamPosition, Timestamp} import com.adform.streamloader.source.KafkaContext import com.adform.streamloader.util.Retry import com.adform.streamloader.{BuildInfo, Loader} -import com.clickhouse.jdbc.ClickHouseArray +import com.clickhouse.client.api.Client +import com.clickhouse.client.api.query.QuerySettings +import org.apache.kafka.common.TopicPartition import org.mandas.docker.client.DockerClient import org.mandas.docker.client.messages.{ContainerConfig, HostConfig} -import com.zaxxer.hikari.HikariConfig -import org.apache.kafka.common.TopicPartition import org.scalacheck.Arbitrary +import java.time.ZoneOffset import java.time.temporal.ChronoUnit import java.util.UUID -import javax.sql.DataSource import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ -import scala.util.Using case class ClickHouseStorageBackend( docker: DockerClient, dockerNetwork: DockerNetwork, kafkaContainer: ContainerWithEndpoint, clickHouseContainer: ContainerWithEndpoint, - clickHouseConf: HikariConfig, - dataSource: DataSource, + clickHouseConfig: ClickHouseConfig, + clickHouseClient: Client, table: String, loader: Loader -) extends StorageBackend[ExampleMessage] - with JdbcStorageBackend { +) extends StorageBackend[ExampleMessage] { override def arbMessage: Arbitrary[ExampleMessage] = ExampleMessage.arbMessage @@ -58,32 +56,34 @@ case class ClickHouseStorageBackend( val batchStorage: ClickHouseFileStorage = ClickHouseFileStorage .builder() - .dbDataSource(dataSource) + .client(clickHouseClient) .table(table) .rowOffsetColumnNames(TOPIC_COLUMN, PARTITION_COLUMN, OFFSET_COLUMN, WATERMARK_COLUMN) .build() override def initialize(): Unit = { batchStorage.initialize(kafkaContext) - executeStatement( - s"""CREATE TABLE IF NOT EXISTS $table ( - | $TOPIC_COLUMN String, - | $PARTITION_COLUMN UInt16, - | $OFFSET_COLUMN UInt64, - | $WATERMARK_COLUMN Timestamp, - | id Int32, - | name String, - | timestamp Timestamp, - | height Float64, - | width Float32, - | is_enabled UInt8, - | child_ids Array(Int32), - | parent_id Nullable(Int64), - | transaction_id UUID, - | money_spent Decimal(${ExampleMessage.SCALE_PRECISION.precision}, ${ExampleMessage.SCALE_PRECISION.scale}) - |) ENGINE = MergeTree() - |ORDER BY $OFFSET_COLUMN;""".stripMargin - ) + clickHouseClient + .execute( + s"""CREATE TABLE IF NOT EXISTS $table ( + | $TOPIC_COLUMN String, + | $PARTITION_COLUMN UInt16, + | $OFFSET_COLUMN UInt64, + | $WATERMARK_COLUMN Timestamp, + | id Int32, + | name String, + | timestamp Timestamp, + | height Float64, + | width Float32, + | is_enabled UInt8, + | child_ids Array(Int32), + | parent_id Nullable(Int64), + | transaction_id UUID, + | money_spent Decimal(${ExampleMessage.SCALE_PRECISION.precision}, ${ExampleMessage.SCALE_PRECISION.scale}) + |) ENGINE = MergeTree() + |ORDER BY $OFFSET_COLUMN;""".stripMargin + ) + .get() } def createLoaderContainer(loaderKafkaConfig: LoaderKafkaConfig, batchSize: Long): Container = { @@ -106,11 +106,11 @@ case class ClickHouseStorageBackend( s"KAFKA_BROKERS=${kafkaContainer.endpoint}", s"KAFKA_TOPIC=$topic", s"KAFKA_CONSUMER_GROUP=$consumerGroup", - s"CLICKHOUSE_PASSWORD=${clickHouseConf.getDataSourceProperties.getProperty("password")}", - s"CLICKHOUSE_PORT=${clickHouseConf.getDataSourceProperties.get("port")}", - s"CLICKHOUSE_USER=${clickHouseConf.getDataSourceProperties.getProperty("userID")}", - s"CLICKHOUSE_HOST=${clickHouseConf.getDataSourceProperties.getProperty("host")}", - s"CLICKHOUSE_DB=${clickHouseConf.getDataSourceProperties.getProperty("database")}", + s"CLICKHOUSE_HOST=${clickHouseContainer.ip}", + s"CLICKHOUSE_PORT=${clickHouseContainer.port}", + s"CLICKHOUSE_DB=${clickHouseConfig.dbName}", + s"CLICKHOUSE_USER=${clickHouseConfig.userName}", + s"CLICKHOUSE_PASSWORD=${clickHouseConfig.password}", s"CLICKHOUSE_TABLE=$table", s"BATCH_SIZE=$batchSize" ) @@ -122,41 +122,40 @@ case class ClickHouseStorageBackend( override def getContent: StorageContent[ExampleMessage] = Retry.retryOnFailure(Retry.Policy(retriesLeft = 3, initialDelay = 1.seconds, backoffFactor = 1)) { - Using.resource(dataSource.getConnection()) { connection => - val content = Using.resource(connection.prepareStatement(s"SELECT * FROM $table")) { ps => - ps.setQueryTimeout(5) - Using.resource(ps.executeQuery()) { rs => - val content: ListBuffer[ExampleMessage] = collection.mutable.ListBuffer[ExampleMessage]() - val positions: mutable.HashMap[TopicPartition, ListBuffer[StreamPosition]] = mutable.HashMap.empty - while (rs.next()) { - - val topicPartition = new TopicPartition(rs.getString(TOPIC_COLUMN), rs.getInt(PARTITION_COLUMN)) - val position = - StreamPosition(rs.getLong(OFFSET_COLUMN), Timestamp(rs.getTimestamp(WATERMARK_COLUMN).getTime)) - positions - .getOrElseUpdate(topicPartition, collection.mutable.ListBuffer[StreamPosition]()) - .append(position) - - content.addOne( - ExampleMessage( - rs.getInt("id"), - rs.getString("name"), - rs.getTimestamp("timestamp").toLocalDateTime, - rs.getDouble("height"), - rs.getFloat("width"), - rs.getBoolean("is_enabled"), - rs.getArray("child_ids").asInstanceOf[ClickHouseArray].getArray().asInstanceOf[Array[Int]], - Option(rs.getObject("parent_id").asInstanceOf[java.lang.Long]).map(_.toLong), - rs.getObject("transaction_id", classOf[UUID]), - rs.getBigDecimal("money_spent") - ) - ) - } - StorageContent(content.toList, positions.view.mapValues(sps => sps.maxBy(_.offset)).toMap) - } - } - content - } + val content: ListBuffer[ExampleMessage] = collection.mutable.ListBuffer[ExampleMessage]() + val positions: mutable.HashMap[TopicPartition, ListBuffer[StreamPosition]] = mutable.HashMap.empty + + clickHouseClient + .queryAll(s"SELECT * FROM $table", new QuerySettings().setMaxExecutionTime(5)) + .forEach(row => { + val topicPartition = new TopicPartition(row.getString(TOPIC_COLUMN), row.getInteger(PARTITION_COLUMN)) + val position = + StreamPosition( + row.getLong(OFFSET_COLUMN), + Timestamp(row.getLocalDateTime(WATERMARK_COLUMN).toInstant(ZoneOffset.UTC).toEpochMilli) + ) + + positions + .getOrElseUpdate(topicPartition, collection.mutable.ListBuffer[StreamPosition]()) + .append(position) + + content.addOne( + ExampleMessage( + row.getInteger("id"), + row.getString("name"), + row.getLocalDateTime("timestamp"), + row.getDouble("height"), + row.getFloat("width"), + row.getBoolean("is_enabled"), + row.getIntArray("child_ids"), + Option(row.getObject("parent_id").asInstanceOf[java.lang.Long]).map(_.toLong), + row.getObject("transaction_id").asInstanceOf[UUID], + row.getBigDecimal("money_spent") + ) + ) + }) + + StorageContent(content.toList, positions.view.mapValues(sps => sps.maxBy(_.offset)).toMap) } override def committedPositions( diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala index 50bcbd5d..c93aa503 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala @@ -48,7 +48,7 @@ case class IcebergStorageBackend( private val duckdbExtension = new File("/tmp/iceberg.duckdb_extension") private val duckdbExtensionUrl = new URI( - s"http://extensions.duckdb.org/v${BuildInfo.duckdbVersion}/linux_amd64_gcc4/iceberg.duckdb_extension.gz" + s"http://extensions.duckdb.org/v${BuildInfo.duckdbVersion.split('.').take(3).mkString(".")}/linux_amd64/iceberg.duckdb_extension.gz" ).toURL private val warehouseDir = "/tmp/stream-loader-tests" @@ -123,17 +123,12 @@ case class IcebergStorageBackend( override def getContent: StorageContent[ExampleMessage] = Using.Manager { use => val conn = use(DriverManager.getConnection("jdbc:duckdb:").asInstanceOf[DuckDBConnection]) - // Querying complex types from Iceberg tables is semi-broken, - // see: https://github.com/duckdb/duckdb_iceberg/issues/47 val stmt = use(conn.createStatement()) val rs = use( stmt.executeQuery( s"""INSTALL '${duckdbExtension.getPath}'; |LOAD iceberg; - |SELECT * FROM iceberg_scan('$warehouseDir/${table.replace( - '.', - '/' - )}', skip_schema_inference=True);""".stripMargin + |SELECT * FROM iceberg_scan('$warehouseDir/${table.replace('.', '/')}');""".stripMargin ) )