Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional writer support #6

Merged
merged 11 commits into from
Mar 21, 2019
8 changes: 7 additions & 1 deletion src/main/scala/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT
// ONLY for testing
object App {

import com.github.anicolaspp.spark.sql.MapRDB._
import com.github.anicolaspp.spark.MapRDB._

def main(args: Array[String]): Unit = {

Expand Down Expand Up @@ -37,6 +37,12 @@ object App {
println(s"MY SCHEMA: ${data.schema}")

data.show()



data.writeToMapRDB("/user/mapr/tables/my_table", withTransaction = true)



// sparkSession
// .loadFromMapRDB("/user/mapr/tables/data", schema)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.github.anicolaspp.spark.sql
package com.github.anicolaspp.spark


import com.mapr.db.spark.MapRDBSpark
import com.mapr.db.spark.utils.MapRSpark
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}


object MapRDB {

implicit class ExtendedSession(sparkSession: SparkSession) {
Expand All @@ -19,22 +21,22 @@ object MapRDB {
}

implicit class ExtendedDataFrame(df: DataFrame) {
def saveToMapRDB(path: String, withTransaction: Boolean = false): Unit = {
def writeToMapRDB(path: String, withTransaction: Boolean = false): Unit = {

if (withTransaction) {
df.write
.format("com.github.anicolaspp.spark.sql.writing.Writer")
.save(path)

} else {
MapRSpark.save(df, path, "_id", false, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want the connector to replace the official connector at some point then the createTable and bulkInsert options should be exposed to the caller. Either through default parameters or through the .option method. Similar to how the current connector allows df.write.option("Operation","Insert").saveToMapRDB("path")

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine. I will create an issue to track that.
GH-9


com.mapr.db.spark.sql
.MapRDBDataFrameFunctions(df)
.saveToMapRDB(path)
// com.mapr.db.spark.sql.MapRDBDataFrameFunctions(df)
// .saveToMapRDB(path)
}


}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.github.anicolaspp.spark.sql.writing

import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage

case class CommittedIds(partitionId: Int, ids: Set[String]) extends WriterCommitMessage
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.github.anicolaspp.spark.sql.writing

import org.ojai.store.{DocumentStore, DriverManager}

object MapRDBCleaner {

def clean(ids: Set[String], table: String): Unit = {

val connection = DriverManager.getConnection("ojai:mapr:")

val store: DocumentStore = connection.getStore(table)

ids.foreach(store.delete)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a normal rollback this should be fine since the only the confirmed written ids are in the ids value. But if one of the records to be deleted has been removed by an external application then the store.delete should throw an exception. It might need to be caught and ignored here.
One way to prevent a try catch here is to use the checkAndDelete method although that might actually create more overhead.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.github.anicolaspp.spark.sql.writing

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.StructType

class MapRDBDataSourceWriter(table: String, schema: StructType) extends DataSourceWriter with Logging {

private var globallyCommittedIds = List.empty[String]

override def createWriterFactory(): DataWriterFactory[Row] = new MapRDBDataWriterFactory(table, schema)

override def commit(messages: Array[WriterCommitMessage]): Unit = {

val ids = messages.foldLeft(Set.empty[String]) { case (acc, CommittedIds(partitionId, partitionIds)) =>
log.info(s"PARTITION $partitionId HAS BEEN CONFIRMED BY DRIVER")

acc ++ partitionIds
}

// Let's make sure this is thread-safe
globallyCommittedIds = this.synchronized {
globallyCommittedIds ++ ids
}

}

override def abort(messages: Array[WriterCommitMessage]): Unit = {
log.info("JOB BEING ABORTED")
log.info("JOB CLEANING UP")

MapRDBCleaner.clean(globallyCommittedIds.toSet, table)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.anicolaspp.spark.sql.writing

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}

class MapRDBDataWriter extends DataWriter[Row] with Logging {
override def write(record: Row): Unit = ???

override def commit(): WriterCommitMessage = ???

override def abort(): Unit = ???
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.github.anicolaspp.spark.sql.writing

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.StructType
import org.ojai.store.{DocumentStore, DriverManager}

class MapRDBDataWriterFactory(table: String, schema: StructType) extends DataWriterFactory[Row] {

@transient private lazy val connection = DriverManager.getConnection("ojai:mapr:")

@transient private lazy val store: DocumentStore = connection.getStore(table)

private val writtenIds = scala.collection.mutable.ListBuffer.empty[String]

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = new DataWriter[Row] with Logging {

log.info(s"PROCESSING PARTITION ID: $partitionId ; ATTEMPT: $attemptNumber")

override def write(record: Row): Unit = {

val doc = schema
.fields
.map(field => (field.name, schema.fieldIndex(field.name)))
.foldLeft(connection.newDocumentBuilder()) { case (acc, (name, idx)) => acc.put(name, record.getString(idx)) }
.getDocument

this.synchronized {
if (!writtenIds.contains(doc.getIdString)) {
store.insert(doc)
writtenIds.append(doc.getIdString)
}
}
}

override def commit(): WriterCommitMessage = {
log.info(s"PARTITION $partitionId COMMITTED AFTER ATTEMPT $attemptNumber")

CommittedIds(partitionId, writtenIds.toSet)
}

override def abort(): Unit = {
log.info(s"PARTITION $partitionId ABORTED AFTER ATTEMPT $attemptNumber")

MapRDBCleaner.clean(writtenIds.toSet, table)

log.info(s"PARTITION $partitionId CLEANED UP")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,96 +22,14 @@ class Writer extends WriteSupport with Logging {
}
}

class MapRDBDataSourceWriter(table: String, schema: StructType) extends DataSourceWriter with Logging {

private var globallyCommittedIds = List.empty[String]

override def createWriterFactory(): DataWriterFactory[Row] = new MapRDBDataWriterFactory(table, schema)

override def commit(messages: Array[WriterCommitMessage]): Unit = {

val ids = messages.foldLeft(Set.empty[String]) { case (acc, CommittedIds(partitionId, ids)) =>
log.info(s"PARTITION $partitionId HAS BEEN CONFIRMED BY DRIVER")

acc ++ ids
}

// Let's make sure this is thread-safe
globallyCommittedIds = this.synchronized {
globallyCommittedIds ++ ids
}

}

override def abort(messages: Array[WriterCommitMessage]): Unit = {
log.info("JOB BEING ABORTED")
log.info("JOB CLEANING UP")

MapRDBCleaner.clean(globallyCommittedIds.toSet, table)
}
}

class MapRDBDataWriterFactory(table: String, schema: StructType) extends DataWriterFactory[Row] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = new DataWriter[Row] with Logging {

@transient private lazy val connection = DriverManager.getConnection("ojai:mapr:")

@transient private lazy val store: DocumentStore = connection.getStore(table)

log.info(s"PROCESSING PARTITION ID: $partitionId ; ATTEMPT: $attemptNumber")

private val writtenIds = scala.collection.mutable.ListBuffer.empty[String]

override def write(record: Row): Unit = {

val doc = schema
.fields
.map(field => (field.name, schema.fieldIndex(field.name)))
.foldLeft(connection.newDocumentBuilder()) { case (acc, (name, idx)) => acc.put(name, record.getString(idx)) }
.getDocument

store.insert(doc)

writtenIds.append(doc.getIdString)
}

override def commit(): WriterCommitMessage = {
log.info(s"PARTITION $partitionId COMMITTED AFTER ATTEMPT $attemptNumber")

CommittedIds(partitionId, writtenIds.toSet)
}

override def abort(): Unit = {
log.info(s"PARTITION $partitionId ABORTED AFTER ATTEMPT $attemptNumber")

MapRDBCleaner.clean(writtenIds.toSet, table)

log.info(s"PARTITION $partitionId CLEANED UP")
}
}
}

class MapRDBDataWriter extends DataWriter[Row] with Logging {
override def write(record: Row): Unit = ???

override def commit(): WriterCommitMessage = ???

override def abort(): Unit = ???
}


object MapRDBCleaner {

def clean(ids: Set[String], table: String): Unit = {

val connection = DriverManager.getConnection("ojai:mapr:")

val store: DocumentStore = connection.getStore(table)

ids.foreach(store.delete)
}
}


case class CommittedIds(partitionId: Int, ids: Set[String]) extends WriterCommitMessage