Skip to content

Commit

Permalink
feat: Update WAL plugin for new structure (#25777)
Browse files Browse the repository at this point in the history
* feat: Update WAL plugin for new structure

This ended up being a very large change set. In order to get around circular dependencies, the processing engine had to be moved into its own crate, which I think is ultimately much cleaner.

Unfortunately, this required changing a ton of things. There's more testing and things to add on to this, but I think it's important to get this through and build on it.

Importantly, the processing engine no longer resides inside the write buffer. Instead, it is attached to the HTTP server. It is now able to take a query executor, write buffer, and WAL so that the full range of functionality of the server can be exposed to the plugin API.

There are a bunch of system-py feature flags littered everywhere, which I'm hoping we can remove soon.

* refactor: PR feedback
  • Loading branch information
pauldix authored Jan 10, 2025
1 parent 2d18a61 commit 7230148
Show file tree
Hide file tree
Showing 28 changed files with 1,300 additions and 1,255 deletions.
30 changes: 29 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"influxdb3_internal_api",
"influxdb3_load_generator",
"influxdb3_process",
"influxdb3_processing_engine",
"influxdb3_py_api",
"influxdb3_server",
"influxdb3_telemetry",
Expand Down
7 changes: 1 addition & 6 deletions influxdb3/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ pub struct PluginConfig {
/// Python file containing the plugin code
#[clap(long = "code-filename")]
code_file: String,
/// Entry point function for the plugin
#[clap(long = "entry-point")]
function_name: String,
/// Type of trigger the plugin processes
/// Type of trigger the plugin processes. Options: wal_rows, scheduled
#[clap(long = "plugin-type", default_value = "wal_rows")]
plugin_type: String,
/// Name of the plugin to create
Expand Down Expand Up @@ -335,7 +332,6 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
influxdb3_config: InfluxDb3Config { database_name, .. },
plugin_name,
code_file,
function_name,
plugin_type,
}) => {
let code = fs::read_to_string(&code_file)?;
Expand All @@ -344,7 +340,6 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
database_name,
&plugin_name,
code,
function_name,
plugin_type,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,6 @@ pub async fn command(config: Config) -> Result<()> {
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metrics),
plugin_dir: config.plugin_dir,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
Expand Down Expand Up @@ -532,6 +531,7 @@ pub async fn command(config: Config) -> Result<()> {
trace_exporter,
trace_header_parser,
Arc::clone(&telemetry_store),
config.plugin_dir,
)?;

let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
Expand Down
12 changes: 0 additions & 12 deletions influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,6 @@ def process_rows(iterator, output):
&server_addr,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);
debug!(result = ?result, "create plugin");
Expand Down Expand Up @@ -501,8 +499,6 @@ def process_rows(iterator, output):
&server_addr,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);

Expand Down Expand Up @@ -547,8 +543,6 @@ def process_rows(iterator, output):
&server_addr,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);

Expand Down Expand Up @@ -597,8 +591,6 @@ def process_rows(iterator, output):
&server_addr,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);

Expand Down Expand Up @@ -670,8 +662,6 @@ def process_rows(iterator, output):
&server_addr,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);

Expand Down Expand Up @@ -769,8 +759,6 @@ def process_rows(iterator, output):
&server_addr,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);

Expand Down
38 changes: 19 additions & 19 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,19 @@ impl Catalog {

pub fn apply_catalog_batch(
&self,
catalog_batch: CatalogBatch,
catalog_batch: &CatalogBatch,
) -> Result<Option<OrderedCatalogBatch>> {
self.inner.write().apply_catalog_batch(catalog_batch)
}

// Checks the sequence number to see if it needs to be applied.
pub fn apply_ordered_catalog_batch(
&self,
batch: OrderedCatalogBatch,
batch: &OrderedCatalogBatch,
) -> Result<Option<CatalogBatch>> {
if batch.sequence_number() >= self.sequence_number().as_u32() {
if let Some(catalog_batch) = self.apply_catalog_batch(batch.batch())? {
return Ok(Some(catalog_batch.batch()));
return Ok(Some(catalog_batch.batch().clone()));
}
}
Ok(None)
Expand Down Expand Up @@ -456,12 +456,12 @@ impl InnerCatalog {
/// have already been applied, the sequence number and updated tracker are not updated.
pub fn apply_catalog_batch(
&mut self,
catalog_batch: CatalogBatch,
catalog_batch: &CatalogBatch,
) -> Result<Option<OrderedCatalogBatch>> {
let table_count = self.table_count();

if let Some(db) = self.databases.get(&catalog_batch.database_id) {
if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, &catalog_batch)? {
if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, catalog_batch)? {
check_overall_table_count(Some(db), &new_db, table_count)?;
self.upsert_db(new_db);
} else {
Expand All @@ -471,12 +471,12 @@ impl InnerCatalog {
if self.database_count() >= Catalog::NUM_DBS_LIMIT {
return Err(Error::TooManyDbs);
}
let new_db = DatabaseSchema::new_from_batch(&catalog_batch)?;
let new_db = DatabaseSchema::new_from_batch(catalog_batch)?;
check_overall_table_count(None, &new_db, table_count)?;
self.upsert_db(new_db);
}
Ok(Some(OrderedCatalogBatch::new(
catalog_batch,
catalog_batch.clone(),
self.sequence.0,
)))
}
Expand Down Expand Up @@ -1847,7 +1847,7 @@ mod tests {
)],
);
let err = catalog
.apply_catalog_batch(catalog_batch)
.apply_catalog_batch(&catalog_batch)
.expect_err("should fail to apply AddFields operation for non-existent table");
assert_contains!(err.to_string(), "Table banana not in DB schema for foo");
}
Expand Down Expand Up @@ -1968,10 +1968,10 @@ mod tests {
})],
};
let create_ordered_op = catalog
.apply_catalog_batch(create_op)?
.apply_catalog_batch(&create_op)?
.expect("should be able to create");
let add_column_op = catalog
.apply_catalog_batch(add_column_op)?
.apply_catalog_batch(&add_column_op)?
.expect("should produce operation");
let mut ops = vec![
WalOp::Catalog(add_column_op),
Expand Down Expand Up @@ -2010,7 +2010,7 @@ mod tests {
let db_id = DbId::new();
let db_name = format!("test-db-{db_id}");
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
.apply_catalog_batch(&influxdb3_wal::create::catalog_batch(
db_id,
db_name.as_str(),
0,
Expand Down Expand Up @@ -2043,7 +2043,7 @@ mod tests {
let db_id = DbId::new();
let db_name = "a-database-too-far";
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
.apply_catalog_batch(&influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
Expand Down Expand Up @@ -2071,7 +2071,7 @@ mod tests {
// now delete a database:
let (db_id, db_name) = dbs.pop().unwrap();
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
.apply_catalog_batch(&influxdb3_wal::create::catalog_batch(
db_id,
db_name.as_str(),
1,
Expand All @@ -2087,7 +2087,7 @@ mod tests {
// now create another database (using same name as the deleted one), this should be allowed:
let db_id = DbId::new();
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
.apply_catalog_batch(&influxdb3_wal::create::catalog_batch(
db_id,
db_name.as_str(),
0,
Expand Down Expand Up @@ -2123,7 +2123,7 @@ mod tests {
let db_id = DbId::new();
let db_name = "test-db";
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
.apply_catalog_batch(&influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
Expand All @@ -2138,7 +2138,7 @@ mod tests {
let table_id = TableId::new();
let table_name = Arc::<str>::from(format!("test-table-{i}").as_str());
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
.apply_catalog_batch(&influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
Expand Down Expand Up @@ -2170,7 +2170,7 @@ mod tests {
let table_id = TableId::new();
let table_name = Arc::<str>::from("a-table-too-far");
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
.apply_catalog_batch(&influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
Expand Down Expand Up @@ -2198,7 +2198,7 @@ mod tests {
// delete a table
let (table_id, table_name) = tables.pop().unwrap();
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
.apply_catalog_batch(&influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
Expand All @@ -2214,7 +2214,7 @@ mod tests {
assert_eq!(1_999, catalog.inner.read().table_count());
// now create it again, this should be allowed:
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
.apply_catalog_batch(&influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
Expand Down
6 changes: 3 additions & 3 deletions influxdb3_catalog/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
plugin,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
disabled: trigger.disabled,
database_name: trigger.database_name,
},
)
})
Expand Down Expand Up @@ -163,14 +164,14 @@ struct TableSnapshot {
struct ProcessingEnginePluginSnapshot {
pub plugin_name: String,
pub code: String,
pub function_name: String,
pub plugin_type: PluginType,
}

#[derive(Debug, Serialize, Deserialize)]
struct ProcessingEngineTriggerSnapshot {
pub trigger_name: String,
pub plugin_name: String,
pub database_name: String,
pub trigger_specification: String,
pub disabled: bool,
}
Expand Down Expand Up @@ -412,7 +413,6 @@ impl From<&PluginDefinition> for ProcessingEnginePluginSnapshot {
Self {
plugin_name: plugin.plugin_name.to_string(),
code: plugin.code.to_string(),
function_name: plugin.function_name.to_string(),
plugin_type: plugin.plugin_type,
}
}
Expand All @@ -423,7 +423,6 @@ impl From<ProcessingEnginePluginSnapshot> for PluginDefinition {
Self {
plugin_name: plugin.plugin_name.to_string(),
code: plugin.code.to_string(),
function_name: plugin.function_name.to_string(),
plugin_type: plugin.plugin_type,
}
}
Expand All @@ -434,6 +433,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(),
plugin_name: trigger.plugin_name.to_string(),
database_name: trigger.database_name.to_string(),
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
disabled: trigger.disabled,
Expand Down
3 changes: 0 additions & 3 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,6 @@ impl Client {
db: impl Into<String> + Send,
plugin_name: impl Into<String> + Send,
code: impl Into<String> + Send,
function_name: impl Into<String> + Send,
plugin_type: impl Into<String> + Send,
) -> Result<()> {
let api_path = "/api/v3/configure/processing_engine_plugin";
Expand All @@ -496,15 +495,13 @@ impl Client {
db: String,
plugin_name: String,
code: String,
function_name: String,
plugin_type: String,
}

let mut req = self.http_client.post(url).json(&Req {
db: db.into(),
plugin_name: plugin_name.into(),
code: code.into(),
function_name: function_name.into(),
plugin_type: plugin_type.into(),
});

Expand Down
Loading

0 comments on commit 7230148

Please sign in to comment.