Skip to content

Commit

Permalink
Expose Delta Uniform write commit size in logs (delta-io#3898)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Expose Delta Uniform write commit size in logs
  • Loading branch information
lzlfred authored and huan233usc committed Jan 17, 2025
1 parent d6fc5be commit 61ae838
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class IcebergConversionTransaction(

protected abstract class TransactionHelper(impl: PendingUpdate[_]) {
private var committed = false
var writeSize = 0L

def opType: String

Expand All @@ -94,6 +95,7 @@ class IcebergConversionTransaction(
override def opType: String = "append"

def add(add: AddFile): Unit = {
writeSize += add.size
appender.appendFile(
convertDeltaAddFileToIcebergDataFile(
add,
Expand Down Expand Up @@ -134,6 +136,7 @@ class IcebergConversionTransaction(
override def opType: String = "overwrite"

def add(add: AddFile): Unit = {
writeSize += add.size
overwriter.addFile(
convertDeltaAddFileToIcebergDataFile(
add,
Expand Down Expand Up @@ -169,6 +172,7 @@ class IcebergConversionTransaction(
override def opType: String = "rewrite"

def rewrite(removes: Seq[RemoveFile], adds: Seq[AddFile]): Unit = {
writeSize += adds.map(_.size).sum
val dataFilesToDelete = removes.map { f =>
assert(!f.dataChange, "Rewrite operation should not add data")
convertDeltaRemoveFileToIcebergDataFile(
Expand Down Expand Up @@ -487,7 +491,9 @@ class IcebergConversionTransaction(
"version" -> postCommitSnapshot.version,
"timestamp" -> postCommitSnapshot.timestamp,
"tableOp" -> tableOp.getClass.getSimpleName.stripSuffix("$"),
"prevConvertedDeltaVersion" -> lastConvertedDeltaVersion
"prevConvertedDeltaVersion" -> lastConvertedDeltaVersion,
"tableSize" -> postCommitSnapshot.sizeInBytes,
"commitWriteSize" -> fileUpdates.map(_.writeSize).sum
) ++ icebergTxnTypes ++ errorData
)
}
Expand Down

0 comments on commit 61ae838

Please sign in to comment.