From daa709c86a616b3fa76ca7bf67dae2ee527355a4 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Tue, 27 Aug 2024 17:33:20 -0400 Subject: [PATCH] snapshot export component subtree (#29149) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit enable snapshot exports of a subtee of the component tree. this is especially useful if a component has been unmounted (deleted from source code) and you want to recover its data. Screenshot 2024-08-23 at 12 41 26 PM GitOrigin-RevId: 0e9ebd6d483c17f9c80c63c862ce3a7d0ebc7d63 --- crates/application/src/export_worker.rs | 197 ++++++++++++++------ crates/application/src/lib.rs | 7 +- crates/local_backend/src/snapshot_export.rs | 22 ++- crates/model/src/exports/mod.rs | 12 +- crates/model/src/exports/types.rs | 80 ++++++-- 5 files changed, 244 insertions(+), 74 deletions(-) diff --git a/crates/application/src/export_worker.rs b/crates/application/src/export_worker.rs index 6490310b..061b81e8 100644 --- a/crates/application/src/export_worker.rs +++ b/crates/application/src/export_worker.rs @@ -333,13 +333,14 @@ impl ExportWorker { async fn export_inner( &mut self, format: ExportFormat, + component: ComponentId, ) -> anyhow::Result<(Timestamp, ExportObjectKeys, FunctionUsageTracker)> { tracing::info!("Beginning snapshot export..."); let storage = &self.storage; let (ts, tables, by_id_indexes, system_tables, component_tree) = { let mut tx = self.database.begin(Identity::system()).await?; let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?; - let component_tree = ComponentTree::new(&mut tx, ComponentId::Root).await?; + let component_tree = ComponentTree::new(&mut tx, component).await?; let snapshot = self.database.snapshot(tx.begin_timestamp())?; let tables: BTreeMap<_, _> = snapshot .table_registry @@ -619,7 +620,9 @@ impl ExportWorker { &mut self, export: ParsedDocument, ) -> anyhow::Result<()> { - let (ts, object_keys, usage) = self.export_inner(export.format()).await?; + let (ts, object_keys, usage) = self + .export_inner(export.format(), export.component()) + .await?; let mut tx = self.database.begin(Identity::system()).await?; let completed_export = @@ -816,6 +819,7 @@ mod tests { use database::{ test_helpers::DbFixtures, BootstrapComponentsModel, + Database, TableModel, UserFacingModel, }; @@ -949,9 +953,12 @@ mod tests { db.commit(tx).await?; } let (_, object_keys, usage) = export_worker - .export_inner(ExportFormat::Zip { - include_storage: true, - }) + .export_inner( + ExportFormat::Zip { + include_storage: true, + }, + ComponentId::Root, + ) .await?; must_let!(let ExportObjectKeys::Zip(object_key) = object_keys); @@ -983,6 +990,53 @@ mod tests { Ok(()) } + async fn write_test_data_in_component( + db: &Database, + component: ComponentId, + path_prefix: &str, + expected_export_entries: &mut BTreeMap, + ) -> anyhow::Result<()> { + expected_export_entries.insert( + format!("{path_prefix}_tables/documents.jsonl"), + format!("{}\n", json!({"name": "messages", "id": 10001}),), + ); + // Write to tables in each component + let table: TableName = str::parse("messages")?; + let mut tx = db.begin(Identity::system()).await?; + let id = UserFacingModel::new(&mut tx, component.into()) + .insert(table, assert_obj!("channel" => "c", "text" => path_prefix)) + .await?; + let doc = UserFacingModel::new(&mut tx, component.into()) + .get(id, None) + .await? + .unwrap(); + let tablet_id = tx + .table_mapping() + .namespace(component.into()) + .number_to_tablet()(doc.table())?; + let doc = doc.to_resolved(tablet_id); + let expected_documents = format!( + "{}\n", + serde_json::to_string(&doc.export(ValueFormat::ConvexCleanJSON))? + ); + let expected_generated_schema = format!( + "{}\n", + json!(format!( + r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": field_name}}"#, + )) + ); + expected_export_entries.insert( + format!("{path_prefix}messages/documents.jsonl"), + expected_documents.clone(), + ); + expected_export_entries.insert( + format!("{path_prefix}messages/generated_schema.jsonl"), + expected_generated_schema.clone(), + ); + db.commit(tx).await?; + Ok(()) + } + #[convex_macro::test_runtime] async fn test_export_components(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; @@ -1008,47 +1062,17 @@ mod tests { ("", ComponentId::Root), ("_components/component/", child_component), ] { - expected_export_entries.insert( - format!("{path_prefix}_tables/documents.jsonl"), - format!("{}\n", json!({"name": "messages", "id": 10001}),), - ); - // Write to tables in each component - let table: TableName = str::parse("messages")?; - let mut tx = db.begin(Identity::system()).await?; - let id = UserFacingModel::new(&mut tx, component.into()) - .insert(table, assert_obj!("channel" => "c", "text" => path_prefix)) + write_test_data_in_component(&db, component, path_prefix, &mut expected_export_entries) .await?; - let doc = UserFacingModel::new(&mut tx, component.into()) - .get(id, None) - .await? - .unwrap(); - let tablet_id = tx - .table_mapping() - .namespace(component.into()) - .number_to_tablet()(doc.table())?; - let doc = doc.to_resolved(tablet_id); - expected_export_entries.insert( - format!("{path_prefix}messages/documents.jsonl"), - format!( - "{}\n", - serde_json::to_string(&doc.export(ValueFormat::ConvexCleanJSON))? - ), - ); - expected_export_entries.insert( - format!("{path_prefix}messages/generated_schema.jsonl"), - format!( - "{}\n", - json!(format!( -r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": field_name}}"#, - )) - ), - ); - db.commit(tx).await?; } + let (_, object_keys, usage) = export_worker - .export_inner(ExportFormat::Zip { - include_storage: false, - }) + .export_inner( + ExportFormat::Zip { + include_storage: false, + }, + ComponentId::Root, + ) .await?; must_let!(let ExportObjectKeys::Zip(object_key) = object_keys); @@ -1074,7 +1098,67 @@ r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": fiel let usage = usage.gather_user_stats(); assert!(usage.database_egress_size["messages"] > 0); + Ok(()) + } + + #[convex_macro::test_runtime] + async fn test_export_child_component(rt: TestRuntime) -> anyhow::Result<()> { + let application = Application::new_for_tests(&rt).await?; + application + .load_component_tests_modules("with-schema") + .await?; + let db = application.database().clone(); + let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let mut export_worker = + ExportWorker::new_test(rt, db.clone(), storage.clone(), file_storage); + + let mut expected_export_entries = BTreeMap::new(); + + expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string()); + + let mut tx = db.begin(Identity::system()).await?; + let (_, child_component) = BootstrapComponentsModel::new(&mut tx) + .component_path_to_ids("component".parse()?) + .await?; + + // Data in root component doesn't matter. + write_test_data_in_component(&db, ComponentId::Root, "", &mut BTreeMap::new()).await?; + write_test_data_in_component(&db, child_component, "", &mut expected_export_entries) + .await?; + + let (_, object_keys, usage) = export_worker + .export_inner( + ExportFormat::Zip { + include_storage: false, + }, + child_component, + ) + .await?; + must_let!(let ExportObjectKeys::Zip(object_key) = object_keys); + + // Check we can get the stored zip. + let storage_stream = storage + .get(&object_key) + .await? + .context("object missing from storage")?; + let stored_bytes = storage_stream.collect_as_bytes().await?; + let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?; + let mut zip_entries = BTreeMap::new(); + let filenames: Vec<_> = zip_reader + .entries() + .into_iter() + .map(|entry| entry.filename().to_string()) + .collect(); + for (i, filename) in filenames.into_iter().enumerate() { + let entry_reader = zip_reader.entry_reader(i).await?; + let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?; + zip_entries.insert(filename, entry_contents); + } + assert_eq!(zip_entries, expected_export_entries); + let usage = usage.gather_user_stats(); + assert!(usage.database_egress_size["messages"] > 0); Ok(()) } @@ -1138,9 +1222,12 @@ r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": fiel ); let (_, object_keys, usage) = export_worker - .export_inner(ExportFormat::Zip { - include_storage: true, - }) + .export_inner( + ExportFormat::Zip { + include_storage: true, + }, + ComponentId::Root, + ) .await?; must_let!(let ExportObjectKeys::Zip(object_key) = object_keys); @@ -1199,9 +1286,12 @@ r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": fiel db.commit(tx).await?; let (_, object_keys, _) = export_worker - .export_inner(ExportFormat::Zip { - include_storage: false, - }) + .export_inner( + ExportFormat::Zip { + include_storage: false, + }, + ComponentId::test_user(), + ) .await?; must_let!(let ExportObjectKeys::Zip(_ok) = object_keys); Ok(()) @@ -1212,9 +1302,12 @@ r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": fiel let DbFixtures { db, .. } = DbFixtures::new(&rt).await?; // Requested - let requested_export = Export::requested(ExportFormat::Zip { - include_storage: false, - }); + let requested_export = Export::requested( + ExportFormat::Zip { + include_storage: false, + }, + ComponentId::test_user(), + ); let object: ConvexObject = requested_export.clone().try_into()?; let deserialized_export = object.try_into()?; assert_eq!(requested_export, deserialized_export); diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 55ea1c7e..cec30b74 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -1286,6 +1286,7 @@ impl Application { &self, identity: Identity, format: ExportFormat, + component: ComponentId, ) -> anyhow::Result<()> { anyhow::ensure!(identity.is_admin(), unauthorized_error("request_export")); let snapshot = self.latest_snapshot()?; @@ -1301,7 +1302,11 @@ impl Application { let export_requested = ExportWorker::export_in_state(&mut tx, "requested").await?; let export_in_progress = ExportWorker::export_in_state(&mut tx, "in_progress").await?; match (export_requested, export_in_progress) { - (None, None) => ExportsModel::new(&mut tx).insert_requested(format).await, + (None, None) => { + ExportsModel::new(&mut tx) + .insert_requested(format, component) + .await + }, _ => Err( anyhow::anyhow!("Can only have one export requested or in progress at once") .context(ErrorMetadata::bad_request( diff --git a/crates/local_backend/src/snapshot_export.rs b/crates/local_backend/src/snapshot_export.rs index 56c477bf..e8be5ac2 100644 --- a/crates/local_backend/src/snapshot_export.rs +++ b/crates/local_backend/src/snapshot_export.rs @@ -14,12 +14,15 @@ use axum_extra::{ }, TypedHeader, }; -use common::http::{ - extract::{ - Path, - Query, +use common::{ + components::ComponentId, + http::{ + extract::{ + Path, + Query, + }, + HttpResponseError, }, - HttpResponseError, }; use errors::ErrorMetadata; use http::StatusCode; @@ -43,17 +46,22 @@ const MAX_CACHE_AGE: Duration = Duration::from_secs(60 * 60 * 24 * 30); pub struct RequestZipExport { #[serde(default)] include_storage: bool, + component: Option, } #[minitrace::trace] pub async fn request_zip_export( State(st): State, ExtractIdentity(identity): ExtractIdentity, - Query(RequestZipExport { include_storage }): Query, + Query(RequestZipExport { + include_storage, + component, + }): Query, ) -> Result { must_be_admin_with_write_access(&identity)?; + let component = ComponentId::deserialize_from_string(component.as_deref())?; st.application - .request_export(identity, ExportFormat::Zip { include_storage }) + .request_export(identity, ExportFormat::Zip { include_storage }, component) .await?; Ok(StatusCode::OK) } diff --git a/crates/model/src/exports/mod.rs b/crates/model/src/exports/mod.rs index 244ad225..58b3490e 100644 --- a/crates/model/src/exports/mod.rs +++ b/crates/model/src/exports/mod.rs @@ -1,6 +1,7 @@ use std::sync::LazyLock; use common::{ + components::ComponentId, document::{ ParsedDocument, ResolvedDocument, @@ -74,9 +75,16 @@ impl<'a, RT: Runtime> ExportsModel<'a, RT> { Self { tx } } - pub async fn insert_requested(&mut self, format: ExportFormat) -> anyhow::Result<()> { + pub async fn insert_requested( + &mut self, + format: ExportFormat, + component: ComponentId, + ) -> anyhow::Result<()> { SystemMetadataModel::new_global(self.tx) - .insert(&EXPORTS_TABLE, Export::requested(format).try_into()?) + .insert( + &EXPORTS_TABLE, + Export::requested(format, component).try_into()?, + ) .await?; Ok(()) } diff --git a/crates/model/src/exports/types.rs b/crates/model/src/exports/types.rs index 7116f259..c397bf8a 100644 --- a/crates/model/src/exports/types.rs +++ b/crates/model/src/exports/types.rs @@ -4,6 +4,7 @@ use std::{ }; use common::{ + components::ComponentId, obj, types::ObjectKey, }; @@ -22,12 +23,14 @@ const EXPORT_RETENTION: u64 = 14 * 24 * 60 * 60 * 1000000000; // 14 days pub enum Export { Requested { format: ExportFormat, + component: ComponentId, }, InProgress { /// Timestamp when the first attempt /// at the Export started. start_ts: Timestamp, format: ExportFormat, + component: ComponentId, }, Completed { /// Timestamp for the successful (final) attempt at Export. @@ -40,6 +43,7 @@ pub enum Export { object_keys: ExportObjectKeys, /// Format of the export format: ExportFormat, + component: ComponentId, }, Failed { /// Timestamp for the failed (final) attempt at Export. @@ -47,18 +51,28 @@ pub enum Export { /// Timestamp when the Export failed failed_ts: Timestamp, format: ExportFormat, + component: ComponentId, }, } impl Export { pub fn format(&self) -> ExportFormat { match self { - Export::Requested { format } + Export::Requested { format, .. } | Export::InProgress { format, .. } | Export::Completed { format, .. } | Export::Failed { format, .. } => *format, } } + + pub fn component(&self) -> ComponentId { + match self { + Export::Requested { component, .. } + | Export::InProgress { component, .. } + | Export::Completed { component, .. } + | Export::Failed { component, .. } => *component, + } + } } #[derive(Copy, Clone, Debug, PartialEq)] @@ -69,15 +83,16 @@ pub enum ExportFormat { } impl Export { - pub fn requested(format: ExportFormat) -> Self { - Self::Requested { format } + pub fn requested(format: ExportFormat, component: ComponentId) -> Self { + Self::Requested { format, component } } pub fn in_progress(self, ts: Timestamp) -> anyhow::Result { match self { - Self::Requested { format } => Ok(Self::InProgress { + Self::Requested { format, component } => Ok(Self::InProgress { start_ts: ts, format, + component, }), Self::Completed { .. } | Self::InProgress { .. } | Self::Failed { .. } => Err( anyhow::anyhow!("Can only begin an export that is requested"), @@ -93,7 +108,9 @@ impl Export { ) -> anyhow::Result { let expiration_ts = Into::::into(complete_ts) + EXPORT_RETENTION; match self { - Self::InProgress { format, .. } => { + Self::InProgress { + format, component, .. + } => { anyhow::ensure!(snapshot_ts <= complete_ts); Ok(Self::Completed { start_ts: snapshot_ts, @@ -101,20 +118,26 @@ impl Export { expiration_ts, object_keys, format, + component, }) }, - Self::Requested { format: _ } + Self::Requested { + format: _, + component: _, + } | Self::Completed { start_ts: _, complete_ts: _, expiration_ts: _, object_keys: _, format: _, + component: _, } | Self::Failed { start_ts: _, failed_ts: _, format: _, + component: _, } => Err(anyhow::anyhow!( "Can only complete an export that is in_progress" )), @@ -123,26 +146,34 @@ impl Export { pub fn failed(self, snapshot_ts: Timestamp, failed_ts: Timestamp) -> anyhow::Result { match self { - Self::InProgress { format, .. } => { + Self::InProgress { + format, component, .. + } => { anyhow::ensure!(snapshot_ts <= failed_ts); Ok(Self::Failed { start_ts: snapshot_ts, failed_ts, format, + component, }) }, - Self::Requested { format: _ } + Self::Requested { + format: _, + component: _, + } | Self::Completed { start_ts: _, complete_ts: _, expiration_ts: _, object_keys: _, format: _, + component: _, } | Self::Failed { start_ts: _, failed_ts: _, format: _, + component: _, } => Err(anyhow::anyhow!( "Can only fail an export that is in_progress" )), @@ -153,10 +184,14 @@ impl Export { impl Display for Export { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Self::Requested { format: _ } => write!(f, "requested"), + Self::Requested { + format: _, + component: _, + } => write!(f, "requested"), Self::InProgress { start_ts: _, format: _, + component: _, } => write!(f, "in_progress"), Self::Completed { start_ts: _, @@ -164,11 +199,13 @@ impl Display for Export { expiration_ts: _, object_keys: _, format: _, + component: _, } => write!(f, "completed"), Self::Failed { start_ts: _, failed_ts: _, format: _, + component: _, } => write!(f, "failed"), } } @@ -185,6 +222,7 @@ impl TryFrom for ConvexObject { expiration_ts, object_keys, format, + component, } => { let mut o = btreemap! { "start_ts".parse()? => val!(i64::from(start_ts)), @@ -192,6 +230,7 @@ impl TryFrom for ConvexObject { "expiration_ts".parse()? => val!(expiration_ts as i64), "state".parse()? => val!("completed"), "format".parse()? => val!(format), + "component".parse()? => val!(component.serialize_to_string()), }; match object_keys { ExportObjectKeys::Zip(object_key) => o.insert( @@ -201,27 +240,35 @@ impl TryFrom for ConvexObject { }; ConvexObject::try_from(o) }, - Export::Requested { format } => obj!( + Export::Requested { format, component } => obj!( "state" => "requested", "format" => format, + "component" => component.serialize_to_string(), ), - Export::InProgress { start_ts, format } => { + Export::InProgress { + start_ts, + format, + component, + } => { obj!( "state" => "in_progress", "start_ts" => i64::from(start_ts), "format" => format, + "component" => component.serialize_to_string(), ) }, Export::Failed { start_ts, failed_ts, format, + component, } => { obj!( "state" => "failed", "start_ts" => i64::from(start_ts), "failed_ts" => i64::from(failed_ts), "format" => format, + "component" => component.serialize_to_string(), ) }, } @@ -236,9 +283,15 @@ impl TryFrom for Export { Some(format) => ExportFormat::try_from(format.clone())?, _ => anyhow::bail!("invalid format: {:?}", o), }; + let component = match o.get("component") { + Some(ConvexValue::String(s)) => ComponentId::deserialize_from_string(Some(s))?, + Some(ConvexValue::Null) => ComponentId::Root, + None => ComponentId::Root, + _ => anyhow::bail!("invalid component: {:?}", o), + }; match o.get("state") { Some(ConvexValue::String(s)) => match &s[..] { - "requested" => Ok(Export::Requested { format }), + "requested" => Ok(Export::Requested { format, component }), "in_progress" => { if let Some(start_ts_value) = o.get("start_ts") && let ConvexValue::Int64(start_ts) = start_ts_value @@ -246,6 +299,7 @@ impl TryFrom for Export { Ok(Export::InProgress { start_ts: (*start_ts).try_into()?, format, + component, }) } else { Err(anyhow::anyhow!("No start_ts found for in_progress export.")) @@ -276,6 +330,7 @@ impl TryFrom for Export { complete_ts, object_keys, format, + component, }) }, "failed" => { @@ -291,6 +346,7 @@ impl TryFrom for Export { start_ts, failed_ts, format, + component, }) }, _ => Err(anyhow::anyhow!("Invalid export state {s}")),