Skip to content

feat(transaction): Add TransactionAction POC #1400

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8ed0ddc
add tx action-ish for set_location (does not work well :(
CTTY May 30, 2025
d631b3c
Add logic to commit metadata changes (no retry(yet))
CTTY May 31, 2025
7a6f3d1
refresh-ish action
CTTY Jun 1, 2025
d481d69
minor
CTTY Jun 1, 2025
434cdb6
box things up
CTTY Jun 2, 2025
11260a0
clean up tx.commit
CTTY Jun 2, 2025
a64cc21
more cleanup
CTTY Jun 2, 2025
f2c3399
clean up, need to fix the referenced logic.. or that could be fixed b…
CTTY Jun 3, 2025
f7cb068
More clean up.. retry is not working yet
CTTY Jun 3, 2025
2bfc500
Update crates/iceberg/src/transaction/action/mod.rs
CTTY Jun 3, 2025
e881811
Update crates/iceberg/src/transaction/action/mod.rs
CTTY Jun 3, 2025
c2baf4e
Have tx own base table, mark tx_commit_res as not used, need to fix r…
CTTY Jun 3, 2025
1c40b86
use backon, code cleanup
CTTY Jun 4, 2025
014726e
minor
CTTY Jun 4, 2025
d8a2394
fix test build
CTTY Jun 4, 2025
a6ce0b0
fmt
CTTY Jun 4, 2025
88aecf1
Merge branch 'main' into ctty/tx-commit
CTTY Jun 4, 2025
626715c
clean up, add apply()
CTTY Jun 5, 2025
14aea32
Merge branch 'ctty/tx-commit' of github.com:CTTY/iceberg-rust into ct…
CTTY Jun 5, 2025
11bb9d3
remove apply
CTTY Jun 5, 2025
8c2192b
remove tx from FastAppendAction, impl tx action for fast append
CTTY Jun 6, 2025
3ea76bb
fix build and fmt
CTTY Jun 6, 2025
4d1fff8
impl tx_action for update props, replace sort order, and update forma…
CTTY Jun 7, 2025
e565f76
Add ApplyTransactionAction, remove mutex and state, stop adding actio…
CTTY Jun 7, 2025
3c0bdd7
removed current_table from tx
CTTY Jun 7, 2025
c2dde23
Fix error-handling in retry
CTTY Jun 9, 2025
3a8e8f7
make tx.apply immutable
CTTY Jun 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,44 @@ impl Catalog for MemoryCatalog {
"MemoryCatalog does not currently support updating tables.",
))
}

// async fn commit_table(&self, base: &Table, current: Table) -> Result<Table> {
// if base.metadata() == current.metadata() {
// // no change
// return Ok(current);
// }
//
// let mut root_namespace_state = self.root_namespace_state.lock().await;
// // TODO: caller needs to retry on the error below
// let _ = root_namespace_state
// .check_metadata_location(base.identifier(), base.metadata_location())?;
//
// let next_metadata_version = if let Some(base_metadata_location) = base.metadata_location() {
// self.parse_metadata_version(base_metadata_location) + 1
// } else {
// 0
// };
//
// // write metadata
// let metadata_location = format!(
// "{}/metadata/{}-{}.metadata.json",
// current.metadata().location(),
// next_metadata_version,
// Uuid::new_v4()
// );
//
// // TODO instead of using current.metadata(), build a new metadata with some properties like last_updated_ms updated
// self.file_io
// .new_output(&metadata_location)?
// .write(serde_json::to_vec(current.metadata())?.into())
// .await?;
//
// root_namespace_state
// .update_existing_table_location(current.identifier(), current.metadata_location())?;
//
// // TODO same here, need to update the metadata location
// Ok(current)
// }
}

#[cfg(test)]
Expand Down
39 changes: 39 additions & 0 deletions crates/catalog/memory/src/namespace_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,45 @@ impl NamespaceState {
}
}

pub(crate) fn check_metadata_location(
&self,
table_ident: &TableIdent,
metadata_location: Option<&str>,
) -> Result<()> {
let namespace = self.get_namespace(table_ident.namespace())?;

if namespace
.table_metadata_locations
.get(table_ident.name())
.map(|s| s.as_str())
!= metadata_location
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Metadata location does not match for table: {table_ident}!"),
));
}

Ok(())
}

pub(crate) fn update_existing_table_location(
&mut self,
table_ident: &TableIdent,
new_metadata_location: Option<&str>,
) -> Result<()> {
if new_metadata_location.is_none() {
return Ok(());
}

let namespace = self.get_mut_namespace(table_ident.namespace())?;
namespace
.table_metadata_locations
.entry(table_ident.name().to_string())
.insert_entry(new_metadata_location.unwrap().to_string());
Ok(())
}

// Inserts the given table or returns an error if it already exists
pub(crate) fn insert_new_table(
&mut self,
Expand Down
8 changes: 4 additions & 4 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2125,10 +2125,10 @@ mod tests {
.unwrap()
};

let table = Transaction::new(&table1)
let table = Transaction::new(table1)
.upgrade_table_version(FormatVersion::V2)
.unwrap()
.commit(&catalog)
.commit(Arc::new(&catalog))
.await
.unwrap();

Expand Down Expand Up @@ -2250,10 +2250,10 @@ mod tests {
.unwrap()
};

let table_result = Transaction::new(&table1)
let table_result = Transaction::new(table1)
.upgrade_table_version(FormatVersion::V2)
.unwrap()
.commit(&catalog)
.commit(Arc::new(&catalog))
.await;

assert!(table_result.is_err());
Expand Down
6 changes: 3 additions & 3 deletions crates/catalog/rest/tests/rest_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};

use ctor::{ctor, dtor};
use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type};
Expand Down Expand Up @@ -347,10 +347,10 @@ async fn test_update_table() {
);

// Update table by committing transaction
let table2 = Transaction::new(&table)
let table2 = Transaction::new(table)
.set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())]))
.unwrap()
.commit(&catalog)
.commit(Arc::new(&catalog))
.await
.unwrap();

Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ arrow-select = { workspace = true }
arrow-string = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-trait = { workspace = true }
backon = { version = "1.5.1"}
base64 = { workspace = true }
bimap = { workspace = true }
bytes = { workspace = true }
Expand Down Expand Up @@ -86,6 +87,7 @@ typed-builder = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
zstd = { workspace = true }
regex = "1.11.1"

[dev-dependencies]
ctor = { workspace = true }
Expand Down
22 changes: 22 additions & 0 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::mem::take;
use std::ops::Deref;
use std::sync::Arc;

use _serde::deserialize_snapshot;
use async_trait::async_trait;
Expand Down Expand Up @@ -300,6 +301,27 @@ impl TableCommit {
pub fn take_updates(&mut self) -> Vec<TableUpdate> {
take(&mut self.updates)
}

/// Apply updates to a table
pub fn apply(&mut self, mut table: Table) -> Result<Table> {
// 1. check requirements
let requirements = self.take_requirements();
for requirement in requirements {
requirement.check(Some(table.metadata()))?;
}

// 2. Apply updates to metadata builder
let mut metadata_builder = table.metadata().clone().into_builder(None);

let updates = self.take_updates();
for update in updates {
metadata_builder = update.apply(metadata_builder)?;
}

table.with_metadata(Arc::new(metadata_builder.build()?.metadata));

Ok(table)
}
}

/// TableRequirement represents a requirement for a table in the catalog.
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub enum ErrorKind {
/// Iceberg data is invalid.
///
/// This error is returned when we try to read a table from iceberg but
/// failed to parse it's metadata or data file correctly.
/// failed to parse its metadata or data file correctly.
///
/// The table could be invalid or corrupted.
DataInvalid,
Expand Down
26 changes: 26 additions & 0 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;

use _serde::TableMetadataEnum;
use chrono::{DateTime, Utc};
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
Expand All @@ -40,6 +41,7 @@ use crate::error::{Result, timestamp_ms_to_utc};
use crate::{Error, ErrorKind};

static MAIN_BRANCH: &str = "main";
const TABLE_METADATA_FILE_NAME_REGEX: &str = r"(\d+)-([\w-]{36})(?:\.\w+)?\.metadata\.json";
pub(crate) static ONE_MINUTE_MS: i64 = 60_000;

pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
Expand Down Expand Up @@ -432,6 +434,30 @@ impl TableMetadata {
self.encryption_keys.get(key_id)
}

/// Parse metadata version and uuid from metadata filename
fn parse_metadata_filename(metadata_location: &str) -> Result<(i32, Uuid)> {
if let Some(metadata_file_name) = metadata_location.split('/').last() {
let re = Regex::new(TABLE_METADATA_FILE_NAME_REGEX)
.expect("Failed to parse regex for metadata file!");
if let Some(caps) = re.captures(metadata_file_name) {
let metadata_version_str = &caps[1];
let uuid_str = &caps[2];

let metadata_version = metadata_version_str
.parse()
.expect(format!("Invalid metadata version: {metadata_version_str}").as_str());
let uuid = Uuid::parse_str(uuid_str)?;

return Ok((metadata_version, uuid));
}
}

Err(Error::new(
ErrorKind::Unexpected,
format!("Unrecognizable metadata location: {metadata_location}"),
))
}

/// Normalize this partition spec.
///
/// This is an internal method
Expand Down
Loading