From 192a2f70b583f5a540c26a38c756d44e2867f656 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Thu, 9 Jan 2025 14:24:21 -0500 Subject: [PATCH] refactor: rename metadata cache to distinct value cache --- influxdb3/src/commands/create.rs | 17 ++- influxdb3/src/commands/delete.rs | 16 +- influxdb3/src/commands/serve.rs | 22 +-- influxdb3/tests/server/cli.rs | 19 +-- influxdb3/tests/server/configure.rs | 12 +- influxdb3/tests/server/flight.rs | 2 +- influxdb3/tests/server/main.rs | 12 +- influxdb3/tests/server/query.rs | 8 +- ...i__distinct_cache_create_and_delete-2.snap | 5 + ...cli__distinct_cache_create_and_delete.snap | 5 + influxdb3/tests/server/system_tables.rs | 62 ++++---- .../{meta_cache => distinct_cache}/cache.rs | 42 ++--- .../src/{meta_cache => distinct_cache}/mod.rs | 143 +++++++++--------- .../provider.rs | 62 ++++---- .../table_function.rs | 66 ++++---- influxdb3_cache/src/lib.rs | 2 +- influxdb3_catalog/src/catalog.rs | 48 +++--- influxdb3_client/src/lib.rs | 41 ++--- influxdb3_server/src/http.rs | 63 ++++---- influxdb3_server/src/lib.rs | 4 +- influxdb3_server/src/query_executor/mod.rs | 12 +- .../{meta_caches.rs => distinct_caches.rs} | 29 ++-- influxdb3_server/src/system_tables/mod.rs | 15 +- influxdb3_wal/src/lib.rs | 12 +- influxdb3_write/src/lib.rs | 26 ++-- influxdb3_write/src/write_buffer/mod.rs | 91 ++++++----- .../src/write_buffer/queryable_buffer.rs | 33 ++-- 27 files changed, 451 insertions(+), 418 deletions(-) create mode 100644 influxdb3/tests/server/snapshots/server__cli__distinct_cache_create_and_delete-2.snap create mode 100644 influxdb3/tests/server/snapshots/server__cli__distinct_cache_create_and_delete.snap rename influxdb3_cache/src/{meta_cache => distinct_cache}/cache.rs (95%) rename influxdb3_cache/src/{meta_cache => distinct_cache}/mod.rs (82%) rename influxdb3_cache/src/{meta_cache => distinct_cache}/provider.rs (87%) rename influxdb3_cache/src/{meta_cache => distinct_cache}/table_function.rs (88%) rename influxdb3_server/src/system_tables/{meta_caches.rs => distinct_caches.rs} (83%) diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index 50970c8c7a6..fd458718f36 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -39,7 +39,7 @@ impl Config { }, .. }) - | SubCommand::MetaCache(MetaCacheConfig { + | SubCommand::DistinctCache(DistinctCacheConfig { influxdb3_config: InfluxDb3Config { host_url, @@ -95,9 +95,9 @@ pub enum SubCommand { /// Create a new last value cache #[clap(name = "last_cache")] LastCache(LastCacheConfig), - /// Create a new metadata cache - #[clap(name = "meta_cache")] - MetaCache(MetaCacheConfig), + /// Create a new distinct value cache + #[clap(name = "distinct_cache")] + DistinctCache(DistinctCacheConfig), /// Create a new processing engine plugin Plugin(PluginConfig), /// Create a new table in a database @@ -167,7 +167,7 @@ pub struct LastCacheConfig { } #[derive(Debug, clap::Args)] -pub struct MetaCacheConfig { +pub struct DistinctCacheConfig { #[clap(flatten)] influxdb3_config: InfluxDb3Config, @@ -301,7 +301,7 @@ pub async fn command(config: Config) -> Result<(), Box> { None => println!("a cache already exists for the provided parameters"), } } - SubCommand::MetaCache(MetaCacheConfig { + SubCommand::DistinctCache(DistinctCacheConfig { influxdb3_config: InfluxDb3Config { database_name, .. }, table, cache_name, @@ -309,7 +309,8 @@ pub async fn command(config: Config) -> Result<(), Box> { max_cardinality, max_age, }) => { - let mut b = client.api_v3_configure_meta_cache_create(database_name, table, columns); + let mut b = + client.api_v3_configure_distinct_cache_create(database_name, table, columns); // Add the optional stuff: if let Some(name) = cache_name { @@ -326,7 +327,7 @@ pub async fn command(config: Config) -> Result<(), Box> { Some(def) => println!( "new cache created: {}", serde_json::to_string_pretty(&def) - .expect("serialize meta cache definition as JSON") + .expect("serialize distinct cache definition as JSON") ), None => println!("a cache already exists for the provided parameters"), } diff --git a/influxdb3/src/commands/delete.rs b/influxdb3/src/commands/delete.rs index 39f10563344..9a7ba1e8508 100644 --- a/influxdb3/src/commands/delete.rs +++ b/influxdb3/src/commands/delete.rs @@ -29,7 +29,7 @@ impl Config { }, .. }) - | SubCommand::MetaCache(MetaCacheConfig { + | SubCommand::DistinctCache(DistinctCacheConfig { influxdb3_config: InfluxDb3Config { host_url, @@ -82,9 +82,9 @@ pub enum SubCommand { /// Delete a last value cache #[clap(name = "last_cache")] LastCache(LastCacheConfig), - /// Delete a meta value cache - #[clap(name = "meta_cache")] - MetaCache(MetaCacheConfig), + /// Delete a distinct value cache + #[clap(name = "distinct_cache")] + DistinctCache(DistinctCacheConfig), /// Delete an existing processing engine plugin Plugin(PluginConfig), /// Delete a table in a database @@ -128,7 +128,7 @@ pub struct LastCacheConfig { } #[derive(Debug, clap::Args)] -pub struct MetaCacheConfig { +pub struct DistinctCacheConfig { #[clap(flatten)] influxdb3_config: InfluxDb3Config, @@ -203,16 +203,16 @@ pub async fn command(config: Config) -> Result<(), Box> { println!("last cache deleted successfully"); } - SubCommand::MetaCache(MetaCacheConfig { + SubCommand::DistinctCache(DistinctCacheConfig { influxdb3_config: InfluxDb3Config { database_name, .. }, table, cache_name, }) => { client - .api_v3_configure_meta_cache_delete(database_name, table, cache_name) + .api_v3_configure_distinct_cache_delete(database_name, table, cache_name) .await?; - println!("meta cache deleted successfully"); + println!("distinct cache deleted successfully"); } SubCommand::Plugin(PluginConfig { influxdb3_config: InfluxDb3Config { database_name, .. }, diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 9262ffe0edb..9b3381fbb75 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -3,8 +3,8 @@ use anyhow::{bail, Context}; use datafusion_util::config::register_iox_object_store; use influxdb3_cache::{ + distinct_cache::DistinctCacheProvider, last_cache::{self, LastCacheProvider}, - meta_cache::MetaCacheProvider, parquet_cache::create_cached_obj_store_and_oracle, }; use influxdb3_clap_blocks::{ @@ -92,8 +92,8 @@ pub enum Error { #[error("failed to initialize last cache: {0}")] InitializeLastCache(#[source] last_cache::Error), - #[error("failed to initialize meta cache: {0:#}")] - InitializeMetaCache(#[source] influxdb3_cache::meta_cache::ProviderError), + #[error("failed to initialize distinct cache: {0:#}")] + InitializeDistinctCache(#[source] influxdb3_cache::distinct_cache::ProviderError), } pub type Result = std::result::Result; @@ -282,15 +282,15 @@ pub struct Config { )] pub last_cache_eviction_interval: humantime::Duration, - /// The interval on which to evict expired entries from the Last-N-Value cache, expressed as a + /// The interval on which to evict expired entries from the Distinct Value cache, expressed as a /// human-readable time, e.g., "20s", "1m", "1h". #[clap( - long = "meta-cache-eviction-interval", - env = "INFLUXDB3_META_CACHE_EVICTION_INTERVAL", + long = "distinct-cache-eviction-interval", + env = "INFLUXDB3_DISTINCT_CACHE_EVICTION_INTERVAL", default_value = "10s", action )] - pub meta_cache_eviction_interval: humantime::Duration, + pub distinct_cache_eviction_interval: humantime::Duration, /// The local directory that has python plugins and their test files. #[clap(long = "plugin-dir", env = "INFLUXDB3_PLUGIN_DIR", action)] @@ -473,18 +473,18 @@ pub async fn command(config: Config) -> Result<()> { ) .map_err(Error::InitializeLastCache)?; - let meta_cache = MetaCacheProvider::new_from_catalog_with_background_eviction( + let distinct_cache = DistinctCacheProvider::new_from_catalog_with_background_eviction( Arc::clone(&time_provider) as _, Arc::clone(&catalog), - config.meta_cache_eviction_interval.into(), + config.distinct_cache_eviction_interval.into(), ) - .map_err(Error::InitializeMetaCache)?; + .map_err(Error::InitializeDistinctCache)?; let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs { persister: Arc::clone(&persister), catalog: Arc::clone(&catalog), last_cache, - meta_cache, + distinct_cache, time_provider: Arc::::clone(&time_provider), executor: Arc::clone(&exec), wal_config, diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index fa771392896..01611647df3 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -364,7 +364,7 @@ async fn test_delete_missing_table() { } #[tokio::test] -async fn test_create_delete_meta_cache() { +async fn test_create_delete_distinct_cache() { let server = TestServer::spawn().await; let server_addr = server.client_addr(); let db_name = "foo"; @@ -381,7 +381,7 @@ async fn test_create_delete_meta_cache() { // first create the cache: let result = run(&[ "create", - "meta_cache", + "distinct_cache", "--host", &server_addr, "--database", @@ -396,7 +396,7 @@ async fn test_create_delete_meta_cache() { // doing the same thing over again will be a no-op let result = run(&[ "create", - "meta_cache", + "distinct_cache", "--host", &server_addr, "--database", @@ -414,7 +414,7 @@ async fn test_create_delete_meta_cache() { // now delete it: let result = run(&[ "delete", - "meta_cache", + "distinct_cache", "--host", &server_addr, "--database", @@ -423,11 +423,11 @@ async fn test_create_delete_meta_cache() { table_name, cache_name, ]); - assert_contains!(&result, "meta cache deleted successfully"); + assert_contains!(&result, "distinct cache deleted successfully"); // trying to delete again should result in an error as the cache no longer exists: let result = run_and_err(&[ "delete", - "meta_cache", + "distinct_cache", "--host", &server_addr, "--database", @@ -438,6 +438,7 @@ async fn test_create_delete_meta_cache() { ]); assert_contains!(&result, "[404 Not Found]: cache not found"); } + #[test_log::test(tokio::test)] async fn test_create_plugin() { let server = TestServer::spawn().await; @@ -802,7 +803,7 @@ fn test_create_token() { } #[tokio::test] -async fn meta_cache_create_and_delete() { +async fn distinct_cache_create_and_delete() { let server = TestServer::spawn().await; let db_name = "foo"; let server_addr = server.client_addr(); @@ -817,7 +818,7 @@ async fn meta_cache_create_and_delete() { let result = run_with_confirmation(&[ "create", - "meta_cache", + "distinct_cache", "-H", &server_addr, "-d", @@ -837,7 +838,7 @@ async fn meta_cache_create_and_delete() { let result = run_with_confirmation(&[ "delete", - "meta_cache", + "distinct_cache", "-H", &server_addr, "-d", diff --git a/influxdb3/tests/server/configure.rs b/influxdb3/tests/server/configure.rs index 5b5c6c0777b..52f5d002a69 100644 --- a/influxdb3/tests/server/configure.rs +++ b/influxdb3/tests/server/configure.rs @@ -7,11 +7,11 @@ use test_helpers::assert_contains; use crate::TestServer; #[tokio::test] -async fn api_v3_configure_meta_cache_create() { +async fn api_v3_configure_distinct_cache_create() { let server = TestServer::spawn().await; let client = reqwest::Client::new(); let url = format!( - "{base}/api/v3/configure/meta_cache", + "{base}/api/v3/configure/distinct_cache", base = server.client_addr() ); @@ -157,7 +157,7 @@ async fn api_v3_configure_meta_cache_create() { .json(&body) .send() .await - .expect("send request to create meta cache"); + .expect("send request to create distinct cache"); let status = resp.status(); assert_eq!( tc.expected, @@ -169,11 +169,11 @@ async fn api_v3_configure_meta_cache_create() { } #[tokio::test] -async fn api_v3_configure_meta_cache_delete() { +async fn api_v3_configure_distinct_cache_delete() { let server = TestServer::spawn().await; let client = reqwest::Client::new(); let url = format!( - "{base}/api/v3/configure/meta_cache", + "{base}/api/v3/configure/distinct_cache", base = server.client_addr() ); @@ -210,7 +210,7 @@ async fn api_v3_configure_meta_cache_delete() { use Request::*; let mut test_cases = [ TestCase { - description: "create a metadata cache", + description: "create a distinct cache", request: Create(serde_json::json!({ "db": db_name, "table": tbl_name, diff --git a/influxdb3/tests/server/flight.rs b/influxdb3/tests/server/flight.rs index 6d9a8e45d11..4e508b37407 100644 --- a/influxdb3/tests/server/flight.rs +++ b/influxdb3/tests/server/flight.rs @@ -117,8 +117,8 @@ async fn flight() -> Result<(), influxdb3_client::Error> { "| public | information_schema | tables | VIEW |", "| public | information_schema | views | VIEW |", "| public | iox | cpu | BASE TABLE |", + "| public | system | distinct_caches | BASE TABLE |", "| public | system | last_caches | BASE TABLE |", - "| public | system | meta_caches | BASE TABLE |", "| public | system | parquet_files | BASE TABLE |", "| public | system | processing_engine_plugins | BASE TABLE |", "| public | system | processing_engine_triggers | BASE TABLE |", diff --git a/influxdb3/tests/server/main.rs b/influxdb3/tests/server/main.rs index f2aac07bcf5..09f3152a583 100644 --- a/influxdb3/tests/server/main.rs +++ b/influxdb3/tests/server/main.rs @@ -341,34 +341,34 @@ impl TestServer { .expect("failed to send request to delete last cache") } - pub async fn api_v3_configure_meta_cache_create( + pub async fn api_v3_configure_distinct_cache_create( &self, request: &serde_json::Value, ) -> Response { self.http_client .post(format!( - "{base}/api/v3/configure/meta_cache", + "{base}/api/v3/configure/distinct_cache", base = self.client_addr() )) .json(request) .send() .await - .expect("failed to send request to create metadata cache") + .expect("failed to send request to create distinct cache") } - pub async fn api_v3_configure_meta_cache_delete( + pub async fn api_v3_configure_distinct_cache_delete( &self, request: &serde_json::Value, ) -> Response { self.http_client .delete(format!( - "{base}/api/v3/configure/meta_cache", + "{base}/api/v3/configure/distinct_cache", base = self.client_addr() )) .json(request) .send() .await - .expect("failed to send request to delete metadata cache") + .expect("failed to send request to delete distinct cache") } } diff --git a/influxdb3/tests/server/query.rs b/influxdb3/tests/server/query.rs index 1293eaa3a27..56cc34b90c9 100644 --- a/influxdb3/tests/server/query.rs +++ b/influxdb3/tests/server/query.rs @@ -1584,7 +1584,7 @@ async fn api_v1_query_uri_and_body() { } #[tokio::test] -async fn api_v3_query_sql_meta_cache() { +async fn api_v3_query_sql_distinct_cache() { let server = TestServer::spawn().await; server .write_lp_to_db("foo", "cpu,region=us,host=a usage=99", Precision::Second) @@ -1594,7 +1594,7 @@ async fn api_v3_query_sql_meta_cache() { server .http_client .post(format!( - "{base}/api/v3/configure/meta_cache", + "{base}/api/v3/configure/distinct_cache", base = server.client_addr() )) .json(&serde_json::json!({ @@ -1623,7 +1623,7 @@ async fn api_v3_query_sql_meta_cache() { .api_v3_query_sql(&[ ("db", "foo"), ("format", "pretty"), - ("q", "SELECT * FROM meta_cache('cpu')"), + ("q", "SELECT * FROM distinct_cache('cpu')"), ]) .await .text() @@ -1647,7 +1647,7 @@ async fn api_v3_query_sql_meta_cache() { .api_v3_query_sql(&[ ("db", "foo"), ("format", "json"), - ("q", "SELECT * FROM meta_cache('cpu')"), + ("q", "SELECT * FROM distinct_cache('cpu')"), ]) .await .json::() diff --git a/influxdb3/tests/server/snapshots/server__cli__distinct_cache_create_and_delete-2.snap b/influxdb3/tests/server/snapshots/server__cli__distinct_cache_create_and_delete-2.snap new file mode 100644 index 00000000000..041e2a5c2f7 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__cli__distinct_cache_create_and_delete-2.snap @@ -0,0 +1,5 @@ +--- +source: influxdb3/tests/server/cli.rs +expression: result +--- +distinct cache deleted successfully diff --git a/influxdb3/tests/server/snapshots/server__cli__distinct_cache_create_and_delete.snap b/influxdb3/tests/server/snapshots/server__cli__distinct_cache_create_and_delete.snap new file mode 100644 index 00000000000..709c0348e0f --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__cli__distinct_cache_create_and_delete.snap @@ -0,0 +1,5 @@ +--- +source: influxdb3/tests/server/cli.rs +expression: result +--- +"new cache created: {\n \"table_id\": 0,\n \"table_name\": \"cpu\",\n \"cache_name\": \"cache_money\",\n \"column_ids\": [\n 0,\n 1\n ],\n \"max_cardinality\": 20000,\n \"max_age_seconds\": 200\n}" diff --git a/influxdb3/tests/server/system_tables.rs b/influxdb3/tests/server/system_tables.rs index f4b6e4d8068..1f7bf0313c1 100644 --- a/influxdb3/tests/server/system_tables.rs +++ b/influxdb3/tests/server/system_tables.rs @@ -280,7 +280,7 @@ async fn last_caches_table() { } #[tokio::test] -async fn meta_caches_table() { +async fn distinct_caches_table() { let server = TestServer::spawn().await; let db_1_name = "foo"; let db_2_name = "bar"; @@ -308,21 +308,21 @@ async fn meta_caches_table() { .await .unwrap(); - // check that there are no meta caches: + // check that there are no distinct caches: for db_name in [db_1_name, db_2_name] { let response_stream = server .flight_sql_client(db_name) .await - .query("SELECT * FROM system.meta_caches") + .query("SELECT * FROM system.distinct_caches") .await .unwrap(); let batches = collect_stream(response_stream).await; assert_batches_sorted_eq!(["++", "++",], &batches); } - // create some metadata caches on the two databases: + // create some distinct caches on the two databases: assert!(server - .api_v3_configure_meta_cache_create(&json!({ + .api_v3_configure_distinct_cache_create(&json!({ "db": db_1_name, "table": "cpu", "columns": ["region", "host"], @@ -331,7 +331,7 @@ async fn meta_caches_table() { .status() .is_success()); assert!(server - .api_v3_configure_meta_cache_create(&json!({ + .api_v3_configure_distinct_cache_create(&json!({ "db": db_1_name, "table": "mem", "columns": ["region", "host"], @@ -341,7 +341,7 @@ async fn meta_caches_table() { .status() .is_success()); assert!(server - .api_v3_configure_meta_cache_create(&json!({ + .api_v3_configure_distinct_cache_create(&json!({ "db": db_2_name, "table": "cpu", "columns": ["host"], @@ -356,51 +356,51 @@ async fn meta_caches_table() { let response_stream = server .flight_sql_client(db_1_name) .await - .query("SELECT * FROM system.meta_caches") + .query("SELECT * FROM system.distinct_caches") .await .unwrap(); let batches = collect_stream(response_stream).await; assert_batches_sorted_eq!([ - "+-------+----------------------------+------------+----------------+-----------------+-----------------+", - "| table | name | column_ids | column_names | max_cardinality | max_age_seconds |", - "+-------+----------------------------+------------+----------------+-----------------+-----------------+", - "| cpu | cpu_region_host_meta_cache | [0, 1] | [region, host] | 100000 | 86400 |", - "| mem | mem_region_host_meta_cache | [4, 5] | [region, host] | 1000 | 86400 |", - "+-------+----------------------------+------------+----------------+-----------------+-----------------+", + "+-------+--------------------------------+------------+----------------+-----------------+-----------------+", + "| table | name | column_ids | column_names | max_cardinality | max_age_seconds |", + "+-------+--------------------------------+------------+----------------+-----------------+-----------------+", + "| cpu | cpu_region_host_distinct_cache | [0, 1] | [region, host] | 100000 | 86400 |", + "| mem | mem_region_host_distinct_cache | [4, 5] | [region, host] | 1000 | 86400 |", + "+-------+--------------------------------+------------+----------------+-----------------+-----------------+", ], &batches); } { let response_stream = server .flight_sql_client(db_2_name) .await - .query("SELECT * FROM system.meta_caches") + .query("SELECT * FROM system.distinct_caches") .await .unwrap(); let batches = collect_stream(response_stream).await; assert_batches_sorted_eq!([ - "+-------+---------------------+------------+--------------+-----------------+-----------------+", - "| table | name | column_ids | column_names | max_cardinality | max_age_seconds |", - "+-------+---------------------+------------+--------------+-----------------+-----------------+", - "| cpu | cpu_host_meta_cache | [9] | [host] | 100000 | 1000 |", - "+-------+---------------------+------------+--------------+-----------------+-----------------+", + "+-------+-------------------------+------------+--------------+-----------------+-----------------+", + "| table | name | column_ids | column_names | max_cardinality | max_age_seconds |", + "+-------+-------------------------+------------+--------------+-----------------+-----------------+", + "| cpu | cpu_host_distinct_cache | [9] | [host] | 100000 | 1000 |", + "+-------+-------------------------+------------+--------------+-----------------+-----------------+", ], &batches); } // delete caches and check that the system tables reflect those changes: assert!(server - .api_v3_configure_meta_cache_delete(&json!({ + .api_v3_configure_distinct_cache_delete(&json!({ "db": db_1_name, "table": "cpu", - "name": "cpu_region_host_meta_cache", + "name": "cpu_region_host_distinct_cache", })) .await .status() .is_success()); assert!(server - .api_v3_configure_meta_cache_delete(&json!({ + .api_v3_configure_distinct_cache_delete(&json!({ "db": db_2_name, "table": "cpu", - "name": "cpu_host_meta_cache", + "name": "cpu_host_distinct_cache", })) .await .status() @@ -411,23 +411,23 @@ async fn meta_caches_table() { let response_stream = server .flight_sql_client(db_1_name) .await - .query("SELECT * FROM system.meta_caches") + .query("SELECT * FROM system.distinct_caches") .await .unwrap(); let batches = collect_stream(response_stream).await; assert_batches_sorted_eq!([ - "+-------+----------------------------+------------+----------------+-----------------+-----------------+", - "| table | name | column_ids | column_names | max_cardinality | max_age_seconds |", - "+-------+----------------------------+------------+----------------+-----------------+-----------------+", - "| mem | mem_region_host_meta_cache | [4, 5] | [region, host] | 1000 | 86400 |", - "+-------+----------------------------+------------+----------------+-----------------+-----------------+", + "+-------+--------------------------------+------------+----------------+-----------------+-----------------+", + "| table | name | column_ids | column_names | max_cardinality | max_age_seconds |", + "+-------+--------------------------------+------------+----------------+-----------------+-----------------+", + "| mem | mem_region_host_distinct_cache | [4, 5] | [region, host] | 1000 | 86400 |", + "+-------+--------------------------------+------------+----------------+-----------------+-----------------+", ], &batches); } { let response_stream = server .flight_sql_client(db_2_name) .await - .query("SELECT * FROM system.meta_caches") + .query("SELECT * FROM system.distinct_caches") .await .unwrap(); let batches = collect_stream(response_stream).await; diff --git a/influxdb3_cache/src/meta_cache/cache.rs b/influxdb3_cache/src/distinct_cache/cache.rs similarity index 95% rename from influxdb3_cache/src/meta_cache/cache.rs rename to influxdb3_cache/src/distinct_cache/cache.rs index 21780d4636d..bb1f24079d4 100644 --- a/influxdb3_cache/src/meta_cache/cache.rs +++ b/influxdb3_cache/src/distinct_cache/cache.rs @@ -14,7 +14,7 @@ use arrow::{ use indexmap::IndexMap; use influxdb3_catalog::catalog::TableDefinition; use influxdb3_id::{ColumnId, TableId}; -use influxdb3_wal::{FieldData, MetaCacheDefinition, Row}; +use influxdb3_wal::{DistinctCacheDefinition, FieldData, Row}; use iox_time::TimeProvider; use schema::{InfluxColumnType, InfluxFieldType}; use serde::Deserialize; @@ -24,7 +24,7 @@ pub enum CacheError { #[error("must pass a non-empty set of column ids")] EmptyColumnSet, #[error( - "cannot use a column of type {attempted} in a metadata cache, only \ + "cannot use a column of type {attempted} in a distinct value cache, only \ tags and string fields can be used" )] NonTagOrStringColumn { attempted: InfluxColumnType }, @@ -34,9 +34,9 @@ pub enum CacheError { Unexpected(#[from] anyhow::Error), } -/// A metadata cache for storing distinct values for a set of columns in a table +/// A cache for storing distinct values for a set of columns in a table #[derive(Debug)] -pub(crate) struct MetaCache { +pub(crate) struct DistinctCache { time_provider: Arc, /// The maximum number of unique value combinations in the cache max_cardinality: usize, @@ -45,23 +45,23 @@ pub(crate) struct MetaCache { /// The fixed Arrow schema used to produce record batches from the cache schema: SchemaRef, /// Holds current state of the cache - state: MetaCacheState, + state: DistinctCacheState, /// The identifiers of the columns used in the cache column_ids: Vec, /// The cache data, stored in a tree data: Node, } -/// Type for tracking the current state of a [`MetaCache`] +/// Type for tracking the current state of a [`DistinctCache`] #[derive(Debug, Default)] -struct MetaCacheState { +struct DistinctCacheState { /// The current number of unique value combinations in the cache cardinality: usize, } -/// Arguments to create a new [`MetaCache`] +/// Arguments to create a new [`DistinctCache`] #[derive(Debug)] -pub struct CreateMetaCacheArgs { +pub struct CreateDistinctCacheArgs { pub table_def: Arc, pub max_cardinality: MaxCardinality, pub max_age: MaxAge, @@ -130,19 +130,19 @@ impl MaxAge { } } -impl MetaCache { - /// Create a new [`MetaCache`] +impl DistinctCache { + /// Create a new [`DistinctCache`] /// /// Must pass a non-empty set of [`ColumnId`]s which correspond to valid columns in the provided /// [`TableDefinition`]. pub(crate) fn new( time_provider: Arc, - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def, max_cardinality, max_age, column_ids, - }: CreateMetaCacheArgs, + }: CreateDistinctCacheArgs, ) -> Result { if column_ids.is_empty() { return Err(CacheError::EmptyColumnSet); @@ -151,7 +151,7 @@ impl MetaCache { let mut builder = SchemaBuilder::new(); for id in &column_ids { let col = table_def.columns.get(id).with_context(|| { - format!("invalid column id ({id}) encountered while creating metadata cache") + format!("invalid column id ({id}) encountered while creating distinct value cache") })?; let data_type = match col.data_type { InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => { @@ -166,7 +166,7 @@ impl MetaCache { time_provider, max_cardinality: max_cardinality.into(), max_age: max_age.into(), - state: MetaCacheState::default(), + state: DistinctCacheState::default(), schema: Arc::new(builder.finish()), column_ids, data: Node::default(), @@ -341,14 +341,14 @@ impl MetaCache { Ok(()) } - /// Create a [`MetaCacheDefinition`] from this cache along with the given args + /// Create a [`DistinctCacheDefinition`] from this cache along with the given args pub(super) fn to_definition( &self, table_id: TableId, table_name: Arc, cache_name: Arc, - ) -> MetaCacheDefinition { - MetaCacheDefinition { + ) -> DistinctCacheDefinition { + DistinctCacheDefinition { table_id, table_name, cache_name, @@ -359,7 +359,7 @@ impl MetaCache { } } -/// A node in the `data` tree of a [`MetaCache`] +/// A node in the `data` tree of a [`DistinctCache`] /// /// Recursive struct holding a [`BTreeMap`] whose keys are the values nested under this node, and /// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to @@ -541,7 +541,7 @@ impl From<&FieldData> for Value { | FieldData::Integer(_) | FieldData::UInteger(_) | FieldData::Float(_) - | FieldData::Boolean(_) => panic!("unexpected field type for metadata cache"), + | FieldData::Boolean(_) => panic!("unexpected field type for distinct value cache"), } } } @@ -552,7 +552,7 @@ impl From for Value { } } -/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`MetaCache`] +/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`DistinctCache`] /// /// This is intended to be derived from a set of filter expressions in Datafusion by analyzing /// them with a `LiteralGuarantee`. diff --git a/influxdb3_cache/src/meta_cache/mod.rs b/influxdb3_cache/src/distinct_cache/mod.rs similarity index 82% rename from influxdb3_cache/src/meta_cache/mod.rs rename to influxdb3_cache/src/distinct_cache/mod.rs index 1f3ecb7e609..8dad4ea46fa 100644 --- a/influxdb3_cache/src/meta_cache/mod.rs +++ b/influxdb3_cache/src/distinct_cache/mod.rs @@ -1,12 +1,12 @@ -//! The Metadata Cache holds the distinct values for a column or set of columns on a table +//! The Distinct Value Cache holds the distinct values for a column or set of columns on a table mod cache; -pub use cache::{CacheError, CreateMetaCacheArgs, MaxAge, MaxCardinality}; +pub use cache::{CacheError, CreateDistinctCacheArgs, MaxAge, MaxCardinality}; mod provider; -pub use provider::{MetaCacheProvider, ProviderError}; +pub use provider::{DistinctCacheProvider, ProviderError}; mod table_function; -pub use table_function::MetaCacheFunction; -pub use table_function::META_CACHE_UDTF_NAME; +pub use table_function::DistinctCacheFunction; +pub use table_function::DISTINCT_CACHE_UDTF_NAME; #[cfg(test)] mod tests { @@ -18,9 +18,9 @@ mod tests { use std::{sync::Arc, time::Duration}; use crate::{ - meta_cache::{ - cache::{CreateMetaCacheArgs, MaxAge, MaxCardinality, MetaCache, Predicate}, - MetaCacheFunction, MetaCacheProvider, META_CACHE_UDTF_NAME, + distinct_cache::{ + cache::{CreateDistinctCacheArgs, DistinctCache, MaxAge, MaxCardinality, Predicate}, + DistinctCacheFunction, DistinctCacheProvider, DISTINCT_CACHE_UDTF_NAME, }, test_helpers::TestWriter, }; @@ -57,9 +57,9 @@ mod tests { let region_col_id = column_ids[0]; let host_col_id = column_ids[1]; // create the cache: - let mut cache = MetaCache::new( + let mut cache = DistinctCache::new( time_provider, - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def, max_cardinality: MaxCardinality::default(), max_age: MaxAge::default(), @@ -210,9 +210,9 @@ mod tests { .map(|name| table_def.column_name_to_id_unchecked(name)) .collect(); // create a cache with some cardinality and age limits: - let mut cache = MetaCache::new( + let mut cache = DistinctCache::new( Arc::clone(&time_provider) as _, - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def, max_cardinality: MaxCardinality::try_from(10).unwrap(), max_age: MaxAge::from(Duration::from_nanos(100)), @@ -293,7 +293,7 @@ mod tests { } #[test] - fn meta_cache_limit() { + fn distinct_cache_limit() { let writer = TestWriter::new(); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let rows = writer.write_lp_to_rows( @@ -318,9 +318,9 @@ mod tests { .into_iter() .map(|name| table_def.column_name_to_id_unchecked(name)) .collect(); - let mut cache = MetaCache::new( + let mut cache = DistinctCache::new( time_provider, - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def, max_cardinality: MaxCardinality::default(), max_age: MaxAge::default(), @@ -379,21 +379,21 @@ mod tests { ); } - /// This test sets up a [`MetaCacheProvider`], creates a [`MetaCache`] using the `region` and + /// This test sets up a [`DistinctCacheProvider`], creates a [`DistinctCache`] using the `region` and /// `host` column, and then writes several different unique combinations of values into it. - /// It then sets up a DataFusion [`SessionContext`], registers our [`MetaCacheFunction`] as a + /// It then sets up a DataFusion [`SessionContext`], registers our [`DistinctCacheFunction`] as a /// UDTF, and then runs a series of test cases to verify queries against the function. /// /// The purpose of this is to see that the cache works as intended, and importantly, that the - /// predicate pushdown is happening and being leveraged by the underlying [`MetaCache`], vs. + /// predicate pushdown is happening and being leveraged by the underlying [`DistinctCache`], vs. /// DataFusion doing it for us with a higher level FilterExec. /// /// Each test case verifies the `RecordBatch` output of the query as well as the output for - /// EXPLAIN on the same query. The EXPLAIN output contains a line for the MetaCacheExec, which - /// is the custom execution plan impl for the metadata cache that captures the predicates that - /// are pushed down to the underlying [`MetaCacahe::to_record_batch`] method, if any. + /// EXPLAIN on the same query. The EXPLAIN output contains a line for the DistinctCacheExec, which + /// is the custom execution plan impl for the distinct value cache that captures the predicates that + /// are pushed down to the underlying [`DistinctCacahe::to_record_batch`] method, if any. #[tokio::test] - async fn test_datafusion_meta_cache_udtf() { + async fn test_datafusion_distinct_cache_udtf() { // create a test writer and do a write in to populate the catalog with a db/table: let writer = TestWriter::new(); let _ = writer.write_lp_to_write_batch( @@ -403,7 +403,7 @@ mod tests { 0, ); - // create a meta provider and a cache on tag columns 'region' and 'host': + // create a distinct provider and a cache on tag columns 'region' and 'host': let db_schema = writer.db_schema(); let table_def = db_schema.table_definition("cpu").unwrap(); let column_ids: Vec = ["region", "host"] @@ -411,13 +411,13 @@ mod tests { .map(|name| table_def.column_name_to_id_unchecked(name)) .collect(); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); - let meta_provider = - MetaCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap(); - meta_provider + let distinct_provider = + DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap(); + distinct_provider .create_cache( db_schema.id, None, - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def, max_cardinality: MaxCardinality::default(), max_age: MaxAge::default(), @@ -448,13 +448,14 @@ mod tests { (0, 1, 0), [influxdb3_wal::create::write_batch_op(write_batch)], ); - meta_provider.write_wal_contents_to_cache(&wal_contents); + distinct_provider.write_wal_contents_to_cache(&wal_contents); - // Spin up a DataFusion session context and add the meta_cache function to it so we + // Spin up a DataFusion session context and add the distinct_cache function to it so we // can query for data in the cache we created and populated above: let ctx = SessionContext::new(); - let meta_func = MetaCacheFunction::new(db_schema.id, Arc::clone(&meta_provider)); - ctx.register_udtf(META_CACHE_UDTF_NAME, Arc::new(meta_func)); + let distinct_func = + DistinctCacheFunction::new(db_schema.id, Arc::clone(&distinct_provider)); + ctx.register_udtf(DISTINCT_CACHE_UDTF_NAME, Arc::new(distinct_func)); struct TestCase<'a> { /// A short description of the test @@ -462,14 +463,14 @@ mod tests { /// A SQL expression to evaluate using the datafusion session context, should be of /// the form: /// ```sql - /// SELECT * FROM meta_cache('cpu') ... + /// SELECT * FROM distinct_cache('cpu') ... /// ``` sql: &'a str, /// Expected record batch output expected: &'a [&'a str], /// Expected EXPLAIN output contains this. /// - /// For checking the `MetaCacheExec` portion of the EXPLAIN output for the given `sql` + /// For checking the `DistinctCacheExec` portion of the EXPLAIN output for the given `sql` /// query. A "contains" is used instead of matching the whole EXPLAIN output to prevent /// flakyness from upstream changes to other parts of the query plan. explain_contains: &'a str, @@ -480,7 +481,7 @@ mod tests { /// /// The cache should produce results in a sorted order as-is, however, some queries /// that process the results after they are emitted from the cache may have their order - /// altered by DataFusion, e.g., `SELECT DISTINCT(column_name) FROM meta_cache('table')` + /// altered by DataFusion, e.g., `SELECT DISTINCT(column_name) FROM distinct_cache('table')` /// or queries that project columns that are not at the top level of the cache. use_sorted_assert: bool, } @@ -488,7 +489,7 @@ mod tests { let test_cases = [ TestCase { _desc: "no predicates", - sql: "SELECT * FROM meta_cache('cpu')", + sql: "SELECT * FROM distinct_cache('cpu')", expected: &[ "+---------+------+", "| region | host |", @@ -507,12 +508,12 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "eq predicate on region", - sql: "SELECT * FROM meta_cache('cpu') WHERE region = 'us-east'", + sql: "SELECT * FROM distinct_cache('cpu') WHERE region = 'us-east'", expected: &[ "+---------+------+", "| region | host |", @@ -521,12 +522,12 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "eq predicate on region and host", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE region = 'us-east' AND host = 'a'", expected: &[ "+---------+------+", @@ -535,12 +536,12 @@ mod tests { "| us-east | a |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "eq predicate on region; in predicate on host", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE region = 'us-east' AND host IN ('a', 'b')", expected: &[ "+---------+------+", @@ -550,12 +551,12 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "eq predicate on region; not in predicate on host", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE region = 'us-east' AND host != 'a'", expected: &[ "+---------+------+", @@ -564,12 +565,12 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "in predicate on region", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE region IN ('ca-cent', 'ca-east', 'us-east', 'us-west')", expected: &[ "+---------+------+", @@ -583,12 +584,12 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "not in predicate on region", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE region NOT IN ('ca-cent', 'ca-east', 'us-east', 'us-west')", expected: &[ "+---------+------+", @@ -602,12 +603,12 @@ mod tests { "| eu-west | l |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "or eq predicates on region", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE region = 'us-east' OR region = 'ca-east'", expected: &[ "+---------+------+", @@ -618,12 +619,12 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "or eq predicate on host", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE host = 'd' OR host = 'e'", expected: &[ "+---------+------+", @@ -633,12 +634,12 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "un-grouped host conditions are not handled in predicate pushdown", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE region = 'us-east' AND host = 'a' OR host = 'b'", expected: &[ "+---------+------+", @@ -648,12 +649,12 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "grouped host conditions are handled in predicate pushdown", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE region = 'us-east' AND (host = 'a' OR host = 'b')", expected: &[ "+---------+------+", @@ -663,12 +664,12 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "project region column", - sql: "SELECT region FROM meta_cache('cpu')", + sql: "SELECT region FROM distinct_cache('cpu')", expected: &[ "+---------+", "| region |", @@ -682,12 +683,12 @@ mod tests { "| us-west |", "+---------+", ], - explain_contains: "MetaCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "project region column taking distinct", - sql: "SELECT DISTINCT(region) FROM meta_cache('cpu')", + sql: "SELECT DISTINCT(region) FROM distinct_cache('cpu')", expected: &[ "+---------+", "| region |", @@ -701,13 +702,13 @@ mod tests { "| us-west |", "+---------+", ], - explain_contains: "MetaCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1", + explain_contains: "DistinctCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1", // it seems that DISTINCT changes around the order of results use_sorted_assert: true, }, TestCase { _desc: "project host column", - sql: "SELECT host FROM meta_cache('cpu')", + sql: "SELECT host FROM distinct_cache('cpu')", expected: &[ "+------+", // commenting for no new line "| host |", // commenting for no new line @@ -726,7 +727,7 @@ mod tests { "| l |", // commenting for no new line "+------+", // commenting for no new line ], - explain_contains: "MetaCacheExec: projection=[host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", // this column will not be sorted since the order of elements depends on the next level // up in the cache, so the `region` column is iterated over in order, but the nested // `host` values, although sorted within `region`s, will not be globally sorted. @@ -734,7 +735,7 @@ mod tests { }, TestCase { _desc: "project host column", - sql: "SELECT host FROM meta_cache('cpu') WHERE region = 'ca-cent'", + sql: "SELECT host FROM distinct_cache('cpu') WHERE region = 'ca-cent'", expected: &[ "+------+", // commenting for no new line "| host |", // commenting for no new line @@ -742,12 +743,12 @@ mod tests { "| f |", // commenting for no new line "+------+", // commenting for no new line ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "limit clause", - sql: "SELECT * FROM meta_cache('cpu') LIMIT 8", + sql: "SELECT * FROM distinct_cache('cpu') LIMIT 8", expected: &[ "+---------+------+", "| region | host |", @@ -762,12 +763,12 @@ mod tests { "| eu-west | l |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "limit and offset", - sql: "SELECT * FROM meta_cache('cpu') LIMIT 8 OFFSET 8", + sql: "SELECT * FROM distinct_cache('cpu') LIMIT 8 OFFSET 8", expected: &[ "+---------+------+", "| region | host |", @@ -778,12 +779,12 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { _desc: "like clause", - sql: "SELECT * FROM meta_cache('cpu') \ + sql: "SELECT * FROM distinct_cache('cpu') \ WHERE region LIKE 'u%'", expected: &[ "+---------+------+", @@ -795,7 +796,7 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "MetaCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, ]; @@ -820,7 +821,7 @@ mod tests { // NOTE(hiltontj): this probably can be done a better way? // The EXPLAIN output will have two columns, the one we are interested in that contains - // the details of the MetaCacheExec is called `plan`... + // the details of the DistinctCacheExec is called `plan`... assert!( explain .column_by_name("plan") diff --git a/influxdb3_cache/src/meta_cache/provider.rs b/influxdb3_cache/src/distinct_cache/provider.rs similarity index 87% rename from influxdb3_cache/src/meta_cache/provider.rs rename to influxdb3_cache/src/distinct_cache/provider.rs index 7231773055c..46dd8fc6d5d 100644 --- a/influxdb3_cache/src/meta_cache/provider.rs +++ b/influxdb3_cache/src/distinct_cache/provider.rs @@ -4,14 +4,14 @@ use anyhow::Context; use arrow::datatypes::SchemaRef; use influxdb3_catalog::catalog::{Catalog, TableDefinition}; use influxdb3_id::{DbId, TableId}; -use influxdb3_wal::{MetaCacheDefinition, WalContents, WalOp}; +use influxdb3_wal::{DistinctCacheDefinition, WalContents, WalOp}; use iox_time::TimeProvider; use parking_lot::RwLock; -use crate::meta_cache::cache::{MaxAge, MaxCardinality}; +use crate::distinct_cache::cache::{MaxAge, MaxCardinality}; use super::{ - cache::{CreateMetaCacheArgs, MetaCache}, + cache::{CreateDistinctCacheArgs, DistinctCache}, CacheError, }; @@ -25,40 +25,40 @@ pub enum ProviderError { Unexpected(#[from] anyhow::Error), } -/// Triple nested map for storing a multiple metadata caches per table. +/// Triple nested map for storing a multiple distinct value caches per table. /// /// That is, the map nesting is `database -> table -> cache name` -type CacheMap = RwLock, MetaCache>>>>; +type CacheMap = RwLock, DistinctCache>>>>; -/// Provides the metadata caches for the running instance of InfluxDB +/// Provides the distinct value caches for the running instance of InfluxDB #[derive(Debug)] -pub struct MetaCacheProvider { +pub struct DistinctCacheProvider { pub(crate) time_provider: Arc, pub(crate) catalog: Arc, pub(crate) cache_map: CacheMap, } -impl MetaCacheProvider { - /// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's +impl DistinctCacheProvider { + /// Initialize a [`DistinctCacheProvider`] from a [`Catalog`], populating the provider's /// `cache_map` from the definitions in the catalog. pub fn new_from_catalog( time_provider: Arc, catalog: Arc, ) -> Result, ProviderError> { - let provider = Arc::new(MetaCacheProvider { + let provider = Arc::new(DistinctCacheProvider { time_provider, catalog: Arc::clone(&catalog), cache_map: Default::default(), }); for db_schema in catalog.list_db_schema() { for table_def in db_schema.tables() { - for (cache_name, cache_def) in table_def.meta_caches() { + for (cache_name, cache_def) in table_def.distinct_caches() { assert!( provider .create_cache( db_schema.id, Some(cache_name), - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def: Arc::clone(&table_def), max_cardinality: MaxCardinality::try_from( cache_def.max_cardinality @@ -78,10 +78,10 @@ impl MetaCacheProvider { Ok(provider) } - /// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's + /// Initialize a [`DistinctCacheProvider`] from a [`Catalog`], populating the provider's /// `cache_map` from the definitions in the catalog. This starts a background process that /// runs on the provided `eviction_interval` to perform eviction on all of the caches - /// in the created [`MetaCacheProvider`]'s `cache_map`. + /// in the created [`DistinctCacheProvider`]'s `cache_map`. pub fn new_from_catalog_with_background_eviction( time_provider: Arc, catalog: Arc, @@ -126,8 +126,8 @@ impl MetaCacheProvider { }) } - /// Get a list of [`MetaCacheDefinition`]s for the given database - pub fn get_cache_definitions_for_db(&self, db_id: &DbId) -> Vec { + /// Get a list of [`DistinctCacheDefinition`]s for the given database + pub fn get_cache_definitions_for_db(&self, db_id: &DbId) -> Vec { let db_schema = self .catalog .db_schema_by_id(db_id) @@ -154,34 +154,34 @@ impl MetaCacheProvider { .unwrap_or_default() } - /// Create a new entry in the metadata cache for a given database and parameters. + /// Create a new entry in the distinct cache for a given database and parameters. /// - /// If a new cache is created, this will return the [`MetaCacheDefinition`] for the created + /// If a new cache is created, this will return the [`DistinctCacheDefinition`] for the created /// cache; otherwise, if the provided arguments are identical to an existing cache, along with /// any defaults, then `None` will be returned. It is an error to attempt to create a cache that /// overwite an existing one with different parameters. /// /// The cache name is optional; if not provided, it will be of the form: /// ```text - /// __meta_cache + /// __distinct_cache /// ``` /// Where `` is an `_`-separated list of the column names used in the cache. pub fn create_cache( &self, db_id: DbId, cache_name: Option>, - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def, max_cardinality, max_age, column_ids, - }: CreateMetaCacheArgs, - ) -> Result, ProviderError> { + }: CreateDistinctCacheArgs, + ) -> Result, ProviderError> { let cache_name = if let Some(cache_name) = cache_name { cache_name } else { format!( - "{table_name}_{cols}_meta_cache", + "{table_name}_{cols}_distinct_cache", table_name = table_def.table_name, cols = column_ids .iter() @@ -196,9 +196,9 @@ impl MetaCacheProvider { .into() }; - let new_cache = MetaCache::new( + let new_cache = DistinctCache::new( Arc::clone(&self.time_provider), - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def: Arc::clone(&table_def), max_cardinality, max_age, @@ -224,7 +224,7 @@ impl MetaCacheProvider { .or_default() .insert(Arc::clone(&cache_name), new_cache); - Ok(Some(MetaCacheDefinition { + Ok(Some(DistinctCacheDefinition { table_id: table_def.table_id, table_name: Arc::clone(&table_def.table_name), cache_name, @@ -240,11 +240,11 @@ impl MetaCacheProvider { &self, db_id: DbId, table_def: Arc, - definition: &MetaCacheDefinition, + definition: &DistinctCacheDefinition, ) { - let meta_cache = MetaCache::new( + let distinct_cache = DistinctCache::new( Arc::clone(&self.time_provider), - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def, max_cardinality: definition .max_cardinality @@ -261,7 +261,7 @@ impl MetaCacheProvider { .or_default() .entry(definition.table_id) .or_default() - .insert(Arc::clone(&definition.cache_name), meta_cache); + .insert(Arc::clone(&definition.cache_name), distinct_cache); } /// Delete a cache from the provider @@ -350,7 +350,7 @@ impl MetaCacheProvider { } fn background_eviction_process( - provider: Arc, + provider: Arc, eviction_interval: Duration, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { diff --git a/influxdb3_cache/src/meta_cache/table_function.rs b/influxdb3_cache/src/distinct_cache/table_function.rs similarity index 88% rename from influxdb3_cache/src/meta_cache/table_function.rs rename to influxdb3_cache/src/distinct_cache/table_function.rs index 888a7b23581..dd934be4b8c 100644 --- a/influxdb3_cache/src/meta_cache/table_function.rs +++ b/influxdb3_cache/src/distinct_cache/table_function.rs @@ -20,31 +20,31 @@ use indexmap::IndexMap; use influxdb3_catalog::catalog::TableDefinition; use influxdb3_id::{ColumnId, DbId}; -use super::{cache::Predicate, MetaCacheProvider}; +use super::{cache::Predicate, DistinctCacheProvider}; -/// The name used to call the metadata cache in SQL queries -pub const META_CACHE_UDTF_NAME: &str = "meta_cache"; +/// The name used to call the distinct value cache in SQL queries +pub const DISTINCT_CACHE_UDTF_NAME: &str = "distinct_cache"; -/// Implementor of the [`TableProvider`] trait that is produced a call to the [`MetaCacheFunction`] +/// Implementor of the [`TableProvider`] trait that is produced a call to the [`DistinctCacheFunction`] #[derive(Debug)] -struct MetaCacheFunctionProvider { - /// Reference to the [`MetaCache`][super::cache::MetaCache] being queried's schema +struct DistinctCacheFunctionProvider { + /// Reference to the [`DistinctCache`][super::cache::DistinctCache] being queried's schema schema: SchemaRef, - /// Forwarded ref to the [`MetaCacheProvider`] which is used to get the - /// [`MetaCache`][super::cache::MetaCache] for the query, along with the `db_id` and - /// `table_def`. This is done instead of passing forward a reference to the `MetaCache` + /// Forwarded ref to the [`DistinctCacheProvider`] which is used to get the + /// [`DistinctCache`][super::cache::DistinctCache] for the query, along with the `db_id` and + /// `table_def`. This is done instead of passing forward a reference to the `DistinctCache` /// directly because doing so is not easy or possible with the Rust borrow checker. - provider: Arc, + provider: Arc, /// The database ID that the called cache is related to db_id: DbId, /// The table definition that the called cache is related to table_def: Arc, - /// The name of the cache, which is determined when calling the `meta_cache` function + /// The name of the cache, which is determined when calling the `distinct_cache` function cache_name: Arc, } #[async_trait] -impl TableProvider for MetaCacheFunctionProvider { +impl TableProvider for DistinctCacheFunctionProvider { fn as_any(&self) -> &dyn Any { self as &dyn Any } @@ -98,7 +98,7 @@ impl TableProvider for MetaCacheFunctionProvider { (vec![], None) }; - let mut meta_exec = MetaCacheExec::try_new( + let mut distinct_exec = DistinctCacheExec::try_new( predicates, Arc::clone(&self.table_def), &[batches], @@ -108,9 +108,9 @@ impl TableProvider for MetaCacheFunctionProvider { )?; let show_sizes = ctx.config_options().explain.show_sizes; - meta_exec = meta_exec.with_show_sizes(show_sizes); + distinct_exec = distinct_exec.with_show_sizes(show_sizes); - Ok(Arc::new(meta_exec)) + Ok(Arc::new(distinct_exec)) } } @@ -161,7 +161,7 @@ fn convert_filter_exprs( // and analyze it using DataFusion's `LiteralGuarantee`. // // This will distill the provided set of `Expr`s down to either an IN list, or a NOT IN list - // which we can convert to the `Predicate` type for the metadata cache. + // which we can convert to the `Predicate` type for the distinct value cache. // // The main caveat is that if for some reason there are multiple `Expr`s that apply predicates // on a given column, i.e., leading to multiple `LiteralGuarantee`s on a specific column, we @@ -217,18 +217,18 @@ fn convert_filter_exprs( /// Implementor of the [`TableFunctionImpl`] trait, to be registered as a user-defined table function /// in the Datafusion `SessionContext`. #[derive(Debug)] -pub struct MetaCacheFunction { +pub struct DistinctCacheFunction { db_id: DbId, - provider: Arc, + provider: Arc, } -impl MetaCacheFunction { - pub fn new(db_id: DbId, provider: Arc) -> Self { +impl DistinctCacheFunction { + pub fn new(db_id: DbId, provider: Arc) -> Self { Self { db_id, provider } } } -impl TableFunctionImpl for MetaCacheFunction { +impl TableFunctionImpl for DistinctCacheFunction { fn call(&self, args: &[Expr]) -> Result> { let Some(Expr::Literal(ScalarValue::Utf8(Some(table_name)))) = args.first() else { return plan_err!("first argument must be the table name as a string"); @@ -254,9 +254,9 @@ impl TableFunctionImpl for MetaCacheFunction { table_def.table_id, cache_name.map(|n| n.as_str()), ) else { - return plan_err!("could not find meta cache for the given arguments"); + return plan_err!("could not find distinct value cache for the given arguments"); }; - Ok(Arc::new(MetaCacheFunctionProvider { + Ok(Arc::new(DistinctCacheFunctionProvider { schema, provider: Arc::clone(&self.provider), db_id: self.db_id, @@ -266,7 +266,7 @@ impl TableFunctionImpl for MetaCacheFunction { } } -/// Custom implementor of the [`ExecutionPlan`] trait for use by the metadata cache +/// Custom implementor of the [`ExecutionPlan`] trait for use by the distinct value cache /// /// Wraps a [`MemoryExec`] from DataFusion, and mostly re-uses that. The special functionality /// provided by this type is to track the predicates that are pushed down to the underlying cache @@ -275,20 +275,20 @@ impl TableFunctionImpl for MetaCacheFunction { /// # Example /// /// For a query that does not provide any predicates, or one that does provide predicates, but they -/// do no get pushed down, the `EXPLAIN` for said query will contain a line for the `MetaCacheExec` +/// do no get pushed down, the `EXPLAIN` for said query will contain a line for the `DistinctCacheExec` /// with no predicates, including what is emitted by the inner `MemoryExec`: /// /// ```text -/// MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1] +/// DistinctCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1] /// ``` /// /// For queries that do have predicates that get pushed down, the output will include them, e.g.: /// /// ```text -/// MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1] +/// DistinctCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1] /// ``` #[derive(Debug)] -struct MetaCacheExec { +struct DistinctCacheExec { inner: MemoryExec, table_def: Arc, predicates: Option>, @@ -296,7 +296,7 @@ struct MetaCacheExec { limit: Option, } -impl MetaCacheExec { +impl DistinctCacheExec { fn try_new( predicates: Option>, table_def: Arc, @@ -323,11 +323,11 @@ impl MetaCacheExec { } } -impl DisplayAs for MetaCacheExec { +impl DisplayAs for DistinctCacheExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "MetaCacheExec:")?; + write!(f, "DistinctCacheExec:")?; if let Some(projection) = &self.projection { write!(f, " projection=[")?; let schema = self.schema(); @@ -363,9 +363,9 @@ impl DisplayAs for MetaCacheExec { } } -impl ExecutionPlan for MetaCacheExec { +impl ExecutionPlan for DistinctCacheExec { fn name(&self) -> &str { - "MetaCacheExec" + "DistinctCacheExec" } fn as_any(&self) -> &dyn Any { diff --git a/influxdb3_cache/src/lib.rs b/influxdb3_cache/src/lib.rs index 41550938492..5c9cd55cb3d 100644 --- a/influxdb3_cache/src/lib.rs +++ b/influxdb3_cache/src/lib.rs @@ -1,7 +1,7 @@ //! Crate holding the various cache implementations used by InfluxDB 3 +pub mod distinct_cache; pub mod last_cache; -pub mod meta_cache; pub mod parquet_cache; #[cfg(test)] diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index e93c57a50a4..1547f80d9a8 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -10,9 +10,9 @@ use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb3_wal::{ CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeletePluginDefinition, - DeleteTableDefinition, DeleteTriggerDefinition, FieldAdditions, FieldDefinition, - LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete, - OrderedCatalogBatch, PluginDefinition, TriggerDefinition, TriggerIdentifier, + DeleteTableDefinition, DeleteTriggerDefinition, DistinctCacheDefinition, DistinctCacheDelete, + FieldAdditions, FieldDefinition, LastCacheDefinition, LastCacheDelete, OrderedCatalogBatch, + PluginDefinition, TriggerDefinition, TriggerIdentifier, }; use influxdb_line_protocol::FieldValue; use iox_time::Time; @@ -707,11 +707,11 @@ impl UpdateDatabaseSchema for CatalogOp { CatalogOp::CreateDatabase(_) => Ok(schema), CatalogOp::CreateTable(create_table) => create_table.update_schema(schema), CatalogOp::AddFields(field_additions) => field_additions.update_schema(schema), - CatalogOp::CreateMetaCache(meta_cache_definition) => { - meta_cache_definition.update_schema(schema) + CatalogOp::CreateDistinctCache(distinct_cache_definition) => { + distinct_cache_definition.update_schema(schema) } - CatalogOp::DeleteMetaCache(delete_meta_cache) => { - delete_meta_cache.update_schema(schema) + CatalogOp::DeleteDistinctCache(delete_distinct_cache) => { + delete_distinct_cache.update_schema(schema) } CatalogOp::CreateLastCache(create_last_cache) => { create_last_cache.update_schema(schema) @@ -963,7 +963,7 @@ pub struct TableDefinition { pub series_key: Vec, pub series_key_names: Vec>, pub last_caches: HashMap, LastCacheDefinition>, - pub meta_caches: HashMap, MetaCacheDefinition>, + pub distinct_caches: HashMap, DistinctCacheDefinition>, pub deleted: bool, } @@ -1024,7 +1024,7 @@ impl TableDefinition { series_key, series_key_names, last_caches: HashMap::new(), - meta_caches: HashMap::new(), + distinct_caches: HashMap::new(), deleted: false, }) } @@ -1173,15 +1173,15 @@ impl TableDefinition { .map(|def| def.data_type) } - /// Add the given [`MetaCacheDefinition`] to this table - pub fn add_meta_cache(&mut self, meta_cache: MetaCacheDefinition) { - self.meta_caches - .insert(Arc::clone(&meta_cache.cache_name), meta_cache); + /// Add the given [`DistinctCacheDefinition`] to this table + pub fn add_distinct_cache(&mut self, distinct_cache: DistinctCacheDefinition) { + self.distinct_caches + .insert(Arc::clone(&distinct_cache.cache_name), distinct_cache); } - /// Remove the meta cache with the given name - pub fn remove_meta_cache(&mut self, cache_name: &str) { - self.meta_caches.remove(cache_name); + /// Remove the distinct cache with the given name + pub fn remove_distinct_cache(&mut self, cache_name: &str) { + self.distinct_caches.remove(cache_name); } /// Add a new last cache to this table definition @@ -1201,8 +1201,8 @@ impl TableDefinition { .map(|(name, def)| (Arc::clone(name), def)) } - pub fn meta_caches(&self) -> impl Iterator, &MetaCacheDefinition)> { - self.meta_caches + pub fn distinct_caches(&self) -> impl Iterator, &DistinctCacheDefinition)> { + self.distinct_caches .iter() .map(|(name, def)| (Arc::clone(name), def)) } @@ -1300,7 +1300,7 @@ impl TableUpdate for FieldAdditions { } } -impl TableUpdate for MetaCacheDefinition { +impl TableUpdate for DistinctCacheDefinition { fn table_id(&self) -> TableId { self.table_id } @@ -1311,14 +1311,14 @@ impl TableUpdate for MetaCacheDefinition { &self, mut table: Cow<'a, TableDefinition>, ) -> Result> { - if !table.meta_caches.contains_key(&self.cache_name) { - table.to_mut().add_meta_cache(self.clone()); + if !table.distinct_caches.contains_key(&self.cache_name) { + table.to_mut().add_distinct_cache(self.clone()); } Ok(table) } } -impl TableUpdate for MetaCacheDelete { +impl TableUpdate for DistinctCacheDelete { fn table_id(&self) -> TableId { self.table_id } @@ -1329,8 +1329,8 @@ impl TableUpdate for MetaCacheDelete { &self, mut table: Cow<'a, TableDefinition>, ) -> Result> { - if table.meta_caches.contains_key(&self.cache_name) { - table.to_mut().meta_caches.remove(&self.cache_name); + if table.distinct_caches.contains_key(&self.cache_name) { + table.to_mut().distinct_caches.remove(&self.cache_name); } Ok(table) } diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index f65691ea59a..288d2e1ca30 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -264,7 +264,7 @@ impl Client { } } - /// Compose a request to the `POST /api/v3/configure/meta_cache` API + /// Compose a request to the `POST /api/v3/configure/distinct_cache` API /// /// # Example /// ```no_run @@ -275,33 +275,33 @@ impl Client { /// # async fn main() -> Result<(), Box> { /// let client = Client::new("http://localhost:8181")?; /// let resp = client - /// .api_v3_configure_meta_cache_create("db_name", "table_name", ["col1", "col2"]) + /// .api_v3_configure_distinct_cache_create("db_name", "table_name", ["col1", "col2"]) /// .name("cache_name") /// .max_cardinality(NonZeroUsize::new(1_000).unwrap()) /// .max_age(Duration::from_secs(3_600)) /// .send() /// .await - /// .expect("send create meta cache request"); + /// .expect("send create distinct cache request"); /// # Ok(()) /// # } /// ``` - pub fn api_v3_configure_meta_cache_create( + pub fn api_v3_configure_distinct_cache_create( &self, db: impl Into, table: impl Into, columns: impl IntoIterator>, - ) -> CreateMetaCacheRequestBuilder<'_> { - CreateMetaCacheRequestBuilder::new(self, db, table, columns) + ) -> CreateDistinctCacheRequestBuilder<'_> { + CreateDistinctCacheRequestBuilder::new(self, db, table, columns) } - /// Make a request to the `DELETE /api/v3/configure/meta_cache` API - pub async fn api_v3_configure_meta_cache_delete( + /// Make a request to the `DELETE /api/v3/configure/distinct_cache` API + pub async fn api_v3_configure_distinct_cache_delete( &self, db: impl Into + Send, table: impl Into + Send, name: impl Into + Send, ) -> Result<()> { - let url = self.base_url.join("/api/v3/configure/meta_cache")?; + let url = self.base_url.join("/api/v3/configure/distinct_cache")?; #[derive(Serialize)] struct Req { db: String, @@ -317,7 +317,7 @@ impl Client { req = req.bearer_auth(token.expose_secret()); } let resp = req.send().await.map_err(|src| { - Error::request_send(Method::DELETE, "/api/v3/configure/meta_cache", src) + Error::request_send(Method::DELETE, "/api/v3/configure/distinct_cache", src) })?; let status = resp.status(); match status { @@ -1258,10 +1258,10 @@ pub enum LastCacheValueColumnsDef { AllNonKeyColumns, } -/// Type for composing requests to the `POST /api/v3/configure/meta_cache` API created by the -/// [`Client::api_v3_configure_meta_cache_create`] method +/// Type for composing requests to the `POST /api/v3/configure/distinct_cache` API created by the +/// [`Client::api_v3_configure_distinct_cache_create`] method #[derive(Debug, Serialize)] -pub struct CreateMetaCacheRequestBuilder<'c> { +pub struct CreateDistinctCacheRequestBuilder<'c> { #[serde(skip_serializing)] client: &'c Client, db: String, @@ -1275,7 +1275,7 @@ pub struct CreateMetaCacheRequestBuilder<'c> { max_age: Option, } -impl<'c> CreateMetaCacheRequestBuilder<'c> { +impl<'c> CreateDistinctCacheRequestBuilder<'c> { fn new( client: &'c Client, db: impl Into, @@ -1312,20 +1312,23 @@ impl<'c> CreateMetaCacheRequestBuilder<'c> { } /// Send the create cache request - pub async fn send(self) -> Result> { - let url = self.client.base_url.join("/api/v3/configure/meta_cache")?; + pub async fn send(self) -> Result> { + let url = self + .client + .base_url + .join("/api/v3/configure/distinct_cache")?; let mut req = self.client.http_client.post(url).json(&self); if let Some(token) = &self.client.auth_token { req = req.bearer_auth(token.expose_secret()); } let resp = req.send().await.map_err(|src| { - Error::request_send(Method::POST, "/api/v3/configure/meta_cache", src) + Error::request_send(Method::POST, "/api/v3/configure/distinct_cache", src) })?; let status = resp.status(); match status { StatusCode::CREATED => { let content = resp - .json::() + .json::() .await .map_err(Error::Json)?; Ok(Some(content)) @@ -1340,7 +1343,7 @@ impl<'c> CreateMetaCacheRequestBuilder<'c> { } #[derive(Debug, Serialize, Deserialize)] -pub struct MetaCacheCreatedResponse { +pub struct DistinctCacheCreatedResponse { /// The id of the table the cache was created on pub table_id: u32, /// The name of the table the cache was created on diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index aabb510e75a..4c4c4ca7e5d 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -20,8 +20,8 @@ use hyper::header::CONTENT_TYPE; use hyper::http::HeaderValue; use hyper::HeaderMap; use hyper::{Body, Method, Request, Response, StatusCode}; +use influxdb3_cache::distinct_cache::{self, CreateDistinctCacheArgs, MaxAge, MaxCardinality}; use influxdb3_cache::last_cache; -use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MaxAge, MaxCardinality}; use influxdb3_catalog::catalog::Error as CatalogError; use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION}; use influxdb3_wal::{PluginType, TriggerSpecificationDefinition}; @@ -315,24 +315,26 @@ impl Error { .body(Body::from(self.to_string())) .unwrap(), }, - Self::WriteBuffer(WriteBufferError::MetaCacheError(ref mc_err)) => match mc_err { - meta_cache::ProviderError::Cache(ref cache_err) => match cache_err { - meta_cache::CacheError::EmptyColumnSet - | meta_cache::CacheError::NonTagOrStringColumn { .. } - | meta_cache::CacheError::ConfigurationMismatch { .. } => Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from(mc_err.to_string())) - .unwrap(), - meta_cache::CacheError::Unexpected(_) => Response::builder() + Self::WriteBuffer(WriteBufferError::DistinctCacheError(ref mc_err)) => match mc_err { + distinct_cache::ProviderError::Cache(ref cache_err) => match cache_err { + distinct_cache::CacheError::EmptyColumnSet + | distinct_cache::CacheError::NonTagOrStringColumn { .. } + | distinct_cache::CacheError::ConfigurationMismatch { .. } => { + Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(mc_err.to_string())) + .unwrap() + } + distinct_cache::CacheError::Unexpected(_) => Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::from(mc_err.to_string())) .unwrap(), }, - meta_cache::ProviderError::CacheNotFound { .. } => Response::builder() + distinct_cache::ProviderError::CacheNotFound { .. } => Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(mc_err.to_string())) .unwrap(), - meta_cache::ProviderError::Unexpected(_) => Response::builder() + distinct_cache::ProviderError::Unexpected(_) => Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::from(mc_err.to_string())) .unwrap(), @@ -766,16 +768,16 @@ where .map_err(Into::into) } - /// Create a new metadata cache given the [`MetaCacheCreateRequest`] arguments in the request + /// Create a new distinct value cache given the [`DistinctCacheCreateRequest`] arguments in the request /// body. /// /// If the result is to create a cache that already exists, with the same configuration, this /// will respond with a 204 NOT CREATED. If an existing cache would be overwritten with a /// different configuration, that is a 400 BAD REQUEST - async fn configure_meta_cache_create(&self, req: Request) -> Result> { + async fn configure_distinct_cache_create(&self, req: Request) -> Result> { let args = self.read_body_json(req).await?; - info!(?args, "create metadata cache request"); - let MetaCacheCreateRequest { + info!(?args, "create distinct value cache request"); + let DistinctCacheCreateRequest { db, table, name, @@ -807,10 +809,10 @@ where let max_cardinality = max_cardinality.unwrap_or_default(); match self .write_buffer - .create_meta_cache( + .create_distinct_cache( db_schema, name, - CreateMetaCacheArgs { + CreateDistinctCacheArgs { table_def, max_cardinality, max_age, @@ -831,11 +833,12 @@ where } } - /// Delete a metadata cache entry with the given [`MetaCacheDeleteRequest`] parameters + /// Delete a distinct value cache entry with the given [`DistinctCacheDeleteRequest`] parameters /// /// The parameters must be passed in either the query string or the body of the request as JSON. - async fn configure_meta_cache_delete(&self, req: Request) -> Result> { - let MetaCacheDeleteRequest { db, table, name } = if let Some(query) = req.uri().query() { + async fn configure_distinct_cache_delete(&self, req: Request) -> Result> { + let DistinctCacheDeleteRequest { db, table, name } = if let Some(query) = req.uri().query() + { serde_urlencoded::from_str(query)? } else { self.read_body_json(req).await? @@ -855,7 +858,7 @@ where } })?; self.write_buffer - .delete_meta_cache(&db_id, &table_id, &name) + .delete_distinct_cache(&db_id, &table_id, &name) .await?; Ok(Response::builder() .status(StatusCode::OK) @@ -1518,9 +1521,9 @@ impl From for WriteParams { } } -/// Request definition for the `POST /api/v3/configure/meta_cache` API +/// Request definition for the `POST /api/v3/configure/distinct_cache` API #[derive(Debug, Deserialize)] -struct MetaCacheCreateRequest { +struct DistinctCacheCreateRequest { /// The name of the database associated with the cache db: String, /// The name of the table associated with the cache @@ -1539,9 +1542,9 @@ struct MetaCacheCreateRequest { max_age: Option, } -/// Request definition for the `DELETE /api/v3/configure/meta_cache` API +/// Request definition for the `DELETE /api/v3/configure/distinct_cache` API #[derive(Debug, Deserialize)] -struct MetaCacheDeleteRequest { +struct DistinctCacheDeleteRequest { db: String, table: String, name: String, @@ -1712,11 +1715,11 @@ pub(crate) async fn route_request( (Method::GET, "/health" | "/api/v1/health") => http_server.health(), (Method::GET | Method::POST, "/ping") => http_server.ping(), (Method::GET, "/metrics") => http_server.handle_metrics(), - (Method::POST, "/api/v3/configure/meta_cache") => { - http_server.configure_meta_cache_create(req).await + (Method::POST, "/api/v3/configure/distinct_cache") => { + http_server.configure_distinct_cache_create(req).await } - (Method::DELETE, "/api/v3/configure/meta_cache") => { - http_server.configure_meta_cache_delete(req).await + (Method::DELETE, "/api/v3/configure/distinct_cache") => { + http_server.configure_distinct_cache_delete(req).await } (Method::POST, "/api/v3/configure/last_cache") => { http_server.configure_last_cache_create(req).await diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 44915e562a8..50874fb73b1 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -261,8 +261,8 @@ mod tests { use crate::serve; use datafusion::parquet::data_type::AsBytes; use hyper::{body, Body, Client, Request, Response, StatusCode}; + use influxdb3_cache::distinct_cache::DistinctCacheProvider; use influxdb3_cache::last_cache::LastCacheProvider; - use influxdb3_cache::meta_cache::MetaCacheProvider; use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle; use influxdb3_catalog::catalog::Catalog; use influxdb3_id::{DbId, TableId}; @@ -813,7 +813,7 @@ mod tests { persister: Arc::clone(&persister), catalog: Arc::clone(&catalog), last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), - meta_cache: MetaCacheProvider::new_from_catalog( + distinct_cache: DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider) as _, Arc::clone(&catalog), ) diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 0a39a08e33e..ef89bfdc23f 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -20,8 +20,8 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; use datafusion_util::config::DEFAULT_SCHEMA; use datafusion_util::MemoryStream; +use influxdb3_cache::distinct_cache::{DistinctCacheFunction, DISTINCT_CACHE_UDTF_NAME}; use influxdb3_cache::last_cache::{LastCacheFunction, LAST_CACHE_UDTF_NAME}; -use influxdb3_cache::meta_cache::{MetaCacheFunction, META_CACHE_UDTF_NAME}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; @@ -502,10 +502,10 @@ impl QueryNamespace for Database { )), ); ctx.inner().register_udtf( - META_CACHE_UDTF_NAME, - Arc::new(MetaCacheFunction::new( + DISTINCT_CACHE_UDTF_NAME, + Arc::new(DistinctCacheFunction::new( self.db_schema.id, - self.write_buffer.meta_cache_provider(), + self.write_buffer.distinct_cache_provider(), )), ); ctx @@ -652,7 +652,7 @@ mod tests { use datafusion::assert_batches_sorted_eq; use futures::TryStreamExt; use influxdb3_cache::{ - last_cache::LastCacheProvider, meta_cache::MetaCacheProvider, + distinct_cache::DistinctCacheProvider, last_cache::LastCacheProvider, parquet_cache::test_cached_obj_store_and_oracle, }; use influxdb3_catalog::catalog::Catalog; @@ -715,7 +715,7 @@ mod tests { persister, catalog: Arc::clone(&catalog), last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), - meta_cache: MetaCacheProvider::new_from_catalog( + distinct_cache: DistinctCacheProvider::new_from_catalog( Arc::::clone(&time_provider), Arc::clone(&catalog), ) diff --git a/influxdb3_server/src/system_tables/meta_caches.rs b/influxdb3_server/src/system_tables/distinct_caches.rs similarity index 83% rename from influxdb3_server/src/system_tables/meta_caches.rs rename to influxdb3_server/src/system_tables/distinct_caches.rs index a8918eae01d..888d85747e1 100644 --- a/influxdb3_server/src/system_tables/meta_caches.rs +++ b/influxdb3_server/src/system_tables/distinct_caches.rs @@ -4,29 +4,32 @@ use arrow::array::{GenericListBuilder, StringViewBuilder, UInt32Builder, UInt64B use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion::{error::DataFusionError, prelude::Expr}; -use influxdb3_cache::meta_cache::MetaCacheProvider; +use influxdb3_cache::distinct_cache::DistinctCacheProvider; use influxdb3_catalog::catalog::DatabaseSchema; -use influxdb3_wal::MetaCacheDefinition; +use influxdb3_wal::DistinctCacheDefinition; use iox_system_tables::IoxSystemTable; #[derive(Debug)] -pub(super) struct MetaCachesTable { +pub(super) struct DistinctCachesTable { db_schema: Arc, schema: SchemaRef, - provider: Arc, + provider: Arc, } -impl MetaCachesTable { - pub(super) fn new(db_schema: Arc, provider: Arc) -> Self { +impl DistinctCachesTable { + pub(super) fn new( + db_schema: Arc, + provider: Arc, + ) -> Self { Self { db_schema, - schema: meta_caches_schema(), + schema: distinct_caches_schema(), provider, } } } -fn meta_caches_schema() -> SchemaRef { +fn distinct_caches_schema() -> SchemaRef { let columns = vec![ Field::new("table", DataType::Utf8View, false), Field::new("name", DataType::Utf8View, false), @@ -47,7 +50,7 @@ fn meta_caches_schema() -> SchemaRef { } #[async_trait::async_trait] -impl IoxSystemTable for MetaCachesTable { +impl IoxSystemTable for DistinctCachesTable { fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } @@ -60,14 +63,14 @@ impl IoxSystemTable for MetaCachesTable { let caches = self .provider .get_cache_definitions_for_db(&self.db_schema.id); - from_meta_cache_definitions(&self.db_schema, self.schema(), &caches) + from_distinct_cache_definitions(&self.db_schema, self.schema(), &caches) } } -fn from_meta_cache_definitions( +fn from_distinct_cache_definitions( db_schema: &DatabaseSchema, sys_table_schema: SchemaRef, - cache_definitions: &[MetaCacheDefinition], + cache_definitions: &[DistinctCacheDefinition], ) -> Result { let mut table_name_arr = StringViewBuilder::with_capacity(cache_definitions.len()); let mut cache_name_arr = StringViewBuilder::with_capacity(cache_definitions.len()); @@ -90,7 +93,7 @@ fn from_meta_cache_definitions( for cache in cache_definitions { let table_def = db_schema .table_definition_by_id(&cache.table_id) - .expect("table should exist for metadata cache"); + .expect("table should exist for distinct value cache"); table_name_arr.append_value(&cache.table_name); cache_name_arr.append_value(&cache.cache_name); diff --git a/influxdb3_server/src/system_tables/mod.rs b/influxdb3_server/src/system_tables/mod.rs index 3b59bf6d29d..0471cbbb3e8 100644 --- a/influxdb3_server/src/system_tables/mod.rs +++ b/influxdb3_server/src/system_tables/mod.rs @@ -7,19 +7,19 @@ use datafusion::{ logical_expr::{col, BinaryExpr, Expr, Operator}, scalar::ScalarValue, }; +use distinct_caches::DistinctCachesTable; use influxdb3_catalog::catalog::DatabaseSchema; use influxdb3_sys_events::SysEventStore; use influxdb3_write::WriteBuffer; use iox_query::query_log::QueryLog; use iox_system_tables::SystemTableProvider; -use meta_caches::MetaCachesTable; use parquet_files::ParquetFilesTable; use tonic::async_trait; use self::{last_caches::LastCachesTable, queries::QueriesTable}; +mod distinct_caches; mod last_caches; -mod meta_caches; mod parquet_files; use crate::system_tables::python_call::{ ProcessingEnginePluginTable, ProcessingEngineTriggerTable, @@ -33,7 +33,7 @@ pub const TABLE_NAME_PREDICATE: &str = "table_name"; pub(crate) const QUERIES_TABLE_NAME: &str = "queries"; pub(crate) const LAST_CACHES_TABLE_NAME: &str = "last_caches"; -pub(crate) const META_CACHES_TABLE_NAME: &str = "meta_caches"; +pub(crate) const DISTINCT_CACHES_TABLE_NAME: &str = "distinct_caches"; pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files"; const PROCESSING_ENGINE_PLUGINS_TABLE_NAME: &str = "processing_engine_plugins"; @@ -102,11 +102,10 @@ impl AllSystemSchemaTablesProvider { buffer.last_cache_provider(), )))); tables.insert(LAST_CACHES_TABLE_NAME, last_caches); - let meta_caches = Arc::new(SystemTableProvider::new(Arc::new(MetaCachesTable::new( - Arc::clone(&db_schema), - buffer.meta_cache_provider(), - )))); - tables.insert(META_CACHES_TABLE_NAME, meta_caches); + let distinct_caches = Arc::new(SystemTableProvider::new(Arc::new( + DistinctCachesTable::new(Arc::clone(&db_schema), buffer.distinct_cache_provider()), + ))); + tables.insert(DISTINCT_CACHES_TABLE_NAME, distinct_caches); let parquet_files = Arc::new(SystemTableProvider::new(Arc::new(ParquetFilesTable::new( db_schema.id, buffer, diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 15947ec27a5..a07f2d9e3c6 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -321,8 +321,8 @@ pub enum CatalogOp { CreateDatabase(DatabaseDefinition), CreateTable(TableDefinition), AddFields(FieldAdditions), - CreateMetaCache(MetaCacheDefinition), - DeleteMetaCache(MetaCacheDelete), + CreateDistinctCache(DistinctCacheDefinition), + DeleteDistinctCache(DistinctCacheDelete), CreateLastCache(LastCacheDefinition), DeleteLastCache(LastCacheDelete), DeleteDatabase(DeleteDatabaseDefinition), @@ -586,16 +586,16 @@ pub struct LastCacheDelete { pub name: Arc, } -/// Defines a metadata cache in a given table and database +/// Defines a distinct value cache in a given table and database #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] -pub struct MetaCacheDefinition { +pub struct DistinctCacheDefinition { /// The id of the associated table pub table_id: TableId, /// The name of the associated table pub table_name: Arc, /// The name of the cache, is unique within the associated table pub cache_name: Arc, - /// The ids of columns tracked by this metadata cache, in the defined order + /// The ids of columns tracked by this distinct value cache, in the defined order pub column_ids: Vec, /// The maximum number of distinct value combintions the cache will hold pub max_cardinality: usize, @@ -604,7 +604,7 @@ pub struct MetaCacheDefinition { } #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct MetaCacheDelete { +pub struct DistinctCacheDelete { pub table_name: Arc, pub table_id: TableId, pub cache_name: Arc, diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 823cdb3a6dd..67f912d5be3 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -14,9 +14,9 @@ use data_types::{NamespaceName, TimestampMinMax}; use datafusion::catalog::Session; use datafusion::error::DataFusionError; use datafusion::prelude::Expr; +use influxdb3_cache::distinct_cache::CreateDistinctCacheArgs; +use influxdb3_cache::distinct_cache::DistinctCacheProvider; use influxdb3_cache::last_cache::LastCacheProvider; -use influxdb3_cache::meta_cache::CreateMetaCacheArgs; -use influxdb3_cache::meta_cache::MetaCacheProvider; use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::CatalogSequenceNumber; use influxdb3_catalog::catalog::DatabaseSchema; @@ -24,7 +24,7 @@ use influxdb3_id::ParquetFileId; use influxdb3_id::SerdeVecMap; use influxdb3_id::TableId; use influxdb3_id::{ColumnId, DbId}; -use influxdb3_wal::MetaCacheDefinition; +use influxdb3_wal::DistinctCacheDefinition; use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber}; use iox_query::QueryChunk; use iox_time::Time; @@ -52,7 +52,7 @@ pub type Result = std::result::Result; pub trait WriteBuffer: Bufferer + ChunkContainer - + MetaCacheManager + + DistinctCacheManager + LastCacheManager + DatabaseManager + ProcessingEngineManager @@ -115,21 +115,23 @@ pub trait ChunkContainer: Debug + Send + Sync + 'static { ) -> Result>, DataFusionError>; } -/// [`MetaCacheManager`] is used to manage interaction with a [`MetaCacheProvider`]. This enables +/// [`DistinctCacheManager`] is used to manage interaction with a [`DistinctCacheProvider`]. This enables /// cache creation, deletion, and getting access to existing #[async_trait::async_trait] -pub trait MetaCacheManager: Debug + Send + Sync + 'static { - /// Get a reference to the metadata cache provider - fn meta_cache_provider(&self) -> Arc; +pub trait DistinctCacheManager: Debug + Send + Sync + 'static { + /// Get a reference to the distinct value cache provider + fn distinct_cache_provider(&self) -> Arc; - async fn create_meta_cache( + /// Create a new distinct value cache + async fn create_distinct_cache( &self, db_schema: Arc, cache_name: Option, - args: CreateMetaCacheArgs, - ) -> Result, write_buffer::Error>; + args: CreateDistinctCacheArgs, + ) -> Result, write_buffer::Error>; - async fn delete_meta_cache( + /// Delete a distinct value cache + async fn delete_distinct_cache( &self, db_id: &DbId, tbl_id: &TableId, diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index e1299f3b3e3..52cc5df83f4 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -14,7 +14,7 @@ use crate::write_buffer::queryable_buffer::QueryableBuffer; use crate::write_buffer::validator::WriteValidator; use crate::{chunk::ParquetChunk, write_buffer, DatabaseManager}; use crate::{ - BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, MetaCacheManager, + BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager, LastCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError, }; #[cfg(feature = "system-py")] @@ -28,8 +28,8 @@ use datafusion::catalog::Session; use datafusion::common::DataFusionError; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::logical_expr::Expr; +use influxdb3_cache::distinct_cache::{self, CreateDistinctCacheArgs, DistinctCacheProvider}; use influxdb3_cache::last_cache::{self, LastCacheProvider}; -use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MetaCacheProvider}; use influxdb3_cache::parquet_cache::ParquetCacheOracle; use influxdb3_catalog::catalog; use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound; @@ -40,8 +40,8 @@ use influxdb3_wal::{ PluginDefinition, PluginType, TriggerDefinition, TriggerSpecificationDefinition, WalContents, }; use influxdb3_wal::{ - CatalogBatch, CatalogOp, LastCacheDefinition, LastCacheDelete, LastCacheSize, - MetaCacheDefinition, MetaCacheDelete, Wal, WalConfig, WalFileNotifier, WalOp, + CatalogBatch, CatalogOp, DistinctCacheDefinition, DistinctCacheDelete, LastCacheDefinition, + LastCacheDelete, LastCacheSize, Wal, WalConfig, WalFileNotifier, WalOp, }; use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition}; use influxdb3_wal::{DatabaseDefinition, FieldDefinition}; @@ -130,8 +130,8 @@ pub enum Error { #[error("cannot write to a read-only server")] NoWriteInReadOnly, - #[error("error in metadata cache: {0}")] - MetaCacheError(#[from] meta_cache::ProviderError), + #[error("error in distinct value cache: {0}")] + DistinctCacheError(#[from] distinct_cache::ProviderError), #[error("error: {0}")] AnyhowError(#[from] anyhow::Error), @@ -163,7 +163,7 @@ pub struct WriteBufferImpl { wal: Arc, time_provider: Arc, metrics: WriteMetrics, - meta_cache: Arc, + distinct_cache: Arc, last_cache: Arc, #[allow(dead_code)] plugin_dir: Option, @@ -177,7 +177,7 @@ pub struct WriteBufferImplArgs { pub persister: Arc, pub catalog: Arc, pub last_cache: Arc, - pub meta_cache: Arc, + pub distinct_cache: Arc, pub time_provider: Arc, pub executor: Arc, pub wal_config: WalConfig, @@ -192,7 +192,7 @@ impl WriteBufferImpl { persister, catalog, last_cache, - meta_cache, + distinct_cache, time_provider, executor, wal_config, @@ -227,7 +227,7 @@ impl WriteBufferImpl { catalog: Arc::clone(&catalog), persister: Arc::clone(&persister), last_cache_provider: Arc::clone(&last_cache), - meta_cache_provider: Arc::clone(&meta_cache), + distinct_cache_provider: Arc::clone(&distinct_cache), persisted_files: Arc::clone(&persisted_files), parquet_cache: parquet_cache.clone(), })); @@ -252,7 +252,7 @@ impl WriteBufferImpl { wal_config, wal, time_provider, - meta_cache, + distinct_cache, last_cache, persisted_files, buffer: queryable_buffer, @@ -475,23 +475,23 @@ impl ChunkContainer for WriteBufferImpl { } #[async_trait::async_trait] -impl MetaCacheManager for WriteBufferImpl { - fn meta_cache_provider(&self) -> Arc { - Arc::clone(&self.meta_cache) +impl DistinctCacheManager for WriteBufferImpl { + fn distinct_cache_provider(&self) -> Arc { + Arc::clone(&self.distinct_cache) } - async fn create_meta_cache( + async fn create_distinct_cache( &self, db_schema: Arc, cache_name: Option, - args: CreateMetaCacheArgs, - ) -> Result, Error> { + args: CreateDistinctCacheArgs, + ) -> Result, Error> { if let Some(new_cache_definition) = self - .meta_cache + .distinct_cache .create_cache(db_schema.id, cache_name.map(Into::into), args) - .map_err(Error::MetaCacheError)? + .map_err(Error::DistinctCacheError)? { - let catalog_op = CatalogOp::CreateMetaCache(new_cache_definition.clone()); + let catalog_op = CatalogOp::CreateDistinctCache(new_cache_definition.clone()); let catalog_batch = CatalogBatch { database_id: db_schema.id, database_name: db_schema.name.clone(), @@ -509,7 +509,7 @@ impl MetaCacheManager for WriteBufferImpl { } } - async fn delete_meta_cache( + async fn delete_distinct_cache( &self, db_id: &DbId, tbl_id: &TableId, @@ -517,12 +517,13 @@ impl MetaCacheManager for WriteBufferImpl { ) -> Result<(), Error> { let catalog = self.catalog(); let db_schema = catalog.db_schema_by_id(db_id).expect("db should exist"); - self.meta_cache.delete_cache(db_id, tbl_id, cache_name)?; + self.distinct_cache + .delete_cache(db_id, tbl_id, cache_name)?; let catalog_batch = CatalogBatch { database_id: *db_id, database_name: Arc::clone(&db_schema.name), time_ns: self.time_provider.now().timestamp_nanos(), - ops: vec![CatalogOp::DeleteMetaCache(MetaCacheDelete { + ops: vec![CatalogOp::DeleteDistinctCache(DistinctCacheDelete { table_name: db_schema.table_id_to_name(tbl_id).expect("table exists"), table_id: *tbl_id, cache_name: cache_name.into(), @@ -1202,14 +1203,16 @@ mod tests { let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap()); let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); - let meta_cache = - MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog)) - .unwrap(); + let distinct_cache = DistinctCacheProvider::new_from_catalog( + Arc::clone(&time_provider), + Arc::clone(&catalog), + ) + .unwrap(); let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs { persister: Arc::clone(&persister), catalog, last_cache, - meta_cache, + distinct_cache, time_provider: Arc::clone(&time_provider), executor: crate::test_help::make_exec(), wal_config: WalConfig::test_config(), @@ -1282,14 +1285,16 @@ mod tests { // now load a new buffer from object storage let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap()); let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); - let meta_cache = - MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog)) - .unwrap(); + let distinct_cache = DistinctCacheProvider::new_from_catalog( + Arc::clone(&time_provider), + Arc::clone(&catalog), + ) + .unwrap(); let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs { persister, catalog, last_cache, - meta_cache, + distinct_cache, time_provider, executor: crate::test_help::make_exec(), wal_config: WalConfig { @@ -1349,7 +1354,7 @@ mod tests { let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap()); let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); - let meta_cache = MetaCacheProvider::new_from_catalog( + let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) @@ -1358,7 +1363,7 @@ mod tests { persister: Arc::clone(&wbuf.persister), catalog, last_cache, - meta_cache, + distinct_cache, time_provider, executor: Arc::clone(&wbuf.buffer.executor), wal_config: WalConfig { @@ -1579,14 +1584,16 @@ mod tests { .unwrap(), ); let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); - let meta_cache = - MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog)) - .unwrap(); + let distinct_cache = DistinctCacheProvider::new_from_catalog( + Arc::clone(&time_provider), + Arc::clone(&catalog), + ) + .unwrap(); let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs { persister: Arc::clone(&write_buffer.persister), catalog, last_cache, - meta_cache, + distinct_cache, time_provider: Arc::clone(&write_buffer.time_provider), executor: Arc::clone(&write_buffer.buffer.executor), wal_config: WalConfig { @@ -2870,14 +2877,16 @@ mod tests { let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap()); let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); - let meta_cache = - MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog)) - .unwrap(); + let distinct_cache = DistinctCacheProvider::new_from_catalog( + Arc::clone(&time_provider), + Arc::clone(&catalog), + ) + .unwrap(); let wbuf = WriteBufferImpl::new(WriteBufferImplArgs { persister, catalog, last_cache, - meta_cache, + distinct_cache, time_provider: Arc::clone(&time_provider), executor: crate::test_help::make_exec(), wal_config, diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 02d55765d02..a7308c18144 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -17,8 +17,8 @@ use datafusion::common::DataFusionError; use datafusion::logical_expr::Expr; use datafusion_util::stream_from_batches; use hashbrown::HashMap; +use influxdb3_cache::distinct_cache::DistinctCacheProvider; use influxdb3_cache::last_cache::LastCacheProvider; -use influxdb3_cache::meta_cache::MetaCacheProvider; use influxdb3_cache::parquet_cache::{CacheRequest, ParquetCacheOracle}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{DbId, TableId}; @@ -43,7 +43,7 @@ use tokio::sync::{mpsc, oneshot, Mutex}; pub struct QueryableBuffer { pub(crate) executor: Arc, catalog: Arc, - meta_cache_provider: Arc, + distinct_cache_provider: Arc, last_cache_provider: Arc, persister: Arc, persisted_files: Arc, @@ -60,7 +60,7 @@ pub struct QueryableBufferArgs { pub catalog: Arc, pub persister: Arc, pub last_cache_provider: Arc, - pub meta_cache_provider: Arc, + pub distinct_cache_provider: Arc, pub persisted_files: Arc, pub parquet_cache: Option>, } @@ -72,7 +72,7 @@ impl QueryableBuffer { catalog, persister, last_cache_provider, - meta_cache_provider, + distinct_cache_provider, persisted_files, parquet_cache, }: QueryableBufferArgs, @@ -84,7 +84,7 @@ impl QueryableBuffer { executor, catalog, last_cache_provider, - meta_cache_provider, + distinct_cache_provider, persister, persisted_files, buffer, @@ -152,7 +152,8 @@ impl QueryableBuffer { /// Update the caches managed by the database fn write_wal_contents_to_caches(&self, write: &WalContents) { self.last_cache_provider.write_wal_contents_to_cache(write); - self.meta_cache_provider.write_wal_contents_to_cache(write); + self.distinct_cache_provider + .write_wal_contents_to_cache(write); } /// Called when the wal has persisted a new file. Buffer the contents in memory and update the @@ -163,7 +164,7 @@ impl QueryableBuffer { buffer.buffer_ops( write.ops, &self.last_cache_provider, - &self.meta_cache_provider, + &self.distinct_cache_provider, ); } @@ -185,7 +186,7 @@ impl QueryableBuffer { buffer.buffer_ops( write.ops, &self.last_cache_provider, - &self.meta_cache_provider, + &self.distinct_cache_provider, ); let mut persisting_chunks = vec![]; @@ -476,7 +477,7 @@ impl BufferState { &mut self, ops: Vec, last_cache_provider: &LastCacheProvider, - meta_cache_provider: &MetaCacheProvider, + distinct_cache_provider: &DistinctCacheProvider, ) { for op in ops { match op { @@ -498,20 +499,20 @@ impl BufferState { // eg. creating or deleting last cache itself for op in catalog_batch.ops { match op { - CatalogOp::CreateMetaCache(definition) => { + CatalogOp::CreateDistinctCache(definition) => { let table_def = db_schema .table_definition_by_id(&definition.table_id) .expect("table should exist"); - meta_cache_provider.create_from_definition( + distinct_cache_provider.create_from_definition( db_schema.id, table_def, &definition, ); } - CatalogOp::DeleteMetaCache(cache) => { + CatalogOp::DeleteDistinctCache(cache) => { // this only fails if the db/table/cache do not exist, so we ignore // the error if it happens. - let _ = meta_cache_provider.delete_cache( + let _ = distinct_cache_provider.delete_cache( &db_schema.id, &cache.table_id, &cache.cache_name, @@ -543,7 +544,7 @@ impl BufferState { self.db_to_table.remove(&db_definition.database_id); last_cache_provider .delete_caches_for_db(&db_definition.database_id); - meta_cache_provider + distinct_cache_provider .delete_caches_for_db(&db_definition.database_id); } CatalogOp::DeleteTable(table_definition) => { @@ -551,7 +552,7 @@ impl BufferState { &table_definition.database_id, &table_definition.table_id, ); - meta_cache_provider.delete_caches_for_db_and_table( + distinct_cache_provider.delete_caches_for_db_and_table( &table_definition.database_id, &table_definition.table_id, ); @@ -784,7 +785,7 @@ mod tests { catalog: Arc::clone(&catalog), persister: Arc::clone(&persister), last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), - meta_cache_provider: MetaCacheProvider::new_from_catalog( + distinct_cache_provider: DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), )