From 80db25365059d4e8c5d30c7ffacf7d1b9f84598e Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Mon, 25 May 2026 18:34:55 -0700 Subject: [PATCH] fix(transaction): detect duplicate file paths within a single FastAppend batch `SnapshotProducer::validate_duplicate_files` collected `added_data_files` straight into a `HashSet<&str>` before checking against existing manifests. That collect step silently dedupes the batch, so two `DataFile` entries sharing the same `file_path` in one `add_data_files(...)` call were written into the manifest unchecked and committed without error - producing a snapshot whose `added_files_count` and read-side row count both double-count the offending file. Walk the added files explicitly: insert each path into the seen set and track every distinct path that collides. If any collisions are observed, return `ErrorKind::DataInvalid` with the sorted, deduped list of duplicated paths. The existing cross-snapshot check continues to operate on the same `new_files` set, so its behaviour is unchanged. Adds two unit tests: - rejection path, including the dedup-in-message guarantee when a path appears three or more times; - `with_check_duplicate(false)` opt-out still accepts batch duplicates, matching the opt-out semantics already documented for the cross-snapshot check. Closes #2507. --- crates/iceberg/src/transaction/append.rs | 68 ++++++++++++++++++++++ crates/iceberg/src/transaction/snapshot.rs | 27 +++++++-- 2 files changed, 90 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..2b1e6703ff 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -259,6 +259,74 @@ mod tests { assert!(Arc::new(action).commit(&table).await.is_err()); } + #[tokio::test] + async fn test_fast_append_rejects_intra_batch_duplicate_paths() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let make_file = |size: u64, records: u64| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/dup.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(size) + .record_count(records) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(1))])) + .build() + .unwrap() + }; + + // Two files share the same path; the third repeats it to exercise the + // dedup of duplicates in the error message. + let action = tx.fast_append().add_data_files(vec![ + make_file(100, 10), + make_file(200, 20), + make_file(300, 30), + ]); + let err = match Arc::new(action).commit(&table).await { + Ok(_) => panic!("expected duplicate paths to be rejected"), + Err(e) => e, + }; + assert_eq!(err.kind(), crate::ErrorKind::DataInvalid); + let msg = err.message(); + assert!( + msg.contains("duplicate file paths") && msg.contains("test/dup.parquet"), + "unexpected error message: {msg}" + ); + // The same path should appear exactly once in the message, even though + // it was added three times. + assert_eq!(msg.matches("test/dup.parquet").count(), 1); + } + + #[tokio::test] + async fn test_fast_append_intra_batch_duplicate_check_can_be_disabled() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let make_file = || { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/dup.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(1))])) + .build() + .unwrap() + }; + + // When the user opts out of duplicate validation, batch duplicates are + // accepted unchanged - mirrors the existing behaviour for cross- + // snapshot duplicates and keeps the opt-out semantics consistent. + let action = tx + .fast_append() + .with_check_duplicate(false) + .add_data_files(vec![make_file(), make_file()]); + assert!(Arc::new(action).commit(&table).await.is_ok()); + } + #[tokio::test] async fn test_fast_append() { let table = make_v2_minimal_table(); diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8f643a7d1e..239f912c85 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -164,11 +164,28 @@ impl<'a> SnapshotProducer<'a> { } pub(crate) async fn validate_duplicate_files(&self) -> Result<()> { - let new_files: HashSet<&str> = self - .added_data_files - .iter() - .map(|df| df.file_path.as_str()) - .collect(); + // First, detect duplicate file paths within the batch itself. + // Collecting straight into a `HashSet` would silently deduplicate + // and pass corruption through to the manifest, so we walk the list + // explicitly and surface every distinct duplicated path. + let mut new_files: HashSet<&str> = HashSet::with_capacity(self.added_data_files.len()); + let mut duplicates_in_batch: HashSet<&str> = HashSet::new(); + for data_file in &self.added_data_files { + if !new_files.insert(data_file.file_path.as_str()) { + duplicates_in_batch.insert(data_file.file_path.as_str()); + } + } + if !duplicates_in_batch.is_empty() { + let mut paths: Vec<&str> = duplicates_in_batch.into_iter().collect(); + paths.sort_unstable(); + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add duplicate file paths within the same commit: {}", + paths.join(", ") + ), + )); + } let mut referenced_files = Vec::new(); if let Some(current_snapshot) = self.table.metadata().current_snapshot() {