Project | Current version | Scala version |
---|---|---|
Trembita |
0.8.5-SNAPSHOT |
2.11.12, 2.12.8 |
Project Trembita - Functional Data Pipelining library. Lets you query and transform your data in a pure functional, typesafe & declarative way. Trembita allows you to make complicated transformation pipelines where some of them are executed locally sequentially, locally in parallel on in other environments (for instance on Spark cluster, see below)
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
libraryDependencies ++= {
val trembitaV = "0.8.5-SNAPSHOT"
Seq(
"ua.pp.itkpi" %% "trembita-kernel" % trembitaV, // kernel,
"ua.pp.itkpi" %% "trembita-cassandra" % trembitaV, // cassandra
"ua.pp.itkpi" %% "trembita-phantom" % trembitaV, // phantom
"ua.pp.itkpi" %% "trembita-slf4j" % trembitaV // slf4j, for logging
)
}
- Typesafe and IDE friendly querying dsl for data pipelines provides a unified model and typechecking for various data sources (including collections and Spark RDD)
- Purely functional stateful transformations using Finite State Machines provides dsl for defining FSM that can be run on various data source (collections, Spark Datasets, Akka Streams...)
- One line caching
- Simple logging
- Apache Spark (core, SQL)
- Apache Spark Streaming
- Akka Streams
- Cassandra
- Cassandra using phantom)
- Infinispan
- Java8 streams
- kernel - lazy (parallel) data pipelines, QL for grouping/aggregations and stateful computations using Cats and Shapeless
- Any
Iterable
- just wrap your collection intoDataPipeline
- cassandra connector - fetch rows from your
Cassandra
database withCassandraSource
- cassandra phantom - provides Phantom library support
- akka stream - allows to make pipeline from akka stream (e.g. from any data source compatible with akka)
- spark RDD / DataSet - allows to make pipeline from RDD / DataSet (e.g. from any non-streaming data source compatible with Spark)
- spark DStreams - allows to make pipeline from Discrete streams (e.g. from any streaming data source compatible with Spark)
- trembita slf4j - provides slf4j logging support. Use it with any compatible logging backend (for instance logback)
- trembita log4j - provides log4j logging support.
You can run some your transformations on spark cluster. To do that, add the following dependencies:
libraryDependencies ++= Seq(
"ua.pp.itkpi" %% "trembita-spark" % trembitaV,
"org.apache.spark" %% "spark-core" % "2.4.0" // first spark version with scala 2.12 support
)
Using spark integration you can even easily run asynchronous computations on spark with Futures:
import trembita._
import trembita.spark._
import org.apache.spark._
import scala.concurrent.{ExecutionContext, Future}
import java.util.concurrent.Executors
implicit val sc: SparkContext = ??? // requires implicit SparkContext in scope
implicit val timeout: Timeout = Timeout(5.minutes) // requires implicit timeout for async operations
implicit val ec: ExecutionContext = ???
val cachedThreadPool =
ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
Input
.sequentialF[SerializableFuture, Seq]
.create(SerializableFuture.pure(Seq(1, 2, 3, 20, 40, 60)))
.to[Spark]
// will be executed on spark
.map(_ + 1)
.mapM { i: Int =>
val n = SerializableFuture.start { i + 1 }(cahedThreadPool)
val b = SerializableFuture
.start {
val x = 1 + 2
x * 3
}
.flatTap(
xx =>
SerializableFuture.start {
println(s"spark debug: $xx") // you won't see this in submit logs
}
)
val result: SerializableFuture[Int] =
n.bind { nx =>
b.where(nx > _).fmap(_ + nx)
}
result.attempt
}
.mapK(serializableFutureToIO)
.map(_.getOrElse(-100500))
.mapM { i =>
IO { scala.util.Random.nextInt(10) + i }
}
// will be executed locally in parallel
.to[Parallel]
.info(i => s"parallel debug: $i") // you will see it in console
.map(_ + 1)
Trembita will do the best to transform async lambda into serializable format.
By default a special macro detects all references to ExecutionContext
within lambda you pass into mapM
.
All ExecutionContext
's should be globally accessible (e.g. need to be def
or val
in some object).
If not - your code won't compile with appropriate error.
If everything is ok - macro creates helper object with references to all found ExecutionContext
s making them @transient lazy val
(well known technique) and rewrites your lambda so that all async transformations references to fields in that object.
You can find full example here.
Happy to say that using cats.effect.IO
on spark is also supported =)
You can now define stateful transformations on Spark Dataset using Finite State Machines.
It's implemented using Dataset.mapWithState
.
Defining FSM for Spark is as simple as defining FSM for regular pipeline except of state is preserved only at level for specific key
(due to mapWithState
limitation).
To do so, use fsmByKey
:
val pipeline: DataPipelineT[F, A, Spark] = ???
pipeline.fsmByKey(getKey = ???)(... /* your FSM definition here */)
Full example can be found here.
See the full example here
- Be careful not to make closures against the
SparkContext
orSparkSession
because it will fall in runtime - Other non-serializable resources also will fail in runtime. This will be adapted later
You can find a script to run the example on spark cluster within docker:
# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run.sh
To run Spark FSM example in docker use the following script:
# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run_fsm.sh
To run Spark QL example in docker use the following script:
# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run_ql.sh
Before running QL please remove spire jars from spark classpath to avoid dependency conflicts
Trembita now supports running a part of your transformations on akka-streams. To use it, add the following dependency:
libraryDependencies += "ua.pp.itkpi" %% "trembita-akka-streams" % trembitaV
You can run existing pipeline through akka stream or create a pipeline from source directly:
import akka.stream.scaladsl._
import trembita.akka_streams._
val fileLines =
Input.fromSourceF[IO, ByteString, Future[IOResult]](IO {
FileIO
.fromPath(Paths.get("examples/src/main/resources/words.txt"))
})
Akka streaming pipelines also support FSM
using custom graph state:
val pipeline: DataPipelineT[IO, Int, Akka] = ???
val stateful = pipeline.fsm(/* your FSM definition here */)
You can find full examples here
Add the following dependency if you wan't to run your pipeline through both akka streams and spark RDD:
libraryDependencies += "ua.pp.itkpi" %% "trembita-seamless-akka-spark" % trembitaV
It goal is to avoid additional overhead when switching between akka and spark.
Akka -> Spark
is implemented using custom Sink.
Spark -> Akka
is implemented using toLocalIterator
Trembita now allows to write QL
and FSM
upon spark DStreams.
libraryDependencies += "ua.pp.itkpi" %% "trembita-spark-streaming" % trembitaV
For examples see here Run scripts:
libraryDependencies += "ua.pp.itkpi" %% "trembita-java-streams" % trembitaV
See sources and tests for examples
libraryDependencies += "ua.pp.itkpi" %% "trembita-seamless-akka-infinispan" % trembitaV
Allows to cache akka stream. See example
- caching
- integration with distributed streaming frameworks
- tensorflow
- slick
My speec about trembita at Scalaua conference: https://youtu.be/PDBVCVv4mVc
Trembita is a alpine horn made of wood. It is common among Ukrainian highlanders Hutsuls who used to live in western Ukraine, eastern Poland, Slovakia and northern Romania. In southern Poland it's called trombita, bazuna in the North and ligawka in central Poland.
- Vitalii Honta
- You =)