Skip to content

Commit

Permalink
Merge branch 'main' into hiltontj/distinct-v-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj committed Jan 10, 2025
2 parents 192a2f7 + 7230148 commit d329763
Show file tree
Hide file tree
Showing 38 changed files with 2,044 additions and 1,431 deletions.
57 changes: 53 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ members = [
"influxdb3_clap_blocks",
"influxdb3_client",
"influxdb3_id",
"influxdb3_internal_api",
"influxdb3_load_generator",
"influxdb3_process",
"influxdb3_processing_engine",
"influxdb3_py_api",
"influxdb3_server",
"influxdb3_telemetry",
Expand Down
7 changes: 1 addition & 6 deletions influxdb3/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ pub struct PluginConfig {
/// Python file containing the plugin code
#[clap(long = "code-filename")]
code_file: String,
/// Entry point function for the plugin
#[clap(long = "entry-point")]
function_name: String,
/// Type of trigger the plugin processes
/// Type of trigger the plugin processes. Options: wal_rows, scheduled
#[clap(long = "plugin-type", default_value = "wal_rows")]
plugin_type: String,
/// Name of the plugin to create
Expand Down Expand Up @@ -336,7 +333,6 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
influxdb3_config: InfluxDb3Config { database_name, .. },
plugin_name,
code_file,
function_name,
plugin_type,
}) => {
let code = fs::read_to_string(&code_file)?;
Expand All @@ -345,7 +341,6 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
database_name,
&plugin_name,
code,
function_name,
plugin_type,
)
.await?;
Expand Down
40 changes: 37 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
persister::Persister,
write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs},
write_buffer::{
check_mem_and_force_snapshot_loop, persisted_files::PersistedFiles, WriteBufferImpl,
WriteBufferImplArgs,
},
WriteBuffer,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
Expand All @@ -37,7 +40,7 @@ use object_store::ObjectStore;
use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{num::NonZeroUsize, sync::Arc};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use std::{
path::{Path, PathBuf},
str::FromStr,
Expand Down Expand Up @@ -295,6 +298,16 @@ pub struct Config {
/// The local directory that has python plugins and their test files.
#[clap(long = "plugin-dir", env = "INFLUXDB3_PLUGIN_DIR", action)]
pub plugin_dir: Option<PathBuf>,

/// Threshold for internal buffer, can be either percentage or absolute value.
/// eg: 70% or 100000
#[clap(
long = "force-snapshot-mem-threshold",
env = "INFLUXDB3_FORCE_SNAPSHOT_MEM_THRESHOLD",
default_value = "70%",
action
)]
pub force_snapshot_mem_threshold: MemorySize,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -490,11 +503,18 @@ pub async fn command(config: Config) -> Result<()> {
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metrics),
plugin_dir: config.plugin_dir,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;

info!("setting up background mem check for query buffer");
background_buffer_checker(
config.force_snapshot_mem_threshold.bytes(),
&write_buffer_impl,
)
.await;

info!("setting up telemetry store");
let telemetry_store = setup_telemetry_store(
&config.object_store_config,
catalog.instance_id(),
Expand All @@ -511,6 +531,7 @@ pub async fn command(config: Config) -> Result<()> {
trace_exporter,
trace_header_parser,
Arc::clone(&telemetry_store),
config.plugin_dir,
)?;

let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
Expand Down Expand Up @@ -576,3 +597,16 @@ async fn setup_telemetry_store(
)
.await
}

async fn background_buffer_checker(
mem_threshold_bytes: usize,
write_buffer_impl: &Arc<WriteBufferImpl>,
) {
debug!(mem_threshold_bytes, "setting up background buffer checker");
check_mem_and_force_snapshot_loop(
Arc::clone(write_buffer_impl),
mem_threshold_bytes,
Duration::from_secs(10),
)
.await;
}
46 changes: 26 additions & 20 deletions influxdb3/src/commands/test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList};
use anyhow::Context;
use influxdb3_client::plugin_development::WalPluginTestRequest;
use influxdb3_client::Client;
use secrecy::ExposeSecret;
Expand Down Expand Up @@ -53,34 +54,39 @@ pub struct WalPluginConfig {
/// If given pass this map of string key/value pairs as input arguments
#[clap(long = "input-arguments")]
pub input_arguments: Option<SeparatedList<SeparatedKeyValue<String, String>>>,
/// The name of the plugin, which should match its file name on the server `<plugin-dir>/<name>.py`
/// The file name of the plugin, which should exist on the server in `<plugin-dir>/<filename>`.
/// The plugin-dir is provided on server startup.
#[clap(required = true)]
pub name: String,
}

impl From<WalPluginConfig> for WalPluginTestRequest {
fn from(val: WalPluginConfig) -> Self {
let input_arguments = val.input_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});

Self {
name: val.name,
input_lp: val.input_lp,
input_file: val.input_file,
input_arguments,
}
}
pub filename: String,
}

pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let client = config.get_client()?;

match config.cmd {
SubCommand::WalPlugin(plugin_config) => {
let wal_plugin_test_request: WalPluginTestRequest = plugin_config.into();
let input_arguments = plugin_config.input_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});

let input_lp = match plugin_config.input_lp {
Some(lp) => lp,
None => {
let file_path = plugin_config
.input_file
.context("either input_lp or input_file must be provided")?;
std::fs::read_to_string(file_path).context("unable to read input file")?
}
};

let wal_plugin_test_request = WalPluginTestRequest {
filename: plugin_config.filename,
database: plugin_config.influxdb3_config.database_name,
input_lp,
input_arguments,
};

let response = client.wal_plugin_test(wal_plugin_test_request).await?;

Expand Down
Loading

0 comments on commit d329763

Please sign in to comment.