diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..2b1e6703ff 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -259,6 +259,74 @@ mod tests { assert!(Arc::new(action).commit(&table).await.is_err()); } + #[tokio::test] + async fn test_fast_append_rejects_intra_batch_duplicate_paths() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let make_file = |size: u64, records: u64| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/dup.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(size) + .record_count(records) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(1))])) + .build() + .unwrap() + }; + + // Two files share the same path; the third repeats it to exercise the + // dedup of duplicates in the error message. + let action = tx.fast_append().add_data_files(vec![ + make_file(100, 10), + make_file(200, 20), + make_file(300, 30), + ]); + let err = match Arc::new(action).commit(&table).await { + Ok(_) => panic!("expected duplicate paths to be rejected"), + Err(e) => e, + }; + assert_eq!(err.kind(), crate::ErrorKind::DataInvalid); + let msg = err.message(); + assert!( + msg.contains("duplicate file paths") && msg.contains("test/dup.parquet"), + "unexpected error message: {msg}" + ); + // The same path should appear exactly once in the message, even though + // it was added three times. + assert_eq!(msg.matches("test/dup.parquet").count(), 1); + } + + #[tokio::test] + async fn test_fast_append_intra_batch_duplicate_check_can_be_disabled() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let make_file = || { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/dup.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(1))])) + .build() + .unwrap() + }; + + // When the user opts out of duplicate validation, batch duplicates are + // accepted unchanged - mirrors the existing behaviour for cross- + // snapshot duplicates and keeps the opt-out semantics consistent. + let action = tx + .fast_append() + .with_check_duplicate(false) + .add_data_files(vec![make_file(), make_file()]); + assert!(Arc::new(action).commit(&table).await.is_ok()); + } + #[tokio::test] async fn test_fast_append() { let table = make_v2_minimal_table(); diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8f643a7d1e..239f912c85 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -164,11 +164,28 @@ impl<'a> SnapshotProducer<'a> { } pub(crate) async fn validate_duplicate_files(&self) -> Result<()> { - let new_files: HashSet<&str> = self - .added_data_files - .iter() - .map(|df| df.file_path.as_str()) - .collect(); + // First, detect duplicate file paths within the batch itself. + // Collecting straight into a `HashSet` would silently deduplicate + // and pass corruption through to the manifest, so we walk the list + // explicitly and surface every distinct duplicated path. + let mut new_files: HashSet<&str> = HashSet::with_capacity(self.added_data_files.len()); + let mut duplicates_in_batch: HashSet<&str> = HashSet::new(); + for data_file in &self.added_data_files { + if !new_files.insert(data_file.file_path.as_str()) { + duplicates_in_batch.insert(data_file.file_path.as_str()); + } + } + if !duplicates_in_batch.is_empty() { + let mut paths: Vec<&str> = duplicates_in_batch.into_iter().collect(); + paths.sort_unstable(); + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add duplicate file paths within the same commit: {}", + paths.join(", ") + ), + )); + } let mut referenced_files = Vec::new(); if let Some(current_snapshot) = self.table.metadata().current_snapshot() {