From 1e8232707f265b4b692e89dccdbaf199b089dbd6 Mon Sep 17 00:00:00 2001
From: Shawn Chang <yxchang@amazon.com>
Date: Mon, 19 May 2025 15:23:54 -0700
Subject: [PATCH 1/6] add snapshot validation logic

---
 crates/iceberg/src/lib.rs                  |   2 +
 crates/iceberg/src/spec/snapshot.rs        |   1 +
 crates/iceberg/src/transaction/append.rs   |   7 +-
 crates/iceberg/src/transaction/mod.rs      |   1 +
 crates/iceberg/src/transaction/snapshot.rs |   8 +-
 crates/iceberg/src/transaction/validate.rs | 225 +++++++++++++++++++++
 6 files changed, 242 insertions(+), 2 deletions(-)
 create mode 100644 crates/iceberg/src/transaction/validate.rs

diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 556ff3e02f..8e72ed07d6 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -1,4 +1,6 @@
+#![feature(let_chains)]
 // Licensed to the Apache Software Foundation (ASF) under one
+
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
 // regarding copyright ownership.  The ASF licenses this file
diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs
index a2716ad97e..d6105273a6 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -42,6 +42,7 @@ pub type SnapshotRef = Arc<Snapshot>;
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
 #[serde(rename_all = "lowercase")]
 /// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
+#[derive(Hash)]
 pub enum Operation {
     /// Only data files were added and no files were removed.
     Append,
diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs
index 574904b283..504bcb464b 100644
--- a/crates/iceberg/src/transaction/append.rs
+++ b/crates/iceberg/src/transaction/append.rs
@@ -16,17 +16,20 @@
 // under the License.
 
 use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
 
 use arrow_array::StringArray;
 use futures::TryStreamExt;
 use uuid::Uuid;
 
 use crate::error::Result;
-use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
+use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation, Snapshot, SnapshotRef};
+use crate::table::Table;
 use crate::transaction::Transaction;
 use crate::transaction::snapshot::{
     DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
 };
+use crate::transaction::validate::SnapshotValidator;
 use crate::writer::file_writer::ParquetWriter;
 use crate::{Error, ErrorKind};
 
@@ -209,6 +212,8 @@ impl SnapshotProduceOperation for FastAppendOperation {
     }
 }
 
+impl SnapshotValidator for FastAppendOperation {}
+
 #[cfg(test)]
 mod tests {
     use crate::scan::tests::TableTestFixture;
diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs
index ba79d60bbd..43460bdc8a 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -20,6 +20,7 @@
 mod append;
 mod snapshot;
 mod sort_order;
+mod validate;
 
 use std::cmp::Ordering;
 use std::collections::HashMap;
diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs
index a15e17f1d0..96b0ffa759 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -29,12 +29,13 @@ use crate::spec::{
     PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention,
     SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries,
 };
+use crate::transaction::validate::SnapshotValidator;
 use crate::transaction::Transaction;
 use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
 
 const META_ROOT_PATH: &str = "metadata";
 
-pub(crate) trait SnapshotProduceOperation: Send + Sync {
+pub(crate) trait SnapshotProduceOperation: Send + SnapshotValidator + Sync {
     fn operation(&self) -> Operation;
     #[allow(unused)]
     fn delete_entries(
@@ -307,6 +308,11 @@ impl<'a> SnapshotProduceAction<'a> {
             .await?;
         let next_seq_num = self.tx.current_table.metadata().next_sequence_number();
 
+        snapshot_produce_operation.validate(
+            &self.tx.current_table,
+            self.tx.current_table.metadata().current_snapshot(),
+        );
+
         let summary = self
             .summary(&snapshot_produce_operation)
             .map_err(|err| {
diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs
new file mode 100644
index 0000000000..7416e41831
--- /dev/null
+++ b/crates/iceberg/src/transaction/validate.rs
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, TableMetadata};
+use crate::table::Table;
+
+pub(crate) trait SnapshotValidator {
+    fn validate(&self, table: &Table, snapshot: Option<&SnapshotRef>) -> () {}
+
+    #[allow(dead_code)]
+    async fn validation_history(
+        &self,
+        base: &Table,
+        to_snapshot: Option<&SnapshotRef>,
+        from_snapshot: Option<&SnapshotRef>,
+        matching_operations: HashSet<Operation>,
+        manifest_content_type: ManifestContentType,
+    ) -> (Vec<ManifestFile>, HashSet<i64>) {
+        let mut manifests = vec![];
+        let mut new_snapshots = HashSet::new();
+        let mut last_snapshot: Option<&SnapshotRef> = None;
+
+        let snapshots = Self::ancestors_between(to_snapshot, from_snapshot, base.metadata());
+        for current_snapshot in &snapshots {
+            last_snapshot = Some(current_snapshot);
+
+            if matching_operations.contains(&current_snapshot.summary().operation) {
+                new_snapshots.insert(current_snapshot.snapshot_id().clone());
+                current_snapshot
+                    .load_manifest_list(base.file_io(), base.metadata())
+                    .await
+                    .expect("Failed to load manifest list!")
+                    .entries()
+                    .into_iter()
+                    .for_each(|manifest| {
+                        if manifest.content == manifest_content_type
+                            && manifest.added_snapshot_id == current_snapshot.snapshot_id()
+                        {
+                            manifests.push(manifest.clone());
+                        }
+                    });
+            }
+        }
+
+        if last_snapshot.is_some()
+            && last_snapshot.unwrap().parent_snapshot_id()
+                != from_snapshot.map(|snapshot| snapshot.snapshot_id())
+        {
+            panic!("Cannot determine history between starting snapshot {} and the last known ancestor {}",
+                   from_snapshot.map_or_else(
+                       || "None".to_string(),
+                       |snapshot| snapshot.snapshot_id().to_string()),
+                   last_snapshot.map_or_else(
+                       || "None".to_string(),
+                       |snapshot| snapshot.parent_snapshot_id().unwrap().to_string()));
+        }
+
+        (manifests, new_snapshots)
+    }
+
+    fn ancestors_between(
+        to_snapshot: Option<&SnapshotRef>,
+        from_snapshot: Option<&SnapshotRef>,
+        table_metadata: &TableMetadata,
+    ) -> Vec<SnapshotRef> {
+        let mut snapshots = Vec::new();
+        let mut current_snapshot = to_snapshot;
+        while let Some(snapshot) = current_snapshot {
+            snapshots.push(Arc::clone(&snapshot));
+            match snapshot.parent_snapshot_id() {
+                Some(parent_snapshot_id)
+                    if from_snapshot.is_some()
+                        && parent_snapshot_id == from_snapshot.unwrap().snapshot_id() =>
+                {
+                    break
+                }
+                Some(parent_snapshot_id) => {
+                    current_snapshot = table_metadata.snapshot_by_id(parent_snapshot_id)
+                }
+                None => break,
+            }
+        }
+
+        snapshots
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashSet;
+
+    use crate::spec::{
+        DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestContentType, Operation,
+        SnapshotRef, Struct,
+    };
+    use crate::transaction::tests::{make_v2_minimal_table, make_v2_table};
+    use crate::transaction::validate::SnapshotValidator;
+    use crate::transaction::{Table, Transaction};
+    use crate::TableUpdate;
+
+    struct TestValidator {}
+
+    impl SnapshotValidator for TestValidator {}
+
+    async fn make_v2_table_with_updates() -> (Table, Vec<TableUpdate>) {
+        let table = make_v2_minimal_table();
+        let tx = Transaction::new(&table);
+        let mut action = tx.fast_append(None, vec![]).unwrap();
+
+        let data_file_1 = DataFileBuilder::default()
+            .content(DataContentType::Data)
+            .file_path("test/1.parquet".to_string())
+            .file_format(DataFileFormat::Parquet)
+            .file_size_in_bytes(100)
+            .record_count(1)
+            .partition_spec_id(table.metadata().default_partition_spec_id())
+            .partition(Struct::from_iter([Some(Literal::long(300))]))
+            .build()
+            .unwrap();
+
+        let data_file_2 = DataFileBuilder::default()
+            .content(DataContentType::Data)
+            .file_path("test/2.parquet".to_string())
+            .file_format(DataFileFormat::Parquet)
+            .file_size_in_bytes(100)
+            .record_count(1)
+            .partition_spec_id(table.metadata().default_partition_spec_id())
+            .partition(Struct::from_iter([Some(Literal::long(300))]))
+            .build()
+            .unwrap();
+
+        action.add_data_files(vec![data_file_1.clone()]).unwrap();
+        let tx = action.apply().await.unwrap();
+        let mut action = tx.fast_append(None, vec![]).unwrap();
+        action.add_data_files(vec![data_file_2.clone()]).unwrap();
+        let tx = action.apply().await.unwrap();
+
+        (table.clone(), tx.updates)
+    }
+
+    #[tokio::test]
+    async fn test_validation_history() {
+        let (table, updates) = make_v2_table_with_updates().await;
+        let parent_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
+            SnapshotRef::new(snapshot.clone())
+        } else {
+            unreachable!()
+        };
+        let current_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[2] {
+            SnapshotRef::new(snapshot.clone())
+        } else {
+            unreachable!()
+        };
+
+        let test_validator = TestValidator {};
+
+        // specifying from_snapshot, validating up to the from_snapshot
+        let (manifests, snapshots) = test_validator
+            .validation_history(
+                &table,
+                Some(&current_snapshot),
+                Some(&parent_snapshot),
+                HashSet::from([Operation::Append]),
+                ManifestContentType::Data,
+            )
+            .await;
+
+        manifests
+            .iter()
+            .for_each(|manifest| assert_eq!(manifest.content, ManifestContentType::Data));
+        assert_eq!(snapshots.into_iter().collect::<Vec<_>>(), vec![
+            current_snapshot.snapshot_id()
+        ]);
+    }
+
+    #[test]
+    fn test_ancestor_between() {
+        let table = make_v2_table();
+        let current_snapshot = table.metadata().current_snapshot();
+        let parent_snapshot_id = current_snapshot.unwrap().parent_snapshot_id().unwrap();
+        let parent_snapshot = table.metadata().snapshot_by_id(parent_snapshot_id);
+
+        // not specifying from_snapshot, listing all ancestors
+        let all_ancestors =
+            TestValidator::ancestors_between(current_snapshot, None, table.metadata());
+        assert_eq!(
+            vec![
+                current_snapshot.unwrap().snapshot_id(),
+                current_snapshot.unwrap().parent_snapshot_id().unwrap()
+            ],
+            all_ancestors
+                .iter()
+                .map(|snapshot| snapshot.snapshot_id())
+                .collect::<Vec<_>>()
+        );
+
+        // specifying from_snapshot, listing only 1 snapshot
+        let ancestors =
+            TestValidator::ancestors_between(current_snapshot, parent_snapshot, table.metadata());
+        assert_eq!(
+            vec![current_snapshot.unwrap().snapshot_id()],
+            ancestors
+                .iter()
+                .map(|snapshot| snapshot.snapshot_id())
+                .collect::<Vec<_>>()
+        );
+    }
+}

From 26b434a50a18e29970aaa5bebf5975f7bd4598f5 Mon Sep 17 00:00:00 2001
From: Shawn Chang <yxchang@amazon.com>
Date: Mon, 19 May 2025 15:42:58 -0700
Subject: [PATCH 2/6] fmt

---
 crates/iceberg/src/transaction/append.rs   |  4 +---
 crates/iceberg/src/transaction/snapshot.rs |  2 +-
 crates/iceberg/src/transaction/validate.rs | 24 +++++++++++++---------
 3 files changed, 16 insertions(+), 14 deletions(-)

diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs
index 504bcb464b..fdfc154176 100644
--- a/crates/iceberg/src/transaction/append.rs
+++ b/crates/iceberg/src/transaction/append.rs
@@ -16,15 +16,13 @@
 // under the License.
 
 use std::collections::{HashMap, HashSet};
-use std::sync::Arc;
 
 use arrow_array::StringArray;
 use futures::TryStreamExt;
 use uuid::Uuid;
 
 use crate::error::Result;
-use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation, Snapshot, SnapshotRef};
-use crate::table::Table;
+use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
 use crate::transaction::Transaction;
 use crate::transaction::snapshot::{
     DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs
index 96b0ffa759..d10f432932 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -29,8 +29,8 @@ use crate::spec::{
     PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention,
     SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries,
 };
-use crate::transaction::validate::SnapshotValidator;
 use crate::transaction::Transaction;
+use crate::transaction::validate::SnapshotValidator;
 use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
 
 const META_ROOT_PATH: &str = "metadata";
diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs
index 7416e41831..7db8ba562d 100644
--- a/crates/iceberg/src/transaction/validate.rs
+++ b/crates/iceberg/src/transaction/validate.rs
@@ -22,7 +22,7 @@ use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, Tab
 use crate::table::Table;
 
 pub(crate) trait SnapshotValidator {
-    fn validate(&self, table: &Table, snapshot: Option<&SnapshotRef>) -> () {}
+    fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> () {}
 
     #[allow(dead_code)]
     async fn validation_history(
@@ -63,13 +63,17 @@ pub(crate) trait SnapshotValidator {
             && last_snapshot.unwrap().parent_snapshot_id()
                 != from_snapshot.map(|snapshot| snapshot.snapshot_id())
         {
-            panic!("Cannot determine history between starting snapshot {} and the last known ancestor {}",
-                   from_snapshot.map_or_else(
-                       || "None".to_string(),
-                       |snapshot| snapshot.snapshot_id().to_string()),
-                   last_snapshot.map_or_else(
-                       || "None".to_string(),
-                       |snapshot| snapshot.parent_snapshot_id().unwrap().to_string()));
+            panic!(
+                "Cannot determine history between starting snapshot {} and the last known ancestor {}",
+                from_snapshot.map_or_else(
+                    || "None".to_string(),
+                    |snapshot| snapshot.snapshot_id().to_string()
+                ),
+                last_snapshot.map_or_else(
+                    || "None".to_string(),
+                    |snapshot| snapshot.parent_snapshot_id().unwrap().to_string()
+                )
+            );
         }
 
         (manifests, new_snapshots)
@@ -89,7 +93,7 @@ pub(crate) trait SnapshotValidator {
                     if from_snapshot.is_some()
                         && parent_snapshot_id == from_snapshot.unwrap().snapshot_id() =>
                 {
-                    break
+                    break;
                 }
                 Some(parent_snapshot_id) => {
                     current_snapshot = table_metadata.snapshot_by_id(parent_snapshot_id)
@@ -106,6 +110,7 @@ pub(crate) trait SnapshotValidator {
 mod tests {
     use std::collections::HashSet;
 
+    use crate::TableUpdate;
     use crate::spec::{
         DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestContentType, Operation,
         SnapshotRef, Struct,
@@ -113,7 +118,6 @@ mod tests {
     use crate::transaction::tests::{make_v2_minimal_table, make_v2_table};
     use crate::transaction::validate::SnapshotValidator;
     use crate::transaction::{Table, Transaction};
-    use crate::TableUpdate;
 
     struct TestValidator {}
 

From 6a08af8bd0af7523146bd542c057057ef870d545 Mon Sep 17 00:00:00 2001
From: Shawn Chang <yxchang@amazon.com>
Date: Mon, 19 May 2025 15:45:25 -0700
Subject: [PATCH 3/6] minor

---
 crates/iceberg/src/lib.rs | 2 --
 1 file changed, 2 deletions(-)

diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 8e72ed07d6..556ff3e02f 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -1,6 +1,4 @@
-#![feature(let_chains)]
 // Licensed to the Apache Software Foundation (ASF) under one
-
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
 // regarding copyright ownership.  The ASF licenses this file

From fa109bc0f633ea0ea2da31aeeb53b2347293a34a Mon Sep 17 00:00:00 2001
From: Shawn Chang <yxchang@amazon.com>
Date: Mon, 19 May 2025 15:53:11 -0700
Subject: [PATCH 4/6] minor

---
 crates/iceberg/src/transaction/validate.rs | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs
index 7db8ba562d..51afee5d73 100644
--- a/crates/iceberg/src/transaction/validate.rs
+++ b/crates/iceberg/src/transaction/validate.rs
@@ -22,7 +22,7 @@ use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, Tab
 use crate::table::Table;
 
 pub(crate) trait SnapshotValidator {
-    fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> () {}
+    fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) {}
 
     #[allow(dead_code)]
     async fn validation_history(
@@ -42,13 +42,13 @@ pub(crate) trait SnapshotValidator {
             last_snapshot = Some(current_snapshot);
 
             if matching_operations.contains(&current_snapshot.summary().operation) {
-                new_snapshots.insert(current_snapshot.snapshot_id().clone());
+                new_snapshots.insert(current_snapshot.snapshot_id());
                 current_snapshot
                     .load_manifest_list(base.file_io(), base.metadata())
                     .await
                     .expect("Failed to load manifest list!")
                     .entries()
-                    .into_iter()
+                    .iter()
                     .for_each(|manifest| {
                         if manifest.content == manifest_content_type
                             && manifest.added_snapshot_id == current_snapshot.snapshot_id()
@@ -87,7 +87,7 @@ pub(crate) trait SnapshotValidator {
         let mut snapshots = Vec::new();
         let mut current_snapshot = to_snapshot;
         while let Some(snapshot) = current_snapshot {
-            snapshots.push(Arc::clone(&snapshot));
+            snapshots.push(Arc::clone(snapshot));
             match snapshot.parent_snapshot_id() {
                 Some(parent_snapshot_id)
                     if from_snapshot.is_some()

From 29a50fc17281b9450fa5c56203e5d13a1a3cea09 Mon Sep 17 00:00:00 2001
From: Shawn Chang <yxchang@amazon.com>
Date: Wed, 21 May 2025 16:48:39 -0700
Subject: [PATCH 5/6] remove validate usage

---
 crates/iceberg/src/transaction/snapshot.rs | 5 -----
 crates/iceberg/src/transaction/validate.rs | 1 +
 2 files changed, 1 insertion(+), 5 deletions(-)

diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs
index d10f432932..b9dbaf9f84 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -308,11 +308,6 @@ impl<'a> SnapshotProduceAction<'a> {
             .await?;
         let next_seq_num = self.tx.current_table.metadata().next_sequence_number();
 
-        snapshot_produce_operation.validate(
-            &self.tx.current_table,
-            self.tx.current_table.metadata().current_snapshot(),
-        );
-
         let summary = self
             .summary(&snapshot_produce_operation)
             .map_err(|err| {
diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs
index 51afee5d73..f36db3c8f5 100644
--- a/crates/iceberg/src/transaction/validate.rs
+++ b/crates/iceberg/src/transaction/validate.rs
@@ -22,6 +22,7 @@ use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, Tab
 use crate::table::Table;
 
 pub(crate) trait SnapshotValidator {
+    #[allow(dead_code)]
     fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) {}
 
     #[allow(dead_code)]

From e329b10370325522e5124b5d6182dac426e2e02f Mon Sep 17 00:00:00 2001
From: Shawn Chang <yxchang@amazon.com>
Date: Fri, 23 May 2025 14:39:12 -0700
Subject: [PATCH 6/6] remove Option for to_snapshot

---
 crates/iceberg/src/transaction/validate.rs | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs
index f36db3c8f5..935f93af6b 100644
--- a/crates/iceberg/src/transaction/validate.rs
+++ b/crates/iceberg/src/transaction/validate.rs
@@ -29,7 +29,7 @@ pub(crate) trait SnapshotValidator {
     async fn validation_history(
         &self,
         base: &Table,
-        to_snapshot: Option<&SnapshotRef>,
+        to_snapshot: &SnapshotRef,
         from_snapshot: Option<&SnapshotRef>,
         matching_operations: HashSet<Operation>,
         manifest_content_type: ManifestContentType,
@@ -80,13 +80,15 @@ pub(crate) trait SnapshotValidator {
         (manifests, new_snapshots)
     }
 
+    /// find ancestors in (from_snapshot, to_snapshot]
+    /// TODO: Return an iterator instead of a vector
     fn ancestors_between(
-        to_snapshot: Option<&SnapshotRef>,
+        to_snapshot: &SnapshotRef,
         from_snapshot: Option<&SnapshotRef>,
         table_metadata: &TableMetadata,
     ) -> Vec<SnapshotRef> {
         let mut snapshots = Vec::new();
-        let mut current_snapshot = to_snapshot;
+        let mut current_snapshot = Some(to_snapshot);
         while let Some(snapshot) = current_snapshot {
             snapshots.push(Arc::clone(snapshot));
             match snapshot.parent_snapshot_id() {
@@ -180,7 +182,7 @@ mod tests {
         let (manifests, snapshots) = test_validator
             .validation_history(
                 &table,
-                Some(&current_snapshot),
+                &current_snapshot,
                 Some(&parent_snapshot),
                 HashSet::from([Operation::Append]),
                 ManifestContentType::Data,
@@ -204,7 +206,7 @@ mod tests {
 
         // not specifying from_snapshot, listing all ancestors
         let all_ancestors =
-            TestValidator::ancestors_between(current_snapshot, None, table.metadata());
+            TestValidator::ancestors_between(current_snapshot.unwrap(), None, table.metadata());
         assert_eq!(
             vec![
                 current_snapshot.unwrap().snapshot_id(),
@@ -218,7 +220,7 @@ mod tests {
 
         // specifying from_snapshot, listing only 1 snapshot
         let ancestors =
-            TestValidator::ancestors_between(current_snapshot, parent_snapshot, table.metadata());
+            TestValidator::ancestors_between(current_snapshot.unwrap(), parent_snapshot, table.metadata());
         assert_eq!(
             vec![current_snapshot.unwrap().snapshot_id()],
             ancestors