From d6a0b38e37c8bdb2f396080bee9cd80c4ca49d77 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 9 Jan 2025 19:13:32 -0500 Subject: [PATCH] feat: Update WAL plugin for new structure This ended up being a very large change set. In order to get around circular dependencies, the processing engine had to be moved into its own crate, which I think is ultimately much cleaner. Unfortunately, this required changing a ton of things. There's more testing and things to add on to this, but I think it's important to get this through and build on it. Importantly, the processing engine no longer resides inside the write buffer. Instead, it is attached to the HTTP server. It is now able to take a query executor, write buffer, and WAL so that the full range of functionality of the server can be exposed to the plugin API. There are a bunch of system-py feature flags littered everywhere, which I'm hoping we can remove soon. --- Cargo.lock | 30 +- Cargo.toml | 2 +- influxdb3/src/commands/create.rs | 7 +- influxdb3/src/commands/serve.rs | 2 +- influxdb3/tests/server/cli.rs | 12 - influxdb3_catalog/src/catalog.rs | 38 +- influxdb3_catalog/src/serialize.rs | 6 +- influxdb3_client/src/lib.rs | 3 - influxdb3_processing_engine/Cargo.toml | 42 + influxdb3_processing_engine/src/lib.rs | 866 ++++++++++++++++++ influxdb3_processing_engine/src/manager.rs | 131 +++ .../src}/plugins.rs | 253 +++-- influxdb3_py_api/Cargo.toml | 2 +- influxdb3_py_api/src/system_py.rs | 195 +--- influxdb3_server/Cargo.toml | 1 + influxdb3_server/src/builder.rs | 13 +- influxdb3_server/src/http.rs | 33 +- influxdb3_server/src/lib.rs | 6 +- influxdb3_server/src/query_executor/mod.rs | 1 - .../src/system_tables/python_call.rs | 7 - influxdb3_wal/src/lib.rs | 10 +- influxdb3_wal/src/object_store.rs | 39 +- influxdb3_write/src/lib.rs | 13 +- influxdb3_write/src/persister.rs | 2 +- influxdb3_write/src/write_buffer/mod.rs | 773 +--------------- .../src/write_buffer/queryable_buffer.rs | 87 +- .../src/write_buffer/table_buffer.rs | 34 +- influxdb3_write/src/write_buffer/validator.rs | 6 +- 28 files changed, 1358 insertions(+), 1256 deletions(-) create mode 100644 influxdb3_processing_engine/Cargo.toml create mode 100644 influxdb3_processing_engine/src/lib.rs create mode 100644 influxdb3_processing_engine/src/manager.rs rename {influxdb3_write/src/write_buffer => influxdb3_processing_engine/src}/plugins.rs (71%) diff --git a/Cargo.lock b/Cargo.lock index 96cc211e0ce..faeb7e308b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2964,6 +2964,33 @@ dependencies = [ "uuid", ] +[[package]] +name = "influxdb3_processing_engine" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "data_types", + "datafusion_util", + "hashbrown 0.15.2", + "influxdb3_cache", + "influxdb3_catalog", + "influxdb3_client", + "influxdb3_internal_api", + "influxdb3_py_api", + "influxdb3_wal", + "influxdb3_write", + "iox_query", + "iox_time", + "metric", + "object_store", + "observability_deps", + "parquet_file", + "pyo3", + "thiserror 1.0.69", + "tokio", +] + [[package]] name = "influxdb3_py_api" version = "0.1.0" @@ -2972,12 +2999,12 @@ dependencies = [ "arrow-schema", "futures", "influxdb3_catalog", + "influxdb3_id", "influxdb3_internal_api", "influxdb3_wal", "iox_query_params", "parking_lot", "pyo3", - "schema", "tokio", ] @@ -3014,6 +3041,7 @@ dependencies = [ "influxdb3_id", "influxdb3_internal_api", "influxdb3_process", + "influxdb3_processing_engine", "influxdb3_sys_events", "influxdb3_telemetry", "influxdb3_wal", diff --git a/Cargo.toml b/Cargo.toml index 0f6d697ff44..20b2d46d4c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ "influxdb3_id", "influxdb3_internal_api", "influxdb3_load_generator", - "influxdb3_process", + "influxdb3_process", "influxdb3_processing_engine", "influxdb3_py_api", "influxdb3_server", "influxdb3_telemetry", diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index 50970c8c7a6..ec71fa5be5e 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -206,10 +206,7 @@ pub struct PluginConfig { /// Python file containing the plugin code #[clap(long = "code-filename")] code_file: String, - /// Entry point function for the plugin - #[clap(long = "entry-point")] - function_name: String, - /// Type of trigger the plugin processes + /// Type of trigger the plugin processes. Options: wal_rows, scheduled #[clap(long = "plugin-type", default_value = "wal_rows")] plugin_type: String, /// Name of the plugin to create @@ -335,7 +332,6 @@ pub async fn command(config: Config) -> Result<(), Box> { influxdb3_config: InfluxDb3Config { database_name, .. }, plugin_name, code_file, - function_name, plugin_type, }) => { let code = fs::read_to_string(&code_file)?; @@ -344,7 +340,6 @@ pub async fn command(config: Config) -> Result<(), Box> { database_name, &plugin_name, code, - function_name, plugin_type, ) .await?; diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 7da27da9e54..a2e6e4c838d 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -503,7 +503,6 @@ pub async fn command(config: Config) -> Result<()> { wal_config, parquet_cache, metric_registry: Arc::clone(&metrics), - plugin_dir: config.plugin_dir, }) .await .map_err(|e| Error::WriteBufferInit(e.into()))?; @@ -532,6 +531,7 @@ pub async fn command(config: Config) -> Result<()> { trace_exporter, trace_header_parser, Arc::clone(&telemetry_store), + config.plugin_dir, )?; let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs { diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index 5baeec728cf..d4530f6744d 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -467,8 +467,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); debug!(result = ?result, "create plugin"); @@ -501,8 +499,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); @@ -547,8 +543,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); @@ -597,8 +591,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); @@ -670,8 +662,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); @@ -769,8 +759,6 @@ def process_rows(iterator, output): &server_addr, "--code-filename", plugin_file.path().to_str().unwrap(), - "--entry-point", - "process_rows", plugin_name, ]); diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index e93c57a50a4..8ea1df2a693 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -209,7 +209,7 @@ impl Catalog { pub fn apply_catalog_batch( &self, - catalog_batch: CatalogBatch, + catalog_batch: &CatalogBatch, ) -> Result> { self.inner.write().apply_catalog_batch(catalog_batch) } @@ -217,11 +217,11 @@ impl Catalog { // Checks the sequence number to see if it needs to be applied. pub fn apply_ordered_catalog_batch( &self, - batch: OrderedCatalogBatch, + batch: &OrderedCatalogBatch, ) -> Result> { if batch.sequence_number() >= self.sequence_number().as_u32() { if let Some(catalog_batch) = self.apply_catalog_batch(batch.batch())? { - return Ok(Some(catalog_batch.batch())); + return Ok(Some(catalog_batch.batch().clone())); } } Ok(None) @@ -456,12 +456,12 @@ impl InnerCatalog { /// have already been applied, the sequence number and updated tracker are not updated. pub fn apply_catalog_batch( &mut self, - catalog_batch: CatalogBatch, + catalog_batch: &CatalogBatch, ) -> Result> { let table_count = self.table_count(); if let Some(db) = self.databases.get(&catalog_batch.database_id) { - if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, &catalog_batch)? { + if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, catalog_batch)? { check_overall_table_count(Some(db), &new_db, table_count)?; self.upsert_db(new_db); } else { @@ -471,12 +471,12 @@ impl InnerCatalog { if self.database_count() >= Catalog::NUM_DBS_LIMIT { return Err(Error::TooManyDbs); } - let new_db = DatabaseSchema::new_from_batch(&catalog_batch)?; + let new_db = DatabaseSchema::new_from_batch(catalog_batch)?; check_overall_table_count(None, &new_db, table_count)?; self.upsert_db(new_db); } Ok(Some(OrderedCatalogBatch::new( - catalog_batch, + catalog_batch.clone(), self.sequence.0, ))) } @@ -1847,7 +1847,7 @@ mod tests { )], ); let err = catalog - .apply_catalog_batch(catalog_batch) + .apply_catalog_batch(&catalog_batch) .expect_err("should fail to apply AddFields operation for non-existent table"); assert_contains!(err.to_string(), "Table banana not in DB schema for foo"); } @@ -1968,10 +1968,10 @@ mod tests { })], }; let create_ordered_op = catalog - .apply_catalog_batch(create_op)? + .apply_catalog_batch(&create_op)? .expect("should be able to create"); let add_column_op = catalog - .apply_catalog_batch(add_column_op)? + .apply_catalog_batch(&add_column_op)? .expect("should produce operation"); let mut ops = vec![ WalOp::Catalog(add_column_op), @@ -2010,7 +2010,7 @@ mod tests { let db_id = DbId::new(); let db_name = format!("test-db-{db_id}"); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name.as_str(), 0, @@ -2043,7 +2043,7 @@ mod tests { let db_id = DbId::new(); let db_name = "a-database-too-far"; catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2071,7 +2071,7 @@ mod tests { // now delete a database: let (db_id, db_name) = dbs.pop().unwrap(); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name.as_str(), 1, @@ -2087,7 +2087,7 @@ mod tests { // now create another database (using same name as the deleted one), this should be allowed: let db_id = DbId::new(); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name.as_str(), 0, @@ -2123,7 +2123,7 @@ mod tests { let db_id = DbId::new(); let db_name = "test-db"; catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2138,7 +2138,7 @@ mod tests { let table_id = TableId::new(); let table_name = Arc::::from(format!("test-table-{i}").as_str()); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2170,7 +2170,7 @@ mod tests { let table_id = TableId::new(); let table_name = Arc::::from("a-table-too-far"); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2198,7 +2198,7 @@ mod tests { // delete a table let (table_id, table_name) = tables.pop().unwrap(); catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, @@ -2214,7 +2214,7 @@ mod tests { assert_eq!(1_999, catalog.inner.read().table_count()); // now create it again, this should be allowed: catalog - .apply_catalog_batch(influxdb3_wal::create::catalog_batch( + .apply_catalog_batch(&influxdb3_wal::create::catalog_batch( db_id, db_name, 0, diff --git a/influxdb3_catalog/src/serialize.rs b/influxdb3_catalog/src/serialize.rs index a6b6fbcfb3d..9ea18758df0 100644 --- a/influxdb3_catalog/src/serialize.rs +++ b/influxdb3_catalog/src/serialize.rs @@ -106,6 +106,7 @@ impl From for DatabaseSchema { plugin, trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(), disabled: trigger.disabled, + database_name: trigger.database_name, }, ) }) @@ -163,7 +164,6 @@ struct TableSnapshot { struct ProcessingEnginePluginSnapshot { pub plugin_name: String, pub code: String, - pub function_name: String, pub plugin_type: PluginType, } @@ -171,6 +171,7 @@ struct ProcessingEnginePluginSnapshot { struct ProcessingEngineTriggerSnapshot { pub trigger_name: String, pub plugin_name: String, + pub database_name: String, pub trigger_specification: String, pub disabled: bool, } @@ -412,7 +413,6 @@ impl From<&PluginDefinition> for ProcessingEnginePluginSnapshot { Self { plugin_name: plugin.plugin_name.to_string(), code: plugin.code.to_string(), - function_name: plugin.function_name.to_string(), plugin_type: plugin.plugin_type, } } @@ -423,7 +423,6 @@ impl From for PluginDefinition { Self { plugin_name: plugin.plugin_name.to_string(), code: plugin.code.to_string(), - function_name: plugin.function_name.to_string(), plugin_type: plugin.plugin_type, } } @@ -434,6 +433,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot { ProcessingEngineTriggerSnapshot { trigger_name: trigger.trigger_name.to_string(), plugin_name: trigger.plugin_name.to_string(), + database_name: trigger.database_name.to_string(), trigger_specification: serde_json::to_string(&trigger.trigger) .expect("should be able to serialize trigger specification"), disabled: trigger.disabled, diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index f65691ea59a..518cc25d5b5 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -484,7 +484,6 @@ impl Client { db: impl Into + Send, plugin_name: impl Into + Send, code: impl Into + Send, - function_name: impl Into + Send, plugin_type: impl Into + Send, ) -> Result<()> { let api_path = "/api/v3/configure/processing_engine_plugin"; @@ -496,7 +495,6 @@ impl Client { db: String, plugin_name: String, code: String, - function_name: String, plugin_type: String, } @@ -504,7 +502,6 @@ impl Client { db: db.into(), plugin_name: plugin_name.into(), code: code.into(), - function_name: function_name.into(), plugin_type: plugin_type.into(), }); diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml new file mode 100644 index 00000000000..37eb8f26b9f --- /dev/null +++ b/influxdb3_processing_engine/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "influxdb3_processing_engine" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[features] +"system-py" = ["influxdb3_py_api/system-py", "pyo3"] + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +data_types.workspace = true +hashbrown.workspace = true +iox_time.workspace = true +influxdb3_catalog = { path = "../influxdb3_catalog" } +influxdb3_client = { path = "../influxdb3_client" } +influxdb3_internal_api = { path = "../influxdb3_internal_api" } +influxdb3_py_api = { path = "../influxdb3_py_api" } +influxdb3_wal = { path = "../influxdb3_wal" } +influxdb3_write = { path = "../influxdb3_write" } +observability_deps.workspace = true +thiserror.workspace = true +tokio.workspace = true + +[dependencies.pyo3] +version = "0.23.3" +# this is necessary to automatically initialize the Python interpreter +features = ["auto-initialize"] +optional = true + +[dev-dependencies] +datafusion_util.workspace = true +iox_query.workspace = true +influxdb3_cache = { path = "../influxdb3_cache" } +metric.workspace = true +object_store.workspace = true +parquet_file.workspace = true + +[lints] +workspace = true diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs new file mode 100644 index 00000000000..bba25273a3a --- /dev/null +++ b/influxdb3_processing_engine/src/lib.rs @@ -0,0 +1,866 @@ +use crate::manager::{ProcessingEngineError, ProcessingEngineManager}; +#[cfg(feature = "system-py")] +use crate::plugins::PluginContext; +use anyhow::Context; +use hashbrown::HashMap; +use influxdb3_catalog::catalog; +use influxdb3_catalog::catalog::Catalog; +use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound; +use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_wal::{ + CatalogBatch, CatalogOp, DeletePluginDefinition, DeleteTriggerDefinition, PluginDefinition, + PluginType, TriggerDefinition, TriggerIdentifier, TriggerSpecificationDefinition, Wal, + WalContents, WalOp, +}; +use influxdb3_write::WriteBuffer; +use iox_time::TimeProvider; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot, Mutex}; + +pub mod manager; +pub mod plugins; + +// startup the plugins on initialization +// let triggers = result.catalog().triggers(); +// for (db_name, trigger_name) in triggers { +// result +// .run_trigger(Arc::clone(&write_buffer), Arc::clone(&quer), &db_name, &trigger_name) +// .await?; +// } + +#[derive(Debug)] +pub struct ProcessingEngineManagerImpl { + plugin_dir: Option, + catalog: Arc, + _write_buffer: Arc, + _query_executor: Arc, + time_provider: Arc, + wal: Arc, + plugin_event_tx: Mutex, +} + +#[derive(Debug, Default)] +struct PluginChannels { + /// Map of database to trigger name to sender + active_triggers: HashMap>>, +} + +#[cfg(feature = "system-py")] +const PLUGIN_EVENT_BUFFER_SIZE: usize = 60; + +impl PluginChannels { + async fn shutdown(&mut self, db: String, trigger: String) { + // create a one shot to wait for the shutdown to complete + let (tx, rx) = oneshot::channel(); + if let Some(trigger_map) = self.active_triggers.get_mut(&db) { + if let Some(sender) = trigger_map.remove(&trigger) { + if sender.send(PluginEvent::Shutdown(tx)).await.is_ok() { + rx.await.ok(); + } + } + } + } + + #[cfg(feature = "system-py")] + fn add_trigger(&mut self, db: String, trigger: String) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE); + self.active_triggers + .entry(db) + .or_default() + .insert(trigger, tx); + rx + } +} + +impl ProcessingEngineManagerImpl { + pub fn new( + plugin_dir: Option, + catalog: Arc, + write_buffer: Arc, + query_executor: Arc, + time_provider: Arc, + wal: Arc, + ) -> Self { + Self { + plugin_dir, + catalog, + _write_buffer: write_buffer, + _query_executor: query_executor, + time_provider, + wal, + plugin_event_tx: Default::default(), + } + } + + pub fn read_plugin_code(&self, name: &str) -> Result { + let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?; + let path = plugin_dir.join(name); + Ok(std::fs::read_to_string(path)?) + } +} + +#[async_trait::async_trait] +impl ProcessingEngineManager for ProcessingEngineManagerImpl { + async fn insert_plugin( + &self, + db: &str, + plugin_name: String, + code: String, + plugin_type: PluginType, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db.to_string()))?; + + let catalog_op = CatalogOp::CreatePlugin(PluginDefinition { + plugin_name, + code, + plugin_type, + }); + + let creation_time = self.time_provider.now(); + let catalog_batch = CatalogBatch { + time_ns: creation_time.timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + Ok(()) + } + + async fn delete_plugin( + &self, + db: &str, + plugin_name: &str, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db.to_string()))?; + let catalog_op = CatalogOp::DeletePlugin(DeletePluginDefinition { + plugin_name: plugin_name.to_string(), + }); + let catalog_batch = CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } + Ok(()) + } + + async fn insert_trigger( + &self, + db_name: &str, + trigger_name: String, + plugin_name: String, + trigger_specification: TriggerSpecificationDefinition, + disabled: bool, + ) -> Result<(), ProcessingEngineError> { + let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else { + return Err(ProcessingEngineError::DatabaseNotFound(db_name.to_string())); + }; + let plugin = db_schema + .processing_engine_plugins + .get(&plugin_name) + .ok_or_else(|| catalog::Error::ProcessingEnginePluginNotFound { + plugin_name: plugin_name.to_string(), + database_name: db_schema.name.to_string(), + })?; + let catalog_op = CatalogOp::CreateTrigger(TriggerDefinition { + trigger_name, + plugin_name, + plugin: plugin.clone(), + trigger: trigger_specification, + disabled, + database_name: db_name.to_string(), + }); + let creation_time = self.time_provider.now(); + let catalog_batch = CatalogBatch { + time_ns: creation_time.timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + Ok(()) + } + + async fn delete_trigger( + &self, + db: &str, + trigger_name: &str, + force: bool, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db.to_string()))?; + let catalog_op = CatalogOp::DeleteTrigger(DeleteTriggerDefinition { + trigger_name: trigger_name.to_string(), + force, + }); + let catalog_batch = CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + + // Do this first to avoid a dangling running plugin. + // Potential edge-case of a plugin being stopped but not deleted, + // but should be okay given desire to force delete. + let needs_deactivate = force + && db_schema + .processing_engine_triggers + .get(trigger_name) + .is_some_and(|trigger| !trigger.disabled); + + if needs_deactivate { + self.deactivate_trigger(db, trigger_name).await?; + } + + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } + Ok(()) + } + + #[cfg_attr(not(feature = "system-py"), allow(unused))] + async fn run_trigger( + &self, + write_buffer: Arc, + query_executor: Arc, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError> { + #[cfg(feature = "system-py")] + { + let (_db_id, db_schema) = self + .catalog + .db_id_and_schema(db_name) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db_name.to_string()))?; + let trigger = db_schema + .processing_engine_triggers + .get(trigger_name) + .ok_or_else(|| ProcessingEngineTriggerNotFound { + database_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + })? + .clone(); + + let trigger_rx = self + .plugin_event_tx + .lock() + .await + .add_trigger(db_name.to_string(), trigger_name.to_string()); + + let plugin_context = PluginContext { + trigger_rx, + write_buffer, + query_executor, + }; + plugins::run_plugin(db_name.to_string(), trigger, plugin_context); + } + + Ok(()) + } + + async fn deactivate_trigger( + &self, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db_name) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db_name.to_string()))?; + let trigger = db_schema + .processing_engine_triggers + .get(trigger_name) + .ok_or_else(|| ProcessingEngineTriggerNotFound { + database_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + })?; + // Already disabled, so this is a no-op + if trigger.disabled { + return Ok(()); + }; + + let mut deactivated = trigger.clone(); + deactivated.disabled = true; + let catalog_op = CatalogOp::DisableTrigger(TriggerIdentifier { + db_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + }); + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&CatalogBatch { + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + time_ns: self.time_provider.now().timestamp_nanos(), + ops: vec![catalog_op], + })? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + + let mut plugin_channels = self.plugin_event_tx.lock().await; + plugin_channels + .shutdown(db_name.to_string(), trigger_name.to_string()) + .await; + + Ok(()) + } + + async fn activate_trigger( + &self, + write_buffer: Arc, + query_executor: Arc, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError> { + let (db_id, db_schema) = self + .catalog + .db_id_and_schema(db_name) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db_name.to_string()))?; + let trigger = db_schema + .processing_engine_triggers + .get(trigger_name) + .ok_or_else(|| ProcessingEngineTriggerNotFound { + database_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + })?; + // Already enabled, so this is a no-op + if !trigger.disabled { + return Ok(()); + }; + + let mut activated = trigger.clone(); + activated.disabled = false; + let catalog_op = CatalogOp::EnableTrigger(TriggerIdentifier { + db_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + }); + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&CatalogBatch { + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + time_ns: self.time_provider.now().timestamp_nanos(), + ops: vec![catalog_op], + })? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + + self.run_trigger(write_buffer, query_executor, db_name, trigger_name) + .await?; + Ok(()) + } + + #[cfg_attr(not(feature = "system-py"), allow(unused))] + async fn test_wal_plugin( + &self, + request: WalPluginTestRequest, + query_executor: Arc, + ) -> Result { + #[cfg(feature = "system-py")] + { + // create a copy of the catalog so we don't modify the original + let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner())); + let now = self.time_provider.now(); + + let code = self.read_plugin_code(&request.filename)?; + + let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request) + .unwrap_or_else(|e| WalPluginTestResponse { + log_lines: vec![], + database_writes: Default::default(), + errors: vec![e.to_string()], + }); + + return Ok(res); + } + + #[cfg(not(feature = "system-py"))] + Err(plugins::Error::AnyhowError(anyhow::anyhow!( + "system-py feature not enabled" + ))) + } +} + +#[allow(unused)] +pub(crate) enum PluginEvent { + WriteWalContents(Arc), + Shutdown(oneshot::Sender<()>), +} + +#[cfg(test)] +mod tests { + use crate::manager::{ProcessingEngineError, ProcessingEngineManager}; + use crate::ProcessingEngineManagerImpl; + use data_types::NamespaceName; + use datafusion_util::config::register_iox_object_store; + use influxdb3_cache::last_cache::LastCacheProvider; + use influxdb3_cache::meta_cache::MetaCacheProvider; + use influxdb3_catalog::catalog; + use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor; + use influxdb3_wal::{ + Gen1Duration, PluginDefinition, PluginType, TriggerSpecificationDefinition, WalConfig, + }; + use influxdb3_write::persister::Persister; + use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs}; + use influxdb3_write::{Precision, WriteBuffer}; + use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, IOxSessionContext}; + use iox_time::{MockProvider, Time, TimeProvider}; + use metric::Registry; + use object_store::memory::InMemory; + use object_store::ObjectStore; + use parquet_file::storage::{ParquetStorage, StorageId}; + use std::num::NonZeroUsize; + use std::sync::Arc; + use std::time::Duration; + + #[tokio::test] + async fn test_create_plugin() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + pem._write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + let empty_udf = r#"def example(iterator, output): + return"#; + + pem.insert_plugin( + "foo", + "my_plugin".to_string(), + empty_udf.to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + let plugin = pem + .catalog + .db_schema("foo") + .expect("should have db named foo") + .processing_engine_plugins + .get("my_plugin") + .unwrap() + .clone(); + let expected = PluginDefinition { + plugin_name: "my_plugin".to_string(), + code: empty_udf.to_string(), + plugin_type: PluginType::WalRows, + }; + assert_eq!(expected, plugin); + + // confirm that creating it again is a no-op. + pem.insert_plugin( + "foo", + "my_plugin".to_string(), + empty_udf.to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Confirm the same contents can be added to a new name. + pem.insert_plugin( + "foo", + "my_second_plugin".to_string(), + empty_udf.to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + Ok(()) + } + #[tokio::test] + async fn test_delete_plugin() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + // Create the DB by inserting a line. + pem._write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // First create a plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Then delete it + pem.delete_plugin("foo", "test_plugin").await.unwrap(); + + // Verify plugin is gone from schema + let schema = pem.catalog.db_schema("foo").unwrap(); + assert!(!schema.processing_engine_plugins.contains_key("test_plugin")); + + // Verify we can add a newly named plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def new_process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + Ok(()) + } + + #[tokio::test] + async fn test_delete_plugin_with_active_trigger() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + // Create the DB by inserting a line. + pem._write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // Create a plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Create a trigger using the plugin + pem.insert_trigger( + "foo", + "test_trigger".to_string(), + "test_plugin".to_string(), + TriggerSpecificationDefinition::AllTablesWalWrite, + false, + ) + .await + .unwrap(); + + // Try to delete the plugin - should fail because trigger exists + let result = pem.delete_plugin("foo", "test_plugin").await; + assert!(matches!( + result, + Err(ProcessingEngineError::CatalogUpdateError(catalog::Error::ProcessingEnginePluginInUse { + database_name, + plugin_name, + trigger_name, + })) if database_name == "foo" && plugin_name == "test_plugin" && trigger_name == "test_trigger" + )); + Ok(()) + } + + #[tokio::test] + async fn test_trigger_lifecycle() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + // convert to Arc + let write_buffer: Arc = Arc::clone(&pem._write_buffer); + + // Create the DB by inserting a line. + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // Create a plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Create an enabled trigger + pem.insert_trigger( + "foo", + "test_trigger".to_string(), + "test_plugin".to_string(), + TriggerSpecificationDefinition::AllTablesWalWrite, + false, + ) + .await + .unwrap(); + // Run the trigger + pem.run_trigger( + Arc::clone(&write_buffer), + Arc::clone(&pem._query_executor), + "foo", + "test_trigger", + ) + .await + .unwrap(); + + // Deactivate the trigger + let result = pem.deactivate_trigger("foo", "test_trigger").await; + assert!(result.is_ok()); + + // Verify trigger is disabled in schema + let schema = write_buffer.catalog().db_schema("foo").unwrap(); + let trigger = schema + .processing_engine_triggers + .get("test_trigger") + .unwrap(); + assert!(trigger.disabled); + + // Activate the trigger + let result = pem + .activate_trigger( + Arc::clone(&write_buffer), + Arc::clone(&pem._query_executor), + "foo", + "test_trigger", + ) + .await; + assert!(result.is_ok()); + + // Verify trigger is enabled and running + let schema = write_buffer.catalog().db_schema("foo").unwrap(); + let trigger = schema + .processing_engine_triggers + .get("test_trigger") + .unwrap(); + assert!(!trigger.disabled); + Ok(()) + } + + #[tokio::test] + async fn test_create_disabled_trigger() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + // Create the DB by inserting a line. + pem._write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // Create a plugin + pem.insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Create a disabled trigger + pem.insert_trigger( + "foo", + "test_trigger".to_string(), + "test_plugin".to_string(), + TriggerSpecificationDefinition::AllTablesWalWrite, + true, + ) + .await + .unwrap(); + + // Verify trigger is created but disabled + let schema = pem.catalog.db_schema("foo").unwrap(); + let trigger = schema + .processing_engine_triggers + .get("test_trigger") + .unwrap(); + assert!(trigger.disabled); + + // Verify trigger is not in active triggers list + assert!(pem.catalog.triggers().is_empty()); + Ok(()) + } + + #[tokio::test] + async fn test_activate_nonexistent_trigger() -> influxdb3_write::write_buffer::Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let pem = setup(start_time, test_store, wal_config).await; + + let write_buffer: Arc = Arc::clone(&pem._write_buffer); + + // Create the DB by inserting a line. + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + let result = pem + .activate_trigger( + Arc::clone(&write_buffer), + Arc::clone(&pem._query_executor), + "foo", + "nonexistent_trigger", + ) + .await; + + assert!(matches!( + result, + Err(ProcessingEngineError::CatalogUpdateError(catalog::Error::ProcessingEngineTriggerNotFound { + database_name, + trigger_name, + })) if database_name == "foo" && trigger_name == "nonexistent_trigger" + )); + Ok(()) + } + + async fn setup( + start: Time, + object_store: Arc, + wal_config: WalConfig, + ) -> ProcessingEngineManagerImpl { + let time_provider: Arc = Arc::new(MockProvider::new(start)); + let metric_registry = Arc::new(Registry::new()); + let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); + let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap()); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let meta_cache = + MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog)) + .unwrap(); + let wbuf = WriteBufferImpl::new(WriteBufferImplArgs { + persister, + catalog: Arc::clone(&catalog), + last_cache, + meta_cache, + time_provider: Arc::clone(&time_provider), + executor: make_exec(), + wal_config, + parquet_cache: None, + metric_registry: Arc::clone(&metric_registry), + }) + .await + .unwrap(); + let ctx = IOxSessionContext::with_testing(); + let runtime_env = ctx.inner().runtime_env(); + register_iox_object_store(runtime_env, "influxdb3", Arc::clone(&object_store)); + + let qe = Arc::new(UnimplementedQueryExecutor); + let wal = wbuf.wal(); + + ProcessingEngineManagerImpl::new(None, catalog, wbuf, qe, time_provider, wal) + } + + pub(crate) fn make_exec() -> Arc { + let metrics = Arc::new(metric::Registry::default()); + let object_store: Arc = Arc::new(InMemory::new()); + + let parquet_store = ParquetStorage::new( + Arc::clone(&object_store), + StorageId::from("test_exec_storage"), + ); + Arc::new(Executor::new_with_config_and_executor( + ExecutorConfig { + target_query_partitions: NonZeroUsize::new(1).unwrap(), + object_stores: [&parquet_store] + .into_iter() + .map(|store| (store.id(), Arc::clone(store.object_store()))) + .collect(), + metric_registry: Arc::clone(&metrics), + // Default to 1gb + mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb) + }, + DedicatedExecutor::new_testing(), + )) + } +} diff --git a/influxdb3_processing_engine/src/manager.rs b/influxdb3_processing_engine/src/manager.rs new file mode 100644 index 00000000000..c1843c8b95b --- /dev/null +++ b/influxdb3_processing_engine/src/manager.rs @@ -0,0 +1,131 @@ +use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_wal::{PluginType, TriggerSpecificationDefinition}; +use influxdb3_write::WriteBuffer; +use std::fmt::Debug; +use std::sync::Arc; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ProcessingEngineError { + #[error("database not found: {0}")] + DatabaseNotFound(String), + + #[error("catalog update error: {0}")] + CatalogUpdateError(#[from] influxdb3_catalog::catalog::Error), + + #[error("write buffer error: {0}")] + WriteBufferError(#[from] influxdb3_write::write_buffer::Error), + + #[error("wal error: {0}")] + WalError(#[from] influxdb3_wal::Error), +} + +/// `[ProcessingEngineManager]` is used to interact with the processing engine, +/// in particular plugins and triggers. +/// +#[async_trait::async_trait] +pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { + /// Inserts a plugin + async fn insert_plugin( + &self, + db: &str, + plugin_name: String, + code: String, + plugin_type: PluginType, + ) -> Result<(), ProcessingEngineError>; + + async fn delete_plugin(&self, db: &str, plugin_name: &str) + -> Result<(), ProcessingEngineError>; + + async fn insert_trigger( + &self, + db_name: &str, + trigger_name: String, + plugin_name: String, + trigger_specification: TriggerSpecificationDefinition, + disabled: bool, + ) -> Result<(), ProcessingEngineError>; + + async fn delete_trigger( + &self, + db_name: &str, + trigger_name: &str, + force: bool, + ) -> Result<(), ProcessingEngineError>; + + /// Starts running the trigger, which will run in the background. + async fn run_trigger( + &self, + write_buffer: Arc, + query_executor: Arc, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError>; + + async fn deactivate_trigger( + &self, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError>; + + async fn activate_trigger( + &self, + write_buffer: Arc, + query_executor: Arc, + db_name: &str, + trigger_name: &str, + ) -> Result<(), ProcessingEngineError>; + + async fn test_wal_plugin( + &self, + request: WalPluginTestRequest, + query_executor: Arc, + ) -> Result; +} + +// from the queryable_bufer +// #[cfg(feature = "system-py")] +// pub(crate) async fn subscribe_to_plugin_events( +// &self, +// trigger_name: String, +// ) -> mpsc::Receiver { +// let mut senders = self.plugin_event_tx.lock().await; +// +// // TODO: should we be checking for replacements? +// let (plugin_tx, plugin_rx) = mpsc::channel(4); +// senders.insert(trigger_name, plugin_tx); +// plugin_rx +// } +// +// /// Deactivates a running trigger by sending it a oneshot sender. It should send back a message and then immediately shut down. +// pub(crate) async fn deactivate_trigger( +// &self, +// #[allow(unused)] trigger_name: String, +// ) -> Result<(), anyhow::Error> { +// #[cfg(feature = "system-py")] +// { +// let Some(sender) = self.plugin_event_tx.lock().await.remove(&trigger_name) else { +// anyhow::bail!("no trigger named '{}' found", trigger_name); +// }; +// let (oneshot_tx, oneshot_rx) = oneshot::channel(); +// sender.send(PluginEvent::Shutdown(oneshot_tx)).await?; +// oneshot_rx.await?; +// } +// Ok(()) +// } +// +// async fn send_to_plugins(&self, wal_contents: &WalContents) { +// let senders = self.plugin_event_tx.lock().await; +// if !senders.is_empty() { +// let wal_contents = Arc::new(wal_contents.clone()); +// for (plugin, sender) in senders.iter() { +// if let Err(err) = sender +// .send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents))) +// .await +// { +// error!("failed to send plugin event to plugin {}: {}", plugin, err); +// } +// } +// } +// } diff --git a/influxdb3_write/src/write_buffer/plugins.rs b/influxdb3_processing_engine/src/plugins.rs similarity index 71% rename from influxdb3_write/src/write_buffer/plugins.rs rename to influxdb3_processing_engine/src/plugins.rs index ed9c571d75d..06f2818aeda 100644 --- a/influxdb3_write/src/write_buffer/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -1,11 +1,22 @@ -use crate::write_buffer::{plugins, PluginEvent}; -use crate::{write_buffer, WriteBuffer}; +#[cfg(feature = "system-py")] +use crate::PluginEvent; +#[cfg(feature = "system-py")] use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +#[cfg(feature = "system-py")] use influxdb3_internal_api::query_executor::QueryExecutor; -use influxdb3_wal::{PluginType, TriggerDefinition, TriggerSpecificationDefinition}; +#[cfg(feature = "system-py")] +use influxdb3_wal::TriggerDefinition; +#[cfg(feature = "system-py")] +use influxdb3_wal::TriggerSpecificationDefinition; +use influxdb3_write::write_buffer; +#[cfg(feature = "system-py")] +use influxdb3_write::WriteBuffer; +use observability_deps::tracing::error; use std::fmt::Debug; +#[cfg(feature = "system-py")] use std::sync::Arc; use thiserror::Error; +#[cfg(feature = "system-py")] use tokio::sync::mpsc; #[derive(Debug, Error)] @@ -33,176 +44,148 @@ pub enum Error { ReadPluginError(#[from] std::io::Error), } -/// `[ProcessingEngineManager]` is used to interact with the processing engine, -/// in particular plugins and triggers. -/// -#[async_trait::async_trait] -pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { - /// Inserts a plugin - async fn insert_plugin( - &self, - db: &str, - plugin_name: String, - code: String, - function_name: String, - plugin_type: PluginType, - ) -> crate::Result<(), write_buffer::Error>; - - async fn delete_plugin( - &self, - db: &str, - plugin_name: &str, - ) -> crate::Result<(), write_buffer::Error>; - - async fn insert_trigger( - &self, - db_name: &str, - trigger_name: String, - plugin_name: String, - trigger_specification: TriggerSpecificationDefinition, - disabled: bool, - ) -> crate::Result<(), write_buffer::Error>; - - async fn delete_trigger( - &self, - db_name: &str, - trigger_name: &str, - force: bool, - ) -> crate::Result<(), write_buffer::Error>; - - /// Starts running the trigger, which will run in the background. - async fn run_trigger( - &self, - write_buffer: Arc, - db_name: &str, - trigger_name: &str, - ) -> crate::Result<(), write_buffer::Error>; - - async fn deactivate_trigger( - &self, - db_name: &str, - trigger_name: &str, - ) -> Result<(), write_buffer::Error>; - - async fn activate_trigger( - &self, - write_buffer: Arc, - db_name: &str, - trigger_name: &str, - ) -> Result<(), write_buffer::Error>; - - async fn test_wal_plugin( - &self, - request: WalPluginTestRequest, - query_executor: Arc, - ) -> crate::Result; -} - #[cfg(feature = "system-py")] pub(crate) fn run_plugin( db_name: String, trigger_definition: TriggerDefinition, - mut context: PluginContext, + context: PluginContext, ) { let trigger_plugin = TriggerPlugin { trigger_definition, db_name, + write_buffer: context.write_buffer, + query_executor: context.query_executor, }; tokio::task::spawn(async move { trigger_plugin - .run_plugin(&mut context) + .run_plugin(context.trigger_rx) .await .expect("trigger plugin failed"); }); } +#[cfg(feature = "system-py")] pub(crate) struct PluginContext { // tokio channel for inputs pub(crate) trigger_rx: mpsc::Receiver, // handler to write data back to the DB. pub(crate) write_buffer: Arc, + // query executor to hand off to the plugin + pub(crate) query_executor: Arc, } +#[cfg(feature = "system-py")] #[async_trait::async_trait] trait RunnablePlugin { // Returns true if it should exit - async fn process_event( + async fn process_event(&self, event: PluginEvent) -> Result; + async fn run_plugin( &self, - event: PluginEvent, - write_buffer: Arc, - ) -> Result; - async fn run_plugin(&self, context: &mut PluginContext) -> Result<(), Error> { - while let Some(event) = context.trigger_rx.recv().await { - if self - .process_event(event, context.write_buffer.clone()) - .await? - { - break; - } - } - Ok(()) - } + receiver: tokio::sync::mpsc::Receiver, + ) -> Result<(), Error>; } + +#[cfg(feature = "system-py")] #[derive(Debug)] struct TriggerPlugin { trigger_definition: TriggerDefinition, db_name: String, + write_buffer: Arc, + query_executor: Arc, } #[cfg(feature = "system-py")] mod python_plugin { use super::*; - use crate::Precision; + use anyhow::Context; use data_types::NamespaceName; - use influxdb3_py_api::system_py::PyWriteBatch; + use hashbrown::HashMap; + use influxdb3_py_api::system_py::execute_python_with_batch; use influxdb3_wal::WalOp; + use influxdb3_write::Precision; use iox_time::Time; + use observability_deps::tracing::warn; use std::time::SystemTime; + use tokio::sync::mpsc::Receiver; #[async_trait::async_trait] impl RunnablePlugin for TriggerPlugin { - async fn process_event( - &self, - event: PluginEvent, - write_buffer: Arc, - ) -> Result { - let Some(schema) = write_buffer.catalog().db_schema(self.db_name.as_str()) else { + async fn run_plugin(&self, mut receiver: Receiver) -> Result<(), Error> { + loop { + let event = match receiver.recv().await { + Some(event) => event, + None => { + warn!(?self.trigger_definition, "trigger plugin receiver closed"); + break; + } + }; + + match self.process_event(event).await { + Ok(stop) => { + if stop { + break; + } + } + Err(e) => { + error!(?self.trigger_definition, "error processing event: {}", e); + } + } + } + + Ok(()) + } + async fn process_event(&self, event: PluginEvent) -> Result { + let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { return Err(Error::MissingDb); }; - let mut output_lines = Vec::new(); + let mut db_writes: HashMap> = HashMap::new(); match event { PluginEvent::WriteWalContents(wal_contents) => { for wal_op in &wal_contents.ops { match wal_op { WalOp::Write(write_batch) => { - let py_write_batch = PyWriteBatch { - // TODO: don't clone the write batch - write_batch: write_batch.clone(), - schema: Arc::clone(&schema), - }; - match &self.trigger_definition.trigger { + // determine if this write batch is for this database + if write_batch.database_name.as_ref() + != self.trigger_definition.database_name + { + continue; + } + let table_filter = match &self.trigger_definition.trigger { + TriggerSpecificationDefinition::AllTablesWalWrite => { + // no filter + None + } TriggerSpecificationDefinition::SingleTableWalWrite { table_name, } => { - output_lines.extend(py_write_batch.call_against_table( - table_name, - &self.trigger_definition.plugin.code, - &self.trigger_definition.plugin.function_name, - )?); - } - TriggerSpecificationDefinition::AllTablesWalWrite => { - for table in schema.table_map.right_values() { - output_lines.extend( - py_write_batch.call_against_table( - table.as_ref(), - &self.trigger_definition.plugin.code, - &self.trigger_definition.plugin.function_name, - )?, - ); - } + let table_id = schema + .table_name_to_id(table_name.as_ref()) + .context("table not found")?; + Some(table_id) } + }; + + let result = execute_python_with_batch( + &self.trigger_definition.plugin.code, + write_batch, + Arc::clone(&schema), + Arc::clone(&self.query_executor), + table_filter, + None, + )?; + + // write the output lines to the appropriate database + if !result.write_back_lines.is_empty() { + let lines = + db_writes.entry_ref(schema.name.as_ref()).or_default(); + lines.extend(result.write_back_lines); + } + + for (db_name, add_lines) in result.write_db_lines { + let lines = db_writes.entry(db_name).or_default(); + lines.extend(add_lines); } } WalOp::Catalog(_) => {} @@ -215,19 +198,22 @@ mod python_plugin { return Ok(true); } } - if !output_lines.is_empty() { - let ingest_time = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - write_buffer - .write_lp( - NamespaceName::new(self.db_name.to_string()).unwrap(), - output_lines.join("\n").as_str(), - Time::from_timestamp_nanos(ingest_time.as_nanos() as i64), - false, - Precision::Nanosecond, - ) - .await?; + + if !db_writes.is_empty() { + for (db_name, output_lines) in db_writes { + let ingest_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + self.write_buffer + .write_lp( + NamespaceName::new(db_name).unwrap(), + output_lines.join("\n").as_str(), + Time::from_timestamp_nanos(ingest_time.as_nanos() as i64), + false, + Precision::Nanosecond, + ) + .await?; + } } Ok(false) @@ -243,10 +229,10 @@ pub(crate) fn run_test_wal_plugin( code: String, request: WalPluginTestRequest, ) -> Result { - use crate::write_buffer::validator::WriteValidator; - use crate::Precision; use data_types::NamespaceName; use influxdb3_wal::Gen1Duration; + use influxdb3_write::write_buffer::validator::WriteValidator; + use influxdb3_write::Precision; let database = request.database; let namespace = NamespaceName::new(database.clone()) @@ -271,6 +257,7 @@ pub(crate) fn run_test_wal_plugin( &data.valid_data, db, query_executor, + None, request.input_arguments, )?; @@ -358,11 +345,11 @@ pub(crate) fn run_test_wal_plugin( #[cfg(test)] mod tests { use super::*; - use crate::write_buffer::validator::WriteValidator; - use crate::Precision; use data_types::NamespaceName; use influxdb3_catalog::catalog::Catalog; use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor; + use influxdb3_write::write_buffer::validator::WriteValidator; + use influxdb3_write::Precision; use iox_time::Time; use std::collections::HashMap; diff --git a/influxdb3_py_api/Cargo.toml b/influxdb3_py_api/Cargo.toml index aab94b4c36c..72e2d12c83c 100644 --- a/influxdb3_py_api/Cargo.toml +++ b/influxdb3_py_api/Cargo.toml @@ -11,11 +11,11 @@ system-py = ["pyo3"] [dependencies] arrow-array.workspace = true arrow-schema.workspace = true +influxdb3_id = { path = "../influxdb3_id" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_catalog = {path = "../influxdb3_catalog"} influxdb3_internal_api = { path = "../influxdb3_internal_api" } iox_query_params.workspace = true -schema.workspace = true parking_lot.workspace = true futures.workspace = true tokio.workspace = true diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index 1bc87c2e885..1267807a5f8 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -5,197 +5,22 @@ use arrow_array::{ }; use arrow_schema::DataType; use futures::TryStreamExt; -use influxdb3_catalog::catalog::{DatabaseSchema, TableDefinition}; +use influxdb3_catalog::catalog::DatabaseSchema; +use influxdb3_id::TableId; use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind}; -use influxdb3_wal::{FieldData, Row, WriteBatch}; +use influxdb3_wal::{FieldData, WriteBatch}; use iox_query_params::StatementParams; use parking_lot::Mutex; use pyo3::exceptions::PyValueError; -use pyo3::prelude::{PyAnyMethods, PyModule, PyModuleMethods}; +use pyo3::prelude::{PyAnyMethods, PyModule}; use pyo3::types::{PyDict, PyList}; use pyo3::{ - pyclass, pymethods, pymodule, Bound, IntoPyObject, Py, PyAny, PyErr, PyObject, PyResult, Python, + pyclass, pymethods, pymodule, Bound, IntoPyObject, Py, PyAny, PyObject, PyResult, Python, }; -use schema::InfluxColumnType; use std::collections::HashMap; use std::ffi::CString; use std::sync::Arc; -#[pyclass] -#[derive(Debug)] -pub struct PyWriteBatchIterator { - table_definition: Arc, - rows: Vec, - current_index: usize, -} - -#[pymethods] -impl PyWriteBatchIterator { - fn next_point(&mut self) -> PyResult> { - if self.current_index >= self.rows.len() { - return Ok(None); - } - - Python::with_gil(|py| { - let row = &self.rows[self.current_index]; - self.current_index += 1; - - // Import Point class - let point_class = py - .import("influxdb_client_3.write_client.client.write.point")? - .getattr("Point") - .unwrap(); - - // Create new Point instance with measurement name (table name) - let point = point_class.call1((self.table_definition.table_name.as_ref(),))?; - - // Set timestamp - point.call_method1("time", (row.time,))?; - - // Add fields based on column definitions and field data - for field in &row.fields { - if let Some(col_def) = self.table_definition.columns.get(&field.id) { - let field_name = col_def.name.as_ref(); - - match col_def.data_type { - InfluxColumnType::Tag => { - let FieldData::Tag(tag) = &field.value else { - // error out because we expect a tag - return Err(PyValueError::new_err(format!( - "expect FieldData:Tag for tagged columns, not ${:?}", - field - ))); - }; - point.call_method1("tag", (field_name, tag.as_str()))?; - } - InfluxColumnType::Timestamp => {} - InfluxColumnType::Field(_) => { - match &field.value { - FieldData::String(s) => { - point.call_method1("field", (field_name, s.as_str()))? - } - FieldData::Integer(i) => { - point.call_method1("field", (field_name, *i))? - } - FieldData::UInteger(u) => { - point.call_method1("field", (field_name, *u))? - } - FieldData::Float(f) => { - point.call_method1("field", (field_name, *f))? - } - FieldData::Boolean(b) => { - point.call_method1("field", (field_name, *b))? - } - FieldData::Tag(t) => { - point.call_method1("field", (field_name, t.as_str()))? - } - FieldData::Key(k) => { - point.call_method1("field", (field_name, k.as_str()))? - } - FieldData::Timestamp(ts) => { - point.call_method1("field", (field_name, *ts))? - } - }; - } - } - } - } - - Ok(Some(point.into_pyobject(py)?.unbind())) - }) - } -} - -#[pyclass] -#[derive(Debug)] -pub struct PyWriteBatch { - pub write_batch: WriteBatch, - pub schema: Arc, -} - -#[pymethods] -impl PyWriteBatch { - fn get_iterator_for_table(&self, table_name: &str) -> PyResult> { - // Find table ID from name - let table_id = self - .schema - .table_map - .get_by_right(&Arc::from(table_name)) - .ok_or_else(|| { - PyErr::new::(format!("Table '{}' not found", table_name)) - })?; - - // Get table chunks - let Some(chunks) = self.write_batch.table_chunks.get(table_id) else { - return Ok(None); - }; - - // Get table definition - let table_def = self.schema.tables.get(table_id).ok_or_else(|| { - PyErr::new::(format!( - "Table definition not found for '{}'", - table_name - )) - })?; - - Ok(Some(PyWriteBatchIterator { - table_definition: Arc::clone(table_def), - // TODO: avoid copying all the data at once. - rows: chunks - .chunk_time_to_chunk - .values() - .flat_map(|chunk| chunk.rows.clone()) - .collect(), - current_index: 0, - })) - } -} - -#[derive(Debug)] -#[pyclass] -pub struct PyLineProtocolOutput { - lines: Arc>>, -} - -#[pymethods] -impl PyLineProtocolOutput { - fn insert_line_protocol(&mut self, line: &str) -> PyResult<()> { - let mut lines = self.lines.lock(); - lines.push(line.to_string()); - Ok(()) - } -} - -impl PyWriteBatch { - pub fn call_against_table( - &self, - table_name: &str, - setup_code: &str, - call_site: &str, - ) -> PyResult> { - let Some(iterator) = self.get_iterator_for_table(table_name)? else { - return Ok(Vec::new()); - }; - - Python::with_gil(|py| { - py.run(&CString::new(setup_code)?, None, None)?; - let py_func = py.eval(&CString::new(call_site)?, None, None)?; - - // Create the output collector with shared state - let lines = Arc::new(Mutex::new(Vec::new())); - let output = PyLineProtocolOutput { - lines: Arc::clone(&lines), - }; - - // Pass both iterator and output collector to the Python function - py_func.call1((iterator, output.into_pyobject(py)?))?; - - let output_lines = lines.lock().clone(); - Ok(output_lines) - }) - } -} - #[pyclass] #[derive(Debug)] struct PyPluginCallApi { @@ -515,6 +340,7 @@ pub fn execute_python_with_batch( write_batch: &WriteBatch, schema: Arc, query_executor: Arc, + table_filter: Option, args: Option>, ) -> PyResult { Python::with_gil(|py| { @@ -531,6 +357,11 @@ pub fn execute_python_with_batch( let mut table_batches = Vec::with_capacity(write_batch.table_chunks.len()); for (table_id, table_chunks) in &write_batch.table_chunks { + if let Some(table_filter) = table_filter { + if table_id != &table_filter { + continue; + } + } let table_def = schema.tables.get(table_id).unwrap(); let dict = PyDict::new(py); @@ -621,8 +452,6 @@ pub fn execute_python_with_batch( // Module initialization #[pymodule] -fn influxdb3_py_api(m: &Bound<'_, PyModule>) -> PyResult<()> { - m.add_class::()?; - m.add_class::()?; +fn influxdb3_py_api(_m: &Bound<'_, PyModule>) -> PyResult<()> { Ok(()) } diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index c5aca260530..a0a26b37a8d 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -36,6 +36,7 @@ influxdb3_client = { path = "../influxdb3_client" } influxdb3_id = { path = "../influxdb3_id" } influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_process = { path = "../influxdb3_process", default-features = false } +influxdb3_processing_engine = { path = "../influxdb3_processing_engine" } influxdb3_wal = { path = "../influxdb3_wal"} influxdb3_write = { path = "../influxdb3_write" } iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" } diff --git a/influxdb3_server/src/builder.rs b/influxdb3_server/src/builder.rs index 56edfa14e17..42804822e4a 100644 --- a/influxdb3_server/src/builder.rs +++ b/influxdb3_server/src/builder.rs @@ -3,7 +3,9 @@ use std::sync::Arc; use crate::{auth::DefaultAuthorizer, http::HttpApi, CommonServerState, Server}; use authz::Authorizer; use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_processing_engine::ProcessingEngineManagerImpl; use influxdb3_write::{persister::Persister, WriteBuffer}; +use iox_time::TimeProvider; use tokio::net::TcpListener; #[derive(Debug)] @@ -144,17 +146,26 @@ impl ServerBuilder { } } -impl +impl ServerBuilder, WithListener> { pub fn build(self) -> Server { let persister = Arc::clone(&self.persister.0); let authorizer = Arc::clone(&self.authorizer); + let processing_engine = Arc::new(ProcessingEngineManagerImpl::new( + self.common_state.plugin_dir.clone(), + self.write_buffer.0.catalog(), + Arc::clone(&self.write_buffer.0), + Arc::clone(&self.query_executor.0), + Arc::clone(&self.time_provider.0) as _, + self.write_buffer.0.wal(), + )); let http = Arc::new(HttpApi::new( self.common_state.clone(), Arc::clone(&self.time_provider.0), Arc::clone(&self.write_buffer.0), Arc::clone(&self.query_executor.0), + processing_engine, self.max_request_size, Arc::clone(&authorizer), )); diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 175025992bf..cc3d22bffa7 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -24,6 +24,7 @@ use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MaxAge, MaxCardinal use influxdb3_catalog::catalog::Error as CatalogError; use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind}; use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION}; +use influxdb3_processing_engine::manager::ProcessingEngineManager; use influxdb3_wal::{PluginType, TriggerSpecificationDefinition}; use influxdb3_write::persister::TrackedMemoryArrowWriter; use influxdb3_write::write_buffer::Error as WriteBufferError; @@ -216,7 +217,10 @@ pub enum Error { PythonPluginsNotEnabled, #[error("Plugin error")] - Plugin(#[from] influxdb3_write::write_buffer::plugins::Error), + Plugin(#[from] influxdb3_processing_engine::plugins::Error), + + #[error("Processing engine error: {0}")] + ProcessingEngine(#[from] influxdb3_processing_engine::manager::ProcessingEngineError), } #[derive(Debug, Error)] @@ -439,6 +443,7 @@ pub type Result = std::result::Result; pub(crate) struct HttpApi { common_state: CommonServerState, write_buffer: Arc, + processing_engine: Arc, time_provider: Arc, pub(crate) query_executor: Arc, max_request_bytes: usize, @@ -452,6 +457,7 @@ impl HttpApi { time_provider: Arc, write_buffer: Arc, query_executor: Arc, + processing_engine: Arc, max_request_bytes: usize, authorizer: Arc, ) -> Self { @@ -464,6 +470,7 @@ impl HttpApi { max_request_bytes, authorizer, legacy_write_param_unifier, + processing_engine, } } } @@ -981,15 +988,14 @@ where db, plugin_name, code, - function_name, plugin_type, } = if let Some(query) = req.uri().query() { serde_urlencoded::from_str(query)? } else { self.read_body_json(req).await? }; - self.write_buffer - .insert_plugin(&db, plugin_name, code, function_name, plugin_type) + self.processing_engine + .insert_plugin(&db, plugin_name, code, plugin_type) .await?; Ok(Response::builder() @@ -1004,7 +1010,9 @@ where } else { self.read_body_json(req).await? }; - self.write_buffer.delete_plugin(&db, &plugin_name).await?; + self.processing_engine + .delete_plugin(&db, &plugin_name) + .await?; Ok(Response::builder() .status(StatusCode::OK) .body(Body::empty())?) @@ -1034,7 +1042,7 @@ where }, )); }; - self.write_buffer + self.processing_engine .insert_trigger( db.as_str(), trigger_name.clone(), @@ -1044,9 +1052,10 @@ where ) .await?; if !disabled { - self.write_buffer + self.processing_engine .run_trigger( Arc::clone(&self.write_buffer), + Arc::clone(&self.query_executor), db.as_str(), trigger_name.as_str(), ) @@ -1067,7 +1076,7 @@ where } else { self.read_body_json(req).await? }; - self.write_buffer + self.processing_engine .delete_trigger(&db, &trigger_name, force) .await?; Ok(Response::builder() @@ -1081,7 +1090,7 @@ where ) -> Result> { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; - self.write_buffer + self.processing_engine .deactivate_trigger(delete_req.db.as_str(), delete_req.trigger_name.as_str()) .await?; Ok(Response::builder() @@ -1094,9 +1103,10 @@ where ) -> Result> { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; - self.write_buffer + self.processing_engine .activate_trigger( Arc::clone(&self.write_buffer), + Arc::clone(&self.query_executor), delete_req.db.as_str(), delete_req.trigger_name.as_str(), ) @@ -1139,7 +1149,7 @@ where self.read_body_json(req).await?; let output = self - .write_buffer + .processing_engine .test_wal_plugin(request, Arc::clone(&self.query_executor)) .await?; let body = serde_json::to_string(&output)?; @@ -1579,7 +1589,6 @@ struct ProcessingEnginePluginCreateRequest { db: String, plugin_name: String, code: String, - function_name: String, plugin_type: PluginType, } diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 60b5879623f..736bbf7ac96 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -35,6 +35,7 @@ use observability_deps::tracing::info; use service::hybrid; use std::convert::Infallible; use std::fmt::Debug; +use std::path::PathBuf; use std::sync::Arc; use thiserror::Error; use tokio::net::TcpListener; @@ -78,6 +79,7 @@ pub struct CommonServerState { trace_exporter: Option>, trace_header_parser: TraceHeaderParser, telemetry_store: Arc, + plugin_dir: Option, } impl CommonServerState { @@ -86,12 +88,14 @@ impl CommonServerState { trace_exporter: Option>, trace_header_parser: TraceHeaderParser, telemetry_store: Arc, + plugin_dir: Option, ) -> Result { Ok(Self { metrics, trace_exporter, trace_header_parser, telemetry_store, + plugin_dir, }) } @@ -774,7 +778,6 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(parquet_cache), metric_registry: Arc::clone(&metrics), - plugin_dir: None, }, ) .await @@ -793,6 +796,7 @@ mod tests { None, trace_header_parser, Arc::clone(&sample_telem_store), + None, ) .unwrap(); let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs { diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 05e3f7fb5af..26032a8f981 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -713,7 +713,6 @@ mod tests { }, parquet_cache: Some(parquet_cache), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap(); diff --git a/influxdb3_server/src/system_tables/python_call.rs b/influxdb3_server/src/system_tables/python_call.rs index 9ae58292044..fa1d6e43996 100644 --- a/influxdb3_server/src/system_tables/python_call.rs +++ b/influxdb3_server/src/system_tables/python_call.rs @@ -17,7 +17,6 @@ pub(super) struct ProcessingEnginePluginTable { fn plugin_schema() -> SchemaRef { let columns = vec![ Field::new("plugin_name", DataType::Utf8, false), - Field::new("function_name", DataType::Utf8, false), Field::new("code", DataType::Utf8, false), Field::new("plugin_type", DataType::Utf8, false), ]; @@ -51,12 +50,6 @@ impl IoxSystemTable for ProcessingEnginePluginTable { .map(|call| Some(call.plugin_name.clone())) .collect::(), ), - Arc::new( - self.plugins - .iter() - .map(|p| Some(p.function_name.clone())) - .collect::(), - ), Arc::new( self.plugins .iter() diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 80073e615d8..a906ca4db08 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -120,13 +120,13 @@ pub trait Wal: Debug + Send + Sync + 'static { #[async_trait] pub trait WalFileNotifier: Debug + Send + Sync + 'static { /// Notify the handler that a new WAL file has been persisted with the given contents. - async fn notify(&self, write: WalContents); + async fn notify(&self, write: Arc); /// Notify the handler that a new WAL file has been persisted with the given contents and tell /// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete. async fn notify_and_snapshot( &self, - write: WalContents, + write: Arc, snapshot_details: SnapshotDetails, ) -> oneshot::Receiver; @@ -311,8 +311,8 @@ impl OrderedCatalogBatch { self.database_sequence_number } - pub fn batch(self) -> CatalogBatch { - self.catalog + pub fn batch(&self) -> &CatalogBatch { + &self.catalog } } @@ -614,7 +614,6 @@ pub struct MetaCacheDelete { pub struct PluginDefinition { pub plugin_name: String, pub code: String, - pub function_name: String, pub plugin_type: PluginType, } @@ -633,6 +632,7 @@ pub enum PluginType { pub struct TriggerDefinition { pub trigger_name: String, pub plugin_name: String, + pub database_name: String, pub trigger: TriggerSpecificationDefinition, // TODO: decide whether this should be populated from a reference rather than stored on its own. pub plugin: PluginDefinition, diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 6efd79e1bcc..0d80dca5362 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -145,7 +145,7 @@ impl WalObjectStore { match wal_contents.snapshot { // This branch uses so much time - None => self.file_notifier.notify(wal_contents).await, + None => self.file_notifier.notify(Arc::new(wal_contents)).await, Some(snapshot_details) => { let snapshot_info = { let mut buffer = self.flush_buffer.lock().await; @@ -162,11 +162,11 @@ impl WalObjectStore { if snapshot_details.snapshot_sequence_number <= last_snapshot_sequence_number { // Instead just notify about the WAL, as this snapshot has already been taken // and WAL files may have been cleared. - self.file_notifier.notify(wal_contents).await; + self.file_notifier.notify(Arc::new(wal_contents)).await; } else { let snapshot_done = self .file_notifier - .notify_and_snapshot(wal_contents, snapshot_details) + .notify_and_snapshot(Arc::new(wal_contents), snapshot_details) .await; let details = snapshot_done.await.unwrap(); assert_eq!(snapshot_details, details); @@ -299,7 +299,7 @@ impl WalObjectStore { info!(?snapshot_details, "snapshotting wal"); let snapshot_done = self .file_notifier - .notify_and_snapshot(wal_contents, snapshot_details) + .notify_and_snapshot(Arc::new(wal_contents), snapshot_details) .await; let (snapshot_info, snapshot_permit) = snapshot.expect("snapshot should be set when snapshot details are set"); @@ -310,7 +310,7 @@ impl WalObjectStore { "notify sent to buffer for wal file {}", wal_contents.wal_file_number.as_u64() ); - self.file_notifier.notify(wal_contents).await; + self.file_notifier.notify(Arc::new(wal_contents)).await; None } }; @@ -966,10 +966,11 @@ mod tests { { let notified_writes = replay_notifier.notified_writes.lock(); - assert_eq!( - *notified_writes, - vec![file_1_contents.clone(), file_2_contents.clone()] - ); + let notified_refs = notified_writes + .iter() + .map(|x| x.as_ref()) + .collect::>(); + assert_eq!(notified_refs, vec![&file_1_contents, &file_2_contents]); } assert!(replay_notifier.snapshot_details.lock().is_none()); @@ -1067,8 +1068,12 @@ mod tests { { let notified_writes = notifier.notified_writes.lock(); - let expected_writes = vec![file_1_contents, file_2_contents, file_3_contents.clone()]; - assert_eq!(*notified_writes, expected_writes); + let notified_refs = notified_writes + .iter() + .map(|x| x.as_ref()) + .collect::>(); + let expected_writes = vec![&file_1_contents, &file_2_contents, &file_3_contents]; + assert_eq!(notified_refs, expected_writes); let details = notifier.snapshot_details.lock(); assert_eq!(details.unwrap(), expected_info); } @@ -1097,7 +1102,11 @@ mod tests { .downcast_ref::() .unwrap(); let notified_writes = replay_notifier.notified_writes.lock(); - assert_eq!(*notified_writes, vec![file_3_contents.clone()]); + let notified_refs = notified_writes + .iter() + .map(|x| x.as_ref()) + .collect::>(); + assert_eq!(notified_refs, vec![&file_3_contents]); let snapshot_details = replay_notifier.snapshot_details.lock(); assert_eq!(*snapshot_details, file_3_contents.snapshot); } @@ -1303,19 +1312,19 @@ mod tests { #[derive(Debug, Default)] struct TestNotifier { - notified_writes: parking_lot::Mutex>, + notified_writes: parking_lot::Mutex>>, snapshot_details: parking_lot::Mutex>, } #[async_trait] impl WalFileNotifier for TestNotifier { - async fn notify(&self, write: WalContents) { + async fn notify(&self, write: Arc) { self.notified_writes.lock().push(write); } async fn notify_and_snapshot( &self, - write: WalContents, + write: Arc, snapshot_details: SnapshotDetails, ) -> Receiver { self.notified_writes.lock().push(write); diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 823cdb3a6dd..82b93f64ec4 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -24,8 +24,8 @@ use influxdb3_id::ParquetFileId; use influxdb3_id::SerdeVecMap; use influxdb3_id::TableId; use influxdb3_id::{ColumnId, DbId}; -use influxdb3_wal::MetaCacheDefinition; use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber}; +use influxdb3_wal::{MetaCacheDefinition, Wal}; use iox_query::QueryChunk; use iox_time::Time; use serde::{Deserialize, Serialize}; @@ -33,7 +33,6 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use write_buffer::plugins::ProcessingEngineManager; #[derive(Debug, Error)] pub enum Error { @@ -50,12 +49,7 @@ pub enum Error { pub type Result = std::result::Result; pub trait WriteBuffer: - Bufferer - + ChunkContainer - + MetaCacheManager - + LastCacheManager - + DatabaseManager - + ProcessingEngineManager + Bufferer + ChunkContainer + MetaCacheManager + LastCacheManager + DatabaseManager { } @@ -95,6 +89,9 @@ pub trait Bufferer: Debug + Send + Sync + 'static { /// Returns the database schema provider fn catalog(&self) -> Arc; + /// Reutrns the WAL this bufferer is using + fn wal(&self) -> Arc; + /// Returns the parquet files for a given database and table fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec; diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index e1a53c2e6d4..c0153f2d7b2 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -489,7 +489,7 @@ mod tests { persister.persist_catalog(&catalog).await.unwrap(); let batch = |name: &str, num: u32| { - let _ = catalog.apply_catalog_batch(CatalogBatch { + let _ = catalog.apply_catalog_batch(&CatalogBatch { database_id: db_schema.id, database_name: Arc::clone(&db_schema.name), time_ns: 5000, diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 26928d15fd7..914d7a92cf2 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -2,8 +2,6 @@ mod metrics; pub mod persisted_files; -#[allow(dead_code)] -pub mod plugins; pub mod queryable_buffer; mod table_buffer; pub mod validator; @@ -12,13 +10,11 @@ use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::queryable_buffer::QueryableBuffer; use crate::write_buffer::validator::WriteValidator; -use crate::{chunk::ParquetChunk, write_buffer, DatabaseManager}; +use crate::{chunk::ParquetChunk, DatabaseManager}; use crate::{ BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, MetaCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError, }; -#[cfg(feature = "system-py")] -use anyhow::Context; use async_trait::async_trait; use data_types::{ ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError, PartitionHashId, @@ -31,22 +27,17 @@ use datafusion::logical_expr::Expr; use influxdb3_cache::last_cache::{self, LastCacheProvider}; use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MetaCacheProvider}; use influxdb3_cache::parquet_cache::ParquetCacheOracle; -use influxdb3_catalog::catalog; -use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{ColumnId, DbId, TableId}; -use influxdb3_wal::{ - object_store::WalObjectStore, DeleteDatabaseDefinition, DeleteTriggerDefinition, - PluginDefinition, PluginType, TriggerDefinition, TriggerSpecificationDefinition, WalContents, -}; +use influxdb3_wal::FieldDataType; +use influxdb3_wal::TableDefinition; +use influxdb3_wal::{object_store::WalObjectStore, DeleteDatabaseDefinition}; use influxdb3_wal::{ CatalogBatch, CatalogOp, LastCacheDefinition, LastCacheDelete, LastCacheSize, MetaCacheDefinition, MetaCacheDelete, Wal, WalConfig, WalFileNotifier, WalOp, }; use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition}; use influxdb3_wal::{DatabaseDefinition, FieldDefinition}; -use influxdb3_wal::{DeletePluginDefinition, TableDefinition}; -use influxdb3_wal::{FieldDataType, TriggerIdentifier}; use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges}; use iox_query::QueryChunk; use iox_time::{Time, TimeProvider}; @@ -56,21 +47,13 @@ use object_store::path::Path as ObjPath; use object_store::{ObjectMeta, ObjectStore}; use observability_deps::tracing::{debug, warn}; use parquet_file::storage::ParquetExecInput; -use plugins::ProcessingEngineManager; use queryable_buffer::QueryableBufferArgs; use schema::Schema; -use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::sync::oneshot; use tokio::sync::watch::Receiver; -#[cfg(feature = "system-py")] -use crate::write_buffer::plugins::PluginContext; -use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; -use influxdb3_internal_api::query_executor::QueryExecutor; - #[derive(Debug, Error)] pub enum Error { #[error("parsing for line protocol failed")] @@ -163,8 +146,6 @@ pub struct WriteBufferImpl { metrics: WriteMetrics, meta_cache: Arc, last_cache: Arc, - #[allow(dead_code)] - plugin_dir: Option, } /// The maximum number of snapshots to load on start @@ -181,7 +162,6 @@ pub struct WriteBufferImplArgs { pub wal_config: WalConfig, pub parquet_cache: Option>, pub metric_registry: Arc, - pub plugin_dir: Option, } impl WriteBufferImpl { @@ -196,7 +176,6 @@ impl WriteBufferImpl { wal_config, parquet_cache, metric_registry, - plugin_dir, }: WriteBufferImplArgs, ) -> Result> { // load snapshots and replay the wal into the in memory buffer @@ -255,15 +234,7 @@ impl WriteBufferImpl { persisted_files, buffer: queryable_buffer, metrics: WriteMetrics::new(&metric_registry), - plugin_dir, }); - let write_buffer: Arc = result.clone(); - let triggers = result.catalog().triggers(); - for (db_name, trigger_name) in triggers { - result - .run_trigger(Arc::clone(&write_buffer), &db_name, &trigger_name) - .await?; - } Ok(result) } @@ -271,6 +242,10 @@ impl WriteBufferImpl { Arc::clone(&self.catalog) } + pub fn wal(&self) -> Arc { + Arc::clone(&self.wal) + } + pub fn persisted_files(&self) -> Arc { Arc::clone(&self.persisted_files) } @@ -375,13 +350,6 @@ impl WriteBufferImpl { Ok(chunks) } - - #[cfg(feature = "system-py")] - fn read_plugin_code(&self, name: &str) -> Result { - let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?; - let path = plugin_dir.join(name); - Ok(std::fs::read_to_string(path)?) - } } pub fn parquet_chunk_from_file( @@ -450,6 +418,10 @@ impl Bufferer for WriteBufferImpl { self.catalog() } + fn wal(&self) -> Arc { + Arc::clone(&self.wal) + } + fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec { self.buffer.persisted_parquet_files(db_id, table_id) } @@ -496,7 +468,7 @@ impl MetaCacheManager for WriteBufferImpl { time_ns: self.time_provider.now().timestamp_nanos(), ops: vec![catalog_op], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -526,7 +498,7 @@ impl MetaCacheManager for WriteBufferImpl { cache_name: cache_name.into(), })], }; - if let Some(catalog_batch) = catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -592,7 +564,7 @@ impl LastCacheManager for WriteBufferImpl { database_name: Arc::clone(&db_schema.name), ops: vec![CreateLastCache(info.clone())], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -627,7 +599,7 @@ impl LastCacheManager for WriteBufferImpl { // NOTE: if this fails then the cache will be gone from the running server, but will be // resurrected on server restart. - if let Some(catalog_batch) = catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -656,7 +628,7 @@ impl DatabaseManager for WriteBufferImpl { database_name: Arc::clone(&db_schema.name), })], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -684,7 +656,7 @@ impl DatabaseManager for WriteBufferImpl { deletion_time: deletion_time.timestamp_nanos(), })], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { let wal_op = WalOp::Catalog(catalog_batch); self.wal.write_ops(vec![wal_op]).await?; debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully deleted database"); @@ -763,7 +735,7 @@ impl DatabaseManager for WriteBufferImpl { database_name: Arc::clone(&db_schema.name), ops: vec![CatalogOp::CreateTable(catalog_table_def)], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -810,7 +782,7 @@ impl DatabaseManager for WriteBufferImpl { table_name: Arc::clone(&table_defn.table_name), })], }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { self.wal .write_ops(vec![WalOp::Catalog(catalog_batch)]) .await?; @@ -826,319 +798,6 @@ impl DatabaseManager for WriteBufferImpl { } } -#[async_trait::async_trait] -impl ProcessingEngineManager for WriteBufferImpl { - async fn insert_plugin( - &self, - db: &str, - plugin_name: String, - code: String, - function_name: String, - plugin_type: PluginType, - ) -> crate::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db) - .ok_or_else(|| self::Error::DatabaseNotFound { - db_name: db.to_owned(), - })?; - - let catalog_op = CatalogOp::CreatePlugin(PluginDefinition { - plugin_name, - code, - function_name, - plugin_type, - }); - - let creation_time = self.time_provider.now(); - let catalog_batch = CatalogBatch { - time_ns: creation_time.timestamp_nanos(), - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - ops: vec![catalog_op], - }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; - } - Ok(()) - } - - async fn delete_plugin(&self, db: &str, plugin_name: &str) -> crate::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db.to_string(), - })?; - let catalog_op = CatalogOp::DeletePlugin(DeletePluginDefinition { - plugin_name: plugin_name.to_string(), - }); - let catalog_batch = CatalogBatch { - time_ns: self.time_provider.now().timestamp_nanos(), - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - ops: vec![catalog_op], - }; - - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { - self.wal - .write_ops(vec![WalOp::Catalog(catalog_batch)]) - .await?; - } - Ok(()) - } - - async fn insert_trigger( - &self, - db_name: &str, - trigger_name: String, - plugin_name: String, - trigger_specification: TriggerSpecificationDefinition, - disabled: bool, - ) -> crate::Result<(), Error> { - let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else { - return Err(Error::DatabaseNotFound { - db_name: db_name.to_owned(), - }); - }; - let plugin = db_schema - .processing_engine_plugins - .get(&plugin_name) - .ok_or_else(|| catalog::Error::ProcessingEnginePluginNotFound { - plugin_name: plugin_name.to_string(), - database_name: db_schema.name.to_string(), - })?; - let catalog_op = CatalogOp::CreateTrigger(TriggerDefinition { - trigger_name, - plugin_name, - plugin: plugin.clone(), - trigger: trigger_specification, - disabled, - }); - let creation_time = self.time_provider.now(); - let catalog_batch = CatalogBatch { - time_ns: creation_time.timestamp_nanos(), - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - ops: vec![catalog_op], - }; - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; - } - Ok(()) - } - - async fn delete_trigger( - &self, - db: &str, - trigger_name: &str, - force: bool, - ) -> crate::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db.to_string(), - })?; - let catalog_op = CatalogOp::DeleteTrigger(DeleteTriggerDefinition { - trigger_name: trigger_name.to_string(), - force, - }); - let catalog_batch = CatalogBatch { - time_ns: self.time_provider.now().timestamp_nanos(), - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - ops: vec![catalog_op], - }; - - // Do this first to avoid a dangling running plugin. - // Potential edge-case of a plugin being stopped but not deleted, - // but should be okay given desire to force delete. - let needs_deactivate = force - && db_schema - .processing_engine_triggers - .get(trigger_name) - .is_some_and(|trigger| !trigger.disabled); - - if needs_deactivate { - self.deactivate_trigger(db, trigger_name).await?; - } - - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { - self.wal - .write_ops(vec![WalOp::Catalog(catalog_batch)]) - .await?; - } - Ok(()) - } - - #[cfg_attr(not(feature = "system-py"), allow(unused))] - async fn run_trigger( - &self, - write_buffer: Arc, - db_name: &str, - trigger_name: &str, - ) -> crate::Result<(), write_buffer::Error> { - #[cfg(feature = "system-py")] - { - let (_db_id, db_schema) = - self.catalog - .db_id_and_schema(db_name) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db_name.to_string(), - })?; - let trigger = db_schema - .processing_engine_triggers - .get(trigger_name) - .ok_or_else(|| ProcessingEngineTriggerNotFound { - database_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - })? - .clone(); - let trigger_rx = self - .buffer - .subscribe_to_plugin_events(trigger_name.to_string()) - .await; - let plugin_context = PluginContext { - trigger_rx, - write_buffer, - }; - plugins::run_plugin(db_name.to_string(), trigger, plugin_context); - } - - Ok(()) - } - - async fn deactivate_trigger( - &self, - db_name: &str, - trigger_name: &str, - ) -> std::result::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db_name) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db_name.to_string(), - })?; - let trigger = db_schema - .processing_engine_triggers - .get(trigger_name) - .ok_or_else(|| ProcessingEngineTriggerNotFound { - database_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - })?; - // Already disabled, so this is a no-op - if trigger.disabled { - return Ok(()); - }; - - let mut deactivated = trigger.clone(); - deactivated.disabled = true; - let catalog_op = CatalogOp::DisableTrigger(TriggerIdentifier { - db_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - }); - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(CatalogBatch { - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - time_ns: self.time_provider.now().timestamp_nanos(), - ops: vec![catalog_op], - })? { - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; - } - // TODO: handle processing engine errors - self.buffer - .deactivate_trigger(trigger_name.to_string()) - .await - .unwrap(); - Ok(()) - } - - async fn activate_trigger( - &self, - write_buffer: Arc, - db_name: &str, - trigger_name: &str, - ) -> std::result::Result<(), Error> { - let (db_id, db_schema) = - self.catalog - .db_id_and_schema(db_name) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db_name.to_string(), - })?; - let trigger = db_schema - .processing_engine_triggers - .get(trigger_name) - .ok_or_else(|| ProcessingEngineTriggerNotFound { - database_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - })?; - // Already disabled, so this is a no-op - if !trigger.disabled { - return Ok(()); - }; - - let mut activated = trigger.clone(); - activated.disabled = false; - let catalog_op = CatalogOp::EnableTrigger(TriggerIdentifier { - db_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - }); - if let Some(catalog_batch) = self.catalog.apply_catalog_batch(CatalogBatch { - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - time_ns: self.time_provider.now().timestamp_nanos(), - ops: vec![catalog_op], - })? { - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; - } - - self.run_trigger(write_buffer, db_name, trigger_name) - .await?; - Ok(()) - } - - #[cfg_attr(not(feature = "system-py"), allow(unused))] - async fn test_wal_plugin( - &self, - request: WalPluginTestRequest, - query_executor: Arc, - ) -> crate::Result { - #[cfg(feature = "system-py")] - { - // create a copy of the catalog so we don't modify the original - let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner())); - let now = self.time_provider.now(); - - let code = self.read_plugin_code(&request.filename)?; - - let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request) - .unwrap_or_else(|e| WalPluginTestResponse { - log_lines: vec![], - database_writes: Default::default(), - errors: vec![e.to_string()], - }); - - return Ok(res); - } - - #[cfg(not(feature = "system-py"))] - Err(plugins::Error::AnyhowError(anyhow::anyhow!( - "system-py feature not enabled" - ))) - } -} - -#[allow(unused)] -pub(crate) enum PluginEvent { - WriteWalContents(Arc), - Shutdown(oneshot::Sender<()>), -} - impl WriteBuffer for WriteBufferImpl {} pub async fn check_mem_and_force_snapshot_loop( @@ -1271,7 +930,6 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap(); @@ -1356,7 +1014,6 @@ mod tests { }, parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap(); @@ -1425,7 +1082,6 @@ mod tests { }, parquet_cache: wbuf.parquet_cache.clone(), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap() @@ -1653,7 +1309,6 @@ mod tests { }, parquet_cache: write_buffer.parquet_cache.clone(), metric_registry: Default::default(), - plugin_dir: None, }) .await .unwrap(); @@ -2939,7 +2594,6 @@ mod tests { wal_config, parquet_cache, metric_registry: Arc::clone(&metric_registry), - plugin_dir: None, }) .await .unwrap(); @@ -2969,393 +2623,6 @@ mod tests { batches } - #[tokio::test] - async fn test_create_plugin() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - let empty_udf = r#"def example(iterator, output): - return"#; - - write_buffer - .insert_plugin( - "foo", - "my_plugin".to_string(), - empty_udf.to_string(), - "example".to_string(), - PluginType::WalRows, - ) - .await?; - - let plugin = write_buffer - .catalog - .db_schema("foo") - .expect("should have db named foo") - .processing_engine_plugins - .get("my_plugin") - .unwrap() - .clone(); - let expected = PluginDefinition { - plugin_name: "my_plugin".to_string(), - code: empty_udf.to_string(), - function_name: "example".to_string(), - plugin_type: PluginType::WalRows, - }; - assert_eq!(expected, plugin); - - // confirm that creating it again is a no-op. - write_buffer - .insert_plugin( - "foo", - "my_plugin".to_string(), - empty_udf.to_string(), - "example".to_string(), - PluginType::WalRows, - ) - .await?; - - // confirm that a different argument is an error - let Err(Error::CatalogUpdateError(catalog::Error::ProcessingEngineCallExists { .. })) = - write_buffer - .insert_plugin( - "foo", - "my_plugin".to_string(), - empty_udf.to_string(), - "bad_example".to_string(), - PluginType::WalRows, - ) - .await - else { - panic!("failed to insert plugin"); - }; - - // Confirm the same contents can be added to a new name. - write_buffer - .insert_plugin( - "foo", - "my_second_plugin".to_string(), - empty_udf.to_string(), - "example".to_string(), - PluginType::WalRows, - ) - .await?; - Ok(()) - } - #[tokio::test] - async fn test_delete_plugin() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - // First create a plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def process(iterator, output): pass".to_string(), - "process".to_string(), - PluginType::WalRows, - ) - .await?; - - // Then delete it - write_buffer.delete_plugin("foo", "test_plugin").await?; - - // Verify plugin is gone from schema - let schema = write_buffer.catalog().db_schema("foo").unwrap(); - assert!(!schema.processing_engine_plugins.contains_key("test_plugin")); - - // Verify we can add a newly named plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def new_process(iterator, output): pass".to_string(), - "new_process".to_string(), - PluginType::WalRows, - ) - .await?; - - Ok(()) - } - - #[tokio::test] - async fn test_delete_plugin_with_active_trigger() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - // Create a plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def process(iterator, output): pass".to_string(), - "process".to_string(), - PluginType::WalRows, - ) - .await - .unwrap(); - - // Create a trigger using the plugin - write_buffer - .insert_trigger( - "foo", - "test_trigger".to_string(), - "test_plugin".to_string(), - TriggerSpecificationDefinition::AllTablesWalWrite, - false, - ) - .await - .unwrap(); - - // Try to delete the plugin - should fail because trigger exists - let result = write_buffer.delete_plugin("foo", "test_plugin").await; - assert!(matches!( - result, - Err(Error::CatalogUpdateError(catalog::Error::ProcessingEnginePluginInUse { - database_name, - plugin_name, - trigger_name, - })) if database_name == "foo" && plugin_name == "test_plugin" && trigger_name == "test_trigger" - )); - Ok(()) - } - - #[tokio::test] - async fn test_trigger_lifecycle() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - // convert to Arc - let write_buffer: Arc = write_buffer.clone(); - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - // Create a plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def process(iterator, output): pass".to_string(), - "process".to_string(), - PluginType::WalRows, - ) - .await?; - - // Create an enabled trigger - write_buffer - .insert_trigger( - "foo", - "test_trigger".to_string(), - "test_plugin".to_string(), - TriggerSpecificationDefinition::AllTablesWalWrite, - false, - ) - .await?; - // Run the trigger - write_buffer - .run_trigger(Arc::clone(&write_buffer), "foo", "test_trigger") - .await?; - - // Deactivate the trigger - let result = write_buffer.deactivate_trigger("foo", "test_trigger").await; - assert!(result.is_ok()); - - // Verify trigger is disabled in schema - let schema = write_buffer.catalog().db_schema("foo").unwrap(); - let trigger = schema - .processing_engine_triggers - .get("test_trigger") - .unwrap(); - assert!(trigger.disabled); - - // Activate the trigger - let result = write_buffer - .activate_trigger(Arc::clone(&write_buffer), "foo", "test_trigger") - .await; - assert!(result.is_ok()); - - // Verify trigger is enabled and running - let schema = write_buffer.catalog().db_schema("foo").unwrap(); - let trigger = schema - .processing_engine_triggers - .get("test_trigger") - .unwrap(); - assert!(!trigger.disabled); - Ok(()) - } - - #[tokio::test] - async fn test_create_disabled_trigger() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - // Create a plugin - write_buffer - .insert_plugin( - "foo", - "test_plugin".to_string(), - "def process(iterator, output): pass".to_string(), - "process".to_string(), - PluginType::WalRows, - ) - .await?; - - // Create a disabled trigger - write_buffer - .insert_trigger( - "foo", - "test_trigger".to_string(), - "test_plugin".to_string(), - TriggerSpecificationDefinition::AllTablesWalWrite, - true, - ) - .await?; - - // Verify trigger is created but disabled - let schema = write_buffer.catalog().db_schema("foo").unwrap(); - let trigger = schema - .processing_engine_triggers - .get("test_trigger") - .unwrap(); - assert!(trigger.disabled); - - // Verify trigger is not in active triggers list - assert!(write_buffer.catalog().triggers().is_empty()); - Ok(()) - } - - #[tokio::test] - async fn test_activate_nonexistent_trigger() -> Result<()> { - let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); - let test_store = Arc::new(InMemory::new()); - let wal_config = WalConfig { - gen1_duration: Gen1Duration::new_1m(), - max_write_buffer_size: 100, - flush_interval: Duration::from_millis(10), - snapshot_size: 1, - }; - let (write_buffer, _, _) = - setup_cache_optional(start_time, test_store, wal_config, false).await; - - let write_buffer: Arc = write_buffer.clone(); - - // Create the DB by inserting a line. - write_buffer - .write_lp( - NamespaceName::new("foo").unwrap(), - "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", - start_time, - false, - Precision::Nanosecond, - ) - .await?; - - let result = write_buffer - .activate_trigger(Arc::clone(&write_buffer), "foo", "nonexistent_trigger") - .await; - - assert!(matches!( - result, - Err(Error::CatalogUpdateError(catalog::Error::ProcessingEngineTriggerNotFound { - database_name, - trigger_name, - })) if database_name == "foo" && trigger_name == "nonexistent_trigger" - )); - Ok(()) - } - #[test_log::test(tokio::test)] async fn test_check_mem_and_force_snapshot() { let obj_store: Arc = Arc::new(InMemory::new()); diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index edf297e4007..1ab81067ec7 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -3,7 +3,6 @@ use crate::paths::ParquetFilePath; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::table_buffer::TableBuffer; -use crate::write_buffer::PluginEvent; use crate::{ParquetFile, ParquetFileId, PersistedSnapshot}; use anyhow::Context; use arrow::record_batch::RecordBatch; @@ -36,8 +35,7 @@ use schema::Schema; use std::any::Any; use std::sync::Arc; use std::time::Duration; -use tokio::sync::oneshot::Receiver; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::oneshot::{self, Receiver}; #[derive(Debug)] pub struct QueryableBuffer { @@ -52,7 +50,6 @@ pub struct QueryableBuffer { /// Sends a notification to this watch channel whenever a snapshot info is persisted persisted_snapshot_notify_rx: tokio::sync::watch::Receiver>, persisted_snapshot_notify_tx: tokio::sync::watch::Sender>, - plugin_event_tx: Mutex>>, } pub struct QueryableBufferArgs { @@ -91,7 +88,6 @@ impl QueryableBuffer { parquet_cache, persisted_snapshot_notify_rx, persisted_snapshot_notify_tx, - plugin_event_tx: Mutex::new(HashMap::new()), } } @@ -157,11 +153,11 @@ impl QueryableBuffer { /// Called when the wal has persisted a new file. Buffer the contents in memory and update the /// last cache so the data is queryable. - fn buffer_contents(&self, write: WalContents) { + fn buffer_contents(&self, write: Arc) { self.write_wal_contents_to_caches(&write); let mut buffer = self.buffer.write(); buffer.buffer_ops( - write.ops, + &write.ops, &self.last_cache_provider, &self.meta_cache_provider, ); @@ -171,7 +167,7 @@ impl QueryableBuffer { /// data that can be snapshot in the background after putting the data in the buffer. async fn buffer_contents_and_persist_snapshotted_data( &self, - write: WalContents, + write: Arc, snapshot_details: SnapshotDetails, ) -> Receiver { info!( @@ -183,7 +179,7 @@ impl QueryableBuffer { let persist_jobs = { let mut buffer = self.buffer.write(); buffer.buffer_ops( - write.ops, + &write.ops, &self.last_cache_provider, &self.meta_cache_provider, ); @@ -388,51 +384,6 @@ impl QueryableBuffer { buffer.db_to_table.remove(db_id); } - #[cfg(feature = "system-py")] - pub(crate) async fn subscribe_to_plugin_events( - &self, - trigger_name: String, - ) -> mpsc::Receiver { - let mut senders = self.plugin_event_tx.lock().await; - - // TODO: should we be checking for replacements? - let (plugin_tx, plugin_rx) = mpsc::channel(4); - senders.insert(trigger_name, plugin_tx); - plugin_rx - } - - /// Deactivates a running trigger by sending it a oneshot sender. It should send back a message and then immediately shut down. - pub(crate) async fn deactivate_trigger( - &self, - #[allow(unused)] trigger_name: String, - ) -> Result<(), anyhow::Error> { - #[cfg(feature = "system-py")] - { - let Some(sender) = self.plugin_event_tx.lock().await.remove(&trigger_name) else { - anyhow::bail!("no trigger named '{}' found", trigger_name); - }; - let (oneshot_tx, oneshot_rx) = oneshot::channel(); - sender.send(PluginEvent::Shutdown(oneshot_tx)).await?; - oneshot_rx.await?; - } - Ok(()) - } - - async fn send_to_plugins(&self, wal_contents: &WalContents) { - let senders = self.plugin_event_tx.lock().await; - if !senders.is_empty() { - let wal_contents = Arc::new(wal_contents.clone()); - for (plugin, sender) in senders.iter() { - if let Err(err) = sender - .send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents))) - .await - { - error!("failed to send plugin event to plugin {}: {}", plugin, err); - } - } - } - } - pub fn get_total_size_bytes(&self) -> usize { let buffer = self.buffer.read(); buffer.find_overall_buffer_size_bytes() @@ -441,17 +392,15 @@ impl QueryableBuffer { #[async_trait] impl WalFileNotifier for QueryableBuffer { - async fn notify(&self, write: WalContents) { - self.send_to_plugins(&write).await; + async fn notify(&self, write: Arc) { self.buffer_contents(write) } async fn notify_and_snapshot( &self, - write: WalContents, + write: Arc, snapshot_details: SnapshotDetails, ) -> Receiver { - self.send_to_plugins(&write).await; self.buffer_contents_and_persist_snapshotted_data(write, snapshot_details) .await } @@ -479,7 +428,7 @@ impl BufferState { pub fn buffer_ops( &mut self, - ops: Vec, + ops: &[WalOp], last_cache_provider: &LastCacheProvider, meta_cache_provider: &MetaCacheProvider, ) { @@ -580,7 +529,7 @@ impl BufferState { } } - fn add_write_batch(&mut self, write_batch: WriteBatch) { + fn add_write_batch(&mut self, write_batch: &WriteBatch) { let db_schema = self .catalog .db_schema_by_id(&write_batch.database_id) @@ -588,10 +537,10 @@ impl BufferState { let database_buffer = self.db_to_table.entry(write_batch.database_id).or_default(); - for (table_id, table_chunks) in write_batch.table_chunks { - let table_buffer = database_buffer.entry(table_id).or_insert_with(|| { + for (table_id, table_chunks) in &write_batch.table_chunks { + let table_buffer = database_buffer.entry(*table_id).or_insert_with(|| { let table_def = db_schema - .table_definition_by_id(&table_id) + .table_definition_by_id(table_id) .expect("table should exist"); let sort_key = table_def .series_key @@ -601,8 +550,8 @@ impl BufferState { TableBuffer::new(index_columns, SortKey::from_columns(sort_key)) }); - for (chunk_time, chunk) in table_chunks.chunk_time_to_chunk { - table_buffer.buffer_chunk(chunk_time, chunk.rows); + for (chunk_time, chunk) in &table_chunks.chunk_time_to_chunk { + table_buffer.buffer_chunk(*chunk_time, &chunk.rows); } } } @@ -832,7 +781,7 @@ mod tests { wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; // write the lp into the buffer - queryable_buffer.notify(wal_contents).await; + queryable_buffer.notify(Arc::new(wal_contents)).await; // now force a snapshot, persisting the data to parquet file. Also, buffer up a new write let snapshot_sequence_number = SnapshotSequenceNumber::new(1); @@ -865,7 +814,7 @@ mod tests { wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; let details = queryable_buffer - .notify_and_snapshot(wal_contents, snapshot_details) + .notify_and_snapshot(Arc::new(wal_contents), snapshot_details) .await; let _details = details.await.unwrap(); @@ -888,14 +837,14 @@ mod tests { }; queryable_buffer .notify_and_snapshot( - WalContents { + Arc::new(WalContents { persist_timestamp_ms: 0, min_timestamp_ns: 0, max_timestamp_ns: 0, wal_file_number: WalFileSequenceNumber::new(3), ops: vec![], snapshot: Some(snapshot_details), - }, + }), snapshot_details, ) .await diff --git a/influxdb3_write/src/write_buffer/table_buffer.rs b/influxdb3_write/src/write_buffer/table_buffer.rs index ff60f5a4846..d1ca35764c7 100644 --- a/influxdb3_write/src/write_buffer/table_buffer.rs +++ b/influxdb3_write/src/write_buffer/table_buffer.rs @@ -50,7 +50,7 @@ impl TableBuffer { } } - pub fn buffer_chunk(&mut self, chunk_time: i64, rows: Vec) { + pub fn buffer_chunk(&mut self, chunk_time: i64, rows: &[Row]) { let buffer_chunk = self .chunk_time_to_chunks .entry(chunk_time) @@ -250,19 +250,19 @@ struct MutableTableChunk { } impl MutableTableChunk { - fn add_rows(&mut self, rows: Vec) { + fn add_rows(&mut self, rows: &[Row]) { let new_row_count = rows.len(); - for (row_index, r) in rows.into_iter().enumerate() { + for (row_index, r) in rows.iter().enumerate() { let mut value_added = HashSet::with_capacity(r.fields.len()); - for f in r.fields { + for f in &r.fields { value_added.insert(f.id); - match f.value { + match &f.value { FieldData::Timestamp(v) => { - self.timestamp_min = self.timestamp_min.min(v); - self.timestamp_max = self.timestamp_max.max(v); + self.timestamp_min = self.timestamp_min.min(*v); + self.timestamp_max = self.timestamp_max.max(*v); let b = self.data.entry(f.id).or_insert_with(|| { debug!("Creating new timestamp builder"); @@ -272,7 +272,7 @@ impl MutableTableChunk { Builder::Time(time_builder) }); if let Builder::Time(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -288,7 +288,7 @@ impl MutableTableChunk { } let b = self.data.get_mut(&f.id).expect("tag builder should exist"); if let Builder::Tag(b) = b { - self.index.add_row_if_indexed_column(b.len(), f.id, &v); + self.index.add_row_if_indexed_column(b.len(), f.id, v); b.append(v) .expect("shouldn't be able to overflow 32 bit dictionary"); } else { @@ -307,7 +307,7 @@ impl MutableTableChunk { let Builder::Key(b) = b else { panic!("unexpected field type"); }; - self.index.add_row_if_indexed_column(b.len(), f.id, &v); + self.index.add_row_if_indexed_column(b.len(), f.id, v); b.append_value(v); } FieldData::String(v) => { @@ -333,7 +333,7 @@ impl MutableTableChunk { Builder::I64(int_builder) }); if let Builder::I64(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -346,7 +346,7 @@ impl MutableTableChunk { Builder::U64(uint_builder) }); if let Builder::U64(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -359,7 +359,7 @@ impl MutableTableChunk { Builder::F64(float_builder) }); if let Builder::F64(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -372,7 +372,7 @@ impl MutableTableChunk { Builder::Bool(bool_builder) }); if let Builder::Bool(b) = b { - b.append_value(v); + b.append_value(*v); } else { panic!("unexpected field type"); } @@ -866,7 +866,7 @@ mod tests { }, ]; - table_buffer.buffer_chunk(offset, rows); + table_buffer.buffer_chunk(offset, &rows); } let partitioned_batches = table_buffer @@ -980,7 +980,7 @@ mod tests { }, ]; - table_buffer.buffer_chunk(0, rows); + table_buffer.buffer_chunk(0, &rows); let filter = &[Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(Column { @@ -1105,7 +1105,7 @@ mod tests { }, ]; - table_buffer.buffer_chunk(0, rows); + table_buffer.buffer_chunk(0, &rows); let size = table_buffer.computed_size(); assert_eq!(size, 18120); diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index afbfd860437..430478d2abc 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -138,7 +138,7 @@ impl WriteValidator { database_name: Arc::clone(&self.state.db_schema.name), ops: catalog_updates, }; - self.state.catalog.apply_catalog_batch(catalog_batch)? + self.state.catalog.apply_catalog_batch(&catalog_batch)? }; Ok(WriteValidator { @@ -398,9 +398,9 @@ pub struct ValidatedLines { /// Number of index columns passed in, whether tags (v1) or series keys (v3) pub(crate) index_count: usize, /// Any errors that occurred while parsing the lines - pub(crate) errors: Vec, + pub errors: Vec, /// Only valid lines will be converted into a WriteBatch - pub(crate) valid_data: WriteBatch, + pub valid_data: WriteBatch, /// If any catalog updates were made, they will be included here pub(crate) catalog_updates: Option, }