Skip to content

Commit

Permalink
snapshot export component subtree (#29149)
Browse files Browse the repository at this point in the history
enable snapshot exports of a subtee of the component tree.
this is especially useful if a component has been unmounted (deleted from source code) and you want to recover its data.

<img width="1534" alt="Screenshot 2024-08-23 at 12 41 26 PM" src="https://github.com/user-attachments/assets/e8a4287d-36ba-48a7-9a92-1dc3ff42ebcb">

GitOrigin-RevId: 0e9ebd6d483c17f9c80c63c862ce3a7d0ebc7d63
  • Loading branch information
ldanilek authored and Convex, Inc. committed Aug 27, 2024
1 parent 4d15f7f commit daa709c
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 74 deletions.
197 changes: 145 additions & 52 deletions crates/application/src/export_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,14 @@ impl<RT: Runtime> ExportWorker<RT> {
async fn export_inner(
&mut self,
format: ExportFormat,
component: ComponentId,
) -> anyhow::Result<(Timestamp, ExportObjectKeys, FunctionUsageTracker)> {
tracing::info!("Beginning snapshot export...");
let storage = &self.storage;
let (ts, tables, by_id_indexes, system_tables, component_tree) = {
let mut tx = self.database.begin(Identity::system()).await?;
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
let component_tree = ComponentTree::new(&mut tx, ComponentId::Root).await?;
let component_tree = ComponentTree::new(&mut tx, component).await?;
let snapshot = self.database.snapshot(tx.begin_timestamp())?;
let tables: BTreeMap<_, _> = snapshot
.table_registry
Expand Down Expand Up @@ -619,7 +620,9 @@ impl<RT: Runtime> ExportWorker<RT> {
&mut self,
export: ParsedDocument<Export>,
) -> anyhow::Result<()> {
let (ts, object_keys, usage) = self.export_inner(export.format()).await?;
let (ts, object_keys, usage) = self
.export_inner(export.format(), export.component())
.await?;

let mut tx = self.database.begin(Identity::system()).await?;
let completed_export =
Expand Down Expand Up @@ -816,6 +819,7 @@ mod tests {
use database::{
test_helpers::DbFixtures,
BootstrapComponentsModel,
Database,
TableModel,
UserFacingModel,
};
Expand Down Expand Up @@ -949,9 +953,12 @@ mod tests {
db.commit(tx).await?;
}
let (_, object_keys, usage) = export_worker
.export_inner(ExportFormat::Zip {
include_storage: true,
})
.export_inner(
ExportFormat::Zip {
include_storage: true,
},
ComponentId::Root,
)
.await?;
must_let!(let ExportObjectKeys::Zip(object_key) = object_keys);

Expand Down Expand Up @@ -983,6 +990,53 @@ mod tests {
Ok(())
}

async fn write_test_data_in_component(
db: &Database<TestRuntime>,
component: ComponentId,
path_prefix: &str,
expected_export_entries: &mut BTreeMap<String, String>,
) -> anyhow::Result<()> {
expected_export_entries.insert(
format!("{path_prefix}_tables/documents.jsonl"),
format!("{}\n", json!({"name": "messages", "id": 10001}),),
);
// Write to tables in each component
let table: TableName = str::parse("messages")?;
let mut tx = db.begin(Identity::system()).await?;
let id = UserFacingModel::new(&mut tx, component.into())
.insert(table, assert_obj!("channel" => "c", "text" => path_prefix))
.await?;
let doc = UserFacingModel::new(&mut tx, component.into())
.get(id, None)
.await?
.unwrap();
let tablet_id = tx
.table_mapping()
.namespace(component.into())
.number_to_tablet()(doc.table())?;
let doc = doc.to_resolved(tablet_id);
let expected_documents = format!(
"{}\n",
serde_json::to_string(&doc.export(ValueFormat::ConvexCleanJSON))?
);
let expected_generated_schema = format!(
"{}\n",
json!(format!(
r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": field_name}}"#,
))
);
expected_export_entries.insert(
format!("{path_prefix}messages/documents.jsonl"),
expected_documents.clone(),
);
expected_export_entries.insert(
format!("{path_prefix}messages/generated_schema.jsonl"),
expected_generated_schema.clone(),
);
db.commit(tx).await?;
Ok(())
}

#[convex_macro::test_runtime]
async fn test_export_components(rt: TestRuntime) -> anyhow::Result<()> {
let application = Application::new_for_tests(&rt).await?;
Expand All @@ -1008,47 +1062,17 @@ mod tests {
("", ComponentId::Root),
("_components/component/", child_component),
] {
expected_export_entries.insert(
format!("{path_prefix}_tables/documents.jsonl"),
format!("{}\n", json!({"name": "messages", "id": 10001}),),
);
// Write to tables in each component
let table: TableName = str::parse("messages")?;
let mut tx = db.begin(Identity::system()).await?;
let id = UserFacingModel::new(&mut tx, component.into())
.insert(table, assert_obj!("channel" => "c", "text" => path_prefix))
write_test_data_in_component(&db, component, path_prefix, &mut expected_export_entries)
.await?;
let doc = UserFacingModel::new(&mut tx, component.into())
.get(id, None)
.await?
.unwrap();
let tablet_id = tx
.table_mapping()
.namespace(component.into())
.number_to_tablet()(doc.table())?;
let doc = doc.to_resolved(tablet_id);
expected_export_entries.insert(
format!("{path_prefix}messages/documents.jsonl"),
format!(
"{}\n",
serde_json::to_string(&doc.export(ValueFormat::ConvexCleanJSON))?
),
);
expected_export_entries.insert(
format!("{path_prefix}messages/generated_schema.jsonl"),
format!(
"{}\n",
json!(format!(
r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": field_name}}"#,
))
),
);
db.commit(tx).await?;
}

let (_, object_keys, usage) = export_worker
.export_inner(ExportFormat::Zip {
include_storage: false,
})
.export_inner(
ExportFormat::Zip {
include_storage: false,
},
ComponentId::Root,
)
.await?;
must_let!(let ExportObjectKeys::Zip(object_key) = object_keys);

Expand All @@ -1074,7 +1098,67 @@ r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": fiel

let usage = usage.gather_user_stats();
assert!(usage.database_egress_size["messages"] > 0);
Ok(())
}

#[convex_macro::test_runtime]
async fn test_export_child_component(rt: TestRuntime) -> anyhow::Result<()> {
let application = Application::new_for_tests(&rt).await?;
application
.load_component_tests_modules("with-schema")
.await?;
let db = application.database().clone();
let storage: Arc<dyn Storage> = Arc::new(LocalDirStorage::new(rt.clone())?);
let file_storage: Arc<dyn Storage> = Arc::new(LocalDirStorage::new(rt.clone())?);
let mut export_worker =
ExportWorker::new_test(rt, db.clone(), storage.clone(), file_storage);

let mut expected_export_entries = BTreeMap::new();

expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string());

let mut tx = db.begin(Identity::system()).await?;
let (_, child_component) = BootstrapComponentsModel::new(&mut tx)
.component_path_to_ids("component".parse()?)
.await?;

// Data in root component doesn't matter.
write_test_data_in_component(&db, ComponentId::Root, "", &mut BTreeMap::new()).await?;
write_test_data_in_component(&db, child_component, "", &mut expected_export_entries)
.await?;

let (_, object_keys, usage) = export_worker
.export_inner(
ExportFormat::Zip {
include_storage: false,
},
child_component,
)
.await?;
must_let!(let ExportObjectKeys::Zip(object_key) = object_keys);

// Check we can get the stored zip.
let storage_stream = storage
.get(&object_key)
.await?
.context("object missing from storage")?;
let stored_bytes = storage_stream.collect_as_bytes().await?;
let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?;
let mut zip_entries = BTreeMap::new();
let filenames: Vec<_> = zip_reader
.entries()
.into_iter()
.map(|entry| entry.filename().to_string())
.collect();
for (i, filename) in filenames.into_iter().enumerate() {
let entry_reader = zip_reader.entry_reader(i).await?;
let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?;
zip_entries.insert(filename, entry_contents);
}
assert_eq!(zip_entries, expected_export_entries);

let usage = usage.gather_user_stats();
assert!(usage.database_egress_size["messages"] > 0);
Ok(())
}

Expand Down Expand Up @@ -1138,9 +1222,12 @@ r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": fiel
);

let (_, object_keys, usage) = export_worker
.export_inner(ExportFormat::Zip {
include_storage: true,
})
.export_inner(
ExportFormat::Zip {
include_storage: true,
},
ComponentId::Root,
)
.await?;
must_let!(let ExportObjectKeys::Zip(object_key) = object_keys);

Expand Down Expand Up @@ -1199,9 +1286,12 @@ r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": fiel
db.commit(tx).await?;

let (_, object_keys, _) = export_worker
.export_inner(ExportFormat::Zip {
include_storage: false,
})
.export_inner(
ExportFormat::Zip {
include_storage: false,
},
ComponentId::test_user(),
)
.await?;
must_let!(let ExportObjectKeys::Zip(_ok) = object_keys);
Ok(())
Expand All @@ -1212,9 +1302,12 @@ r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": fiel
let DbFixtures { db, .. } = DbFixtures::new(&rt).await?;

// Requested
let requested_export = Export::requested(ExportFormat::Zip {
include_storage: false,
});
let requested_export = Export::requested(
ExportFormat::Zip {
include_storage: false,
},
ComponentId::test_user(),
);
let object: ConvexObject = requested_export.clone().try_into()?;
let deserialized_export = object.try_into()?;
assert_eq!(requested_export, deserialized_export);
Expand Down
7 changes: 6 additions & 1 deletion crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,7 @@ impl<RT: Runtime> Application<RT> {
&self,
identity: Identity,
format: ExportFormat,
component: ComponentId,
) -> anyhow::Result<()> {
anyhow::ensure!(identity.is_admin(), unauthorized_error("request_export"));
let snapshot = self.latest_snapshot()?;
Expand All @@ -1301,7 +1302,11 @@ impl<RT: Runtime> Application<RT> {
let export_requested = ExportWorker::export_in_state(&mut tx, "requested").await?;
let export_in_progress = ExportWorker::export_in_state(&mut tx, "in_progress").await?;
match (export_requested, export_in_progress) {
(None, None) => ExportsModel::new(&mut tx).insert_requested(format).await,
(None, None) => {
ExportsModel::new(&mut tx)
.insert_requested(format, component)
.await
},
_ => Err(
anyhow::anyhow!("Can only have one export requested or in progress at once")
.context(ErrorMetadata::bad_request(
Expand Down
22 changes: 15 additions & 7 deletions crates/local_backend/src/snapshot_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ use axum_extra::{
},
TypedHeader,
};
use common::http::{
extract::{
Path,
Query,
use common::{
components::ComponentId,
http::{
extract::{
Path,
Query,
},
HttpResponseError,
},
HttpResponseError,
};
use errors::ErrorMetadata;
use http::StatusCode;
Expand All @@ -43,17 +46,22 @@ const MAX_CACHE_AGE: Duration = Duration::from_secs(60 * 60 * 24 * 30);
pub struct RequestZipExport {
#[serde(default)]
include_storage: bool,
component: Option<String>,
}

#[minitrace::trace]
pub async fn request_zip_export(
State(st): State<LocalAppState>,
ExtractIdentity(identity): ExtractIdentity,
Query(RequestZipExport { include_storage }): Query<RequestZipExport>,
Query(RequestZipExport {
include_storage,
component,
}): Query<RequestZipExport>,
) -> Result<impl IntoResponse, HttpResponseError> {
must_be_admin_with_write_access(&identity)?;
let component = ComponentId::deserialize_from_string(component.as_deref())?;
st.application
.request_export(identity, ExportFormat::Zip { include_storage })
.request_export(identity, ExportFormat::Zip { include_storage }, component)
.await?;
Ok(StatusCode::OK)
}
Expand Down
12 changes: 10 additions & 2 deletions crates/model/src/exports/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::LazyLock;

use common::{
components::ComponentId,
document::{
ParsedDocument,
ResolvedDocument,
Expand Down Expand Up @@ -74,9 +75,16 @@ impl<'a, RT: Runtime> ExportsModel<'a, RT> {
Self { tx }
}

pub async fn insert_requested(&mut self, format: ExportFormat) -> anyhow::Result<()> {
pub async fn insert_requested(
&mut self,
format: ExportFormat,
component: ComponentId,
) -> anyhow::Result<()> {
SystemMetadataModel::new_global(self.tx)
.insert(&EXPORTS_TABLE, Export::requested(format).try_into()?)
.insert(
&EXPORTS_TABLE,
Export::requested(format, component).try_into()?,
)
.await?;
Ok(())
}
Expand Down
Loading

0 comments on commit daa709c

Please sign in to comment.