Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional writer support #6

Merged
merged 11 commits into from
Mar 21, 2019
Merged

Conversation

anicolaspp
Copy link
Owner

No description provided.

@anicolaspp
Copy link
Owner Author

@iulianov check all this when you get some free time.

- no synchronization was actually happening. We need a common context for sync to happen.
@anicolaspp
Copy link
Owner Author

@iulianov we don't have to worry any longer about doc to row and row to doc conversions. It is also done for us.


if (withTransaction) {
df.write
.format("com.github.anicolaspp.spark.sql.writing.Writer")
.save(path)

} else {
MapRSpark.save(df, path, "_id", false, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want the connector to replace the official connector at some point then the createTable and bulkInsert options should be exposed to the caller. Either through default parameters or through the .option method. Similar to how the current connector allows df.write.option("Operation","Insert").saveToMapRDB("path")

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine. I will create an issue to track that.
GH-9

@iulianov
Copy link
Contributor

Overall the logic looks good but there are still edge cases that need to be thought about and either addressed through code or documented. You already have one case in the comments but there are others.
One I can think of is if there needs to be a rollback but an error occurs, on one of the executors that is has not commited yet, while running the MapRDBCleaner code. Depending on how the spark job was started it might not return an error to the user but it also will not have removed all the added records.

@anicolaspp anicolaspp closed this Mar 21, 2019
@anicolaspp anicolaspp reopened this Mar 21, 2019

val store: DocumentStore = connection.getStore(table)

ids.foreach(store.delete)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a normal rollback this should be fine since the only the confirmed written ids are in the ids value. But if one of the records to be deleted has been removed by an external application then the store.delete should throw an exception. It might need to be caught and ignored here.
One way to prevent a try catch here is to use the checkAndDelete method although that might actually create more overhead.

@anicolaspp anicolaspp merged commit 46aa470 into master Mar 21, 2019
@anicolaspp anicolaspp deleted the transactional-writer-support branch March 21, 2019 03:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants