Skip to content

Commit b10d48e

Browse files
CTTYliurenjie1024
andauthored
feat(transaction): Add TransactionAction and related classes (#1420)
## Which issue does this PR close? This is a part of the effort to refactor transaction commit path and enable retry for write operations. Please find the POC here: #1400 Related Issues: - #1382 [EPIC] - #1386 - #1387 - #1388 - #1389 ## What changes are included in this PR? - Add `TransactionAction`, `ActionCommit`, and `ApplyTransactionAction` - Add `actions` field in `Transaction` ## Are these changes tested? Added unit tests --------- Co-authored-by: Renjie Liu <[email protected]>
1 parent df07537 commit b10d48e

File tree

2 files changed

+190
-0
lines changed

2 files changed

+190
-0
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#![allow(dead_code)]
19+
use std::mem::take;
20+
use std::sync::Arc;
21+
22+
use async_trait::async_trait;
23+
24+
use crate::table::Table;
25+
use crate::transaction::Transaction;
26+
use crate::{Result, TableRequirement, TableUpdate};
27+
28+
/// A boxed, thread-safe reference to a `TransactionAction`.
29+
pub type BoxedTransactionAction = Arc<dyn TransactionAction>;
30+
31+
/// A trait representing an atomic action that can be part of a transaction.
32+
///
33+
/// Implementors of this trait define how a specific action is committed to a table.
34+
/// Each action is responsible for generating the updates and requirements needed
35+
/// to modify the table metadata.
36+
#[async_trait]
37+
pub(crate) trait TransactionAction: Sync + Send {
38+
/// Commits this action against the provided table and returns the resulting updates.
39+
/// NOTE: This function is intended for internal use only and should not be called directly by users.
40+
///
41+
/// # Arguments
42+
///
43+
/// * `table` - The current state of the table this action should apply to.
44+
///
45+
/// # Returns
46+
///
47+
/// An `ActionCommit` containing table updates and table requirements,
48+
/// or an error if the commit fails.
49+
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit>;
50+
}
51+
52+
/// A helper trait for applying a `TransactionAction` to a `Transaction`.
53+
///
54+
/// This is implemented for all `TransactionAction` types
55+
/// to allow easy chaining of actions into a transaction context.
56+
pub trait ApplyTransactionAction {
57+
/// Adds this action to the given transaction.
58+
///
59+
/// # Arguments
60+
///
61+
/// * `tx` - The transaction to apply the action to.
62+
///
63+
/// # Returns
64+
///
65+
/// The modified transaction containing this action, or an error if the operation fails.
66+
fn apply(self, tx: Transaction) -> Result<Transaction>;
67+
}
68+
69+
impl<T: TransactionAction + 'static> ApplyTransactionAction for T {
70+
fn apply(self, mut tx: Transaction) -> Result<Transaction>
71+
where Self: Sized {
72+
tx.actions.push(Arc::new(self));
73+
Ok(tx)
74+
}
75+
}
76+
77+
/// The result of committing a `TransactionAction`.
78+
///
79+
/// This struct contains the updates to apply to the table's metadata
80+
/// and any preconditions that must be satisfied before the update can be committed.
81+
pub struct ActionCommit {
82+
updates: Vec<TableUpdate>,
83+
requirements: Vec<TableRequirement>,
84+
}
85+
86+
impl ActionCommit {
87+
/// Creates a new `ActionCommit` from the given updates and requirements.
88+
pub fn new(updates: Vec<TableUpdate>, requirements: Vec<TableRequirement>) -> Self {
89+
Self {
90+
updates,
91+
requirements,
92+
}
93+
}
94+
95+
/// Consumes and returns the list of table updates.
96+
pub fn take_updates(&mut self) -> Vec<TableUpdate> {
97+
take(&mut self.updates)
98+
}
99+
100+
/// Consumes and returns the list of table requirements.
101+
pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
102+
take(&mut self.requirements)
103+
}
104+
}
105+
106+
#[cfg(test)]
107+
mod tests {
108+
use std::str::FromStr;
109+
use std::sync::Arc;
110+
111+
use async_trait::async_trait;
112+
use uuid::Uuid;
113+
114+
use crate::table::Table;
115+
use crate::transaction::Transaction;
116+
use crate::transaction::action::{ActionCommit, ApplyTransactionAction, TransactionAction};
117+
use crate::transaction::tests::make_v2_table;
118+
use crate::{Result, TableRequirement, TableUpdate};
119+
120+
struct TestAction;
121+
122+
#[async_trait]
123+
impl TransactionAction for TestAction {
124+
async fn commit(self: Arc<Self>, _table: &Table) -> Result<ActionCommit> {
125+
Ok(ActionCommit::new(
126+
vec![TableUpdate::SetLocation {
127+
location: String::from("s3://bucket/prefix/table/"),
128+
}],
129+
vec![TableRequirement::UuidMatch {
130+
uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1")?,
131+
}],
132+
))
133+
}
134+
}
135+
136+
#[tokio::test]
137+
async fn test_commit_transaction_action() {
138+
let table = make_v2_table();
139+
let action = TestAction;
140+
141+
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
142+
143+
let updates = action_commit.take_updates();
144+
let requirements = action_commit.take_requirements();
145+
146+
assert_eq!(updates[0], TableUpdate::SetLocation {
147+
location: String::from("s3://bucket/prefix/table/")
148+
});
149+
assert_eq!(requirements[0], TableRequirement::UuidMatch {
150+
uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap()
151+
});
152+
}
153+
154+
#[test]
155+
fn test_apply_transaction_action() {
156+
let table = make_v2_table();
157+
let action = TestAction;
158+
let tx = Transaction::new(&table);
159+
160+
let updated_tx = action.apply(tx).unwrap();
161+
162+
// There should be one action in the transaction now
163+
assert_eq!(updated_tx.actions.len(), 1);
164+
}
165+
166+
#[test]
167+
fn test_action_commit() {
168+
// Create dummy updates and requirements
169+
let location = String::from("s3://bucket/prefix/table/");
170+
let uuid = Uuid::new_v4();
171+
let updates = vec![TableUpdate::SetLocation { location }];
172+
let requirements = vec![TableRequirement::UuidMatch { uuid }];
173+
174+
let mut action_commit = ActionCommit::new(updates.clone(), requirements.clone());
175+
176+
let taken_updates = action_commit.take_updates();
177+
let taken_requirements = action_commit.take_requirements();
178+
179+
// Check values are returned correctly
180+
assert_eq!(taken_updates, updates);
181+
assert_eq!(taken_requirements, requirements);
182+
183+
assert!(action_commit.take_updates().is_empty());
184+
assert!(action_commit.take_requirements().is_empty());
185+
}
186+
}

crates/iceberg/src/transaction/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! This module contains transaction api.
1919
20+
mod action;
2021
mod append;
2122
mod snapshot;
2223
mod sort_order;
@@ -32,6 +33,7 @@ use crate::TableUpdate::UpgradeFormatVersion;
3233
use crate::error::Result;
3334
use crate::spec::FormatVersion;
3435
use crate::table::Table;
36+
use crate::transaction::action::BoxedTransactionAction;
3537
use crate::transaction::append::FastAppendAction;
3638
use crate::transaction::sort_order::ReplaceSortOrderAction;
3739
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
@@ -40,6 +42,7 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat
4042
pub struct Transaction<'a> {
4143
base_table: &'a Table,
4244
current_table: Table,
45+
actions: Vec<BoxedTransactionAction>,
4346
updates: Vec<TableUpdate>,
4447
requirements: Vec<TableRequirement>,
4548
}
@@ -50,6 +53,7 @@ impl<'a> Transaction<'a> {
5053
Self {
5154
base_table: table,
5255
current_table: table.clone(),
56+
actions: vec![],
5357
updates: vec![],
5458
requirements: vec![],
5559
}

0 commit comments

Comments
 (0)