Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,74 @@ mod tests {
assert!(Arc::new(action).commit(&table).await.is_err());
}

#[tokio::test]
async fn test_fast_append_rejects_intra_batch_duplicate_paths() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);

let make_file = |size: u64, records: u64| {
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/dup.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(size)
.record_count(records)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(1))]))
.build()
.unwrap()
};

// Two files share the same path; the third repeats it to exercise the
// dedup of duplicates in the error message.
let action = tx.fast_append().add_data_files(vec![
make_file(100, 10),
make_file(200, 20),
make_file(300, 30),
]);
let err = match Arc::new(action).commit(&table).await {
Ok(_) => panic!("expected duplicate paths to be rejected"),
Err(e) => e,
};
assert_eq!(err.kind(), crate::ErrorKind::DataInvalid);
let msg = err.message();
assert!(
msg.contains("duplicate file paths") && msg.contains("test/dup.parquet"),
"unexpected error message: {msg}"
);
// The same path should appear exactly once in the message, even though
// it was added three times.
assert_eq!(msg.matches("test/dup.parquet").count(), 1);
}

#[tokio::test]
async fn test_fast_append_intra_batch_duplicate_check_can_be_disabled() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);

let make_file = || {
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/dup.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(10)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(1))]))
.build()
.unwrap()
};

// When the user opts out of duplicate validation, batch duplicates are
// accepted unchanged - mirrors the existing behaviour for cross-
// snapshot duplicates and keeps the opt-out semantics consistent.
let action = tx
.fast_append()
.with_check_duplicate(false)
.add_data_files(vec![make_file(), make_file()]);
assert!(Arc::new(action).commit(&table).await.is_ok());
}

#[tokio::test]
async fn test_fast_append() {
let table = make_v2_minimal_table();
Expand Down
27 changes: 22 additions & 5 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,28 @@ impl<'a> SnapshotProducer<'a> {
}

pub(crate) async fn validate_duplicate_files(&self) -> Result<()> {
let new_files: HashSet<&str> = self
.added_data_files
.iter()
.map(|df| df.file_path.as_str())
.collect();
// First, detect duplicate file paths within the batch itself.
// Collecting straight into a `HashSet` would silently deduplicate
// and pass corruption through to the manifest, so we walk the list
// explicitly and surface every distinct duplicated path.
Comment on lines +167 to +170
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is maybe a little verbose?

let mut new_files: HashSet<&str> = HashSet::with_capacity(self.added_data_files.len());
let mut duplicates_in_batch: HashSet<&str> = HashSet::new();
Comment on lines +171 to +172
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is over complicated, we should throw error immediately when new_files.insert return false.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I prefer to move this check in the begigning of FastAppendAction's commit method, rather than putting it here.

for data_file in &self.added_data_files {
if !new_files.insert(data_file.file_path.as_str()) {
duplicates_in_batch.insert(data_file.file_path.as_str());
}
}
if !duplicates_in_batch.is_empty() {
let mut paths: Vec<&str> = duplicates_in_batch.into_iter().collect();
paths.sort_unstable();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious to know if we need the sort?

return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot add duplicate file paths within the same commit: {}",
paths.join(", ")
),
));
}

let mut referenced_files = Vec::new();
if let Some(current_snapshot) = self.table.metadata().current_snapshot() {
Expand Down
Loading