Skip to content

What's new in the latest connector?

Sabee Grewal edited this page Mar 8, 2018 · 27 revisions

Hello!

For the past few months, we've been hard at work re-doing the EventHubs connector for Apache Spark. There have been a number of changes, and I wanted to quickly summarize what has changed in the library. This is intended for users familiar with the old library. If you're new to the library, then please see the "docs" directory within the repo itself :)

Table of Contents:

Major Changes:

No more progress tracker.

The progress tracker was used for two reasons: (1) checkpointing and (2) to communicate the last byte offset that was read from the Spark Executor to the Spark Driver. The communication of the last byte offset from the Executor to the Driver caused the Driver to wait for all tasks within a specific batch to complete. Additionally, this caused N reads and N writes to an HDFS-compatible file system every batch where N is the number of partitions in your EventHubs. As you can imagine, the wait and the frequent I/O caused huge performance issues with the previous library and limited the number of concurrent jobs to 1. These limitations no longer exist, there is no progress tracker, and we don't have frequent I/O with the file system.

Switched to sequence number filtering.

In the new connector, we use sequence numbers to consume events from EventHubs. This allows us to know the start and end point of each batch deterministically. In other words, 1000 events from sequence number X is simply X + 1000 whereas 1000 events from byte offset Y cannot be known apriori. This (plus the progress tracker being removed) allows us to have concurrent jobs.

Parallelization

All API calls (and anything else really) that can be done in parallel have been parallelized. If you see anything that isn't, please open an issue or make a PR.

Documentation rewrite

There is all new documentation and there are no undocumented options/parameters anymore :)

Between these changes, the entire codebase was rewritten. All major issues haven't been observed and the code has been written such that a path to a very mature, performant, and stable library is clear.

Major Additions

Revamped user config

All user config is done with the EventHubsConf class. Checkout the integration guide in the "docs" directory for more info.

Connection pooling

There is now a static connection pool per JVM so that EventHubClients are reused across batches. In the old connector, N TCP connections were created and N TCP connections were destroyed every batch. Now connections are created when none are available in the pool and they expire after 5 minutes of idle time.

Thread pooling

In the old Java client, each EventHubClient had one thread dedicated to it. With the 1.0.0 release, we can now provide our own thread pool. In the case of Spark, threads are created on demand (and the pool is empty) and are destroyed after the idle time is reach. We use Java's cachedThreadPool, so check their docs for specifics!

EventHubsSink

We now have a sink! You can write your query output to EventHubs using the EventHubsSink.

Preferred location

Spark now consistently schedules partitions on the same executors across batches. This will allow us to use a prefetch queue across batches in future releases.

Additional Improvements/Features

In addition, we have many new improvements:

  • Spark 2.3 and Spark 2.2 Support
  • Databricks (and Azure Databricks) support
  • Spark core support
  • Moved to EventHubs Java Client 1.0.0
  • Java support in Spark Streaming
  • Batch-styled reads and writes are supported in Structured Streaming
  • Allow users to manage their own offsets with the HasOffsetRanges trait. See the integration guide for details!
  • Per partition configuration for starting positions, ending positions, and max rates.
  • Users can start their jobs from START_OF_STREAM and END_OF_STREAM
  • EventHubs receiver timeout and operation timeout are now configurable
  • README is revamped
  • Non-public and international clouds are properly supported

Migration Assistance: Side-by-Side Examples

Example 1: Structured Streaming

Old library

val progressDir = "progress_dir"
val policyName = "policy_name"
val policyKey = "policy_key"
val eventHubNamespace = "namespace"
val eventHubName = "name"
val maxRate = 500
val checkpointLocation = "checkpoint_location"

val eventhubParameters = Map[String, String] (
  "eventhubs.policyname" -> policyName,
  "eventhubs.policykey" -> policyKey,
  "eventhubs.namespace" -> eventHubNamespace,
  "eventhubs.name" -> eventHubName,
  "eventhubs.partition.count" -> "32",
  "eventhubs.consumergroup" -> "$Default",
  "eventhubs.progressTrackingDir" -> progressDir,
  "eventhubs.maxRate" -> s"$maxRate"
)

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val reader = spark.readStream
  .format("eventhubs")
  .options(eventhubParameters)

val eventhubs = reader.load()
  .selectExpr("CAST (body as STRING)")
  .as[String]

val query = df.writeStream
  .outputMode("append")
  .format("console")
  .option("checkpointLocation", checkpointLocation)

query.start().awaitTermination()

New library

val checkpointLocation = "checkpoint_location"

// To connect to an Event Hub, EntityPath is required as part
// of the connection string. Here, we assume that the connection
// string from the Azure portal does not have the EntityPath part.
val connectionString = ConnectionStringBuilder("{CONNECTION STRING FROM AZURE PORTAL}")
  .setEventHubName("{EVENT HUB NAME}")
  .build
val ehConf = EventHubsConf(connectionString)

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val reader = spark.readStream
  .format("eventhubs")
  .options(ehConf.toMap)

val eventhubs = reader.load()
  .selectExpr("CAST (body AS STRING)")
  .as[String]

val query = eventhubs.writeStream
  .outputMode("append")
  .format("console")
  .option("checkpointLocation", checkpointLocation)

query.start().awaitTermination()

Example 2: Spark Streaming

Old Library

val progressDir = "some_path"
val policyName = "policy_name"
val policykey = "policy_key"
val eventHubNamespace = "namespace"
val eventHubName = "name"
val batchDuration = Seconds(500)

val eventhubParameters = Map[String, String] (
  "eventhubs.policyname" -> policyName,
  "eventhubs.policykey" -> policykey,
  "eventhubs.namespace" -> eventHubNamespace,
  "eventhubs.name" -> eventHubName,
  "eventhubs.partition.count" -> "32",
  "eventhubs.consumergroup" -> "$Default"
)

val ssc = new StreamingContext(new SparkContext(), Seconds(batchDuration))

val directStream = EventHubsUtils.createDirectStreams(
  ssc,
  progressDir,
  Map(eventHubName -> eventhubParameters))

directStream.foreachRDD { rdd =>
  rdd.take(10).foreach(println)
}

ssc.start()
ssc.awaitTermination()

New Library

// To connect to an Event Hub, EntityPath is required as part
// of the connection string. Here, we assume that the connection
// string from the Azure portal does not have the EntityPath part.
val connectionString = ConnectionStringBuilder("{CONNECTION STRING FROM AZURE PORTAL}")
  .setEventHubName("{EVENT HUB NAME}")
  .build
val ehConf = EventHubsConf(connectionString)

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sample")
val ssc = new StreamingContext(sparkConf, Seconds(5))

val directStream = EventHubsUtils.createDirectStream(ssc, ehConf)

directStream.foreachRDD { rdd => rdd.take(10).foreach(println) }

ssc.start()
ssc.awaitTermination()