diff --git a/Cargo.lock b/Cargo.lock index 96cc211e0ce..faeb7e308b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2964,6 +2964,33 @@ dependencies = [ "uuid", ] +[[package]] +name = "influxdb3_processing_engine" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "data_types", + "datafusion_util", + "hashbrown 0.15.2", + "influxdb3_cache", + "influxdb3_catalog", + "influxdb3_client", + "influxdb3_internal_api", + "influxdb3_py_api", + "influxdb3_wal", + "influxdb3_write", + "iox_query", + "iox_time", + "metric", + "object_store", + "observability_deps", + "parquet_file", + "pyo3", + "thiserror 1.0.69", + "tokio", +] + [[package]] name = "influxdb3_py_api" version = "0.1.0" @@ -2972,12 +2999,12 @@ dependencies = [ "arrow-schema", "futures", "influxdb3_catalog", + "influxdb3_id", "influxdb3_internal_api", "influxdb3_wal", "iox_query_params", "parking_lot", "pyo3", - "schema", "tokio", ] @@ -3014,6 +3041,7 @@ dependencies = [ "influxdb3_id", "influxdb3_internal_api", "influxdb3_process", + "influxdb3_processing_engine", "influxdb3_sys_events", "influxdb3_telemetry", "influxdb3_wal", diff --git a/Cargo.toml b/Cargo.toml index 0f6d697ff44..2b97362fa86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "influxdb3_internal_api", "influxdb3_load_generator", "influxdb3_process", + "influxdb3_processing_engine", "influxdb3_py_api", "influxdb3_server", "influxdb3_telemetry", diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index 50970c8c7a6..ec71fa5be5e 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -206,10 +206,7 @@ pub struct PluginConfig { /// Python file containing the plugin code #[clap(long = "code-filename")] code_file: String, - /// Entry point function for the plugin - #[clap(long = "entry-point")] - function_name: String, - /// Type of trigger the plugin processes + /// Type of trigger the plugin processes. Options: wal_rows, scheduled #[clap(long = "plugin-type", default_value = "wal_rows")] plugin_type: String, /// Name of the plugin to create @@ -335,7 +332,6 @@ pub async fn command(config: Config) -> Result<(), Box> { influxdb3_config: InfluxDb3Config { database_name, .. }, plugin_name, code_file, - function_name, plugin_type, }) => { let code = fs::read_to_string(&code_file)?; @@ -344,7 +340,6 @@ pub async fn command(config: Config) -> Result<(), Box> { database_name, &plugin_name, code, - function_name, plugin_type, ) .await?; diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 7da27da9e54..a2e6e4c838d 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -503,7 +503,6 @@ pub async fn command(config: Config) -> Result<()> { wal_config, parquet_cache, metric_registry: Arc::clone(&metrics), - plugin_dir: config.plugin_dir, }) .await .map_err(|e| Error::WriteBufferInit(e.into()))?; @@ -532,6 +531,7 @@ pub async fn command(config: Config) -> Result<()> { trace_exporter, trace_header_parser, Arc::clone(&telemetry_store), + config.plugin_dir, )?; let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs { diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index 5baeec728cf..d4530f6744d 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -467,8 +467,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); debug!(result = ?result, "create plugin"); @@ -501,8 +499,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); @@ -547,8 +543,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); @@ -597,8 +591,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); @@ -670,8 +662,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); @@ -769,8 +759,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index e93c57a50a4..8ea1df2a693 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -209,7 +209,7 @@ impl Catalog { pub fn apply_catalog_batch( &self, - catalog_batch: CatalogBatch, + catalog_batch: &CatalogBatch, ) -> Result> { self.inner.write().apply_catalog_batch(catalog_batch) } @@ -217,11 +217,11 @@ impl Catalog { // Checks the sequence number to see if it needs to be applied. pub fn apply_ordered_catalog_batch( &self, - batch: OrderedCatalogBatch, + batch: &OrderedCatalogBatch, ) -> Result> { if batch.sequence_number() >= self.sequence_number().as_u32() { if let Some(catalog_batch) = self.apply_catalog_batch(batch.batch())? { - return Ok(Some(catalog_batch.batch())); + return Ok(Some(catalog_batch.batch().clone())); } } Ok(None) @@ -456,12 +456,12 @@ impl InnerCatalog { /// have already been applied, the sequence number and updated tracker are not updated. pub fn apply_catalog_batch( &mut self, - catalog_batch: CatalogBatch, + catalog_batch: &CatalogBatch, ) -> Result> { let table_count = self.table_count(); if let Some(db) = self.databases.get(&catalog_batch.database_id) { - if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, &catalog_batch)? { + if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, catalog_batch)? { check_overall_table_count(Some(db), &new_db, table_count)?; self.upsert_db(new_db); } else { @@ -471,12 +471,12 @@ impl InnerCatalog { if self.database_count() >= Catalog::NUM_DBS_LIMIT { return Err(Error::TooManyDbs); } - let new_db = DatabaseSchema::new_from_batch(&catalog_batch)?; + let new_db = DatabaseSchema::new_from_batch(catalog_batch)?; check_overall_table_count(None, &new_db, table_count)?; self.upsert_db(new_db); } Ok(Some(OrderedCatalogBatch::new( - catalog_batch, + catalog_batch.clone(), self.sequence.0, ))) } @@ -1847,7 +1847,7 @@ mod tests { )], ); let err = catalog - .apply_catalog_batch(catalog_batch) + .apply_catalog_batch(&catalog_batch) .expect_err("should fail to apply AddFields operation for non-existent table"); assert_contains!(err.to_string(), "Table banana not in DB schema for foo"); } @@ -1968,10 +1968,10 @@ mod tests { })], }; let create_ordered_op = catalog - .apply_catalog_batch(create_op)? + .apply_catalog_batch(&create_op)? .expect("should be able to create"); let add_column_op = catalog - .apply_catalog_batch(add_column_op)? + .apply_catalog_batch(&add_column_op)? .expect("should produce operation"); let mut ops = vec![ WalOp::Catalog(add_column_op), @@ -2010,7 +2010,7 @@ mod tests { let db_id = DbId::new(); let db_name = format!("test-db-{db_id}"); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name.as_str(), 0, @@ -2043,7 +2043,7 @@ mod tests { let db_id = DbId::new(); let db_name = "a-database-too-far"; catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2071,7 +2071,7 @@ mod tests { // now delete a database: let (db_id, db_name) = dbs.pop().unwrap(); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name.as_str(), 1, @@ -2087,7 +2087,7 @@ mod tests { // now create another database (using same name as the deleted one), this should be allowed: let db_id = DbId::new(); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name.as_str(), 0, @@ -2123,7 +2123,7 @@ mod tests { let db_id = DbId::new(); let db_name = "test-db"; catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2138,7 +2138,7 @@ mod tests { let table_id = TableId::new(); let table_name = Arc::::from(format!("test-table-{i}").as_str()); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2170,7 +2170,7 @@ mod tests { let table_id = TableId::new(); let table_name = Arc::::from("a-table-too-far"); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2198,7 +2198,7 @@ mod tests { // delete a table let (table_id, table_name) = tables.pop().unwrap(); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2214,7 +2214,7 @@ mod tests { assert_eq!(1_999, catalog.inner.read().table_count()); // now create it again, this should be allowed: catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, diff --git a/influxdb3_catalog/src/serialize.rs b/influxdb3_catalog/src/serialize.rs index a6b6fbcfb3d..9ea18758df0 100644 --- a/influxdb3_catalog/src/serialize.rs +++ b/influxdb3_catalog/src/serialize.rs @@ -106,6 +106,7 @@ impl From for DatabaseSchema { plugin, trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(), disabled: trigger.disabled, + database_name: trigger.database_name, }, ) }) @@ -163,7 +164,6 @@ struct TableSnapshot { struct ProcessingEnginePluginSnapshot { pub plugin_name: String, pub code: String, - pub function_name: String, pub plugin_type: PluginType, } @@ -171,6 +171,7 @@ struct ProcessingEnginePluginSnapshot { struct ProcessingEngineTriggerSnapshot { pub trigger_name: String, pub plugin_name: String, + pub database_name: String, pub trigger_specification: String, pub disabled: bool, } @@ -412,7 +413,6 @@ impl From<&PluginDefinition> for ProcessingEnginePluginSnapshot { Self { plugin_name: plugin.plugin_name.to_string(), code: plugin.code.to_string(), - function_name: plugin.function_name.to_string(), plugin_type: plugin.plugin_type, } } @@ -423,7 +423,6 @@ impl From for PluginDefinition { Self { plugin_name: plugin.plugin_name.to_string(), code: plugin.code.to_string(), - function_name: plugin.function_name.to_string(), plugin_type: plugin.plugin_type, } } @@ -434,6 +433,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot { ProcessingEngineTriggerSnapshot { trigger_name: trigger.trigger_name.to_string(), plugin_name: trigger.plugin_name.to_string(), + database_name: trigger.database_name.to_string(), trigger_specification: serde_json::to_string(&trigger.trigger) .expect("should be able to serialize trigger specification"), disabled: trigger.disabled, diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index f65691ea59a..518cc25d5b5 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -484,7 +484,6 @@ impl Client { db: impl Into + Send, plugin_name: impl Into + Send, code: impl Into + Send, - function_name: impl Into + Send, plugin_type: impl Into + Send, ) -> Result<()> { let api_path = "/api/v3/configure/processing_engine_plugin"; @@ -496,7 +495,6 @@ impl Client { db: String, plugin_name: String, code: String, - function_name: String, plugin_type: String, } @@ -504,7 +502,6 @@ impl Client { db: db.into(), plugin_name: plugin_name.into(), code: code.into(), - function_name: function_name.into(), plugin_type: plugin_type.into(), }); diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml new file mode 100644 index 00000000000..37eb8f26b9f --- /dev/null +++ b/influxdb3_processing_engine/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "influxdb3_processing_engine" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[features] +"system-py" = ["influxdb3_py_api/system-py", "pyo3"] + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +data_types.workspace = true +hashbrown.workspace = true +iox_time.workspace = true +influxdb3_catalog = { path = "../influxdb3_catalog" } +influxdb3_client = { path = "../influxdb3_client" } +influxdb3_internal_api = { path = "../influxdb3_internal_api" } +influxdb3_py_api = { path = "../influxdb3_py_api" } +influxdb3_wal = { path = "../influxdb3_wal" } +influxdb3_write = { path = "../influxdb3_write" } +observability_deps.workspace = true +thiserror.workspace = true +tokio.workspace = true + +[dependencies.pyo3] +version = "0.23.3" +# this is necessary to automatically initialize the Python interpreter +features = ["auto-initialize"] +optional = true + +[dev-dependencies] +datafusion_util.workspace = true +iox_query.workspace = true +influxdb3_cache = { path = "../influxdb3_cache" } +metric.workspace = true +object_store.workspace = true +parquet_file.workspace = true + +[lints] +workspace = true diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs new file mode 100644 index 00000000000..50f069a04ec --- /dev/null +++ b/influxdb3_processing_engine/src/lib.rs @@ -0,0 +1,854 @@ +use crate::manager::{ProcessingEngineError, ProcessingEngineManager}; +#[cfg(feature = "system-py")] +use crate::plugins::PluginContext; +use anyhow::Context; +use hashbrown::HashMap; +use influxdb3_catalog::catalog; +use influxdb3_catalog::catalog::Catalog; +use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound; +use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_wal::{ + CatalogBatch, CatalogOp, DeletePluginDefinition, DeleteTriggerDefinition, PluginDefinition, + PluginType, TriggerDefinition, TriggerIdentifier, TriggerSpecificationDefinition, Wal, + WalContents, WalOp, +}; +use influxdb3_write::WriteBuffer; +use iox_time::TimeProvider; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot, Mutex}; + +pub mod manager; +pub mod plugins; + +#[derive(Debug)] +pub struct ProcessingEngineManagerImpl { + plugin_dir: Option, + catalog: Arc, + _write_buffer: Arc, + _query_executor: Arc, + time_provider: Arc, + wal: Arc, + plugin_event_tx: Mutex, +} + +#[derive(Debug, Default)] +struct PluginChannels { + /// Map of database to trigger name to sender + active_triggers: HashMap>>, +} + +#[cfg(feature = "system-py")] +const PLUGIN_EVENT_BUFFER_SIZE: usize = 60; + +impl PluginChannels { + async fn shutdown(&mut self, db: String, trigger: String) { + // create a one shot to wait for the shutdown to complete + let (tx, rx) = oneshot::channel(); + if let Some(trigger_map) = self.active_triggers.get_mut(&db) { + if let Some(sender) = trigger_map.remove(&trigger) { + if sender.send(PluginEvent::Shutdown(tx)).await.is_ok() { + rx.await.ok(); + } + } + } + } + + #[cfg(feature = "system-py")] + fn add_trigger(&mut self, db: String, trigger: String) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE); + self.active_triggers + .entry(db) + .or_default() + .insert(trigger, tx); + rx + } +} + +impl ProcessingEngineManagerImpl { + pub fn new( + plugin_dir: Option, + catalog: Arc, + write_buffer: Arc, + query_executor: Arc, + time_provider: Arc, + wal: Arc, + ) -> Self { + Self { + plugin_dir, + catalog, + _write_buffer: write_buffer, + _query_executor: query_executor, + time_provider, + wal, + plugin_event_tx: Default::default(), + } + } + + pub fn read_plugin_code(&self, name: &str) -> Result { + let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?; + let path = plugin_dir.join(name); + Ok(std::fs::read_to_string(path)?) + } +} + +#[async_trait::async_trait] +impl ProcessingEngineManager for ProcessingEngineManagerImpl { + async fn insert_plugin( + &self, + db: &str, + plugin_name: String, + code: String, + plugin_type: PluginType, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db.to_string()))?; + + let catalog_op = CatalogOp::CreatePlugin(PluginDefinition { + plugin_name, + code, + plugin_type, + }); + + let creation_time = self.time_provider.now(); + let catalog_batch = CatalogBatch { + time_ns: creation_time.timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + Ok(()) + } + + async fn delete_plugin( + &self, + db: &str, + plugin_name: &str, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db.to_string()))?; + let catalog_op = CatalogOp::DeletePlugin(DeletePluginDefinition { + plugin_name: plugin_name.to_string(), + }); + let catalog_batch = CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } + Ok(()) + } + + async fn insert_trigger( + &self, + db_name: &str, + trigger_name: String, + plugin_name: String, + trigger_specification: TriggerSpecificationDefinition, + disabled: bool, + ) -> Result<(), ProcessingEngineError> { + let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else { + return Err(ProcessingEngineError::DatabaseNotFound(db_name.to_string())); + }; + let plugin = db_schema + .processing_engine_plugins + .get(&plugin_name) + .ok_or_else(|| catalog::Error::ProcessingEnginePluginNotFound { + plugin_name: plugin_name.to_string(), + database_name: db_schema.name.to_string(), + })?; + let catalog_op = CatalogOp::CreateTrigger(TriggerDefinition { + trigger_name, + plugin_name, + plugin: plugin.clone(), + trigger: trigger_specification, + disabled, + database_name: db_name.to_string(), + }); + let creation_time = self.time_provider.now(); + let catalog_batch = CatalogBatch { + time_ns: creation_time.timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + Ok(()) + } + + async fn delete_trigger( + &self, + db: &str, + trigger_name: &str, + force: bool, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db.to_string()))?; + let catalog_op = CatalogOp::DeleteTrigger(DeleteTriggerDefinition { + trigger_name: trigger_name.to_string(), + force, + }); + let catalog_batch = CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + + // Do this first to avoid a dangling running plugin. + // Potential edge-case of a plugin being stopped but not deleted, + // but should be okay given desire to force delete. + let needs_deactivate = force + && db_schema + .processing_engine_triggers + .get(trigger_name) + .is_some_and(|trigger| !trigger.disabled); + + if needs_deactivate { + self.deactivate_trigger(db, trigger_name).await?; + } + + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } + Ok(()) + } + + #[cfg_attr(not(feature = "system-py"), allow(unused))] + async fn run_trigger( + &self, + write_buffer: Arc, + query_executor: Arc, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError> { + #[cfg(feature = "system-py")] + { + let db_schema = self + .catalog + .db_schema(db_name) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db_name.to_string()))?; + let trigger = db_schema + .processing_engine_triggers + .get(trigger_name) + .ok_or_else(|| ProcessingEngineTriggerNotFound { + database_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + })? + .clone(); + + let trigger_rx = self + .plugin_event_tx + .lock() + .await + .add_trigger(db_name.to_string(), trigger_name.to_string()); + + let plugin_context = PluginContext { + trigger_rx, + write_buffer, + query_executor, + }; + plugins::run_plugin(db_name.to_string(), trigger, plugin_context); + } + + Ok(()) + } + + async fn deactivate_trigger( + &self, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db_name) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db_name.to_string()))?; + let trigger = db_schema + .processing_engine_triggers + .get(trigger_name) + .ok_or_else(|| ProcessingEngineTriggerNotFound { + database_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + })?; + // Already disabled, so this is a no-op + if trigger.disabled { + return Ok(()); + }; + + let catalog_op = CatalogOp::DisableTrigger(TriggerIdentifier { + db_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + }); + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&CatalogBatch { + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + time_ns: self.time_provider.now().timestamp_nanos(), + ops: vec![catalog_op], + })? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + + let mut plugin_channels = self.plugin_event_tx.lock().await; + plugin_channels + .shutdown(db_name.to_string(), trigger_name.to_string()) + .await; + + Ok(()) + } + + async fn activate_trigger( + &self, + write_buffer: Arc, + query_executor: Arc, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db_name) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db_name.to_string()))?; + let trigger = db_schema + .processing_engine_triggers + .get(trigger_name) + .ok_or_else(|| ProcessingEngineTriggerNotFound { + database_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + })?; + // Already enabled, so this is a no-op + if !trigger.disabled { + return Ok(()); + }; + + let catalog_op = CatalogOp::EnableTrigger(TriggerIdentifier { + db_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + }); + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&CatalogBatch { + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + time_ns: self.time_provider.now().timestamp_nanos(), + ops: vec![catalog_op], + })? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + + self.run_trigger(write_buffer, query_executor, db_name, trigger_name) + .await?; + Ok(()) + } + + #[cfg_attr(not(feature = "system-py"), allow(unused))] + async fn test_wal_plugin( + &self, + request: WalPluginTestRequest, + query_executor: Arc, + ) -> Result { + #[cfg(feature = "system-py")] + { + // create a copy of the catalog so we don't modify the original + let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner())); + let now = self.time_provider.now(); + + let code = self.read_plugin_code(&request.filename)?; + + let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request) + .unwrap_or_else(|e| WalPluginTestResponse { + log_lines: vec![], + database_writes: Default::default(), + errors: vec![e.to_string()], + }); + + return Ok(res); + } + + #[cfg(not(feature = "system-py"))] + Err(plugins::Error::AnyhowError(anyhow::anyhow!( + "system-py feature not enabled" + ))) + } +} + +#[allow(unused)] +pub(crate) enum PluginEvent { + WriteWalContents(Arc), + Shutdown(oneshot::Sender<()>), +} + +#[cfg(test)] +mod tests { + use crate::manager::{ProcessingEngineError, ProcessingEngineManager}; + use crate::ProcessingEngineManagerImpl; + use data_types::NamespaceName; + use datafusion_util::config::register_iox_object_store; + use influxdb3_cache::last_cache::LastCacheProvider; + use influxdb3_cache::meta_cache::MetaCacheProvider; + use influxdb3_catalog::catalog; + use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor; + use influxdb3_wal::{ + Gen1Duration, PluginDefinition, PluginType, TriggerSpecificationDefinition, WalConfig, + }; + use influxdb3_write::persister::Persister; + use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs}; + use influxdb3_write::{Precision, WriteBuffer}; + use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, IOxSessionContext}; + use iox_time::{MockProvider, Time, TimeProvider}; + use metric::Registry; + use object_store::memory::InMemory; + use object_store::ObjectStore; + use parquet_file::storage::{ParquetStorage, StorageId}; + use std::num::NonZeroUsize; + use std::sync::Arc; + use std::time::Duration; + + #[tokio::test] + async fn test_create_plugin() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + pem._write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + let empty_udf = r#"def example(iterator, output): + return"#; + + pem.insert_plugin( + "foo", + "my_plugin".to_string(), + empty_udf.to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + let plugin = pem + .catalog + .db_schema("foo") + .expect("should have db named foo") + .processing_engine_plugins + .get("my_plugin") + .unwrap() + .clone(); + let expected = PluginDefinition { + plugin_name: "my_plugin".to_string(), + code: empty_udf.to_string(), + plugin_type: PluginType::WalRows, + }; + assert_eq!(expected, plugin); + + // confirm that creating it again is a no-op. + pem.insert_plugin( + "foo", + "my_plugin".to_string(), + empty_udf.to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Confirm the same contents can be added to a new name. + pem.insert_plugin( + "foo", + "my_second_plugin".to_string(), + empty_udf.to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + Ok(()) + } + #[tokio::test] + async fn test_delete_plugin() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + // Create the DB by inserting a line. + pem._write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // First create a plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Then delete it + pem.delete_plugin("foo", "test_plugin").await.unwrap(); + + // Verify plugin is gone from schema + let schema = pem.catalog.db_schema("foo").unwrap(); + assert!(!schema.processing_engine_plugins.contains_key("test_plugin")); + + // Verify we can add a newly named plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def new_process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + Ok(()) + } + + #[tokio::test] + async fn test_delete_plugin_with_active_trigger() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + // Create the DB by inserting a line. + pem._write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // Create a plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Create a trigger using the plugin + pem.insert_trigger( + "foo", + "test_trigger".to_string(), + "test_plugin".to_string(), + TriggerSpecificationDefinition::AllTablesWalWrite, + false, + ) + .await + .unwrap(); + + // Try to delete the plugin - should fail because trigger exists + let result = pem.delete_plugin("foo", "test_plugin").await; + assert!(matches!( + result, + Err(ProcessingEngineError::CatalogUpdateError(catalog::Error::ProcessingEnginePluginInUse { + database_name, + plugin_name, + trigger_name, + })) if database_name == "foo" && plugin_name == "test_plugin" && trigger_name == "test_trigger" + )); + Ok(()) + } + + #[tokio::test] + async fn test_trigger_lifecycle() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + // convert to Arc + let write_buffer: Arc = Arc::clone(&pem._write_buffer); + + // Create the DB by inserting a line. + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // Create a plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Create an enabled trigger + pem.insert_trigger( + "foo", + "test_trigger".to_string(), + "test_plugin".to_string(), + TriggerSpecificationDefinition::AllTablesWalWrite, + false, + ) + .await + .unwrap(); + // Run the trigger + pem.run_trigger( + Arc::clone(&write_buffer), + Arc::clone(&pem._query_executor), + "foo", + "test_trigger", + ) + .await + .unwrap(); + + // Deactivate the trigger + let result = pem.deactivate_trigger("foo", "test_trigger").await; + assert!(result.is_ok()); + + // Verify trigger is disabled in schema + let schema = write_buffer.catalog().db_schema("foo").unwrap(); + let trigger = schema + .processing_engine_triggers + .get("test_trigger") + .unwrap(); + assert!(trigger.disabled); + + // Activate the trigger + let result = pem + .activate_trigger( + Arc::clone(&write_buffer), + Arc::clone(&pem._query_executor), + "foo", + "test_trigger", + ) + .await; + assert!(result.is_ok()); + + // Verify trigger is enabled and running + let schema = write_buffer.catalog().db_schema("foo").unwrap(); + let trigger = schema + .processing_engine_triggers + .get("test_trigger") + .unwrap(); + assert!(!trigger.disabled); + Ok(()) + } + + #[tokio::test] + async fn test_create_disabled_trigger() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + // Create the DB by inserting a line. + pem._write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // Create a plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Create a disabled trigger + pem.insert_trigger( + "foo", + "test_trigger".to_string(), + "test_plugin".to_string(), + TriggerSpecificationDefinition::AllTablesWalWrite, + true, + ) + .await + .unwrap(); + + // Verify trigger is created but disabled + let schema = pem.catalog.db_schema("foo").unwrap(); + let trigger = schema + .processing_engine_triggers + .get("test_trigger") + .unwrap(); + assert!(trigger.disabled); + + // Verify trigger is not in active triggers list + assert!(pem.catalog.triggers().is_empty()); + Ok(()) + } + + #[tokio::test] + async fn test_activate_nonexistent_trigger() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + let write_buffer: Arc = Arc::clone(&pem._write_buffer); + + // Create the DB by inserting a line. + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + let result = pem + .activate_trigger( + Arc::clone(&write_buffer), + Arc::clone(&pem._query_executor), + "foo", + "nonexistent_trigger", + ) + .await; + + assert!(matches!( + result, + Err(ProcessingEngineError::CatalogUpdateError(catalog::Error::ProcessingEngineTriggerNotFound { + database_name, + trigger_name, + })) if database_name == "foo" && trigger_name == "nonexistent_trigger" + )); + Ok(()) + } + + async fn setup( + start: Time, + object_store: Arc, + wal_config: WalConfig, + ) -> ProcessingEngineManagerImpl { + let time_provider: Arc = Arc::new(MockProvider::new(start)); + let metric_registry = Arc::new(Registry::new()); + let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); + let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap()); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let meta_cache = + MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog)) + .unwrap(); + let wbuf = WriteBufferImpl::new(WriteBufferImplArgs { + persister, + catalog: Arc::clone(&catalog), + last_cache, + meta_cache, + time_provider: Arc::clone(&time_provider), + executor: make_exec(), + wal_config, + parquet_cache: None, + metric_registry: Arc::clone(&metric_registry), + }) + .await + .unwrap(); + let ctx = IOxSessionContext::with_testing(); + let runtime_env = ctx.inner().runtime_env(); + register_iox_object_store(runtime_env, "influxdb3", Arc::clone(&object_store)); + + let qe = Arc::new(UnimplementedQueryExecutor); + let wal = wbuf.wal(); + + ProcessingEngineManagerImpl::new(None, catalog, wbuf, qe, time_provider, wal) + } + + pub(crate) fn make_exec() -> Arc { + let metrics = Arc::new(metric::Registry::default()); + let object_store: Arc = Arc::new(InMemory::new()); + + let parquet_store = ParquetStorage::new( + Arc::clone(&object_store), + StorageId::from("test_exec_storage"), + ); + Arc::new(Executor::new_with_config_and_executor( + ExecutorConfig { + target_query_partitions: NonZeroUsize::new(1).unwrap(), + object_stores: [&parquet_store] + .into_iter() + .map(|store| (store.id(), Arc::clone(store.object_store()))) + .collect(), + metric_registry: Arc::clone(&metrics), + // Default to 1gb + mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb) + }, + DedicatedExecutor::new_testing(), + )) + } +} diff --git a/influxdb3_processing_engine/src/manager.rs b/influxdb3_processing_engine/src/manager.rs new file mode 100644 index 00000000000..ee3087a891b --- /dev/null +++ b/influxdb3_processing_engine/src/manager.rs @@ -0,0 +1,85 @@ +use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_wal::{PluginType, TriggerSpecificationDefinition}; +use influxdb3_write::WriteBuffer; +use std::fmt::Debug; +use std::sync::Arc; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ProcessingEngineError { + #[error("database not found: {0}")] + DatabaseNotFound(String), + + #[error("catalog update error: {0}")] + CatalogUpdateError(#[from] influxdb3_catalog::catalog::Error), + + #[error("write buffer error: {0}")] + WriteBufferError(#[from] influxdb3_write::write_buffer::Error), + + #[error("wal error: {0}")] + WalError(#[from] influxdb3_wal::Error), +} + +/// `[ProcessingEngineManager]` is used to interact with the processing engine, +/// in particular plugins and triggers. +/// +#[async_trait::async_trait] +pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { + /// Inserts a plugin + async fn insert_plugin( + &self, + db: &str, + plugin_name: String, + code: String, + plugin_type: PluginType, + ) -> Result<(), ProcessingEngineError>; + + async fn delete_plugin(&self, db: &str, plugin_name: &str) + -> Result<(), ProcessingEngineError>; + + async fn insert_trigger( + &self, + db_name: &str, + trigger_name: String, + plugin_name: String, + trigger_specification: TriggerSpecificationDefinition, + disabled: bool, + ) -> Result<(), ProcessingEngineError>; + + async fn delete_trigger( + &self, + db_name: &str, + trigger_name: &str, + force: bool, + ) -> Result<(), ProcessingEngineError>; + + /// Starts running the trigger, which will run in the background. + async fn run_trigger( + &self, + write_buffer: Arc, + query_executor: Arc, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError>; + + async fn deactivate_trigger( + &self, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError>; + + async fn activate_trigger( + &self, + write_buffer: Arc, + query_executor: Arc, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError>; + + async fn test_wal_plugin( + &self, + request: WalPluginTestRequest, + query_executor: Arc, + ) -> Result; +} diff --git a/influxdb3_write/src/write_buffer/plugins.rs b/influxdb3_processing_engine/src/plugins.rs similarity index 71% rename from influxdb3_write/src/write_buffer/plugins.rs rename to influxdb3_processing_engine/src/plugins.rs index ed9c571d75d..06f2818aeda 100644 --- a/influxdb3_write/src/write_buffer/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -1,11 +1,22 @@ -use crate::write_buffer::{plugins, PluginEvent}; -use crate::{write_buffer, WriteBuffer}; +#[cfg(feature = "system-py")] +use crate::PluginEvent; +#[cfg(feature = "system-py")] use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +#[cfg(feature = "system-py")] use influxdb3_internal_api::query_executor::QueryExecutor; -use influxdb3_wal::{PluginType, TriggerDefinition, TriggerSpecificationDefinition}; +#[cfg(feature = "system-py")] +use influxdb3_wal::TriggerDefinition; +#[cfg(feature = "system-py")] +use influxdb3_wal::TriggerSpecificationDefinition; +use influxdb3_write::write_buffer; +#[cfg(feature = "system-py")] +use influxdb3_write::WriteBuffer; +use observability_deps::tracing::error; use std::fmt::Debug; +#[cfg(feature = "system-py")] use std::sync::Arc; use thiserror::Error; +#[cfg(feature = "system-py")] use tokio::sync::mpsc; #[derive(Debug, Error)] @@ -33,176 +44,148 @@ pub enum Error { ReadPluginError(#[from] std::io::Error), } -/// `[ProcessingEngineManager]` is used to interact with the processing engine, -/// in particular plugins and triggers. -/// -#[async_trait::async_trait] -pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { - /// Inserts a plugin - async fn insert_plugin( - &self, - db: &str, - plugin_name: String, - code: String, - function_name: String, - plugin_type: PluginType, - ) -> crate::Result<(), write_buffer::Error>; - - async fn delete_plugin( - &self, - db: &str, - plugin_name: &str, - ) -> crate::Result<(), write_buffer::Error>; - - async fn insert_trigger( - &self, - db_name: &str, - trigger_name: String, - plugin_name: String, - trigger_specification: TriggerSpecificationDefinition, - disabled: bool, - ) -> crate::Result<(), write_buffer::Error>; - - async fn delete_trigger( - &self, - db_name: &str, - trigger_name: &str, - force: bool, - ) -> crate::Result<(), write_buffer::Error>; - - /// Starts running the trigger, which will run in the background. - async fn run_trigger( - &self, - write_buffer: Arc, - db_name: &str, - trigger_name: &str, - ) -> crate::Result<(), write_buffer::Error>; - - async fn deactivate_trigger( - &self, - db_name: &str, - trigger_name: &str, - ) -> Result<(), write_buffer::Error>; - - async fn activate_trigger( - &self, - write_buffer: Arc, - db_name: &str, - trigger_name: &str, - ) -> Result<(), write_buffer::Error>; - - async fn test_wal_plugin( - &self, - request: WalPluginTestRequest, - query_executor: Arc, - ) -> crate::Result; -} - #[cfg(feature = "system-py")] pub(crate) fn run_plugin( db_name: String, trigger_definition: TriggerDefinition, - mut context: PluginContext, + context: PluginContext, ) { let trigger_plugin = TriggerPlugin { trigger_definition, db_name, + write_buffer: context.write_buffer, + query_executor: context.query_executor, }; tokio::task::spawn(async move { trigger_plugin - .run_plugin(&mut context) + .run_plugin(context.trigger_rx) .await .expect("trigger plugin failed"); }); } +#[cfg(feature = "system-py")] pub(crate) struct PluginContext { // tokio channel for inputs pub(crate) trigger_rx: mpsc::Receiver, // handler to write data back to the DB. pub(crate) write_buffer: Arc, + // query executor to hand off to the plugin + pub(crate) query_executor: Arc, } +#[cfg(feature = "system-py")] #[async_trait::async_trait] trait RunnablePlugin { // Returns true if it should exit - async fn process_event( + async fn process_event(&self, event: PluginEvent) -> Result; + async fn run_plugin( &self, - event: PluginEvent, - write_buffer: Arc, - ) -> Result; - async fn run_plugin(&self, context: &mut PluginContext) -> Result<(), Error> { - while let Some(event) = context.trigger_rx.recv().await { - if self - .process_event(event, context.write_buffer.clone()) - .await? - { - break; - } - } - Ok(()) - } + receiver: tokio::sync::mpsc::Receiver, + ) -> Result<(), Error>; } + +#[cfg(feature = "system-py")] #[derive(Debug)] struct TriggerPlugin { trigger_definition: TriggerDefinition, db_name: String, + write_buffer: Arc, + query_executor: Arc, } #[cfg(feature = "system-py")] mod python_plugin { use super::*; - use crate::Precision; + use anyhow::Context; use data_types::NamespaceName; - use influxdb3_py_api::system_py::PyWriteBatch; + use hashbrown::HashMap; + use influxdb3_py_api::system_py::execute_python_with_batch; use influxdb3_wal::WalOp; + use influxdb3_write::Precision; use iox_time::Time; + use observability_deps::tracing::warn; use std::time::SystemTime; + use tokio::sync::mpsc::Receiver; #[async_trait::async_trait] impl RunnablePlugin for TriggerPlugin { - async fn process_event( - &self, - event: PluginEvent, - write_buffer: Arc, - ) -> Result { - let Some(schema) = write_buffer.catalog().db_schema(self.db_name.as_str()) else { + async fn run_plugin(&self, mut receiver: Receiver) -> Result<(), Error> { + loop { + let event = match receiver.recv().await { + Some(event) => event, + None => { + warn!(?self.trigger_definition, "trigger plugin receiver closed"); + break; + } + }; + + match self.process_event(event).await { + Ok(stop) => { + if stop { + break; + } + } + Err(e) => { + error!(?self.trigger_definition, "error processing event: {}", e); + } + } + } + + Ok(()) + } + async fn process_event(&self, event: PluginEvent) -> Result { + let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { return Err(Error::MissingDb); }; - let mut output_lines = Vec::new(); + let mut db_writes: HashMap> = HashMap::new(); match event { PluginEvent::WriteWalContents(wal_contents) => { for wal_op in &wal_contents.ops { match wal_op { WalOp::Write(write_batch) => { - let py_write_batch = PyWriteBatch { - // TODO: don't clone the write batch - write_batch: write_batch.clone(), - schema: Arc::clone(&schema), - }; - match &self.trigger_definition.trigger { + // determine if this write batch is for this database + if write_batch.database_name.as_ref() + != self.trigger_definition.database_name + { + continue; + } + let table_filter = match &self.trigger_definition.trigger { + TriggerSpecificationDefinition::AllTablesWalWrite => { + // no filter + None + } TriggerSpecificationDefinition::SingleTableWalWrite { table_name, } => { - output_lines.extend(py_write_batch.call_against_table( - table_name, - &self.trigger_definition.plugin.code, - &self.trigger_definition.plugin.function_name, - )?); - } - TriggerSpecificationDefinition::AllTablesWalWrite => { - for table in schema.table_map.right_values() { - output_lines.extend( - py_write_batch.call_against_table( - table.as_ref(), - &self.trigger_definition.plugin.code, - &self.trigger_definition.plugin.function_name, - )?, - ); - } + let table_id = schema + .table_name_to_id(table_name.as_ref()) + .context("table not found")?; + Some(table_id) } + }; + + let result = execute_python_with_batch( + &self.trigger_definition.plugin.code, + write_batch, + Arc::clone(&schema), + Arc::clone(&self.query_executor), + table_filter, + None, + )?; + + // write the output lines to the appropriate database + if !result.write_back_lines.is_empty() { + let lines = + db_writes.entry_ref(schema.name.as_ref()).or_default(); + lines.extend(result.write_back_lines); + } + + for (db_name, add_lines) in result.write_db_lines { + let lines = db_writes.entry(db_name).or_default(); + lines.extend(add_lines); } } WalOp::Catalog(_) => {} @@ -215,19 +198,22 @@ mod python_plugin { return Ok(true); } } - if !output_lines.is_empty() { - let ingest_time = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - write_buffer - .write_lp( - NamespaceName::new(self.db_name.to_string()).unwrap(), - output_lines.join("\n").as_str(), - Time::from_timestamp_nanos(ingest_time.as_nanos() as i64), - false, - Precision::Nanosecond, - ) - .await?; + + if !db_writes.is_empty() { + for (db_name, output_lines) in db_writes { + let ingest_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + self.write_buffer + .write_lp( + NamespaceName::new(db_name).unwrap(), + output_lines.join("\n").as_str(), + Time::from_timestamp_nanos(ingest_time.as_nanos() as i64), + false, + Precision::Nanosecond, + ) + .await?; + } } Ok(false) @@ -243,10 +229,10 @@ pub(crate) fn run_test_wal_plugin( code: String, request: WalPluginTestRequest, ) -> Result { - use crate::write_buffer::validator::WriteValidator; - use crate::Precision; use data_types::NamespaceName; use influxdb3_wal::Gen1Duration; + use influxdb3_write::write_buffer::validator::WriteValidator; + use influxdb3_write::Precision; let database = request.database; let namespace = NamespaceName::new(database.clone()) @@ -271,6 +257,7 @@ pub(crate) fn run_test_wal_plugin( &data.valid_data, db, query_executor, + None, request.input_arguments, )?; @@ -358,11 +345,11 @@ pub(crate) fn run_test_wal_plugin( #[cfg(test)] mod tests { use super::*; - use crate::write_buffer::validator::WriteValidator; - use crate::Precision; use data_types::NamespaceName; use influxdb3_catalog::catalog::Catalog; use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor; + use influxdb3_write::write_buffer::validator::WriteValidator; + use influxdb3_write::Precision; use iox_time::Time; use std::collections::HashMap; diff --git a/influxdb3_py_api/Cargo.toml b/influxdb3_py_api/Cargo.toml index aab94b4c36c..72e2d12c83c 100644 --- a/influxdb3_py_api/Cargo.toml +++ b/influxdb3_py_api/Cargo.toml @@ -11,11 +11,11 @@ system-py = ["pyo3"] [dependencies] arrow-array.workspace = true arrow-schema.workspace = true +influxdb3_id = { path = "../influxdb3_id" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_catalog = {path = "../influxdb3_catalog"} influxdb3_internal_api = { path = "../influxdb3_internal_api" } iox_query_params.workspace = true -schema.workspace = true parking_lot.workspace = true futures.workspace = true tokio.workspace = true diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index 1bc87c2e885..1267807a5f8 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -5,197 +5,22 @@ use arrow_array::{ }; use arrow_schema::DataType; use futures::TryStreamExt; -use influxdb3_catalog::catalog::{DatabaseSchema, TableDefinition}; +use influxdb3_catalog::catalog::DatabaseSchema; +use influxdb3_id::TableId; use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind}; -use influxdb3_wal::{FieldData, Row, WriteBatch}; +use influxdb3_wal::{FieldData, WriteBatch}; use iox_query_params::StatementParams; use parking_lot::Mutex; use pyo3::exceptions::PyValueError; -use pyo3::prelude::{PyAnyMethods, PyModule, PyModuleMethods}; +use pyo3::prelude::{PyAnyMethods, PyModule}; use pyo3::types::{PyDict, PyList}; use pyo3::{ - pyclass, pymethods, pymodule, Bound, IntoPyObject, Py, PyAny, PyErr, PyObject, PyResult, Python, + pyclass, pymethods, pymodule, Bound, IntoPyObject, Py, PyAny, PyObject, PyResult, Python, }; -use schema::InfluxColumnType; use std::collections::HashMap; use std::ffi::CString; use std::sync::Arc; -#[pyclass] -#[derive(Debug)] -pub struct PyWriteBatchIterator { - table_definition: Arc, - rows: Vec, - current_index: usize, -} - -#[pymethods] -impl PyWriteBatchIterator { - fn next_point(&mut self) -> PyResult> { - if self.current_index >= self.rows.len() { - return Ok(None); - } - - Python::with_gil(|py| { - let row = &self.rows[self.current_index]; - self.current_index += 1; - - // Import Point class - let point_class = py - .import("influxdb_client_3.write_client.client.write.point")? - .getattr("Point") - .unwrap(); - - // Create new Point instance with measurement name (table name) - let point = point_class.call1((self.table_definition.table_name.as_ref(),))?; - - // Set timestamp - point.call_method1("time", (row.time,))?; - - // Add fields based on column definitions and field data - for field in &row.fields { - if let Some(col_def) = self.table_definition.columns.get(&field.id) { - let field_name = col_def.name.as_ref(); - - match col_def.data_type { - InfluxColumnType::Tag => { - let FieldData::Tag(tag) = &field.value else { - // error out because we expect a tag - return Err(PyValueError::new_err(format!( - "expect FieldData:Tag for tagged columns, not ${:?}", - field - ))); - }; - point.call_method1("tag", (field_name, tag.as_str()))?; - } - InfluxColumnType::Timestamp => {} - InfluxColumnType::Field(_) => { - match &field.value { - FieldData::String(s) => { - point.call_method1("field", (field_name, s.as_str()))? - } - FieldData::Integer(i) => { - point.call_method1("field", (field_name, *i))? - } - FieldData::UInteger(u) => { - point.call_method1("field", (field_name, *u))? - } - FieldData::Float(f) => { - point.call_method1("field", (field_name, *f))? - } - FieldData::Boolean(b) => { - point.call_method1("field", (field_name, *b))? - } - FieldData::Tag(t) => { - point.call_method1("field", (field_name, t.as_str()))? - } - FieldData::Key(k) => { - point.call_method1("field", (field_name, k.as_str()))? - } - FieldData::Timestamp(ts) => { - point.call_method1("field", (field_name, *ts))? - } - }; - } - } - } - } - - Ok(Some(point.into_pyobject(py)?.unbind())) - }) - } -} - -#[pyclass] -#[derive(Debug)] -pub struct PyWriteBatch { - pub write_batch: WriteBatch, - pub schema: Arc, -} - -#[pymethods] -impl PyWriteBatch { - fn get_iterator_for_table(&self, table_name: &str) -> PyResult> { - // Find table ID from name - let table_id = self - .schema - .table_map - .get_by_right(&Arc::from(table_name)) - .ok_or_else(|| { - PyErr::new::(format!("Table '{}' not found", table_name)) - })?; - - // Get table chunks - let Some(chunks) = self.write_batch.table_chunks.get(table_id) else { - return Ok(None); - }; - - // Get table definition - let table_def = self.schema.tables.get(table_id).ok_or_else(|| { - PyErr::new::(format!( - "Table definition not found for '{}'", - table_name - )) - })?; - - Ok(Some(PyWriteBatchIterator { - table_definition: Arc::clone(table_def), - // TODO: avoid copying all the data at once. - rows: chunks - .chunk_time_to_chunk - .values() - .flat_map(|chunk| chunk.rows.clone()) - .collect(), - current_index: 0, - })) - } -} - -#[derive(Debug)] -#[pyclass] -pub struct PyLineProtocolOutput { - lines: Arc>>, -} - -#[pymethods] -impl PyLineProtocolOutput { - fn insert_line_protocol(&mut self, line: &str) -> PyResult<()> { - let mut lines = self.lines.lock(); - lines.push(line.to_string()); - Ok(()) - } -} - -impl PyWriteBatch { - pub fn call_against_table( - &self, - table_name: &str, - setup_code: &str, - call_site: &str, - ) -> PyResult> { - let Some(iterator) = self.get_iterator_for_table(table_name)? else { - return Ok(Vec::new()); - }; - - Python::with_gil(|py| { - py.run(&CString::new(setup_code)?, None, None)?; - let py_func = py.eval(&CString::new(call_site)?, None, None)?; - - // Create the output collector with shared state - let lines = Arc::new(Mutex::new(Vec::new())); - let output = PyLineProtocolOutput { - lines: Arc::clone(&lines), - }; - - // Pass both iterator and output collector to the Python function - py_func.call1((iterator, output.into_pyobject(py)?))?; - - let output_lines = lines.lock().clone(); - Ok(output_lines) - }) - } -} - #[pyclass] #[derive(Debug)] struct PyPluginCallApi { @@ -515,6 +340,7 @@ pub fn execute_python_with_batch( write_batch: &WriteBatch, schema: Arc, query_executor: Arc, + table_filter: Option, args: Option>, ) -> PyResult { Python::with_gil(|py| { @@ -531,6 +357,11 @@ pub fn execute_python_with_batch( let mut table_batches = Vec::with_capacity(write_batch.table_chunks.len()); for (table_id, table_chunks) in &write_batch.table_chunks { + if let Some(table_filter) = table_filter { + if table_id != &table_filter { + continue; + } + } let table_def = schema.tables.get(table_id).unwrap(); let dict = PyDict::new(py); @@ -621,8 +452,6 @@ pub fn execute_python_with_batch( // Module initialization #[pymodule] -fn influxdb3_py_api(m: &Bound<'_, PyModule>) -> PyResult<()> { - m.add_class::()?; - m.add_class::()?; +fn influxdb3_py_api(_m: &Bound<'_, PyModule>) -> PyResult<()> { Ok(()) } diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index c5aca260530..a0a26b37a8d 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -36,6 +36,7 @@ influxdb3_client = { path = "../influxdb3_client" } influxdb3_id = { path = "../influxdb3_id" } influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_process = { path = "../influxdb3_process", default-features = false } +influxdb3_processing_engine = { path = "../influxdb3_processing_engine" } influxdb3_wal = { path = "../influxdb3_wal"} influxdb3_write = { path = "../influxdb3_write" } iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" } diff --git a/influxdb3_server/src/builder.rs b/influxdb3_server/src/builder.rs index 56edfa14e17..42804822e4a 100644 --- a/influxdb3_server/src/builder.rs +++ b/influxdb3_server/src/builder.rs @@ -3,7 +3,9 @@ use std::sync::Arc; use crate::{auth::DefaultAuthorizer, http::HttpApi, CommonServerState, Server}; use authz::Authorizer; use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_processing_engine::ProcessingEngineManagerImpl; use influxdb3_write::{persister::Persister, WriteBuffer}; +use iox_time::TimeProvider; use tokio::net::TcpListener; #[derive(Debug)] @@ -144,17 +146,26 @@ impl ServerBuilder { } } -impl +impl ServerBuilder, WithListener> { pub fn build(self) -> Server { let persister = Arc::clone(&self.persister.0); let authorizer = Arc::clone(&self.authorizer); + let processing_engine = Arc::new(ProcessingEngineManagerImpl::new( + self.common_state.plugin_dir.clone(), + self.write_buffer.0.catalog(), + Arc::clone(&self.write_buffer.0), + Arc::clone(&self.query_executor.0), + Arc::clone(&self.time_provider.0) as _, + self.write_buffer.0.wal(), + )); let http = Arc::new(HttpApi::new( self.common_state.clone(), Arc::clone(&self.time_provider.0), Arc::clone(&self.write_buffer.0), Arc::clone(&self.query_executor.0), + processing_engine, self.max_request_size, Arc::clone(&authorizer), )); diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 175025992bf..cc3d22bffa7 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -24,6 +24,7 @@ use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MaxAge, MaxCardinal use influxdb3_catalog::catalog::Error as CatalogError; use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind}; use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION}; +use influxdb3_processing_engine::manager::ProcessingEngineManager; use influxdb3_wal::{PluginType, TriggerSpecificationDefinition}; use influxdb3_write::persister::TrackedMemoryArrowWriter; use influxdb3_write::write_buffer::Error as WriteBufferError; @@ -216,7 +217,10 @@ pub enum Error { PythonPluginsNotEnabled, #[error("Plugin error")] - Plugin(#[from] influxdb3_write::write_buffer::plugins::Error), + Plugin(#[from] influxdb3_processing_engine::plugins::Error), + + #[error("Processing engine error: {0}")] + ProcessingEngine(#[from] influxdb3_processing_engine::manager::ProcessingEngineError), } #[derive(Debug, Error)] @@ -439,6 +443,7 @@ pub type Result = std::result::Result; pub(crate) struct HttpApi { common_state: CommonServerState, write_buffer: Arc, + processing_engine: Arc, time_provider: Arc, pub(crate) query_executor: Arc, max_request_bytes: usize, @@ -452,6 +457,7 @@ impl HttpApi { time_provider: Arc, write_buffer: Arc, query_executor: Arc, + processing_engine: Arc, max_request_bytes: usize, authorizer: Arc, ) -> Self { @@ -464,6 +470,7 @@ impl HttpApi { max_request_bytes, authorizer, legacy_write_param_unifier, + processing_engine, } } } @@ -981,15 +988,14 @@ where db, plugin_name, code, - function_name, plugin_type, } = if let Some(query) = req.uri().query() { serde_urlencoded::from_str(query)? } else { self.read_body_json(req).await? }; - self.write_buffer - .insert_plugin(&db, plugin_name, code, function_name, plugin_type) + self.processing_engine + .insert_plugin(&db, plugin_name, code, plugin_type) .await?; Ok(Response::builder() @@ -1004,7 +1010,9 @@ where } else { self.read_body_json(req).await? }; - self.write_buffer.delete_plugin(&db, &plugin_name).await?; + self.processing_engine + .delete_plugin(&db, &plugin_name) + .await?; Ok(Response::builder() .status(StatusCode::OK) .body(Body::empty())?) @@ -1034,7 +1042,7 @@ where }, )); }; - self.write_buffer + self.processing_engine .insert_trigger( db.as_str(), trigger_name.clone(), @@ -1044,9 +1052,10 @@ where ) .await?; if !disabled { - self.write_buffer + self.processing_engine .run_trigger( Arc::clone(&self.write_buffer), + Arc::clone(&self.query_executor), db.as_str(), trigger_name.as_str(), ) @@ -1067,7 +1076,7 @@ where } else { self.read_body_json(req).await? }; - self.write_buffer + self.processing_engine .delete_trigger(&db, &trigger_name, force) .await?; Ok(Response::builder() @@ -1081,7 +1090,7 @@ where ) -> Result> { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; - self.write_buffer + self.processing_engine .deactivate_trigger(delete_req.db.as_str(), delete_req.trigger_name.as_str()) .await?; Ok(Response::builder() @@ -1094,9 +1103,10 @@ where ) -> Result> { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; - self.write_buffer + self.processing_engine .activate_trigger( Arc::clone(&self.write_buffer), + Arc::clone(&self.query_executor), delete_req.db.as_str(), delete_req.trigger_name.as_str(), ) @@ -1139,7 +1149,7 @@ where self.read_body_json(req).await?; let output = self - .write_buffer + .processing_engine .test_wal_plugin(request, Arc::clone(&self.query_executor)) .await?; let body = serde_json::to_string(&output)?; @@ -1579,7 +1589,6 @@ struct ProcessingEnginePluginCreateRequest { db: String, plugin_name: String, code: String, - function_name: String, plugin_type: PluginType, } diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 60b5879623f..736bbf7ac96 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -35,6 +35,7 @@ use observability_deps::tracing::info; use service::hybrid; use std::convert::Infallible; use std::fmt::Debug; +use std::path::PathBuf; use std::sync::Arc; use thiserror::Error; use tokio::net::TcpListener; @@ -78,6 +79,7 @@ pub struct CommonServerState { trace_exporter: Option>, trace_header_parser: TraceHeaderParser, telemetry_store: Arc, + plugin_dir: Option, } impl CommonServerState { @@ -86,12 +88,14 @@ impl CommonServerState { trace_exporter: Option>, trace_header_parser: TraceHeaderParser, telemetry_store: Arc, + plugin_dir: Option, ) -> Result { Ok(Self { metrics, trace_exporter, trace_header_parser, telemetry_store, + plugin_dir, }) } @@ -774,7 +778,6 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(parquet_cache), metric_registry: Arc::clone(&metrics), - plugin_dir: None, }, ) .await @@ -793,6 +796,7 @@ mod tests { None, trace_header_parser, Arc::clone(&sample_telem_store), + None, ) .unwrap(); let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs { diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 05e3f7fb5af..26032a8f981 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -713,7 +713,6 @@ mod tests { }, parquet_cache: Some(parquet_cache), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap(); diff --git a/influxdb3_server/src/system_tables/python_call.rs b/influxdb3_server/src/system_tables/python_call.rs index 9ae58292044..fa1d6e43996 100644 --- a/influxdb3_server/src/system_tables/python_call.rs +++ b/influxdb3_server/src/system_tables/python_call.rs @@ -17,7 +17,6 @@ pub(super) struct ProcessingEnginePluginTable { fn plugin_schema() -> SchemaRef { let columns = vec![ Field::new("plugin_name", DataType::Utf8, false), - Field::new("function_name", DataType::Utf8, false), Field::new("code", DataType::Utf8, false), Field::new("plugin_type", DataType::Utf8, false), ]; @@ -51,12 +50,6 @@ impl IoxSystemTable for ProcessingEnginePluginTable { .map(|call| Some(call.plugin_name.clone())) .collect::(), ), - Arc::new( - self.plugins - .iter() - .map(|p| Some(p.function_name.clone())) - .collect::(), - ), Arc::new( self.plugins .iter() diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 80073e615d8..a906ca4db08 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -120,13 +120,13 @@ pub trait Wal: Debug + Send + Sync + 'static { #[async_trait] pub trait WalFileNotifier: Debug + Send + Sync + 'static { /// Notify the handler that a new WAL file has been persisted with the given contents. - async fn notify(&self, write: WalContents); + async fn notify(&self, write: Arc); /// Notify the handler that a new WAL file has been persisted with the given contents and tell /// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete. async fn notify_and_snapshot( &self, - write: WalContents, + write: Arc, snapshot_details: SnapshotDetails, ) -> oneshot::Receiver; @@ -311,8 +311,8 @@ impl OrderedCatalogBatch { self.database_sequence_number } - pub fn batch(self) -> CatalogBatch { - self.catalog + pub fn batch(&self) -> &CatalogBatch { + &self.catalog } } @@ -614,7 +614,6 @@ pub struct MetaCacheDelete { pub struct PluginDefinition { pub plugin_name: String, pub code: String, - pub function_name: String, pub plugin_type: PluginType, } @@ -633,6 +632,7 @@ pub enum PluginType { pub struct TriggerDefinition { pub trigger_name: String, pub plugin_name: String, + pub database_name: String, pub trigger: TriggerSpecificationDefinition, // TODO: decide whether this should be populated from a reference rather than stored on its own. pub plugin: PluginDefinition, diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 6efd79e1bcc..0d80dca5362 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -145,7 +145,7 @@ impl WalObjectStore { match wal_contents.snapshot { // This branch uses so much time - None => self.file_notifier.notify(wal_contents).await, + None => self.file_notifier.notify(Arc::new(wal_contents)).await, Some(snapshot_details) => { let snapshot_info = { let mut buffer = self.flush_buffer.lock().await; @@ -162,11 +162,11 @@ impl WalObjectStore { if snapshot_details.snapshot_sequence_number <= last_snapshot_sequence_number { // Instead just notify about the WAL, as this snapshot has already been taken // and WAL files may have been cleared. - self.file_notifier.notify(wal_contents).await; + self.file_notifier.notify(Arc::new(wal_contents)).await; } else { let snapshot_done = self .file_notifier - .notify_and_snapshot(wal_contents, snapshot_details) + .notify_and_snapshot(Arc::new(wal_contents), snapshot_details) .await; let details = snapshot_done.await.unwrap(); assert_eq!(snapshot_details, details); @@ -299,7 +299,7 @@ impl WalObjectStore { info!(?snapshot_details, "snapshotting wal"); let snapshot_done = self .file_notifier - .notify_and_snapshot(wal_contents, snapshot_details) + .notify_and_snapshot(Arc::new(wal_contents), snapshot_details) .await; let (snapshot_info, snapshot_permit) = snapshot.expect("snapshot should be set when snapshot details are set"); @@ -310,7 +310,7 @@ impl WalObjectStore { "notify sent to buffer for wal file {}", wal_contents.wal_file_number.as_u64() ); - self.file_notifier.notify(wal_contents).await; + self.file_notifier.notify(Arc::new(wal_contents)).await; None } }; @@ -966,10 +966,11 @@ mod tests { { let notified_writes = replay_notifier.notified_writes.lock(); - assert_eq!( - *notified_writes, - vec![file_1_contents.clone(), file_2_contents.clone()] - ); + let notified_refs = notified_writes + .iter() + .map(|x| x.as_ref()) + .collect::>(); + assert_eq!(notified_refs, vec![&file_1_contents, &file_2_contents]); } assert!(replay_notifier.snapshot_details.lock().is_none()); @@ -1067,8 +1068,12 @@ mod tests { { let notified_writes = notifier.notified_writes.lock(); - let expected_writes = vec![file_1_contents, file_2_contents, file_3_contents.clone()]; - assert_eq!(*notified_writes, expected_writes); + let notified_refs = notified_writes + .iter() + .map(|x| x.as_ref()) + .collect::>(); + let expected_writes = vec![&file_1_contents, &file_2_contents, &file_3_contents]; + assert_eq!(notified_refs, expected_writes); let details = notifier.snapshot_details.lock(); assert_eq!(details.unwrap(), expected_info); } @@ -1097,7 +1102,11 @@ mod tests { .downcast_ref::() .unwrap(); let notified_writes = replay_notifier.notified_writes.lock(); - assert_eq!(*notified_writes, vec![file_3_contents.clone()]); + let notified_refs = notified_writes + .iter() + .map(|x| x.as_ref()) + .collect::>(); + assert_eq!(notified_refs, vec![&file_3_contents]); let snapshot_details = replay_notifier.snapshot_details.lock(); assert_eq!(*snapshot_details, file_3_contents.snapshot); } @@ -1303,19 +1312,19 @@ mod tests { #[derive(Debug, Default)] struct TestNotifier { - notified_writes: parking_lot::Mutex>, + notified_writes: parking_lot::Mutex>>, snapshot_details: parking_lot::Mutex>, } #[async_trait] impl WalFileNotifier for TestNotifier { - async fn notify(&self, write: WalContents) { + async fn notify(&self, write: Arc) { self.notified_writes.lock().push(write); } async fn notify_and_snapshot( &self, - write: WalContents, + write: Arc, snapshot_details: SnapshotDetails, ) -> Receiver { self.notified_writes.lock().push(write); diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 823cdb3a6dd..82b93f64ec4 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -24,8 +24,8 @@ use influxdb3_id::ParquetFileId; use influxdb3_id::SerdeVecMap; use influxdb3_id::TableId; use influxdb3_id::{ColumnId, DbId}; -use influxdb3_wal::MetaCacheDefinition; use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber}; +use influxdb3_wal::{MetaCacheDefinition, Wal}; use iox_query::QueryChunk; use iox_time::Time; use serde::{Deserialize, Serialize}; @@ -33,7 +33,6 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use write_buffer::plugins::ProcessingEngineManager; #[derive(Debug, Error)] pub enum Error { @@ -50,12 +49,7 @@ pub enum Error { pub type Result = std::result::Result; pub trait WriteBuffer: - Bufferer - + ChunkContainer - + MetaCacheManager - + LastCacheManager - + DatabaseManager - + ProcessingEngineManager + Bufferer + ChunkContainer + MetaCacheManager + LastCacheManager + DatabaseManager { } @@ -95,6 +89,9 @@ pub trait Bufferer: Debug + Send + Sync + 'static { /// Returns the database schema provider fn catalog(&self) -> Arc; + /// Reutrns the WAL this bufferer is using + fn wal(&self) -> Arc; + /// Returns the parquet files for a given database and table fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec; diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index e1a53c2e6d4..c0153f2d7b2 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -489,7 +489,7 @@ mod tests { persister.persist_catalog(&catalog).await.unwrap(); let batch = |name: &str, num: u32| { - let _ = catalog.apply_catalog_batch(CatalogBatch { + let _ = catalog.apply_catalog_batch(&CatalogBatch { database_id: db_schema.id, database_name: Arc::clone(&db_schema.name), time_ns: 5000, diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 26928d15fd7..914d7a92cf2 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -2,8 +2,6 @@ mod metrics; pub mod persisted_files; -#[allow(dead_code)] -pub mod plugins; pub mod queryable_buffer; mod table_buffer; pub mod validator; @@ -12,13 +10,11 @@ use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::queryable_buffer::QueryableBuffer; use crate::write_buffer::validator::WriteValidator; -use crate::{chunk::ParquetChunk, write_buffer, DatabaseManager}; +use crate::{chunk::ParquetChunk, DatabaseManager}; use crate::{ BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, MetaCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError, }; -#[cfg(feature = "system-py")] -use anyhow::Context; use async_trait::async_trait; use data_types::{ ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError, PartitionHashId, @@ -31,22 +27,17 @@ use datafusion::logical_expr::Expr; use influxdb3_cache::last_cache::{self, LastCacheProvider}; use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MetaCacheProvider}; use influxdb3_cache::parquet_cache::ParquetCacheOracle; -use influxdb3_catalog::catalog; -use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{ColumnId, DbId, TableId}; -use influxdb3_wal::{ - object_store::WalObjectStore, DeleteDatabaseDefinition, DeleteTriggerDefinition, - PluginDefinition, PluginType, TriggerDefinition, TriggerSpecificationDefinition, WalContents, -}; +use influxdb3_wal::FieldDataType; +use influxdb3_wal::TableDefinition; +use influxdb3_wal::{object_store::WalObjectStore, DeleteDatabaseDefinition}; use influxdb3_wal::{ CatalogBatch, CatalogOp, LastCacheDefinition, LastCacheDelete, LastCacheSize, MetaCacheDefinition, MetaCacheDelete, Wal, WalConfig, WalFileNotifier, WalOp, }; use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition}; use influxdb3_wal::{DatabaseDefinition, FieldDefinition}; -use influxdb3_wal::{DeletePluginDefinition, TableDefinition}; -use influxdb3_wal::{FieldDataType, TriggerIdentifier}; use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges}; use iox_query::QueryChunk; use iox_time::{Time, TimeProvider}; @@ -56,21 +47,13 @@ use object_store::path::Path as ObjPath; use object_store::{ObjectMeta, ObjectStore}; use observability_deps::tracing::{debug, warn}; use parquet_file::storage::ParquetExecInput; -use plugins::ProcessingEngineManager; use queryable_buffer::QueryableBufferArgs; use schema::Schema; -use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::sync::oneshot; use tokio::sync::watch::Receiver; -#[cfg(feature = "system-py")] -use crate::write_buffer::plugins::PluginContext; -use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; -use influxdb3_internal_api::query_executor::QueryExecutor; - #[derive(Debug, Error)] pub enum Error { #[error("parsing for line protocol failed")] @@ -163,8 +146,6 @@ pub struct WriteBufferImpl { metrics: WriteMetrics, meta_cache: Arc, last_cache: Arc, - #[allow(dead_code)] - plugin_dir: Option, } /// The maximum number of snapshots to load on start @@ -181,7 +162,6 @@ pub struct WriteBufferImplArgs { pub wal_config: WalConfig, pub parquet_cache: Option>, pub metric_registry: Arc, - pub plugin_dir: Option, } impl WriteBufferImpl { @@ -196,7 +176,6 @@ impl WriteBufferImpl { wal_config, parquet_cache, metric_registry, - plugin_dir, }: WriteBufferImplArgs, ) -> Result> { // load snapshots and replay the wal into the in memory buffer @@ -255,15 +234,7 @@ impl WriteBufferImpl { persisted_files, buffer: queryable_buffer, metrics: WriteMetrics::new(&metric_registry), - plugin_dir, }); - let write_buffer: Arc = result.clone(); - let triggers = result.catalog().triggers(); - for (db_name, trigger_name) in triggers { - result - .run_trigger(Arc::clone(&write_buffer), &db_name, &trigger_name) - .await?; - } Ok(result) } @@ -271,6 +242,10 @@ impl WriteBufferImpl { Arc::clone(&self.catalog) } + pub fn wal(&self) -> Arc { + Arc::clone(&self.wal) + } + pub fn persisted_files(&self) -> Arc { Arc::clone(&self.persisted_files) } @@ -375,13 +350,6 @@ impl WriteBufferImpl { Ok(chunks) } - - #[cfg(feature = "system-py")] - fn read_plugin_code(&self, name: &str) -> Result { - let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?; - let path = plugin_dir.join(name); - Ok(std::fs::read_to_string(path)?) - } } pub fn parquet_chunk_from_file( @@ -450,6 +418,10 @@ impl Bufferer for WriteBufferImpl { self.catalog() } + fn wal(&self) -> Arc { + Arc::clone(&self.wal) + } + fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec { self.buffer.persisted_parquet_files(db_id, table_id) } @@ -496,7 +468,7 @@ impl MetaCacheManager for WriteBufferImpl { time_ns: self.time_provider.now().timestamp_nanos(), ops: vec![catalog_op], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -526,7 +498,7 @@ impl MetaCacheManager for WriteBufferImpl { cache_name: cache_name.into(), })], }; - if let Some(catalog_batch) = catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -592,7 +564,7 @@ impl LastCacheManager for WriteBufferImpl { database_name: Arc::clone(&db_schema.name), ops: vec![CreateLastCache(info.clone())], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -627,7 +599,7 @@ impl LastCacheManager for WriteBufferImpl { // NOTE: if this fails then the cache will be gone from the running server, but will be // resurrected on server restart. - if let Some(catalog_batch) = catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -656,7 +628,7 @@ impl DatabaseManager for WriteBufferImpl { database_name: Arc::clone(&db_schema.name), })], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -684,7 +656,7 @@ impl DatabaseManager for WriteBufferImpl { deletion_time: deletion_time.timestamp_nanos(), })], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { let wal_op = WalOp::Catalog(catalog_batch); self.wal.write_ops(vec![wal_op]).await?; debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully deleted database"); @@ -763,7 +735,7 @@ impl DatabaseManager for WriteBufferImpl { database_name: Arc::clone(&db_schema.name), ops: vec![CatalogOp::CreateTable(catalog_table_def)], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -810,7 +782,7 @@ impl DatabaseManager for WriteBufferImpl { table_name: Arc::clone(&table_defn.table_name), })], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -826,319 +798,6 @@ impl DatabaseManager for WriteBufferImpl { } } -#[async_trait::async_trait] -impl ProcessingEngineManager for WriteBufferImpl { - async fn insert_plugin( - &self, - db: &str, - plugin_name: String, - code: String, - function_name: String, - plugin_type: PluginType, - ) -> crate::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db) - .ok_or_else(|| self::Error::DatabaseNotFound { - db_name: db.to_owned(), - })?; - - let catalog_op = CatalogOp::CreatePlugin(PluginDefinition { - plugin_name, - code, - function_name, - plugin_type, - }); - - let creation_time = self.time_provider.now(); - let catalog_batch = CatalogBatch { - time_ns: creation_time.timestamp_nanos(), - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - ops: vec![catalog_op], - }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; - } - Ok(()) - } - - async fn delete_plugin(&self, db: &str, plugin_name: &str) -> crate::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db.to_string(), - })?; - let catalog_op = CatalogOp::DeletePlugin(DeletePluginDefinition { - plugin_name: plugin_name.to_string(), - }); - let catalog_batch = CatalogBatch { - time_ns: self.time_provider.now().timestamp_nanos(), - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - ops: vec![catalog_op], - }; - - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { - self.wal - .write_ops(vec![WalOp::Catalog(catalog_batch)]) - .await?; - } - Ok(()) - } - - async fn insert_trigger( - &self, - db_name: &str, - trigger_name: String, - plugin_name: String, - trigger_specification: TriggerSpecificationDefinition, - disabled: bool, - ) -> crate::Result<(), Error> { - let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else { - return Err(Error::DatabaseNotFound { - db_name: db_name.to_owned(), - }); - }; - let plugin = db_schema - .processing_engine_plugins - .get(&plugin_name) - .ok_or_else(|| catalog::Error::ProcessingEnginePluginNotFound { - plugin_name: plugin_name.to_string(), - database_name: db_schema.name.to_string(), - })?; - let catalog_op = CatalogOp::CreateTrigger(TriggerDefinition { - trigger_name, - plugin_name, - plugin: plugin.clone(), - trigger: trigger_specification, - disabled, - }); - let creation_time = self.time_provider.now(); - let catalog_batch = CatalogBatch { - time_ns: creation_time.timestamp_nanos(), - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - ops: vec![catalog_op], - }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; - } - Ok(()) - } - - async fn delete_trigger( - &self, - db: &str, - trigger_name: &str, - force: bool, - ) -> crate::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db.to_string(), - })?; - let catalog_op = CatalogOp::DeleteTrigger(DeleteTriggerDefinition { - trigger_name: trigger_name.to_string(), - force, - }); - let catalog_batch = CatalogBatch { - time_ns: self.time_provider.now().timestamp_nanos(), - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - ops: vec![catalog_op], - }; - - // Do this first to avoid a dangling running plugin. - // Potential edge-case of a plugin being stopped but not deleted, - // but should be okay given desire to force delete. - let needs_deactivate = force - && db_schema - .processing_engine_triggers - .get(trigger_name) - .is_some_and(|trigger| !trigger.disabled); - - if needs_deactivate { - self.deactivate_trigger(db, trigger_name).await?; - } - - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { - self.wal - .write_ops(vec![WalOp::Catalog(catalog_batch)]) - .await?; - } - Ok(()) - } - - #[cfg_attr(not(feature = "system-py"), allow(unused))] - async fn run_trigger( - &self, - write_buffer: Arc, - db_name: &str, - trigger_name: &str, - ) -> crate::Result<(), write_buffer::Error> { - #[cfg(feature = "system-py")] - { - let (_db_id, db_schema) = - self.catalog - .db_id_and_schema(db_name) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db_name.to_string(), - })?; - let trigger = db_schema - .processing_engine_triggers - .get(trigger_name) - .ok_or_else(|| ProcessingEngineTriggerNotFound { - database_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - })? - .clone(); - let trigger_rx = self - .buffer - .subscribe_to_plugin_events(trigger_name.to_string()) - .await; - let plugin_context = PluginContext { - trigger_rx, - write_buffer, - }; - plugins::run_plugin(db_name.to_string(), trigger, plugin_context); - } - - Ok(()) - } - - async fn deactivate_trigger( - &self, - db_name: &str, - trigger_name: &str, - ) -> std::result::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db_name) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db_name.to_string(), - })?; - let trigger = db_schema - .processing_engine_triggers - .get(trigger_name) - .ok_or_else(|| ProcessingEngineTriggerNotFound { - database_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - })?; - // Already disabled, so this is a no-op - if trigger.disabled { - return Ok(()); - }; - - let mut deactivated = trigger.clone(); - deactivated.disabled = true; - let catalog_op = CatalogOp::DisableTrigger(TriggerIdentifier { - db_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - }); - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(CatalogBatch { - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - time_ns: self.time_provider.now().timestamp_nanos(), - ops: vec![catalog_op], - })? { - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; - } - // TODO: handle processing engine errors - self.buffer - .deactivate_trigger(trigger_name.to_string()) - .await - .unwrap(); - Ok(()) - } - - async fn activate_trigger( - &self, - write_buffer: Arc, - db_name: &str, - trigger_name: &str, - ) -> std::result::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db_name) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db_name.to_string(), - })?; - let trigger = db_schema - .processing_engine_triggers - .get(trigger_name) - .ok_or_else(|| ProcessingEngineTriggerNotFound { - database_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - })?; - // Already disabled, so this is a no-op - if !trigger.disabled { - return Ok(()); - }; - - let mut activated = trigger.clone(); - activated.disabled = false; - let catalog_op = CatalogOp::EnableTrigger(TriggerIdentifier { - db_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - }); - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(CatalogBatch { - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - time_ns: self.time_provider.now().timestamp_nanos(), - ops: vec![catalog_op], - })? { - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; - } - - self.run_trigger(write_buffer, db_name, trigger_name) - .await?; - Ok(()) - } - - #[cfg_attr(not(feature = "system-py"), allow(unused))] - async fn test_wal_plugin( - &self, - request: WalPluginTestRequest, - query_executor: Arc, - ) -> crate::Result { - #[cfg(feature = "system-py")] - { - // create a copy of the catalog so we don't modify the original - let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner())); - let now = self.time_provider.now(); - - let code = self.read_plugin_code(&request.filename)?; - - let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request) - .unwrap_or_else(|e| WalPluginTestResponse { - log_lines: vec![], - database_writes: Default::default(), - errors: vec![e.to_string()], - }); - - return Ok(res); - } - - #[cfg(not(feature = "system-py"))] - Err(plugins::Error::AnyhowError(anyhow::anyhow!( - "system-py feature not enabled" - ))) - } -} - -#[allow(unused)] -pub(crate) enum PluginEvent { - WriteWalContents(Arc), - Shutdown(oneshot::Sender<()>), -} - impl WriteBuffer for WriteBufferImpl {} pub async fn check_mem_and_force_snapshot_loop( @@ -1271,7 +930,6 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap(); @@ -1356,7 +1014,6 @@ mod tests { }, parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap(); @@ -1425,7 +1082,6 @@ mod tests { }, parquet_cache: wbuf.parquet_cache.clone(), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap() @@ -1653,7 +1309,6 @@ mod tests { }, parquet_cache: write_buffer.parquet_cache.clone(), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap(); @@ -2939,7 +2594,6 @@ mod tests { wal_config, parquet_cache, metric_registry: Arc::clone(&metric_registry), - plugin_dir: None, }) .await .unwrap(); @@ -2969,393 +2623,6 @@ mod tests { batches } - #[tokio::test] - async fn test_create_plugin() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - let empty_udf = r#"def example(iterator, output): - return"#; - - write_buffer - .insert_plugin( - "foo", - "my_plugin".to_string(), - empty_udf.to_string(), - "example".to_string(), - PluginType::WalRows, - ) - .await?; - - let plugin = write_buffer - .catalog - .db_schema("foo") - .expect("should have db named foo") - .processing_engine_plugins - .get("my_plugin") - .unwrap() - .clone(); - let expected = PluginDefinition { - plugin_name: "my_plugin".to_string(), - code: empty_udf.to_string(), - function_name: "example".to_string(), - plugin_type: PluginType::WalRows, - }; - assert_eq!(expected, plugin); - - // confirm that creating it again is a no-op. - write_buffer - .insert_plugin( - "foo", - "my_plugin".to_string(), - empty_udf.to_string(), - "example".to_string(), - PluginType::WalRows, - ) - .await?; - - // confirm that a different argument is an error - let Err(Error::CatalogUpdateError(catalog::Error::ProcessingEngineCallExists { .. })) = - write_buffer - .insert_plugin( - "foo", - "my_plugin".to_string(), - empty_udf.to_string(), - "bad_example".to_string(), - PluginType::WalRows, - ) - .await - else { - panic!("failed to insert plugin"); - }; - - // Confirm the same contents can be added to a new name. - write_buffer - .insert_plugin( - "foo", - "my_second_plugin".to_string(), - empty_udf.to_string(), - "example".to_string(), - PluginType::WalRows, - ) - .await?; - Ok(()) - } - #[tokio::test] - async fn test_delete_plugin() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - // First create a plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def process(iterator, output): pass".to_string(), - "process".to_string(), - PluginType::WalRows, - ) - .await?; - - // Then delete it - write_buffer.delete_plugin("foo", "test_plugin").await?; - - // Verify plugin is gone from schema - let schema = write_buffer.catalog().db_schema("foo").unwrap(); - assert!(!schema.processing_engine_plugins.contains_key("test_plugin")); - - // Verify we can add a newly named plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def new_process(iterator, output): pass".to_string(), - "new_process".to_string(), - PluginType::WalRows, - ) - .await?; - - Ok(()) - } - - #[tokio::test] - async fn test_delete_plugin_with_active_trigger() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - // Create a plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def process(iterator, output): pass".to_string(), - "process".to_string(), - PluginType::WalRows, - ) - .await - .unwrap(); - - // Create a trigger using the plugin - write_buffer - .insert_trigger( - "foo", - "test_trigger".to_string(), - "test_plugin".to_string(), - TriggerSpecificationDefinition::AllTablesWalWrite, - false, - ) - .await - .unwrap(); - - // Try to delete the plugin - should fail because trigger exists - let result = write_buffer.delete_plugin("foo", "test_plugin").await; - assert!(matches!( - result, - Err(Error::CatalogUpdateError(catalog::Error::ProcessingEnginePluginInUse { - database_name, - plugin_name, - trigger_name, - })) if database_name == "foo" && plugin_name == "test_plugin" && trigger_name == "test_trigger" - )); - Ok(()) - } - - #[tokio::test] - async fn test_trigger_lifecycle() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - // convert to Arc - let write_buffer: Arc = write_buffer.clone(); - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - // Create a plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def process(iterator, output): pass".to_string(), - "process".to_string(), - PluginType::WalRows, - ) - .await?; - - // Create an enabled trigger - write_buffer - .insert_trigger( - "foo", - "test_trigger".to_string(), - "test_plugin".to_string(), - TriggerSpecificationDefinition::AllTablesWalWrite, - false, - ) - .await?; - // Run the trigger - write_buffer - .run_trigger(Arc::clone(&write_buffer), "foo", "test_trigger") - .await?; - - // Deactivate the trigger - let result = write_buffer.deactivate_trigger("foo", "test_trigger").await; - assert!(result.is_ok()); - - // Verify trigger is disabled in schema - let schema = write_buffer.catalog().db_schema("foo").unwrap(); - let trigger = schema - .processing_engine_triggers - .get("test_trigger") - .unwrap(); - assert!(trigger.disabled); - - // Activate the trigger - let result = write_buffer - .activate_trigger(Arc::clone(&write_buffer), "foo", "test_trigger") - .await; - assert!(result.is_ok()); - - // Verify trigger is enabled and running - let schema = write_buffer.catalog().db_schema("foo").unwrap(); - let trigger = schema - .processing_engine_triggers - .get("test_trigger") - .unwrap(); - assert!(!trigger.disabled); - Ok(()) - } - - #[tokio::test] - async fn test_create_disabled_trigger() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - // Create a plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def process(iterator, output): pass".to_string(), - "process".to_string(), - PluginType::WalRows, - ) - .await?; - - // Create a disabled trigger - write_buffer - .insert_trigger( - "foo", - "test_trigger".to_string(), - "test_plugin".to_string(), - TriggerSpecificationDefinition::AllTablesWalWrite, - true, - ) - .await?; - - // Verify trigger is created but disabled - let schema = write_buffer.catalog().db_schema("foo").unwrap(); - let trigger = schema - .processing_engine_triggers - .get("test_trigger") - .unwrap(); - assert!(trigger.disabled); - - // Verify trigger is not in active triggers list - assert!(write_buffer.catalog().triggers().is_empty()); - Ok(()) - } - - #[tokio::test] - async fn test_activate_nonexistent_trigger() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - let write_buffer: Arc = write_buffer.clone(); - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - let result = write_buffer - .activate_trigger(Arc::clone(&write_buffer), "foo", "nonexistent_trigger") - .await; - - assert!(matches!( - result, - Err(Error::CatalogUpdateError(catalog::Error::ProcessingEngineTriggerNotFound { - database_name, - trigger_name, - })) if database_name == "foo" && trigger_name == "nonexistent_trigger" - )); - Ok(()) - } - #[test_log::test(tokio::test)] async fn test_check_mem_and_force_snapshot() { let obj_store: Arc = Arc::new(InMemory::new()); diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index edf297e4007..1ab81067ec7 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -3,7 +3,6 @@ use crate::paths::ParquetFilePath; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::table_buffer::TableBuffer; -use crate::write_buffer::PluginEvent; use crate::{ParquetFile, ParquetFileId, PersistedSnapshot}; use anyhow::Context; use arrow::record_batch::RecordBatch; @@ -36,8 +35,7 @@ use schema::Schema; use std::any::Any; use std::sync::Arc; use std::time::Duration; -use tokio::sync::oneshot::Receiver; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::oneshot::{self, Receiver}; #[derive(Debug)] pub struct QueryableBuffer { @@ -52,7 +50,6 @@ pub struct QueryableBuffer { /// Sends a notification to this watch channel whenever a snapshot info is persisted persisted_snapshot_notify_rx: tokio::sync::watch::Receiver>, persisted_snapshot_notify_tx: tokio::sync::watch::Sender>, - plugin_event_tx: Mutex>>, } pub struct QueryableBufferArgs { @@ -91,7 +88,6 @@ impl QueryableBuffer { parquet_cache, persisted_snapshot_notify_rx, persisted_snapshot_notify_tx, - plugin_event_tx: Mutex::new(HashMap::new()), } } @@ -157,11 +153,11 @@ impl QueryableBuffer { /// Called when the wal has persisted a new file. Buffer the contents in memory and update the /// last cache so the data is queryable. - fn buffer_contents(&self, write: WalContents) { + fn buffer_contents(&self, write: Arc) { self.write_wal_contents_to_caches(&write); let mut buffer = self.buffer.write(); buffer.buffer_ops( - write.ops, + &write.ops, &self.last_cache_provider, &self.meta_cache_provider, ); @@ -171,7 +167,7 @@ impl QueryableBuffer { /// data that can be snapshot in the background after putting the data in the buffer. async fn buffer_contents_and_persist_snapshotted_data( &self, - write: WalContents, + write: Arc, snapshot_details: SnapshotDetails, ) -> Receiver { info!( @@ -183,7 +179,7 @@ impl QueryableBuffer { let persist_jobs = { let mut buffer = self.buffer.write(); buffer.buffer_ops( - write.ops, + &write.ops, &self.last_cache_provider, &self.meta_cache_provider, ); @@ -388,51 +384,6 @@ impl QueryableBuffer { buffer.db_to_table.remove(db_id); } - #[cfg(feature = "system-py")] - pub(crate) async fn subscribe_to_plugin_events( - &self, - trigger_name: String, - ) -> mpsc::Receiver { - let mut senders = self.plugin_event_tx.lock().await; - - // TODO: should we be checking for replacements? - let (plugin_tx, plugin_rx) = mpsc::channel(4); - senders.insert(trigger_name, plugin_tx); - plugin_rx - } - - /// Deactivates a running trigger by sending it a oneshot sender. It should send back a message and then immediately shut down. - pub(crate) async fn deactivate_trigger( - &self, - #[allow(unused)] trigger_name: String, - ) -> Result<(), anyhow::Error> { - #[cfg(feature = "system-py")] - { - let Some(sender) = self.plugin_event_tx.lock().await.remove(&trigger_name) else { - anyhow::bail!("no trigger named '{}' found", trigger_name); - }; - let (oneshot_tx, oneshot_rx) = oneshot::channel(); - sender.send(PluginEvent::Shutdown(oneshot_tx)).await?; - oneshot_rx.await?; - } - Ok(()) - } - - async fn send_to_plugins(&self, wal_contents: &WalContents) { - let senders = self.plugin_event_tx.lock().await; - if !senders.is_empty() { - let wal_contents = Arc::new(wal_contents.clone()); - for (plugin, sender) in senders.iter() { - if let Err(err) = sender - .send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents))) - .await - { - error!("failed to send plugin event to plugin {}: {}", plugin, err); - } - } - } - } - pub fn get_total_size_bytes(&self) -> usize { let buffer = self.buffer.read(); buffer.find_overall_buffer_size_bytes() @@ -441,17 +392,15 @@ impl QueryableBuffer { #[async_trait] impl WalFileNotifier for QueryableBuffer { - async fn notify(&self, write: WalContents) { - self.send_to_plugins(&write).await; + async fn notify(&self, write: Arc) { self.buffer_contents(write) } async fn notify_and_snapshot( &self, - write: WalContents, + write: Arc, snapshot_details: SnapshotDetails, ) -> Receiver { - self.send_to_plugins(&write).await; self.buffer_contents_and_persist_snapshotted_data(write, snapshot_details) .await } @@ -479,7 +428,7 @@ impl BufferState { pub fn buffer_ops( &mut self, - ops: Vec, + ops: &[WalOp], last_cache_provider: &LastCacheProvider, meta_cache_provider: &MetaCacheProvider, ) { @@ -580,7 +529,7 @@ impl BufferState { } } - fn add_write_batch(&mut self, write_batch: WriteBatch) { + fn add_write_batch(&mut self, write_batch: &WriteBatch) { let db_schema = self .catalog .db_schema_by_id(&write_batch.database_id) @@ -588,10 +537,10 @@ impl BufferState { let database_buffer = self.db_to_table.entry(write_batch.database_id).or_default(); - for (table_id, table_chunks) in write_batch.table_chunks { - let table_buffer = database_buffer.entry(table_id).or_insert_with(|| { + for (table_id, table_chunks) in &write_batch.table_chunks { + let table_buffer = database_buffer.entry(*table_id).or_insert_with(|| { let table_def = db_schema - .table_definition_by_id(&table_id) + .table_definition_by_id(table_id) .expect("table should exist"); let sort_key = table_def .series_key @@ -601,8 +550,8 @@ impl BufferState { TableBuffer::new(index_columns, SortKey::from_columns(sort_key)) }); - for (chunk_time, chunk) in table_chunks.chunk_time_to_chunk { - table_buffer.buffer_chunk(chunk_time, chunk.rows); + for (chunk_time, chunk) in &table_chunks.chunk_time_to_chunk { + table_buffer.buffer_chunk(*chunk_time, &chunk.rows); } } } @@ -832,7 +781,7 @@ mod tests { wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; // write the lp into the buffer - queryable_buffer.notify(wal_contents).await; + queryable_buffer.notify(Arc::new(wal_contents)).await; // now force a snapshot, persisting the data to parquet file. Also, buffer up a new write let snapshot_sequence_number = SnapshotSequenceNumber::new(1); @@ -865,7 +814,7 @@ mod tests { wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; let details = queryable_buffer - .notify_and_snapshot(wal_contents, snapshot_details) + .notify_and_snapshot(Arc::new(wal_contents), snapshot_details) .await; let _details = details.await.unwrap(); @@ -888,14 +837,14 @@ mod tests { }; queryable_buffer .notify_and_snapshot( - WalContents { + Arc::new(WalContents { persist_timestamp_ms: 0, min_timestamp_ns: 0, max_timestamp_ns: 0, wal_file_number: WalFileSequenceNumber::new(3), ops: vec![], snapshot: Some(snapshot_details), - }, + }), snapshot_details, ) .await diff --git a/influxdb3_write/src/write_buffer/table_buffer.rs b/influxdb3_write/src/write_buffer/table_buffer.rs index ff60f5a4846..d1ca35764c7 100644 --- a/influxdb3_write/src/write_buffer/table_buffer.rs +++ b/influxdb3_write/src/write_buffer/table_buffer.rs @@ -50,7 +50,7 @@ impl TableBuffer { } } - pub fn buffer_chunk(&mut self, chunk_time: i64, rows: Vec) { + pub fn buffer_chunk(&mut self, chunk_time: i64, rows: &[Row]) { let buffer_chunk = self .chunk_time_to_chunks .entry(chunk_time) @@ -250,19 +250,19 @@ struct MutableTableChunk { } impl MutableTableChunk { - fn add_rows(&mut self, rows: Vec) { + fn add_rows(&mut self, rows: &[Row]) { let new_row_count = rows.len(); - for (row_index, r) in rows.into_iter().enumerate() { + for (row_index, r) in rows.iter().enumerate() { let mut value_added = HashSet::with_capacity(r.fields.len()); - for f in r.fields { + for f in &r.fields { value_added.insert(f.id); - match f.value { + match &f.value { FieldData::Timestamp(v) => { - self.timestamp_min = self.timestamp_min.min(v); - self.timestamp_max = self.timestamp_max.max(v); + self.timestamp_min = self.timestamp_min.min(*v); + self.timestamp_max = self.timestamp_max.max(*v); let b = self.data.entry(f.id).or_insert_with(|| { debug!("Creating new timestamp builder"); @@ -272,7 +272,7 @@ impl MutableTableChunk { Builder::Time(time_builder) }); if let Builder::Time(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -288,7 +288,7 @@ impl MutableTableChunk { } let b = self.data.get_mut(&f.id).expect("tag builder should exist"); if let Builder::Tag(b) = b { - self.index.add_row_if_indexed_column(b.len(), f.id, &v); + self.index.add_row_if_indexed_column(b.len(), f.id, v); b.append(v) .expect("shouldn't be able to overflow 32 bit dictionary"); } else { @@ -307,7 +307,7 @@ impl MutableTableChunk { let Builder::Key(b) = b else { panic!("unexpected field type"); }; - self.index.add_row_if_indexed_column(b.len(), f.id, &v); + self.index.add_row_if_indexed_column(b.len(), f.id, v); b.append_value(v); } FieldData::String(v) => { @@ -333,7 +333,7 @@ impl MutableTableChunk { Builder::I64(int_builder) }); if let Builder::I64(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -346,7 +346,7 @@ impl MutableTableChunk { Builder::U64(uint_builder) }); if let Builder::U64(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -359,7 +359,7 @@ impl MutableTableChunk { Builder::F64(float_builder) }); if let Builder::F64(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -372,7 +372,7 @@ impl MutableTableChunk { Builder::Bool(bool_builder) }); if let Builder::Bool(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -866,7 +866,7 @@ mod tests { }, ]; - table_buffer.buffer_chunk(offset, rows); + table_buffer.buffer_chunk(offset, &rows); } let partitioned_batches = table_buffer @@ -980,7 +980,7 @@ mod tests { }, ]; - table_buffer.buffer_chunk(0, rows); + table_buffer.buffer_chunk(0, &rows); let filter = &[Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(Column { @@ -1105,7 +1105,7 @@ mod tests { }, ]; - table_buffer.buffer_chunk(0, rows); + table_buffer.buffer_chunk(0, &rows); let size = table_buffer.computed_size(); assert_eq!(size, 18120); diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index afbfd860437..430478d2abc 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -138,7 +138,7 @@ impl WriteValidator { database_name: Arc::clone(&self.state.db_schema.name), ops: catalog_updates, }; - self.state.catalog.apply_catalog_batch(catalog_batch)? + self.state.catalog.apply_catalog_batch(&catalog_batch)? }; Ok(WriteValidator { @@ -398,9 +398,9 @@ pub struct ValidatedLines { /// Number of index columns passed in, whether tags (v1) or series keys (v3) pub(crate) index_count: usize, /// Any errors that occurred while parsing the lines - pub(crate) errors: Vec, + pub errors: Vec, /// Only valid lines will be converted into a WriteBatch - pub(crate) valid_data: WriteBatch, + pub valid_data: WriteBatch, /// If any catalog updates were made, they will be included here pub(crate) catalog_updates: Option, }