Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "stream-loader"

ThisBuild / scalaVersion := "2.13.15"
ThisBuild / scalaVersion := "2.13.17"
ThisBuild / scalacOptions := Seq(
"-unchecked",
"-deprecation",
Expand All @@ -25,12 +25,12 @@ ThisBuild / git.remoteRepo := {
}

val scalaTestVersion = "3.2.19"
val scalaCheckVersion = "1.18.1"
val scalaCheckVersion = "1.19.0"
val scalaCheckTestVersion = "3.2.19.0"

val hadoopVersion = "3.4.1"
val parquetVersion = "1.15.2"
val icebergVersion = "1.7.0"
val hadoopVersion = "3.4.2"
val parquetVersion = "1.16.0"
val icebergVersion = "1.10.0"

lazy val `stream-loader-core` = project
.in(file("stream-loader-core"))
Expand All @@ -41,19 +41,19 @@ lazy val `stream-loader-core` = project
buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, git.gitHeadCommit),
libraryDependencies ++= Seq(
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.apache.kafka" % "kafka-clients" % "3.9.0",
"org.apache.kafka" % "kafka-clients" % "4.1.0",
"org.log4s" %% "log4s" % "1.10.0",
"org.apache.commons" % "commons-compress" % "1.27.1",
"org.xerial.snappy" % "snappy-java" % "1.1.10.7",
"org.apache.commons" % "commons-compress" % "1.28.0",
"org.xerial.snappy" % "snappy-java" % "1.1.10.8",
"org.lz4" % "lz4-java" % "1.8.0",
"com.github.luben" % "zstd-jni" % "1.5.6-8",
"com.github.luben" % "zstd-jni" % "1.5.7-5",
"com.univocity" % "univocity-parsers" % "2.9.1",
"org.json4s" %% "json4s-native" % "4.0.7",
"io.micrometer" % "micrometer-core" % "1.14.1",
"io.micrometer" % "micrometer-core" % "1.15.4",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
"ch.qos.logback" % "logback-classic" % "1.5.12" % "test"
"ch.qos.logback" % "logback-classic" % "1.5.19" % "test"
)
)

Expand All @@ -64,8 +64,8 @@ lazy val `stream-loader-clickhouse` = project
.settings(
resolvers += "jitpack" at "https://jitpack.io",
libraryDependencies ++= Seq(
"org.apache.httpcomponents.client5" % "httpclient5" % "5.4.1",
"com.clickhouse" % "clickhouse-jdbc" % "0.7.1",
"org.apache.httpcomponents.client5" % "httpclient5" % "5.5.1",
"com.clickhouse" % "client-v2" % "0.9.2",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test"
Expand Down Expand Up @@ -106,14 +106,14 @@ lazy val `stream-loader-s3` = project
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"software.amazon.awssdk" % "s3" % "2.29.20",
"software.amazon.awssdk" % "s3" % "2.35.5",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.778" % "test",
"org.gaul" % "s3proxy" % "2.4.1" % "test"
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.792" % "test",
"org.gaul" % "s3proxy" % "2.8.0" % "test"
)
)

val verticaVersion = "24.4.0-0"
val verticaVersion = "25.3.0-0"

lazy val `stream-loader-vertica` = project
.in(file("stream-loader-vertica"))
Expand All @@ -128,7 +128,8 @@ lazy val `stream-loader-vertica` = project
)
)

val duckdbVersion = "1.1.3"
val duckdbVersion = "1.4.1.0"
val jerseyVersion = "3.1.11"

lazy val packAndSplitJars =
taskKey[(File, File)]("Runs pack and splits out the application jars from the external dependency jars")
Expand All @@ -150,19 +151,21 @@ lazy val `stream-loader-tests` = project
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.3",
"ch.qos.logback" % "logback-classic" % "1.5.12",
"com.zaxxer" % "HikariCP" % "6.2.1",
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion,
"com.vertica.jdbc" % "vertica-jdbc" % verticaVersion,
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.16" % "test",
"org.mandas" % "docker-client" % "8.0.3" % "test",
"org.jboss.resteasy" % "resteasy-client" % "6.2.11.Final" % "test",
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.18.1" % "test",
"org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test"
"com.typesafe" % "config" % "1.4.5",
"ch.qos.logback" % "logback-classic" % "1.5.19",
"com.zaxxer" % "HikariCP" % "7.0.2",
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion,
"com.vertica.jdbc" % "vertica-jdbc" % verticaVersion,
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.17" % "test",
"org.mandas" % "docker-client" % "9.0.4" % "test",
"org.glassfish.jersey.core" % "jersey-client" % jerseyVersion % "test",
"org.glassfish.jersey.inject" % "jersey-hk2" % jerseyVersion % "test",
"org.glassfish.jersey.connectors" % "jersey-apache-connector" % jerseyVersion % "test",
"org.glassfish.jersey.media" % "jersey-media-json-jackson" % jerseyVersion % "test",
"org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test"
),
inConfig(IntegrationTest)(Defaults.testTasks),
publish := {},
Expand Down Expand Up @@ -195,7 +198,7 @@ lazy val `stream-loader-tests` = project
val bin = s"/opt/${name.value}/bin/"

new Dockerfile {
from("eclipse-temurin:21.0.2_13-jre")
from("eclipse-temurin:21.0.8_9-jre")

env("APP_CLASS_PATH" -> s"$lib/*")

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.11.3
sbt.version=1.11.7
12 changes: 6 additions & 6 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.11.0")

addSbtPlugin("com.github.sbt" % "sbt-git" % "2.1.0")

addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.20")
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.22")

addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.13.1")

addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.5")

addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")

addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")
addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.6.0")

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")

libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.8"
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2025.8"

addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0")
addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.9.0")

addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.3.0")
addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.3.1")
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package com.adform.streamloader.clickhouse

import com.adform.streamloader.sink.file.{FileBuilder, FileBuilderFactory}
import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat}
import com.adform.streamloader.sink.file.{Compression, FileBuilder, FileBuilderFactory}
import com.clickhouse.data.ClickHouseFormat

/**
* A FileBuilder able to build files that can be loaded to ClickHouse.
Expand All @@ -26,7 +26,7 @@ trait ClickHouseFileBuilder[-R] extends FileBuilder[R] {
/**
* Compression to use for the files being constructed.
*/
def compression: ClickHouseCompression
def fileCompression: Compression
}

trait ClickHouseFileBuilderFactory[R] extends FileBuilderFactory[R, ClickHouseFileBuilder[R]] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@

package com.adform.streamloader.clickhouse

import java.io.File
import com.adform.streamloader.model.StreamRange
import com.adform.streamloader.sink.file.FileRecordBatch
import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat}
import com.adform.streamloader.sink.file.{Compression, FileRecordBatch}
import com.clickhouse.data.ClickHouseFormat

import java.io.File

/**
* A file containing a batch of records in some ClickHouse supported format that can be loaded to ClickHouse.
*/
case class ClickHouseFileRecordBatch(
file: File,
format: ClickHouseFormat,
compression: ClickHouseCompression,
fileCompression: Compression,
recordRanges: Seq[StreamRange],
rowCount: Long
) extends FileRecordBatch
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ClickHouseFileRecordBatcher[R](
ClickHouseFileRecordBatch(
f,
fileBuilder.format,
fileBuilder.compression,
fileBuilder.fileCompression,
recordRanges,
fileBuilder.getRecordCount
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,22 @@ package com.adform.streamloader.clickhouse

import com.adform.streamloader.model._
import com.adform.streamloader.sink.batch.storage.InDataOffsetBatchStorage
import com.adform.streamloader.sink.file.Compression
import com.adform.streamloader.util.Logging
import com.clickhouse.data.ClickHouseFile
import com.clickhouse.jdbc.ClickHouseConnection
import com.clickhouse.client.api.Client
import com.clickhouse.client.api.insert.InsertSettings
import org.apache.kafka.common.TopicPartition

import java.sql.Connection
import javax.sql.DataSource
import java.nio.file.Files
import java.time.ZoneOffset
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.Using

/**
* A ClickHouse storage implementation, stores offsets in rows of data.
* Queries ClickHouse upon initialization in order to retrieve committed stream positions.
*/
class ClickHouseFileStorage(
dbDataSource: DataSource,
client: Client,
table: String,
topicColumnName: String,
partitionColumnName: String,
Expand All @@ -35,7 +34,7 @@ class ClickHouseFileStorage(
) extends InDataOffsetBatchStorage[ClickHouseFileRecordBatch]
with Logging {

def committedPositions(connection: Connection): Map[TopicPartition, StreamPosition] = {
override def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]] = {
val positionQuery =
s"""SELECT
| $topicColumnName,
Expand All @@ -47,53 +46,52 @@ class ClickHouseFileStorage(
|GROUP BY $topicColumnName, $partitionColumnName
|""".stripMargin

Using.resource(connection.prepareStatement(positionQuery)) { statement =>
{
log.info(s"Running stream position query: $positionQuery")
Using.resource(statement.executeQuery()) { result =>
val positions: mutable.HashMap[TopicPartition, StreamPosition] = mutable.HashMap.empty
while (result.next()) {
val topic = result.getString(1)
val partition = result.getInt(2)
val offset = result.getLong(3)
val watermark = Timestamp(result.getTimestamp(4).getTime)
if (!result.wasNull()) {
val topicPartition = new TopicPartition(topic, partition)
val position = StreamPosition(offset, watermark)
positions.put(topicPartition, position)
}
}
positions.toMap
}
}
}
log.info(s"Running stream position query: $positionQuery")
val positions: mutable.HashMap[TopicPartition, StreamPosition] = mutable.HashMap.empty
client
.queryAll(positionQuery)
.forEach(row => {
val topic = row.getString(1)
val partition = row.getInteger(2)
val offset = row.getLong(3)
val watermark = Timestamp(row.getLocalDateTime(4).toInstant(ZoneOffset.UTC).toEpochMilli)

val topicPartition = new TopicPartition(topic, partition)
val position = StreamPosition(offset, watermark)
positions.put(topicPartition, position)
})

topicPartitions.map(tp => (tp, positions.get(tp))).toMap
}

override def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]] = {
Using.resource(dbDataSource.getConnection()) { connection =>
val positions = committedPositions(connection)
topicPartitions.map(tp => (tp, positions.get(tp))).toMap
}
override def commitBatchWithOffsets(batch: ClickHouseFileRecordBatch): Unit = {
val settings = new InsertSettings()
.setOption("max_insert_block_size", batch.rowCount) // ensure single block to prevent partial writes
.setDeduplicationToken(deduplicationToken(batch.recordRanges)) // deduplicate based on ranges

contentEncoding(batch.fileCompression).foreach(encoding => settings.appCompressedData(true, encoding))

client.insert(table, Files.newInputStream(batch.file.toPath), batch.format, settings).get()
}

override def commitBatchWithOffsets(batch: ClickHouseFileRecordBatch): Unit = {
Using.resource(dbDataSource.getConnection) { connection =>
Using.resource(connection.unwrap(classOf[ClickHouseConnection]).createStatement) { statement =>
statement
.write()
.data(ClickHouseFile.of(batch.file, batch.compression, 1, batch.format))
.table(table)
.params(Map("max_insert_block_size" -> batch.rowCount.toString).asJava) // atomic insert
.executeAndWait()
}
}
private def contentEncoding(fileCompression: Compression): Option[String] = fileCompression match {
case Compression.NONE => None
case Compression.ZSTD => Some("zstd")
case Compression.GZIP => Some("gzip")
case Compression.BZIP => Some("bz2")
case Compression.LZ4 => Some("lz4")
case _ => throw new UnsupportedOperationException(s"Compression $fileCompression is not supported by ClickHouse")
}

private def deduplicationToken(ranges: Seq[StreamRange]): String = {
ranges.map(range => s"${range.topic}:${range.partition}:${range.start.offset}:${range.end.offset}").mkString(";")
}
}

object ClickHouseFileStorage {

case class Builder(
private val _dbDataSource: DataSource,
private val _client: Client,
private val _table: String,
private val _topicColumnName: String,
private val _partitionColumnName: String,
Expand All @@ -102,9 +100,9 @@ object ClickHouseFileStorage {
) {

/**
* Sets a data source for ClickHouse JDBC connections.
* Sets the ClickHouse client.
*/
def dbDataSource(source: DataSource): Builder = copy(_dbDataSource = source)
def client(client: Client): Builder = copy(_client = client)

/**
* Sets the table to load data to.
Expand All @@ -129,11 +127,11 @@ object ClickHouseFileStorage {
)

def build(): ClickHouseFileStorage = {
if (_dbDataSource == null) throw new IllegalStateException("Must provide a ClickHouse data source")
if (_client == null) throw new IllegalStateException("Must provide a ClickHouse client")
if (_table == null) throw new IllegalStateException("Must provide a valid table name")

new ClickHouseFileStorage(
_dbDataSource,
_client,
_table,
_topicColumnName,
_partitionColumnName,
Expand Down
Loading