From 25598bb167726b878636c6b54db29d73903839ee Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Fri, 16 Aug 2024 19:22:27 -0400 Subject: [PATCH] [components] snapshot import (#28951) parse the component path out of the filenames in ZIP files. use this component path to write documents to tables in different components. there's a few missing features that i'll get to later: 1. importing single table csv, json, or jsonl has no way to specify component path so it always imports into root component 2. minor race conditions that should be fixed with ComponentRegistry 3. nice error messages if the component path doesn't exist GitOrigin-RevId: 8efcc413b194eeccea43c56c67afc051dbd9271b --- crates/application/src/lib.rs | 3 +- crates/application/src/snapshot_import.rs | 588 +++++++++++++-------- crates/model/src/snapshot_imports/mod.rs | 30 +- crates/model/src/snapshot_imports/types.rs | 15 +- 4 files changed, 397 insertions(+), 239 deletions(-) diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 61a90be7..22c3985c 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -47,6 +47,7 @@ use common::{ CanonicalizedComponentModulePath, ComponentDefinitionPath, ComponentId, + ComponentPath, PublicFunctionPath, }, document::{ @@ -2225,7 +2226,7 @@ impl Application { pub async fn clear_tables( &self, identity: &Identity, - table_names: Vec, + table_names: Vec<(ComponentPath, TableName)>, ) -> anyhow::Result { clear_tables(self, identity, table_names).await } diff --git a/crates/application/src/snapshot_import.rs b/crates/application/src/snapshot_import.rs index 2be984a0..16d85530 100644 --- a/crates/application/src/snapshot_import.rs +++ b/crates/application/src/snapshot_import.rs @@ -33,6 +33,11 @@ use common::{ schema::SchemaState, tables::TABLES_TABLE, }, + components::{ + ComponentId, + ComponentName, + ComponentPath, + }, document::{ CreationTime, ParsedDocument, @@ -57,12 +62,14 @@ use common::{ }, }; use database::{ + BootstrapComponentsModel, Database, ImportFacingModel, IndexModel, SchemaModel, TableModel, Transaction, + SCHEMAS_TABLE, }; use errors::{ ErrorMetadata, @@ -364,22 +371,25 @@ impl SnapshotImportWorker { let mode = snapshot_import.mode; let (_, mut objects) = self.parse_import(snapshot_import.id()).await?; // Find all tables being written to. - let mut count_by_table: BTreeMap = BTreeMap::new(); - let mut tables_missing_id_field: BTreeSet = BTreeSet::new(); + let mut count_by_table: BTreeMap<(ComponentPath, TableName), u64> = BTreeMap::new(); + let mut tables_missing_id_field: BTreeSet<(ComponentPath, TableName)> = BTreeSet::new(); let mut current_table = None; let mut lineno = 0; while let Some(object) = objects.try_next().await? { match object { - ImportUnit::NewTable(table_name) => { + ImportUnit::NewTable(component_path, table_name) => { lineno = 0; - count_by_table.entry(table_name.clone()).or_default(); - current_table = Some(table_name); + count_by_table + .entry((component_path.clone(), table_name.clone())) + .or_default(); + current_table = Some((component_path, table_name)); }, ImportUnit::Object(exported_value) => { lineno += 1; - let Some(current_table) = ¤t_table else { + let Some(current_component_table) = ¤t_table else { continue; }; + let (current_component, current_table) = current_component_table; if current_table == &*TABLES_TABLE { let exported_object = exported_value .as_object() @@ -396,15 +406,17 @@ impl SnapshotImportWorker { let table_name = table_name .parse() .map_err(|e| ImportError::InvalidName(table_name.to_string(), e))?; - count_by_table.entry(table_name).or_default(); + count_by_table + .entry((current_component.clone(), table_name)) + .or_default(); } - if let Some(count) = count_by_table.get_mut(current_table) { + if let Some(count) = count_by_table.get_mut(current_component_table) { *count += 1; } - if !tables_missing_id_field.contains(current_table) + if !tables_missing_id_field.contains(current_component_table) && exported_value.get(&**ID_FIELD).is_none() { - tables_missing_id_field.insert(current_table.clone()); + tables_missing_id_field.insert(current_component_table.clone()); } }, // Ignore storage file chunks and generated schemas. @@ -421,10 +433,18 @@ impl SnapshotImportWorker { } let mut table_changes = BTreeMap::new(); let db_snapshot = self.database.latest_snapshot()?; - for (table_name, count_importing) in count_by_table.iter() { + for (component_and_table, count_importing) in count_by_table.iter() { + let (component_path, table_name) = component_and_table; + let component_id = { + // TODO(lee) read this from db_snapshot once we have ComponentRegistry. + let mut tx = self.database.begin(Identity::system()).await?; + let (_, component_id) = BootstrapComponentsModel::new(&mut tx) + .component_path_to_ids(component_path.clone()) + .await?; + component_id + }; if !table_name.is_system() { - let table_summary = - db_snapshot.table_summary(TableNamespace::by_component_TODO(), table_name); + let table_summary = db_snapshot.table_summary(component_id.into(), table_name); let to_delete = match mode { ImportMode::Replace => { // Overwriting nonempty user table. @@ -437,19 +457,19 @@ impl SnapshotImportWorker { ImportMode::RequireEmpty => 0, }; table_changes.insert( - table_name.clone(), + component_and_table.clone(), TableChange { added: *count_importing, deleted: to_delete, existing: table_summary.num_values(), unit: "", - is_missing_id_field: tables_missing_id_field.contains(table_name), + is_missing_id_field: tables_missing_id_field.contains(component_and_table), }, ); } if table_name == &*FILE_STORAGE_VIRTUAL_TABLE { - let table_summary = db_snapshot - .table_summary(TableNamespace::by_component_TODO(), &FILE_STORAGE_TABLE); + let table_summary = + db_snapshot.table_summary(component_id.into(), &FILE_STORAGE_TABLE); let to_delete = match mode { ImportMode::Replace => { // Overwriting nonempty file storage. @@ -462,13 +482,13 @@ impl SnapshotImportWorker { ImportMode::RequireEmpty => 0, }; table_changes.insert( - table_name.clone(), + component_and_table.clone(), TableChange { added: *count_importing, deleted: to_delete, existing: table_summary.num_values(), unit: " files", - is_missing_id_field: tables_missing_id_field.contains(table_name), + is_missing_id_field: tables_missing_id_field.contains(component_and_table), }, ); } @@ -490,7 +510,7 @@ impl SnapshotImportWorker { "delete".to_string(), )]; for ( - table_name, + (component_path, table_name), TableChange { added, deleted, @@ -515,6 +535,7 @@ impl SnapshotImportWorker { ), )); new_checkpoints.push(ImportTableCheckpoint { + component_path, display_table_name: table_name.clone(), tablet_id: None, num_rows_written: 0, @@ -703,7 +724,7 @@ impl SnapshotImportWorker { &self, import_id: ResolvedDocumentId, ) -> anyhow::Result<( - Vec>, + SchemasForImport, Peekable>>, )> { let (object_key, format) = { @@ -807,19 +828,25 @@ impl ImportError { #[derive(Debug)] enum ImportUnit { Object(JsonValue), - NewTable(TableName), - GeneratedSchema(TableName, GeneratedSchema), + NewTable(ComponentPath, TableName), + GeneratedSchema( + ComponentPath, + TableName, + GeneratedSchema, + ), StorageFileChunk(DeveloperDocumentId, Bytes), } +static COMPONENT_NAME_PATTERN: LazyLock = + LazyLock::new(|| Regex::new(r"^(.*/)?_components/([^/]+)/$").unwrap()); static GENERATED_SCHEMA_PATTERN: LazyLock = - LazyLock::new(|| Regex::new(r"(?:.*/)?([^/]+)/generated_schema\.jsonl$").unwrap()); + LazyLock::new(|| Regex::new(r"^(.*/)?([^/]+)/generated_schema\.jsonl$").unwrap()); static DOCUMENTS_PATTERN: LazyLock = - LazyLock::new(|| Regex::new(r"(?:.*/)?([^/]+)/documents\.jsonl$").unwrap()); + LazyLock::new(|| Regex::new(r"^(.*/)?([^/]+)/documents\.jsonl$").unwrap()); // _storage/(ID) with optional ignored prefix and extension like // snapshot/_storage/(ID).png static STORAGE_FILE_PATTERN: LazyLock = - LazyLock::new(|| Regex::new(r"(?:.*/)?_storage/([^/.]+)(?:\.[^/]+)?$").unwrap()); + LazyLock::new(|| Regex::new(r"(.*/)?_storage/([^/.]+)(?:\.[^/]+)?$").unwrap()); fn map_zip_error(e: ZipError) -> anyhow::Error { match e { @@ -836,6 +863,15 @@ fn map_zip_error(e: ZipError) -> anyhow::Error { /// multiple times to read the file multiple times, for cases where the file /// must be read out of order, e.g. because the _tables table must be imported /// first. +/// Objects are yielded with the following guarantees: +/// 1. When an Object is yielded, it is in the table corresponding to the most +/// recently yielded NewTable. +/// 2. When a StorageFileChunk is yielded, it is in the _storage table +/// corresponding to the most recently yielded NewTable. +/// 3. All StorageFileChunks for a single file are yielded contiguously, in +/// order. +/// 4. If a table has a GeneratedSchema, the GeneratedSchema will be yielded +/// before any Objects in that table. #[try_stream(ok = ImportUnit, error = anyhow::Error)] async fn parse_objects<'a, Fut>(format: ImportFormat, stream_body: impl Fn() -> Fut + 'a) where @@ -844,7 +880,7 @@ where match format { ImportFormat::Csv(table_name) => { let reader = stream_body().await?; - yield ImportUnit::NewTable(table_name); + yield ImportUnit::NewTable(ComponentPath::TODO(), table_name); let mut reader = csv_async::AsyncReader::from_reader(reader); if !reader.has_headers() { anyhow::bail!(ImportError::CsvMissingHeaders); @@ -881,7 +917,7 @@ where }, ImportFormat::JsonLines(table_name) => { let reader = stream_body().await?; - yield ImportUnit::NewTable(table_name); + yield ImportUnit::NewTable(ComponentPath::TODO(), table_name); let mut reader = BufReader::new(reader); let mut line = String::new(); let mut lineno = 1; @@ -900,7 +936,7 @@ where }, ImportFormat::JsonArray(table_name) => { let reader = stream_body().await?; - yield ImportUnit::NewTable(table_name); + yield ImportUnit::NewTable(ComponentPath::TODO(), table_name); let mut buf = Vec::new(); let mut truncated_reader = reader.take((*TRANSACTION_MAX_USER_WRITE_SIZE_BYTES as u64) + 1); @@ -935,39 +971,34 @@ where // Each generated schema must be parsed before the corresponding // table/documents.jsonl file, so we correctly infer types from // export-formatted JsonValues. - let mut table_metadata = vec![]; - let mut storage_metadata = vec![]; - let mut generated_schemas = vec![]; + let mut table_metadata: BTreeMap<_, Vec<_>> = BTreeMap::new(); + let mut storage_metadata: BTreeMap<_, Vec<_>> = BTreeMap::new(); + let mut generated_schemas: BTreeMap<_, Vec<_>> = BTreeMap::new(); for (i, filename) in filenames.iter().enumerate() { let documents_table_name = parse_documents_jsonl_table_name(filename)?; - if let Some(table_name) = &documents_table_name - && table_name == &*TABLES_TABLE + if let Some((component_path, table_name)) = documents_table_name.clone() + && table_name == *TABLES_TABLE { let entry_reader = zip_reader.entry_reader(i).await.map_err(map_zip_error)?; - table_metadata = parse_documents_jsonl(entry_reader).try_collect().await?; - } else if let Some(table_name) = &documents_table_name - && table_name == &*FILE_STORAGE_VIRTUAL_TABLE + table_metadata.insert( + component_path, + parse_documents_jsonl(entry_reader).try_collect().await?, + ); + } else if let Some((component_path, table_name)) = documents_table_name + && table_name == *FILE_STORAGE_VIRTUAL_TABLE { let entry_reader = zip_reader.entry_reader(i).await.map_err(map_zip_error)?; - storage_metadata = - parse_documents_jsonl(entry_reader).try_collect().await?; - } else if let Some(generated_schema_captures) = - GENERATED_SCHEMA_PATTERN.captures(filename) + storage_metadata.insert( + component_path, + parse_documents_jsonl(entry_reader).try_collect().await?, + ); + } else if let Some((component_path, table_name)) = + parse_table_filename(filename, &GENERATED_SCHEMA_PATTERN)? { let entry_reader = zip_reader.entry_reader(i).await.map_err(map_zip_error)?; - let table_name_str = generated_schema_captures - .get(1) - .expect("regex has one capture group") - .as_str(); - let table_name: TableName = table_name_str.parse().map_err(|e| { - ErrorMetadata::bad_request( - "InvalidTableName", - format!("table name '{table_name_str}' invalid: {e}"), - ) - })?; tracing::info!( "importing zip file containing generated_schema {table_name}" ); @@ -975,63 +1006,61 @@ where let generated_schema = parse_generated_schema(filename, entry_reader).await?; generated_schemas - .push(ImportUnit::GeneratedSchema(table_name, generated_schema)); + .entry(component_path.clone()) + .or_default() + .push(ImportUnit::GeneratedSchema( + component_path, + table_name, + generated_schema, + )); } } - for table_unit in table_metadata { + for table_unit in table_metadata.into_values().flatten() { yield table_unit; } - if !storage_metadata.is_empty() { - for storage_unit in storage_metadata { - yield storage_unit; - } - for (i, filename) in filenames.iter().enumerate() { - if let Some(storage_file_captures) = STORAGE_FILE_PATTERN.captures(filename) - { - let storage_id_str = storage_file_captures - .get(1) - .expect("regex has one capture group") - .as_str(); - if storage_id_str == "documents" { - continue; - } - let entry_reader = - zip_reader.entry_reader(i).await.map_err(map_zip_error)?; - let storage_id = - DeveloperDocumentId::decode(storage_id_str).map_err(|e| { - ErrorMetadata::bad_request( - "InvalidStorageId", - format!("_storage id '{storage_id_str}' invalid: {e}"), - ) - })?; - tracing::info!( - "importing zip file containing storage file {}", - storage_id.encode() - ); - let mut entry_reader = entry_reader.compat(); - let mut buf = [0u8; 1024]; - while let bytes_read = entry_reader.read(&mut buf).await? - && bytes_read > 0 + for generated_schema_unit in generated_schemas.into_values().flatten() { + yield generated_schema_unit; + } + for (component_path, storage_metadata) in storage_metadata { + if !storage_metadata.is_empty() { + // Yield NewTable for _storage and Object for each storage file's metadata. + for storage_unit in storage_metadata { + yield storage_unit; + } + // Yield StorageFileChunk for each file in this component. + for (i, filename) in filenames.iter().enumerate() { + if let Some((file_component_path, storage_id)) = + parse_storage_filename(filename)? + && file_component_path == component_path { - yield ImportUnit::StorageFileChunk( - storage_id, - Bytes::copy_from_slice(&buf[..bytes_read]), + let entry_reader = + zip_reader.entry_reader(i).await.map_err(map_zip_error)?; + tracing::info!( + "importing zip file containing storage file {}", + storage_id.encode() ); + let mut entry_reader = entry_reader.compat(); + let mut buf = [0u8; 1024]; + while let bytes_read = entry_reader.read(&mut buf).await? + && bytes_read > 0 + { + yield ImportUnit::StorageFileChunk( + storage_id, + Bytes::copy_from_slice(&buf[..bytes_read]), + ); + } + // In case it's an empty file, make sure we send at + // least one chunk. + yield ImportUnit::StorageFileChunk(storage_id, Bytes::new()); } - // In case it's an empty file, make sure we send at - // least one chunk. - yield ImportUnit::StorageFileChunk(storage_id, Bytes::new()); } } } - for generated_schema_unit in generated_schemas { - yield generated_schema_unit; - } } // Second pass: user tables. for (i, filename) in filenames.iter().enumerate() { - if let Some(table_name) = parse_documents_jsonl_table_name(filename)? + if let Some((_, table_name)) = parse_documents_jsonl_table_name(filename)? && !table_name.is_system() { let entry_reader = zip_reader.entry_reader(i).await.map_err(map_zip_error)?; @@ -1046,13 +1075,36 @@ where } } -fn parse_documents_jsonl_table_name(filename: &str) -> anyhow::Result> { - DOCUMENTS_PATTERN - .captures(filename) - .map(|captures| { +fn parse_component_path(mut filename: &str) -> anyhow::Result { + let mut component_names = Vec::new(); + while let Some(captures) = COMPONENT_NAME_PATTERN.captures(filename) { + filename = captures.get(1).map_or("", |c| c.as_str()); + let component_name_str = captures + .get(2) + .expect("regex has two capture groups") + .as_str(); + let component_name: ComponentName = component_name_str.parse().map_err(|e| { + ErrorMetadata::bad_request( + "InvalidComponentName", + format!("component name '{component_name_str}' invalid: {e}"), + ) + })?; + component_names.push(component_name); + } + component_names.reverse(); + Ok(ComponentPath::from(component_names)) +} + +fn parse_table_filename( + filename: &str, + regex: &Regex, +) -> anyhow::Result> { + match regex.captures(filename) { + None => Ok(None), + Some(captures) => { let table_name_str = captures - .get(1) - .expect("regex has one capture group") + .get(2) + .expect("regex has two capture groups") .as_str(); let table_name = table_name_str.parse().map_err(|e| { ErrorMetadata::bad_request( @@ -1060,17 +1112,52 @@ fn parse_documents_jsonl_table_name(filename: &str) -> anyhow::Result anyhow::Result> { + match STORAGE_FILE_PATTERN.captures(filename) { + None => Ok(None), + Some(captures) => { + let storage_id_str = captures + .get(2) + .expect("regex has two capture groups") + .as_str(); + if storage_id_str == "documents" { + return Ok(None); + } + let storage_id = DeveloperDocumentId::decode(storage_id_str).map_err(|e| { + ErrorMetadata::bad_request( + "InvalidStorageId", + format!("_storage id '{storage_id_str}' invalid: {e}"), + ) + })?; + let prefix = captures.get(1).map_or("", |c| c.as_str()); + let component_path = parse_component_path(prefix)?; + Ok(Some((component_path, storage_id))) + }, + } +} + +fn parse_documents_jsonl_table_name( + filename: &str, +) -> anyhow::Result> { + parse_table_filename(filename, &DOCUMENTS_PATTERN) } #[try_stream(ok = ImportUnit, error = anyhow::Error)] async fn parse_documents_jsonl(entry_reader: ZipEntryReader<'_, R>) { - let table_name = parse_documents_jsonl_table_name(entry_reader.entry().filename())? - .context("expected documents.jsonl file")?; + let (component_path, table_name) = + parse_documents_jsonl_table_name(entry_reader.entry().filename())? + .context("expected documents.jsonl file")?; tracing::info!("importing zip file containing table {table_name}"); - yield ImportUnit::NewTable(table_name); + yield ImportUnit::NewTable(component_path, table_name); let mut reader = BufReader::new(entry_reader.compat()); let mut line = String::new(); let mut lineno = 1; @@ -1351,7 +1438,7 @@ pub async fn do_import( pub async fn clear_tables( application: &Application, identity: &Identity, - table_names: Vec, + table_names: Vec<(ComponentPath, TableName)>, ) -> anyhow::Result { let usage = FunctionUsageTracker::new(); @@ -1360,12 +1447,9 @@ pub async fn clear_tables( schemas_for_import(&mut tx).await? }; - let objects = stream::iter( - table_names - .into_iter() - .map(ImportUnit::NewTable) - .map(anyhow::Ok), - ) + let objects = stream::iter(table_names.into_iter().map(|(component_path, table_name)| { + anyhow::Ok(ImportUnit::NewTable(component_path, table_name)) + })) .boxed() .peekable(); @@ -1399,6 +1483,7 @@ async fn best_effort_update_progress_message( identity: &Identity, import_id: ResolvedDocumentId, progress_message: String, + component_path: &ComponentPath, display_table_name: &TableName, num_rows_written: i64, ) { @@ -1411,6 +1496,7 @@ async fn best_effort_update_progress_message( .update_progress_message( import_id, progress_message, + component_path, display_table_name, num_rows_written, ) @@ -1426,6 +1512,7 @@ async fn add_checkpoint_message( identity: &Identity, import_id: ResolvedDocumentId, checkpoint_message: String, + component_path: &ComponentPath, display_table_name: &TableName, num_rows_written: i64, ) -> anyhow::Result<()> { @@ -1441,6 +1528,7 @@ async fn add_checkpoint_message( .add_checkpoint_message( import_id, checkpoint_message.clone(), + component_path, display_table_name, num_rows_written, ) @@ -1496,6 +1584,7 @@ async fn import_objects( /// table number. #[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] struct ImportSchemaTableConstraint { + namespace: TableNamespace, // "foo" in example above. foreign_ref_table_in_import: (TableName, TableNumber), // "bar" in example above. @@ -1506,7 +1595,7 @@ impl ImportSchemaTableConstraint { async fn validate(&self, tx: &mut Transaction) -> anyhow::Result<()> { let existing_table_mapping = tx.table_mapping(); let Some(existing_table) = existing_table_mapping - .namespace(TableNamespace::by_component_TODO()) + .namespace(self.namespace) .id_and_number_if_exists(&self.foreign_ref_table_in_import.0) else { // If a table doesn't have a table number, @@ -1520,10 +1609,7 @@ impl ImportSchemaTableConstraint { return Ok(()); } if TableModel::new(tx) - .count( - TableNamespace::by_component_TODO(), - &self.table_in_schema_not_in_import, - ) + .count(self.namespace, &self.table_in_schema_not_in_import) .await? == 0 { @@ -1543,23 +1629,20 @@ impl ImportSchemaTableConstraint { #[derive(Clone, Debug)] struct ImportSchemaConstraints { - initial_schemas: Vec>, + initial_schemas: SchemasForImport, table_constraints: BTreeSet, } impl ImportSchemaConstraints { - fn new( - table_mapping_for_import: &TableMapping, - initial_schemas: Vec>, - ) -> Self { + fn new(table_mapping_for_import: &TableMapping, initial_schemas: SchemasForImport) -> Self { let mut table_constraints = BTreeSet::new(); - for schema in initial_schemas.iter() { + for (namespace, _, schema) in initial_schemas.iter() { let Some((_, schema)) = schema else { continue; }; for (table, table_schema) in &schema.tables { if table_mapping_for_import - .namespace(TableNamespace::by_component_TODO()) + .namespace(*namespace) .name_exists(table) { // Schema's table is in the import => it's valid. @@ -1570,10 +1653,11 @@ impl ImportSchemaConstraints { }; for foreign_key_table in document_schema.foreign_keys() { if let Some(foreign_key_table_number) = table_mapping_for_import - .namespace(TableNamespace::by_component_TODO()) + .namespace(*namespace) .id_and_number_if_exists(foreign_key_table) { table_constraints.insert(ImportSchemaTableConstraint { + namespace: *namespace, table_in_schema_not_in_import: table.clone(), foreign_ref_table_in_import: ( foreign_key_table.clone(), @@ -1612,7 +1696,7 @@ async fn finalize_import( usage_tracking: &UsageCounter, identity: Identity, member_id_override: Option, - initial_schemas: Vec>, + initial_schemas: SchemasForImport, table_mapping_for_import: TableMapping, usage: FunctionUsageTracker, audit_log_event: DeploymentAuditLogEvent, @@ -1669,6 +1753,12 @@ async fn finalize_import( Ok((ts, documents_deleted)) } +type SchemasForImport = Vec<( + TableNamespace, + SchemaState, + Option<(ResolvedDocumentId, DatabaseSchema)>, +)>; + /// Documents in an imported table should match the schema. /// ImportFacingModel::insert checks that new documents match the schema, /// but SchemaWorker does not check new schemas against existing documents in @@ -1679,15 +1769,23 @@ async fn finalize_import( /// imported documents, we make sure all relevant schemas stay the same. async fn schemas_for_import( tx: &mut Transaction, -) -> anyhow::Result>> { - let mut schema_model = SchemaModel::new(tx, TableNamespace::by_component_TODO()); +) -> anyhow::Result { + let mut namespaces = tx.table_mapping().namespaces_for_name(&SCHEMAS_TABLE); + namespaces.sort(); let mut schemas = vec![]; - for schema_state in [ - SchemaState::Active, - SchemaState::Validated, - SchemaState::Pending, - ] { - schemas.push(schema_model.get_by_state(schema_state).await?); + for namespace in namespaces { + let mut schema_model = SchemaModel::new(tx, namespace); + for schema_state in [ + SchemaState::Active, + SchemaState::Validated, + SchemaState::Pending, + ] { + schemas.push(( + namespace, + schema_state.clone(), + schema_model.get_by_state(schema_state).await?, + )); + } } Ok(schemas) } @@ -1697,6 +1795,7 @@ async fn import_tables_table( identity: &Identity, mode: ImportMode, mut objects: Pin<&mut Peekable>>>, + component_path: &ComponentPath, import_id: Option, ) -> anyhow::Result { let mut table_mapping_for_import = TableMapping::new(); @@ -1740,10 +1839,11 @@ async fn import_tables_table( .map(|(table_name, _)| table_name.clone()) .collect(); for (table_name, table_number) in import_tables.iter() { - let (table_id, _) = prepare_table_for_import( + let (table_id, component_id, _) = prepare_table_for_import( database, identity, mode, + component_path, table_name, Some(*table_number), &tables_in_import, @@ -1752,7 +1852,7 @@ async fn import_tables_table( .await?; table_mapping_for_import.insert( table_id.tablet_id, - TableNamespace::by_component_TODO(), + component_id.into(), table_id.table_number, table_name.clone(), ); @@ -1765,6 +1865,7 @@ async fn import_storage_table( file_storage: &FileStorage, identity: &Identity, table_id: TabletIdAndTableNumber, + component_path: &ComponentPath, mut objects: Pin<&mut Peekable>>>, usage: &dyn StorageUsageTracker, import_id: Option, @@ -1916,6 +2017,7 @@ async fn import_storage_table( num_files.separate_with_commas(), total_num_files.separate_with_commas() ), + component_path, &FILE_STORAGE_VIRTUAL_TABLE, num_files as i64, ) @@ -1931,6 +2033,7 @@ async fn import_storage_table( "Imported \"_storage\" ({} files)", num_files.separate_with_commas() ), + component_path, &FILE_STORAGE_VIRTUAL_TABLE, num_files as i64, ) @@ -1998,20 +2101,24 @@ async fn import_single_table( identity: &Identity, mode: ImportMode, mut objects: Pin<&mut Peekable>>>, - generated_schemas: &mut BTreeMap>, + generated_schemas: &mut BTreeMap< + (ComponentPath, TableName), + GeneratedSchema, + >, table_mapping_for_import: &mut TableMapping, usage: FunctionUsageTracker, import_id: Option, ) -> anyhow::Result> { - while let Some(ImportUnit::GeneratedSchema(table_name, generated_schema)) = objects - .as_mut() - .try_next_if(|line| matches!(line, ImportUnit::GeneratedSchema(_, _))) - .await? + while let Some(ImportUnit::GeneratedSchema(component_path, table_name, generated_schema)) = + objects + .as_mut() + .try_next_if(|line| matches!(line, ImportUnit::GeneratedSchema(_, _, _))) + .await? { - generated_schemas.insert(table_name, generated_schema); + generated_schemas.insert((component_path, table_name), generated_schema); } - let mut table_name = match objects.try_next().await? { - Some(ImportUnit::NewTable(table_name)) => table_name, + let mut component_and_table = match objects.try_next().await? { + Some(ImportUnit::NewTable(component_path, table_name)) => (component_path, table_name), Some(_) => anyhow::bail!("parse_objects should start with NewTable"), // No more tables to import. None => return Ok(None), @@ -2022,34 +2129,56 @@ async fn import_single_table( database, identity, import_id, - format!("Importing \"{table_name}\""), - &table_name, + format!( + "Importing \"{}\"{}", + component_and_table.1, + component_and_table.0.in_component_str() + ), + &component_and_table.0, + &component_and_table.1, 0, ) .await; } - if table_name == *FILE_STORAGE_VIRTUAL_TABLE { - table_name = FILE_STORAGE_TABLE.clone(); + let table_name = &mut component_and_table.1; + if *table_name == *FILE_STORAGE_VIRTUAL_TABLE { + *table_name = FILE_STORAGE_TABLE.clone(); // Infer table number from existing table. table_number_from_docs = None; } + let (component_path, table_name) = &component_and_table; - if table_name == *TABLES_TABLE { + if *table_name == *TABLES_TABLE { table_mapping_for_import.update( - import_tables_table(database, identity, mode, objects.as_mut(), import_id).await?, + import_tables_table( + database, + identity, + mode, + objects.as_mut(), + component_path, + import_id, + ) + .await?, ); return Ok(Some(0)); } - let mut generated_schema = generated_schemas.get_mut(&table_name); + let mut generated_schema = generated_schemas.get_mut(&component_and_table); let tables_in_import = table_mapping_for_import .iter() .map(|(_, _, _, table_name)| table_name.clone()) .collect(); + let component_id = { + let mut tx = database.begin(Identity::system()).await?; + let (_, component_id) = BootstrapComponentsModel::new(&mut tx) + .component_path_to_ids(component_path.clone()) + .await?; + component_id + }; let (table_id, num_to_skip) = match table_mapping_for_import - .namespace(TableNamespace::by_component_TODO()) - .id_and_number_if_exists(&table_name) + .namespace(component_id.into()) + .id_and_number_if_exists(table_name) { Some(table_id) => { let mut tx = database.begin(identity.clone()).await?; @@ -2063,11 +2192,12 @@ async fn import_single_table( (table_id, num_to_skip) }, None => { - let (table_id, num_to_skip) = prepare_table_for_import( + let (table_id, component_id, num_to_skip) = prepare_table_for_import( database, identity, mode, - &table_name, + component_path, + table_name, table_number_from_docs, &tables_in_import, import_id, @@ -2075,7 +2205,7 @@ async fn import_single_table( .await?; table_mapping_for_import.insert( table_id.tablet_id, - TableNamespace::by_component_TODO(), + component_id.into(), table_id.table_number, table_name.clone(), ); @@ -2083,12 +2213,13 @@ async fn import_single_table( }, }; - if table_name == *FILE_STORAGE_TABLE { + if *table_name == *FILE_STORAGE_TABLE { import_storage_table( database, file_storage, identity, table_id, + component_path, objects.as_mut(), &usage, import_id, @@ -2134,7 +2265,7 @@ async fn import_single_table( database, identity, objects_to_insert, - &table_name, + table_name, table_id, &table_mapping_for_schema, usage.clone(), @@ -2151,7 +2282,8 @@ async fn import_single_table( "Importing \"{table_name}\" ({} documents)", num_objects.separate_with_commas() ), - &table_name, + component_path, + table_name, num_objects as i64, ) .await; @@ -2164,7 +2296,7 @@ async fn import_single_table( database, identity, objects_to_insert, - &table_name, + table_name, table_id, &table_mapping_for_schema, usage, @@ -2180,7 +2312,8 @@ async fn import_single_table( "Imported \"{table_name}\" ({} documents)", num_objects.separate_with_commas() ), - &table_name, + component_path, + table_name, num_objects as i64, ) .await?; @@ -2243,11 +2376,12 @@ async fn prepare_table_for_import( database: &Database, identity: &Identity, mode: ImportMode, + component_path: &ComponentPath, table_name: &TableName, table_number: Option, tables_in_import: &BTreeSet, import_id: Option, -) -> anyhow::Result<(TabletIdAndTableNumber, u64)> { +) -> anyhow::Result<(TabletIdAndTableNumber, ComponentId, u64)> { anyhow::ensure!( table_name == &*FILE_STORAGE_TABLE || !table_name.is_system(), ErrorMetadata::bad_request( @@ -2261,14 +2395,17 @@ async fn prepare_table_for_import( table_name }; let mut tx = database.begin(identity.clone()).await?; + let (_, component_id) = BootstrapComponentsModel::new(&mut tx) + .component_path_to_ids(component_path.clone()) + .await?; let existing_active_table_id = tx .table_mapping() - .namespace(TableNamespace::by_component_TODO()) + .namespace(component_id.into()) .id_and_number_if_exists(table_name); let existing_checkpoint = match import_id { Some(import_id) => { SnapshotImportModel::new(&mut tx) - .get_table_checkpoint(import_id, display_table_name) + .get_table_checkpoint(import_id, component_path, display_table_name) .await? }, None => None, @@ -2293,7 +2430,7 @@ async fn prepare_table_for_import( ImportMode::Append => existing_active_table_id, ImportMode::RequireEmpty => { if !TableModel::new(&mut tx) - .table_is_empty(TableNamespace::by_component_TODO(), table_name) + .table_is_empty(component_id.into(), table_name) .await? { anyhow::bail!(ImportError::TableExists(table_name.clone())); @@ -2321,7 +2458,7 @@ async fn prepare_table_for_import( // Create a new table in state Hidden, that will later be changed to Active. let table_id = TableModel::new(tx) .insert_table_for_import( - TableNamespace::by_component_TODO(), + component_id.into(), table_name, table_number, tables_in_import, @@ -2329,7 +2466,7 @@ async fn prepare_table_for_import( .await?; IndexModel::new(tx) .copy_indexes_to_table( - TableNamespace::by_component_TODO(), + component_id.into(), table_name, table_id.tablet_id, ) @@ -2338,6 +2475,7 @@ async fn prepare_table_for_import( SnapshotImportModel::new(tx) .checkpoint_tablet_created( import_id, + component_path, display_table_name, table_id.tablet_id, ) @@ -2354,7 +2492,7 @@ async fn prepare_table_for_import( table_id }; - Ok((table_id, num_to_skip)) + Ok((table_id, component_id, num_to_skip)) } /// Waits for all indexes on a table to be backfilled, which may take a while @@ -2420,7 +2558,7 @@ async fn table_number_for_import( let id_v6 = DeveloperDocumentId::decode(id).ok()?; Some(id_v6.table()) }, - ImportUnit::NewTable(_) => None, + ImportUnit::NewTable(..) => None, ImportUnit::GeneratedSchema(..) => None, ImportUnit::StorageFileChunk(..) => None, } @@ -2450,7 +2588,7 @@ async fn remap_empty_string_by_schema<'a, RT: Runtime>( Ok(objects .map_ok(move |object| match object { - unit @ ImportUnit::NewTable(_) + unit @ ImportUnit::NewTable(..) | unit @ ImportUnit::GeneratedSchema(..) | unit @ ImportUnit::StorageFileChunk(..) => unit, ImportUnit::Object(mut object) => ImportUnit::Object({ @@ -2501,6 +2639,7 @@ mod tests { IndexConfig, IndexMetadata, }, + components::ComponentPath, db_schema, document::ResolvedDocument, object_validator, @@ -2572,17 +2711,18 @@ mod tests { use super::{ do_import, import_objects, + parse_documents_jsonl_table_name, parse_objects, ImportFormat, ImportMode, ImportUnit, PeekableExt, - DOCUMENTS_PATTERN, GENERATED_SCHEMA_PATTERN, - STORAGE_FILE_PATTERN, }; use crate::{ snapshot_import::{ + parse_storage_filename, + parse_table_filename, upload_import_file, wait_for_import_worker, }, @@ -2595,54 +2735,53 @@ mod tests { #[test] fn test_filename_regex() -> anyhow::Result<()> { - let table_name = DOCUMENTS_PATTERN - .captures("users/documents.jsonl") - .context("no match")? - .get(1) - .context("no capture group")? - .as_str(); - assert_eq!(table_name, "users"); + let (_, table_name) = parse_documents_jsonl_table_name("users/documents.jsonl")?.unwrap(); + assert_eq!(table_name, "users".parse()?); // Regression test, checking that the '.' is escaped. - assert!(DOCUMENTS_PATTERN - .captures("users/documentsxjsonl") - .is_none()); + assert!(parse_documents_jsonl_table_name("users/documentsxjsonl")?.is_none()); // When an export is unzipped and re-zipped, sometimes there's a prefix. - let table_name = DOCUMENTS_PATTERN - .captures("snapshot/users/documents.jsonl") - .context("no match")? - .get(1) - .context("no capture group")? - .as_str(); - assert_eq!(table_name, "users"); - let table_name = GENERATED_SCHEMA_PATTERN - .captures("users/generated_schema.jsonl") - .context("no match")? - .get(1) - .context("no capture group")? - .as_str(); - assert_eq!(table_name, "users"); - let storage_id = STORAGE_FILE_PATTERN - .captures("_storage/abc123.gif") - .context("no match")? - .get(1) - .context("no capture group")? - .as_str(); - assert_eq!(storage_id, "abc123"); - let storage_id = STORAGE_FILE_PATTERN - .captures("snapshot/_storage/abc123.gif") - .context("no match")? - .get(1) - .context("no capture group")? - .as_str(); - assert_eq!(storage_id, "abc123"); + let (_, table_name) = + parse_documents_jsonl_table_name("snapshot/users/documents.jsonl")?.unwrap(); + assert_eq!(table_name, "users".parse()?); + let (_, table_name) = + parse_table_filename("users/generated_schema.jsonl", &GENERATED_SCHEMA_PATTERN)? + .unwrap(); + assert_eq!(table_name, "users".parse()?); + let (_, storage_id) = + parse_storage_filename("_storage/kg2ah8mk1xtg35g7zyexyc96e96yr74f.gif")?.unwrap(); + assert_eq!(&storage_id.to_string(), "kg2ah8mk1xtg35g7zyexyc96e96yr74f"); + let (_, storage_id) = + parse_storage_filename("snapshot/_storage/kg2ah8mk1xtg35g7zyexyc96e96yr74f.gif")? + .unwrap(); + assert_eq!(&storage_id.to_string(), "kg2ah8mk1xtg35g7zyexyc96e96yr74f"); // No file extension. - let storage_id = STORAGE_FILE_PATTERN - .captures("_storage/abc123") - .context("no match")? - .get(1) - .context("no capture group")? - .as_str(); - assert_eq!(storage_id, "abc123"); + let (_, storage_id) = + parse_storage_filename("_storage/kg2ah8mk1xtg35g7zyexyc96e96yr74f")?.unwrap(); + assert_eq!(&storage_id.to_string(), "kg2ah8mk1xtg35g7zyexyc96e96yr74f"); + Ok(()) + } + + #[test] + fn test_component_path_regex() -> anyhow::Result<()> { + let (component_path, table_name) = + parse_documents_jsonl_table_name("_components/waitlist/tbl/documents.jsonl")?.unwrap(); + assert_eq!(&String::from(component_path), "waitlist"); + assert_eq!(&table_name.to_string(), "tbl"); + + let (component_path, table_name) = parse_documents_jsonl_table_name( + "some/parentdir/_components/waitlist/tbl/documents.jsonl", + )? + .unwrap(); + assert_eq!(&String::from(component_path), "waitlist"); + assert_eq!(&table_name.to_string(), "tbl"); + + let (component_path, table_name) = parse_documents_jsonl_table_name( + "_components/waitlist/_components/ratelimit/tbl/documents.jsonl", + )? + .unwrap(); + assert_eq!(&String::from(component_path), "waitlist/ratelimit"); + assert_eq!(&table_name.to_string(), "tbl"); + Ok(()) } @@ -2686,7 +2825,7 @@ mod tests { .filter_map(|line| async move { match line { Ok(super::ImportUnit::Object(object)) => Some(Ok(object)), - Ok(super::ImportUnit::NewTable(_)) => None, + Ok(super::ImportUnit::NewTable(..)) => None, Ok(super::ImportUnit::GeneratedSchema(..)) => None, Ok(super::ImportUnit::StorageFileChunk(..)) => None, Err(e) => Some(Err(e)), @@ -3195,6 +3334,7 @@ a #[convex_macro::test_runtime] async fn test_import_counts_bandwidth(rt: TestRuntime) -> anyhow::Result<()> { let app = Application::new_for_tests(&rt).await?; + let component_path = ComponentPath::root(); let table_name: TableName = "table1".parse()?; let identity = new_admin_id(); @@ -3202,13 +3342,19 @@ a let storage_idv6 = DeveloperDocumentId::decode(storage_id)?; let objects = stream::iter(vec![ - Ok(ImportUnit::NewTable("_storage".parse()?)), + Ok(ImportUnit::NewTable( + component_path.clone(), + "_storage".parse()?, + )), Ok(ImportUnit::Object(json!({"_id": storage_id}))), Ok(ImportUnit::StorageFileChunk( storage_idv6, Bytes::from_static(b"foobarbaz"), )), - Ok(ImportUnit::NewTable(table_name.clone())), + Ok(ImportUnit::NewTable( + component_path.clone(), + table_name.clone(), + )), Ok(ImportUnit::Object(json!({"foo": "bar"}))), Ok(ImportUnit::Object(json!({"foo": "baz"}))), ]) diff --git a/crates/model/src/snapshot_imports/mod.rs b/crates/model/src/snapshot_imports/mod.rs index 092550b8..ca4b7f3c 100644 --- a/crates/model/src/snapshot_imports/mod.rs +++ b/crates/model/src/snapshot_imports/mod.rs @@ -2,6 +2,7 @@ use std::sync::LazyLock; use anyhow::Context; use common::{ + components::ComponentPath, document::{ ParsedDocument, ResolvedDocument, @@ -252,14 +253,14 @@ impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> { pub async fn checkpoint_tablet_created( &mut self, id: ResolvedDocumentId, + component_path: &ComponentPath, table_name: &TableName, tablet_id: TabletId, ) -> anyhow::Result<()> { self.update_checkpoints(id, move |checkpoints| { - if let Some(checkpoint) = checkpoints - .iter_mut() - .find(|c| c.display_table_name == *table_name) - { + if let Some(checkpoint) = checkpoints.iter_mut().find(|c| { + c.component_path == *component_path && c.display_table_name == *table_name + }) { checkpoint.tablet_id = Some(tablet_id); } }) @@ -269,6 +270,7 @@ impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> { pub async fn get_table_checkpoint( &mut self, id: ResolvedDocumentId, + component_path: &ComponentPath, display_table_name: &TableName, ) -> anyhow::Result> { let Some(import) = self.get(id).await? else { @@ -279,7 +281,9 @@ impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> { }; Ok(checkpoints .iter() - .find(|c| c.display_table_name == *display_table_name) + .find(|c| { + c.component_path == *component_path && c.display_table_name == *display_table_name + }) .cloned()) } @@ -287,16 +291,16 @@ impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> { &mut self, id: ResolvedDocumentId, checkpoint_message: String, + component_path: &ComponentPath, display_table_name: &TableName, num_rows_written: i64, ) -> anyhow::Result<()> { let mut noop = false; let noop_ = &mut noop; self.update_checkpoints(id, move |checkpoints| { - if let Some(checkpoint) = checkpoints - .iter_mut() - .find(|c| c.display_table_name == *display_table_name) - { + if let Some(checkpoint) = checkpoints.iter_mut().find(|c| { + c.component_path == *component_path && c.display_table_name == *display_table_name + }) { if num_rows_written <= checkpoint.num_rows_written { *noop_ = true; return; @@ -333,16 +337,16 @@ impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> { &mut self, id: ResolvedDocumentId, progress_message: String, + component_path: &ComponentPath, display_table_name: &TableName, num_rows_written: i64, ) -> anyhow::Result<()> { let mut noop = false; let noop_ = &mut noop; self.update_checkpoints(id, move |checkpoints| { - if let Some(checkpoint) = checkpoints - .iter_mut() - .find(|c| c.display_table_name == *display_table_name) - { + if let Some(checkpoint) = checkpoints.iter_mut().find(|c| { + c.component_path == *component_path && c.display_table_name == *display_table_name + }) { if checkpoint.num_rows_written > 0 && num_rows_written <= checkpoint.num_rows_written { diff --git a/crates/model/src/snapshot_imports/types.rs b/crates/model/src/snapshot_imports/types.rs index d78fdc0e..d09f7ba3 100644 --- a/crates/model/src/snapshot_imports/types.rs +++ b/crates/model/src/snapshot_imports/types.rs @@ -1,7 +1,10 @@ -use common::types::{ - MemberId, - ObjectKey, - TableName, +use common::{ + components::ComponentPath, + types::{ + MemberId, + ObjectKey, + TableName, + }, }; use serde::{ Deserialize, @@ -291,6 +294,7 @@ mod import_state_serde { #[derive(Debug, Clone, Eq, PartialEq)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct ImportTableCheckpoint { + pub component_path: ComponentPath, pub display_table_name: TableName, pub tablet_id: Option, pub total_num_rows_to_write: i64, @@ -312,6 +316,7 @@ pub struct ImportTableCheckpoint { #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct SerializedImportTableCheckpoint { + pub component_path: Option, pub display_table_name: String, pub tablet_id: Option, pub total_num_rows_to_write: i64, @@ -326,6 +331,7 @@ impl TryFrom for SerializedImportTableCheckpoint { fn try_from(checkpoint: ImportTableCheckpoint) -> anyhow::Result { Ok(SerializedImportTableCheckpoint { + component_path: checkpoint.component_path.serialize(), display_table_name: checkpoint.display_table_name.to_string(), tablet_id: checkpoint.tablet_id.map(|table| table.to_string()), total_num_rows_to_write: checkpoint.total_num_rows_to_write, @@ -342,6 +348,7 @@ impl TryFrom for ImportTableCheckpoint { fn try_from(checkpoint: SerializedImportTableCheckpoint) -> anyhow::Result { Ok(ImportTableCheckpoint { + component_path: ComponentPath::deserialize(checkpoint.component_path.as_deref())?, display_table_name: checkpoint.display_table_name.parse()?, tablet_id: checkpoint .tablet_id