From a6ecb953aab77d7cbd6f004a890f2329eb7906ab Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 11 Nov 2024 11:13:56 +0800 Subject: [PATCH 1/9] feat: add http conn for fuzz --- tests-fuzz/Cargo.toml | 18 +++++++++++++++ tests-fuzz/src/error.rs | 23 +++++++++++++++++++ tests-fuzz/src/ir.rs | 2 +- tests-fuzz/src/utils.rs | 15 +++++++++--- .../targets/ddl/fuzz_alter_logical_table.rs | 2 +- tests-fuzz/targets/ddl/fuzz_alter_table.rs | 2 +- .../targets/ddl/fuzz_create_database.rs | 2 +- .../targets/ddl/fuzz_create_logical_table.rs | 2 +- tests-fuzz/targets/ddl/fuzz_create_table.rs | 2 +- .../failover/fuzz_failover_metric_regions.rs | 2 +- .../failover/fuzz_failover_mito_regions.rs | 2 +- tests-fuzz/targets/fuzz_insert.rs | 2 +- .../targets/fuzz_insert_logical_table.rs | 2 +- .../migration/fuzz_migrate_metric_regions.rs | 2 +- .../migration/fuzz_migrate_mito_regions.rs | 2 +- 15 files changed, 65 insertions(+), 15 deletions(-) diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index c408992bd508..5b856f0705d7 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -29,6 +29,7 @@ datatypes = { workspace = true } derive_builder = { workspace = true } dotenv = "0.15" futures = { workspace = true } +greptime-proto.workspace = true humantime = { workspace = true } k8s-openapi = { version = "0.22", features = ["v1_30"] } kube = { version = "0.92", features = [ @@ -42,6 +43,8 @@ libfuzzer-sys = "0.4" nix = { version = "0.28", features = ["process", "signal"], optional = true } partition = { workspace = true } paste.workspace = true +prometheus.workspace = true +prost.workspace = true rand = { workspace = true } rand_chacha = "0.3.1" reqwest = { workspace = true } @@ -49,6 +52,7 @@ schemars = "0.8" serde = { workspace = true } serde_json = { workspace = true } serde_yaml = "0.9" +servers.workspace = true snafu = { workspace = true } sql = { workspace = true } sqlparser.workspace = true @@ -87,6 +91,13 @@ test = false bench = false doc = false +[[bin]] +name = "fuzz_concurr_prom_remote_write" +path = "targets/fuzz_concurr_prom_remote_write.rs" +test = false +bench = false +doc = false + [[bin]] name = "fuzz_insert_logical_table" path = "targets/fuzz_insert_logical_table.rs" @@ -101,6 +112,13 @@ test = false bench = false doc = false +[[bin]] +name = "fuzz_concurr_alter_table" +path = "targets/fuzz_concurr_alter_table.rs" +test = false +bench = false +doc = false + [[bin]] name = "fuzz_alter_logical_table" path = "targets/ddl/fuzz_alter_logical_table.rs" diff --git a/tests-fuzz/src/error.rs b/tests-fuzz/src/error.rs index 10259099a30f..06829e227948 100644 --- a/tests-fuzz/src/error.rs +++ b/tests-fuzz/src/error.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_error::ext::BoxedError; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -111,4 +112,26 @@ pub enum Error { error: nix::Error, pid: Pid, }, + + #[snafu(display("External Error"))] + External { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + source: BoxedError, + }, + + #[snafu(display("Failed to send prometheus remote request"))] + SendPromRemoteRequest { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: reqwest::Error, + }, + + #[snafu(display("Connection with database aborted"))] + ConnAborted { + #[snafu(implicit)] + location: Location, + }, } diff --git a/tests-fuzz/src/ir.rs b/tests-fuzz/src/ir.rs index ae6edd595c85..3e6af87002f0 100644 --- a/tests-fuzz/src/ir.rs +++ b/tests-fuzz/src/ir.rs @@ -24,7 +24,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::Duration; -pub use alter_expr::{AlterTableExpr, AlterTableOption}; +pub use alter_expr::{AlterTableExpr, AlterTableOption, AlterTableOperation}; use common_time::timestamp::TimeUnit; use common_time::{Date, DateTime, Timestamp}; pub use create_expr::{CreateDatabaseExpr, CreateTableExpr}; diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index 8e5daef4eb1d..df4ab140ba5d 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -39,9 +39,11 @@ use crate::ir::Ident; /// Database connections pub struct Connections { pub mysql: Option>, + pub http: Option, } const GT_MYSQL_ADDR: &str = "GT_MYSQL_ADDR"; +const GT_HTTP_ADDR: &str = "GT_HTTP_ADDR"; /// Connects to GreptimeDB via env variables. pub async fn init_greptime_connections_via_env() -> Connections { @@ -53,11 +55,18 @@ pub async fn init_greptime_connections_via_env() -> Connections { None }; - init_greptime_connections(mysql).await + let http = if let Ok(addr) = env::var(GT_HTTP_ADDR) { + Some(addr) + } else { + info!("GT_HTTP_ADDR is empty, ignores test"); + None + }; + + init_greptime_connections(mysql, http).await } /// Connects to GreptimeDB. -pub async fn init_greptime_connections(mysql: Option) -> Connections { +pub async fn init_greptime_connections(mysql: Option, http: Option) -> Connections { let mysql = if let Some(addr) = mysql { let mut opts: MySqlConnectOptions = format!("mysql://{addr}/public").parse().unwrap(); opts.log_statements(LevelFilter::Off); @@ -66,7 +75,7 @@ pub async fn init_greptime_connections(mysql: Option) -> Connections { None }; - Connections { mysql } + Connections { mysql, http } } const GT_FUZZ_BINARY_PATH: &str = "GT_FUZZ_BINARY_PATH"; diff --git a/tests-fuzz/targets/ddl/fuzz_alter_logical_table.rs b/tests-fuzz/targets/ddl/fuzz_alter_logical_table.rs index 80f017a35397..1a0ec9dab399 100644 --- a/tests-fuzz/targets/ddl/fuzz_alter_logical_table.rs +++ b/tests-fuzz/targets/ddl/fuzz_alter_logical_table.rs @@ -238,7 +238,7 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), }; diff --git a/tests-fuzz/targets/ddl/fuzz_alter_table.rs b/tests-fuzz/targets/ddl/fuzz_alter_table.rs index 247d7632eeb5..c86c5b0e5cdc 100644 --- a/tests-fuzz/targets/ddl/fuzz_alter_table.rs +++ b/tests-fuzz/targets/ddl/fuzz_alter_table.rs @@ -241,7 +241,7 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), }; diff --git a/tests-fuzz/targets/ddl/fuzz_create_database.rs b/tests-fuzz/targets/ddl/fuzz_create_database.rs index 3d052bb3d9d3..00b239ce5114 100644 --- a/tests-fuzz/targets/ddl/fuzz_create_database.rs +++ b/tests-fuzz/targets/ddl/fuzz_create_database.rs @@ -95,7 +95,7 @@ async fn execute_create_database(ctx: FuzzContext, input: FuzzInput) -> Result<( fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), }; diff --git a/tests-fuzz/targets/ddl/fuzz_create_logical_table.rs b/tests-fuzz/targets/ddl/fuzz_create_logical_table.rs index 64ed26ba7e9a..d806767f9263 100644 --- a/tests-fuzz/targets/ddl/fuzz_create_logical_table.rs +++ b/tests-fuzz/targets/ddl/fuzz_create_logical_table.rs @@ -190,7 +190,7 @@ async fn execute_create_logic_table(ctx: FuzzContext, input: FuzzInput) -> Resul fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), }; diff --git a/tests-fuzz/targets/ddl/fuzz_create_table.rs b/tests-fuzz/targets/ddl/fuzz_create_table.rs index 6d03b0dffab9..d6725f0cf1d3 100644 --- a/tests-fuzz/targets/ddl/fuzz_create_table.rs +++ b/tests-fuzz/targets/ddl/fuzz_create_table.rs @@ -121,7 +121,7 @@ async fn execute_create_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), }; diff --git a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs index 147c3ead1e41..83531a5b8e6c 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs @@ -292,7 +292,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), kube: kube::client::Client::try_default() diff --git a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs index f456550b3fb9..308e3bd42183 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs @@ -346,7 +346,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), kube: kube::client::Client::try_default() diff --git a/tests-fuzz/targets/fuzz_insert.rs b/tests-fuzz/targets/fuzz_insert.rs index 739d6af7a386..bfef7629cadb 100644 --- a/tests-fuzz/targets/fuzz_insert.rs +++ b/tests-fuzz/targets/fuzz_insert.rs @@ -218,7 +218,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), }; diff --git a/tests-fuzz/targets/fuzz_insert_logical_table.rs b/tests-fuzz/targets/fuzz_insert_logical_table.rs index ee2035acd65a..ca4688cc56bf 100644 --- a/tests-fuzz/targets/fuzz_insert_logical_table.rs +++ b/tests-fuzz/targets/fuzz_insert_logical_table.rs @@ -297,7 +297,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), }; diff --git a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs index 5bcddea53abf..4a923f5621b2 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs @@ -445,7 +445,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), }; diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs index 12c4cdae49e1..c434f03cd9d8 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -356,7 +356,7 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result< fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); common_runtime::block_on_global(async { - let Connections { mysql } = init_greptime_connections_via_env().await; + let Connections { mysql, .. } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), }; From ff1b45776d91d116d2f0cef2121e679e36006c20 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 11 Nov 2024 11:14:21 +0800 Subject: [PATCH 2/9] tests: fuzz alter&prom write --- .../targets/fuzz_concurr_alter_table.rs | 222 +++++++++++ .../targets/fuzz_concurr_prom_remote_write.rs | 360 ++++++++++++++++++ 2 files changed, 582 insertions(+) create mode 100644 tests-fuzz/targets/fuzz_concurr_alter_table.rs create mode 100644 tests-fuzz/targets/fuzz_concurr_prom_remote_write.rs diff --git a/tests-fuzz/targets/fuzz_concurr_alter_table.rs b/tests-fuzz/targets/fuzz_concurr_alter_table.rs new file mode 100644 index 000000000000..626a9afe573f --- /dev/null +++ b/tests-fuzz/targets/fuzz_concurr_alter_table.rs @@ -0,0 +1,222 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![no_main] + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use arbitrary::{Arbitrary, Unstructured}; +use common_telemetry::info; +use libfuzzer_sys::fuzz_target; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaChaRng; +use snafu::ResultExt; +use sqlx::{MySql, Pool}; +use strum::EnumIter; +use tests_fuzz::context::{TableContext, TableContextRef}; +use tests_fuzz::error::{self, Result}; +use tests_fuzz::fake::{ + merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map, + MappedGenerator, WordGenerator, +}; +use tests_fuzz::generator::alter_expr::AlterExprAddColumnGeneratorBuilder; +use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder; +use tests_fuzz::generator::Generator; +use tests_fuzz::ir::{AlterTableExpr, AlterTableOperation, CreateTableExpr}; +use tests_fuzz::translator::mysql::alter_expr::AlterTableExprTranslator; +use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::DslTranslator; +use tests_fuzz::utils::{ + get_gt_fuzz_input_max_columns, init_greptime_connections_via_env, Connections, +}; +use tests_fuzz::validator; +use tokio::task::JoinSet; +struct FuzzContext { + greptime: Pool, +} + +impl FuzzContext { + async fn close(self) { + self.greptime.close().await; + } +} + +#[derive(Clone, Debug)] +struct FuzzInput { + seed: u64, + actions: usize, +} + +#[derive(Debug, EnumIter)] +enum AlterTableOption { + AddColumn, + DropColumn, + RenameTable, + ModifyDataType, +} + +fn generate_create_table_expr(rng: &mut R) -> Result { + let max_columns = get_gt_fuzz_input_max_columns(); + let columns = rng.gen_range(2..max_columns); + let mut with_clause = HashMap::new(); + if rng.gen_bool(0.5) { + with_clause.insert("append_mode".to_string(), "true".to_string()); + } + let create_table_generator = CreateTableExprGeneratorBuilder::default() + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .columns(columns) + .engine("mito") + .with_clause(with_clause) + .build() + .unwrap(); + create_table_generator.generate(rng) +} + +fn generate_alter_table_add_column_expr( + table_ctx: TableContextRef, + rng: &mut R, +) -> Result { + let location = rng.gen_bool(0.5); + let expr_generator = AlterExprAddColumnGeneratorBuilder::default() + // random capitalize column name + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .table_ctx(table_ctx) + .location(location) + .build() + .unwrap(); + expr_generator.generate(rng) +} + +impl Arbitrary<'_> for FuzzInput { + fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { + let seed = u.int_in_range(u64::MIN..=u64::MAX)?; + let mut rng = ChaChaRng::seed_from_u64(seed); + let actions = rng.gen_range(1..256); + + Ok(FuzzInput { seed, actions }) + } +} + +/// Parallel alter table fuzz target +async fn unstable_execute_parallel_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { + info!("input: {input:?}"); + let mut rng = ChaChaRng::seed_from_u64(input.seed); + + // Create table + let expr = generate_create_table_expr(&mut rng).unwrap(); + let translator = CreateTableExprTranslator; + let sql = translator.translate(&expr)?; + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Create table: {sql}, result: {result:?}"); + + // Parallel add columns at once, and validate columns only after all columns are added + let table_ctx = Arc::new(TableContext::from(&expr)); + let mut after_all_alter_table_ctx = table_ctx.clone(); + let mut pool: JoinSet> = JoinSet::new(); + + let mut used_column_names = HashSet::new(); + let mut action = 0; + while action < input.actions { + let expr = generate_alter_table_add_column_expr(table_ctx.clone(), &mut rng).unwrap(); + if let AlterTableOperation::AddColumn { column, .. } = &expr.alter_options { + if used_column_names.contains(&column.name) { + info!("Column name already used: {}, retrying", column.name.value); + continue; + } + used_column_names.insert(column.name.clone()); + action += 1; + } + let translator = AlterTableExprTranslator; + let sql = translator.translate(&expr)?; + if let AlterTableOperation::AddColumn { .. } = expr.alter_options { + // Applies changes + after_all_alter_table_ctx = Arc::new( + Arc::unwrap_or_clone(after_all_alter_table_ctx) + .alter(expr.clone()) + .unwrap(), + ); + let conn = ctx.greptime.clone(); + pool.spawn(async move { + let result = sqlx::query(&sql) + .execute(&conn) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Alter table: {sql}, result: {result:?}"); + Ok(()) + }); + continue; + } + } + + // run concurrently + let output = pool.join_all().await; + for (idx, err) in output.iter().enumerate().filter_map(|(i, u)| match u { + Ok(_) => None, + Err(e) => Some((i, e)), + }) { + // found first error and return + // but also print all errors for debugging + common_telemetry::error!("Error at parallel task {}: {err:?}", idx); + } + + // return first error if exist + if let Some(err) = output.into_iter().filter_map(|u| u.err()).next() { + return Err(err); + } + + let table_ctx = after_all_alter_table_ctx; + // Validates columns + let mut column_entries = + validator::column::fetch_columns(&ctx.greptime, "public".into(), table_ctx.name.clone()) + .await?; + column_entries.sort_by(|a, b| a.column_name.cmp(&b.column_name)); + let mut columns = table_ctx.columns.clone(); + columns.sort_by(|a, b| a.name.value.cmp(&b.name.value)); + validator::column::assert_eq(&column_entries, &columns)?; + + // Cleans up + let table_name = table_ctx.name.clone(); + let sql = format!("DROP TABLE {}", table_name); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql })?; + info!("Drop table: {}, result: {result:?}", table_name); + ctx.close().await; + + Ok(()) +} + +fuzz_target!(|input: FuzzInput| { + common_telemetry::init_default_ut_logging(); + common_runtime::block_on_global(async { + let Connections { mysql, .. } = init_greptime_connections_via_env().await; + let ctx = FuzzContext { + greptime: mysql.expect("mysql connection init must be succeed"), + }; + unstable_execute_parallel_alter_table(ctx, input) + .await + .unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}")); + }) +}); diff --git a/tests-fuzz/targets/fuzz_concurr_prom_remote_write.rs b/tests-fuzz/targets/fuzz_concurr_prom_remote_write.rs new file mode 100644 index 000000000000..5bef7bdc1eed --- /dev/null +++ b/tests-fuzz/targets/fuzz_concurr_prom_remote_write.rs @@ -0,0 +1,360 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Fuzz testing for auto create table and potentially concurrent alter table caused by auto create table. +//! +//! Example Command to run this fuzz test: +//! ```bash +//! GT_MYSQL_ADDR=localhost:4002 GT_HTTP_ADDR=localhost:4000 cargo fuzz run fuzz_concurr_prom_remote_write --fuzz-dir tests-fuzz -D -s none +//! ``` +#![no_main] + +use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::hash::Hasher; + +use common_error::ext::BoxedError; +use common_telemetry::info; +use greptime_proto::prometheus::remote::{self, Label, Sample, TimeSeries}; +use libfuzzer_sys::arbitrary::{Arbitrary, Unstructured}; +use libfuzzer_sys::fuzz_target; +use prost::Message; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaChaRng; +use servers::prom_store::snappy_compress; +use snafu::ResultExt; +use sqlx::{MySql, Pool}; +use tests_fuzz::error::{self, ExternalSnafu, Result, SendPromRemoteRequestSnafu}; +use tests_fuzz::fake::{ + random_capitalize_map, uppercase_and_keyword_backtick_map, MappedGenerator, WordGenerator, +}; +use tests_fuzz::generator::Random; +use tests_fuzz::ir::Ident; +use tests_fuzz::utils::{init_greptime_connections_via_env, Connections}; +use tokio::task::JoinSet; +use tokio::time::Instant; + +struct FuzzContext { + greptime: Pool, +} + +impl FuzzContext { + async fn close(self) { + self.greptime.close().await; + } +} + +#[derive(Copy, Clone, Debug)] +struct FuzzInput { + seed: u64, + concurr_writes: usize, + tables: usize, + labels: usize, + always_choose_old_table: bool, +} + +impl Arbitrary<'_> for FuzzInput { + fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { + let seed = u.int_in_range(u64::MIN..=u64::MAX)?; + let mut rng = ChaChaRng::seed_from_u64(seed); + Ok(FuzzInput { + concurr_writes: rng.gen_range(2..256), + // one table is not enough for this concurrent test + tables: 2, + labels: rng.gen_range(0..4), + seed, + always_choose_old_table: true, + }) + } +} + +/// Generate prometheus remote write requests +fn generate_prom_metrics_write_reqs( + input: FuzzInput, + rng: &mut R, + timestamp: &mut i64, + table_used_col_names: &mut BTreeMap>, +) -> Result { + let name_generator = Box::new(MappedGenerator::new( + WordGenerator, + random_capitalize_map::, + )); + + // advance timestamp by a random number + // *timestamp += rng.gen_range(0..256); + + let mut timeseries = Vec::new(); + + for _ in 0..input.tables { + let len = table_used_col_names.len(); + // more tables means more chance to select an existing table + let table_name = { + let prob = 1.0 - (-(len as f64)).exp(); + let choose_old = rng.gen_bool(prob) || input.always_choose_old_table; + let use_old = choose_old && table_used_col_names.len() >= input.tables; + + if use_old { + // randomly select a table name from the table_used_col_names + let idx = rng.gen_range(0..table_used_col_names.len()); + let table_name = table_used_col_names.keys().nth(idx).unwrap().clone(); + + uppercase_and_keyword_backtick_map(rng, Ident::new(table_name)) + } else { + let name = name_generator.gen(rng); + uppercase_and_keyword_backtick_map(rng, name) + } + }; + // TODO: support more metric types&labels + /*let mf_type = match rng.gen_range(0..3) { + 0 => MetricType::COUNTER, + 1 => MetricType::GAUGE, + 2 => MetricType::HISTOGRAM, + _ => MetricType::SUMMARY, + };*/ + + let mut random_labels: Vec