diff --git a/defaults/manager_restore_benchmark_snapshots.yaml b/defaults/manager_restore_benchmark_snapshots.yaml index 430ca04e01..1d61529829 100644 --- a/defaults/manager_restore_benchmark_snapshots.yaml +++ b/defaults/manager_restore_benchmark_snapshots.yaml @@ -1,110 +1,164 @@ bucket: "manager-backup-tests-permanent-snapshots-us-east-1" -cs_read_cmd_template: "cassandra-stress read cl=ONE n={num_of_rows} -schema 'keyspace={keyspace_name} replication(strategy=NetworkTopologyStrategy,replication_factor=3)' -mode cql3 native -rate threads=500 -col 'size=FIXED(1024) n=FIXED(1)' -pop seq={sequence_start}..{sequence_end}" +cs_read_cmd_template: "cassandra-stress read cl={cl} n={num_of_rows} -schema 'keyspace={keyspace_name} replication(strategy={replication},replication_factor={rf}) compaction(strategy={compaction})' -mode cql3 native -rate threads=500 -col 'size=FIXED({col_size}) n=FIXED({col_n})' -pop seq={sequence_start}..{sequence_end}" sizes: # size of backed up dataset in GB 1gb_1t_ics: tag: "sm_20240812100424UTC" - schema: - keyspace1: - - standard1: 1 - number_of_rows: 1073760 exp_timeout: 1200 # 20 minutes (timeout for restore data operation) scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 1 + num_of_rows: 1073760 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 500gb_1t_ics: tag: "sm_20240813112034UTC" - schema: - keyspace1: - - standard1: 500 - number_of_rows: 524288000 exp_timeout: 14400 # 4 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 500 + num_of_rows: 524288000 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 500gb_1t_ics_tablets: tag: "sm_20240813114617UTC" - schema: - keyspace1: - - standard1: 500 - number_of_rows: 524288000 exp_timeout: 14400 # 4 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 500 + num_of_rows: 524288000 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 500gb_2t_ics: tag: "sm_20240819203428UTC" - schema: - keyspace1: - - standard1: 250 - keyspace2: - - standard1: 250 - number_of_rows: 524288000 exp_timeout: 14400 # 4 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: true + dataset: + schema: + keyspace1: + - standard1: 250 + keyspace2: + - standard1: 250 + num_of_rows: 524288000 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 1tb_1t_ics: tag: "sm_20240814180009UTC" - schema: - keyspace1: - - standard1: 1024 - number_of_rows: 1073741824 exp_timeout: 28800 # 8 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 1024 + num_of_rows: 1073741824 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 1tb_4t_twcs: tag: "sm_20240821145503UTC" - schema: - keyspace1: - - t_10gb: 10 - - t_90gb: 90 - - t_300gb: 300 - - t_600gb: 600 - number_of_rows: 428571429 exp_timeout: 28800 # 8 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "TimeWindowCompactionStrategy" prohibit_verification_read: true + dataset: + schema: + keyspace1: + - t_10gb: 10 + - t_90gb: 90 + - t_300gb: 300 + - t_600gb: 600 + num_of_rows: 428571429 + compaction: "TimeWindowCompactionStrategy" + cl: + col_size: + col_n: + replication: "NetworkTopologyStrategy" + rf: 3 1tb_2t_twcs: tag: "sm_20240827191125UTC" - schema: - keyspace1: - - t_300gb: 300 - - t_700gb: 700 - number_of_rows: 428571429 exp_timeout: 28800 # 8 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 9 - compaction_strategy: "TimeWindowCompactionStrategy" prohibit_verification_read: true + dataset: + schema: + keyspace1: + - t_300gb: 300 + - t_700gb: 700 + num_of_rows: 428571429 + compaction: "TimeWindowCompactionStrategy" + cl: + col_size: + col_n: + replication: "NetworkTopologyStrategy" + rf: 3 1.5tb_2t_ics: tag: "sm_20240820180152UTC" - schema: - keyspace1: - - standard1: 500 - keyspace2: - - standard1: 1024 - number_of_rows: 1598029824 exp_timeout: 43200 # 12 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: true + dataset: + schema: + keyspace1: + - standard1: 500 + keyspace2: + - standard1: 1024 + num_of_rows: 1598029824 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 2tb_1t_ics: tag: "sm_20240816185129UTC" - schema: - keyspace1: - - standard1: 2048 - number_of_rows: 2147483648 exp_timeout: 57600 # 16 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 2048 + num_of_rows: 2147483648 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 diff --git a/defaults/test_default.yaml b/defaults/test_default.yaml index 1b6a562907..b20692484c 100644 --- a/defaults/test_default.yaml +++ b/defaults/test_default.yaml @@ -290,3 +290,20 @@ latency_decorator_error_thresholds: fixed_limit: 10 P99 read: fixed_limit: 10 + +mgmt_snapshots_preparer_params: + # Such a configuration is used by Manager backup snapshots prepare test + cs_cmd_template: "cassandra-stress {operation} cl={cl} n={num_of_rows} -schema 'keyspace={ks_name} replication(strategy={replication},replication_factor={rf}) compaction(strategy={compaction})' -mode cql3 native -rate threads={threads_num} -col 'size=FIXED({col_size}) n=FIXED({col_n})' -pop seq={sequence_start}..{sequence_end}" + operation: "write" + cl: "QUORUM" + replication: "NetworkTopologyStrategy" + rf: 3 + compaction: "IncrementalCompactionStrategy" + threads_num: 500 + col_size: 1024 + col_n: 1 + # Defined in a runtime based on backup size, number of loaders, scylla version, etc + ks_name: '' + num_of_rows: '' + sequence_start: '' + sequence_end: '' diff --git a/mgmt_cli_test.py b/mgmt_cli_test.py index 8b4f8964c1..91cbeffb38 100644 --- a/mgmt_cli_test.py +++ b/mgmt_cli_test.py @@ -34,13 +34,13 @@ from argus.client.generic_result import Status from sdcm import mgmt -from sdcm.argus_results import send_manager_benchmark_results_to_argus, submit_results_to_argus, \ - ManagerBackupReadResult, ManagerBackupBenchmarkResult +from sdcm.argus_results import (send_manager_benchmark_results_to_argus, send_manager_snapshot_details_to_argus, + submit_results_to_argus, ManagerBackupReadResult, ManagerBackupBenchmarkResult) from sdcm.mgmt import ScyllaManagerError, TaskStatus, HostStatus, HostSsl, HostRestStatus from sdcm.mgmt.cli import ScyllaManagerTool, RestoreTask from sdcm.mgmt.common import reconfigure_scylla_manager, get_persistent_snapshots from sdcm.provision.helpers.certificate import TLSAssets -from sdcm.remote import shell_script_cmd +from sdcm.remote import shell_script_cmd, LOCALRUNNER from sdcm.tester import ClusterTester from sdcm.cluster import TestConfig from sdcm.nemesis import MgmtRepair @@ -70,21 +70,21 @@ class SnapshotData: - bucket: S3 bucket name - tag: snapshot tag, for example 'sm_20240816185129UTC' - exp_timeout: expected timeout for the restore operation + - dataset: dict with snapshot dataset details such as cl, replication, schema, etc. - keyspaces: list of keyspaces presented in backup - cs_read_cmd_template: cassandra-stress read command template - prohibit_verification_read: if True, the verification read will be prohibited. Most likely, such a backup was created via c-s user profile. - - number_of_rows: number of data rows written in the DB - node_ids: list of node ids where backup was created """ bucket: str tag: str exp_timeout: int + dataset: dict[str, str | int | dict] keyspaces: list[str] ks_tables_map: dict[str, list[str]] cs_read_cmd_template: str prohibit_verification_read: bool - number_of_rows: int node_ids: list[str] @@ -257,19 +257,12 @@ def run_verification_read_stress(self, ks_names=None): stress_run_time = datetime.now() - stress_start_time InfoEvent(message=f'The read stress run was completed. Total run time: {stress_run_time}').publish() - def prepare_run_and_verify_stress_in_threads(self, cmd_template: str, keyspace_name: str, num_of_rows: int, - stop_on_failure: bool = False) -> None: - """Prepares C-S commands, runs them in threads and verifies their execution results. + def run_and_verify_stress_in_threads(self, cs_cmds: list[str], stop_on_failure: bool = False) -> None: + """Runs C-S commands in threads and verifies their execution results. Stress operation can be either read or write, depending on the cmd_template. """ stress_queue = [] - num_of_loaders = self.params.get("n_loaders") - rows_per_loader = int(num_of_rows / num_of_loaders) - for loader_index in range(num_of_loaders): - stress_cmd = cmd_template.format(num_of_rows=rows_per_loader, - keyspace_name=keyspace_name, - sequence_start=rows_per_loader * loader_index + 1, - sequence_end=rows_per_loader * (loader_index + 1),) + for stress_cmd in cs_cmds: _thread = self.run_stress_thread(stress_cmd=stress_cmd, round_robin=True, stop_test_on_failure=stop_on_failure) stress_queue.append(_thread) @@ -277,26 +270,6 @@ def prepare_run_and_verify_stress_in_threads(self, cmd_template: str, keyspace_n for _thread in stress_queue: assert self.verify_stress_thread(cs_thread_pool=_thread), "Stress thread verification failed" - @staticmethod - def extract_compaction_strategy_from_cs_cmd(cs_cmd: str, lower: bool = True, remove_postfix: bool = True) -> str: - """Extracts the compaction strategy from the cassandra-stress command. - - :param cs_cmd: cassandra-stress command - :param lower: if True, the resulting string will be lowercased - :param remove_postfix: if True, the resulting string will have the "CompactionStrategy" postfix removed - """ - match = re.search(r"compaction\(strategy=([^)]+)\)", cs_cmd) - if match: - strategy = match.group(1) - if remove_postfix: - strategy = re.sub(r"CompactionStrategy$", "", strategy) - if lower: - strategy = strategy.lower() - else: - raise ValueError("Compaction strategy not found in cs_cmd.") - - return strategy - class ClusterOperations(ClusterTester): CLUSTER_NAME = "mgr_cluster1" @@ -391,6 +364,43 @@ def download_from_azure(self, node, source, destination): source = f"{source.replace('azure://', self.backup_azure_blob_service)}{self.backup_azure_blob_sas}" node.remoter.sudo(f"azcopy copy '{source}' '{destination}'") + @staticmethod + def create_s3_bucket(name: str, region: str) -> None: + LOCALRUNNER.run(f"aws s3 mb s3://{name} --region {region}") + + @staticmethod + def create_gs_bucket(name: str, region: str) -> None: + LOCALRUNNER.run(f"gsutil mb -l {region} gs://{name}") + + @staticmethod + def sync_s3_buckets(source: str, destination: str, acl: str = 'bucket-owner-full-control') -> None: + LOCALRUNNER.run(f"aws s3 sync s3://{source} s3://{destination} --acl {acl}") + + @staticmethod + def sync_gs_buckets(source: str, destination: str) -> None: + LOCALRUNNER.run(f"gsutil -m rsync -r gs://{source} gs://{destination}") + + def copy_backup_snapshot_bucket(self, source: str, destination: str) -> None: + """Copy bucket with Manager backup snapshots. + The process consists of two stages - new bucket creation and data sync (original bucket -> newly created). + The main use case is to make a copy of a bucket created in a test with Cloud (siren) cluster since siren + deletes the bucket together with cluster. Thus, if there is a goal to reuse backup snapshot of such cluster + afterward, it should be copied to a new bucket. + + Only AWS and GCE backends are supported. + """ + cluster_backend = self.params.get("cluster_backend") + region = next(iter(self.params.region_names), '') + + if cluster_backend == "aws": + self.create_s3_bucket(name=destination, region=region) + self.sync_s3_buckets(source=source, destination=destination) + elif cluster_backend == "gce": + self.create_gs_bucket(name=destination, region=region) + self.sync_gs_buckets(source=source, destination=destination) + else: + raise ValueError(f"Unsupported cluster backend - {cluster_backend}, should be either aws or gce") + class SnapshotOperations(ClusterTester): @@ -406,7 +416,7 @@ def get_snapshot_data(snapshot_name: str) -> SnapshotData: raise ValueError(f"Snapshot data for size '{snapshot_name}'GB was not found in the {snapshots_config} file") ks_tables_map = {} - for ks, ts in snapshot_dict["schema"].items(): + for ks, ts in snapshot_dict["dataset"]["schema"].items(): t_names = [list(t.keys())[0] for t in ts] ks_tables_map[ks] = t_names @@ -414,11 +424,11 @@ def get_snapshot_data(snapshot_name: str) -> SnapshotData: bucket=all_snapshots_dict["bucket"], tag=snapshot_dict["tag"], exp_timeout=snapshot_dict["exp_timeout"], - keyspaces=list(snapshot_dict["schema"].keys()), + dataset=snapshot_dict["dataset"], + keyspaces=list(snapshot_dict["dataset"]["schema"].keys()), ks_tables_map=ks_tables_map, cs_read_cmd_template=all_snapshots_dict["cs_read_cmd_template"], prohibit_verification_read=snapshot_dict["prohibit_verification_read"], - number_of_rows=snapshot_dict["number_of_rows"], node_ids=snapshot_dict.get("node_ids"), ) return snapshot_data @@ -471,12 +481,118 @@ def get_all_snapshot_files(self, cluster_id): raise ValueError(f'"{self.params.get("backup_bucket_backend")}" not supported') +class SnapshotPreparerOperations(ClusterTester): + ks_name_template = "{size}gb_{compaction}_{cl}_{col_size}_{col_n}_{scylla_version}" + + @staticmethod + def _abbreviate_compaction_strategy_name(compaction_strategy: str) -> str: + """Abbreviate and lower compaction strategy name which comes from c-s cmd to make it more readable in ks name. + + For example, LeveledCompactionStrategy -> lcs or SizeTieredCompactionStrategy -> stcs. + """ + return ''.join(char for char in compaction_strategy if char.isupper()).lower() + + def _build_ks_name(self, backup_size: int, cs_cmd_params: dict) -> str: + """Build the keyspace name based on the backup size and the parameters used in the c-s command. + The name should include all the parameters important for c-s read verification and can be used to + recreate such a command based on ks_name. + """ + _scylla_version = re.sub(r"[.]", "_", self.params.get_version_based_on_conf()[0].split("-")[0]) + ks_name = self.ks_name_template.format( + size=backup_size, + compaction=self._abbreviate_compaction_strategy_name(cs_cmd_params.get("compaction")), + cl=cs_cmd_params.get("cl").lower(), + col_size=cs_cmd_params.get("col_size"), + col_n=cs_cmd_params.get("col_n"), + scylla_version=_scylla_version, + ) + return ks_name + + def calculate_rows_per_loader(self, overall_rows_num: int) -> int: + """Calculate number of rows per loader thread based on the overall number of rows and the number of loaders.""" + num_of_loaders = int(self.params.get("n_loaders")) + if overall_rows_num % num_of_loaders: + raise ValueError(f"Overall rows number ({overall_rows_num}) should be divisible by the number of loaders") + return int(overall_rows_num / num_of_loaders) + + def build_snapshot_preparer_cs_write_cmd(self, backup_size: int) -> tuple[str, list[str]]: + """Build the c-s command from 'mgmt_snapshots_preparer_params' parameters based on backup size. + + Extra params complete the missing part of command template. + Among them are keyspace_name, num_of_rows, sequence_start and sequence_end. + + Returns: + - ks_name: keyspace name + - cs_cmds: list of c-s commands to be executed + """ + overall_num_of_rows = backup_size * 1024 * 1024 # Considering 1 row = 1Kb + rows_per_loader = self.calculate_rows_per_loader(overall_num_of_rows) + + preparer_params = self.params.get("mgmt_snapshots_preparer_params") + + cs_cmd_template = preparer_params.get("cs_cmd_template") + cs_cmd_params = {key: value for key, value in preparer_params.items() if key != "cs_cmd_template"} + extra_params = {"num_of_rows": rows_per_loader} + + params_to_use_in_cs_cmd = {**cs_cmd_params, **extra_params} + + ks_name = self._build_ks_name(backup_size, params_to_use_in_cs_cmd) + params_to_use_in_cs_cmd["ks_name"] = ks_name + + cs_cmds = [] + num_of_loaders = int(self.params.get("n_loaders")) + for loader_index in range(num_of_loaders): + # Sequence params should be defined for every loader thread separately since vary for each thread + params_to_use_in_cs_cmd["sequence_start"] = rows_per_loader * loader_index + 1 + params_to_use_in_cs_cmd["sequence_end"] = rows_per_loader * (loader_index + 1) + + cs_cmd = cs_cmd_template.format(**params_to_use_in_cs_cmd) + cs_cmds.append(cs_cmd) + + return ks_name, cs_cmds + + def build_cs_read_cmd_from_snapshot_details(self, snapshot: SnapshotData) -> list[str]: + """Define a list of cassandra-stress read commands from snapshot (dataset) details. + + C-S read command template and snapshot details are defined in defaults/manager_restore_benchmark_snapshots.yaml. + Number of commands is equal to the number of loaders defined in the test parameters. + """ + dataset = snapshot.dataset + + rows_per_loader = self.calculate_rows_per_loader(overall_rows_num=dataset["num_of_rows"]) + num_of_loaders = int(self.params.get("n_loaders")) + + cs_cmds = [] + cs_cmd_template = snapshot.cs_read_cmd_template + + for loader_index in range(num_of_loaders): + sequence_start = rows_per_loader * loader_index + 1 + sequence_end = rows_per_loader * (loader_index + 1) + + cs_cmd = cs_cmd_template.format( + cl=dataset["cl"], + num_of_rows=rows_per_loader, + keyspace_name=snapshot.keyspaces[0], + replication=dataset["replication"], + rf=dataset["rf"], + compaction=dataset["compaction"], + col_size=dataset["col_size"], + col_n=dataset["col_n"], + sequence_start=sequence_start, + sequence_end=sequence_end, + ) + cs_cmds.append(cs_cmd) + + return cs_cmds + + class ManagerTestFunctionsMixIn( DatabaseOperations, StressLoadOperations, ClusterOperations, BucketOperations, SnapshotOperations, + SnapshotPreparerOperations, ): test_config = TestConfig() manager_test_metrics = ManagerTestMetrics() @@ -1282,41 +1398,53 @@ def test_prepare_backup_snapshot(self): 2. Run backup and wait for it to finish. 3. Log snapshot details into console. """ + is_cloud_manager = self.params.get("use_cloud_manager") + self.log.info("Populate the cluster with data") backup_size = self.params.get("mgmt_prepare_snapshot_size") # in Gb assert backup_size and backup_size >= 1, "Backup size must be at least 1Gb" - backend = self.params.get("cluster_backend") - cs_read_cmd_template = get_persistent_snapshots()[backend]["confirmation_stress_template"] - cs_write_cmd_template = cs_read_cmd_template.replace(" read ", " write ") - - compaction = self.extract_compaction_strategy_from_cs_cmd(cs_write_cmd_template) - scylla_version = re.sub(r"[-.]", "_", self.params.get("scylla_version")) - keyspace_name = f"{backup_size}gb_{compaction}_{scylla_version}" - - self.prepare_run_and_verify_stress_in_threads( - cmd_template=cs_write_cmd_template, - keyspace_name=keyspace_name, - num_of_rows=backup_size * 1024 * 1024, # Considering 1 row = 1Kb - stop_on_failure=True, - ) + ks_name, cs_write_cmds = self.build_snapshot_preparer_cs_write_cmd(backup_size) + self.run_and_verify_stress_in_threads(cs_cmds=cs_write_cmds, stop_on_failure=True) self.log.info("Initialize Scylla Manager") - manager_tool = mgmt.get_scylla_manager_tool(manager_node=self.monitors.nodes[0]) - mgr_cluster = self.ensure_and_get_cluster(manager_tool) + mgr_cluster = self.db_cluster.get_cluster_manager() + + self.log.info("Define backup location") + if is_cloud_manager: + # Extract location from automatically scheduled backup task + auto_backup_task = mgr_cluster.backup_task_list[0] + location_list = [auto_backup_task.get_task_info_dict()["location"]] + else: + location_list = self.locations self.log.info("Run backup and wait for it to finish") - backup_task = mgr_cluster.create_backup_task(location_list=self.locations, rate_limit_list=["0"]) + backup_task = mgr_cluster.create_backup_task(location_list=location_list, rate_limit_list=["0"]) backup_task_status = backup_task.wait_and_get_final_status(timeout=200000) assert backup_task_status == TaskStatus.DONE, \ f"Backup task ended in {backup_task_status} instead of {TaskStatus.DONE}" - self.log.info("Log snapshot details") - self.log.info( - f"Snapshot tag: {backup_task.get_snapshot_tag()}\n" - f"Keyspace name: {keyspace_name}\n" - f"Bucket: {self.locations}\n" - f"Cluster id: {mgr_cluster.id}\n" + if is_cloud_manager: + self.log.info("Copy bucket with snapshot since the original bucket is deleted together with cluster") + # from ["'AWS_US_EAST_1:s3:scylla-cloud-backup-8072-7216-v5dn53'"] to scylla-cloud-backup-8072-7216-v5dn53 + original_bucket_name = location_list[0].split(":")[-1].rstrip("'") + bucket_name = original_bucket_name + "-manager-tests" + self.copy_backup_snapshot_bucket(source=original_bucket_name, destination=bucket_name) + else: + bucket_name = location_list[0] + + self.log.info("Send snapshot details to Argus") + snapshot_details = { + "tag": backup_task.get_snapshot_tag(), + "size": backup_size, + "bucket": bucket_name, + "ks_name": ks_name, + "scylla_version": self.params.get_version_based_on_conf()[0], + "cluster_id": mgr_cluster.id, + } + send_manager_snapshot_details_to_argus( + argus_client=self.test_config.argus_client(), + snapshot_details=snapshot_details, ) @@ -1599,9 +1727,8 @@ def test_restore_from_precreated_backup(self, snapshot_name: str, restore_outsid if not (self.params.get('mgmt_skip_post_restore_stress_read') or snapshot_data.prohibit_verification_read): self.log.info("Running verification read stress") - self.prepare_run_and_verify_stress_in_threads(cmd_template=snapshot_data.cs_read_cmd_template, - keyspace_name=snapshot_data.keyspaces[0], - num_of_rows=snapshot_data.number_of_rows) + cs_verify_cmds = self.build_cs_read_cmd_from_snapshot_details(snapshot_data) + self.run_and_verify_stress_in_threads(cs_cmds=cs_verify_cmds) else: self.log.info("Skipping verification read stress because of the test or snapshot configuration") diff --git a/sdcm/argus_results.py b/sdcm/argus_results.py index d3e0ee3b9f..88bf3ceac7 100644 --- a/sdcm/argus_results.py +++ b/sdcm/argus_results.py @@ -135,6 +135,20 @@ class Meta: ] +class ManagerSnapshotDetails(GenericResultTable): + class Meta: + name = "Snapshot details" + description = "Manager snapshots (pre-created for next utilization in restore tests) details" + Columns = [ + ColumnMetadata(name="tag", unit="", type=ResultType.TEXT), + ColumnMetadata(name="size", unit="GB", type=ResultType.INTEGER), + ColumnMetadata(name="bucket", unit="", type=ResultType.TEXT), + ColumnMetadata(name="ks_name", unit="", type=ResultType.TEXT), + ColumnMetadata(name="cluster_id", unit="", type=ResultType.TEXT), + ColumnMetadata(name="scylla_version", unit="", type=ResultType.TEXT), + ] + + workload_to_table = { "mixed": LatencyCalculatorMixedResult, "write": LatencyCalculatorWriteResult, @@ -257,3 +271,10 @@ def send_manager_benchmark_results_to_argus(argus_client: ArgusClient, result: d for key, value in result.items(): result_table.add_result(column=key, row=row_name, value=value, status=Status.UNSET) submit_results_to_argus(argus_client, result_table) + + +def send_manager_snapshot_details_to_argus(argus_client: ArgusClient, snapshot_details: dict) -> None: + result_table = ManagerSnapshotDetails() + for key, value in snapshot_details.items(): + result_table.add_result(column=key, row="#1", value=value, status=Status.UNSET) + submit_results_to_argus(argus_client, result_table) diff --git a/sdcm/sct_config.py b/sdcm/sct_config.py index 02d0b60bef..a3eb95c19c 100644 --- a/sdcm/sct_config.py +++ b/sdcm/sct_config.py @@ -1202,7 +1202,11 @@ class SCTConfiguration(dict): dict(name="mgmt_prepare_snapshot_size", env="SCT_MGMT_PREPARE_SNAPSHOT_SIZE", type=int, - help="Size of backup snapshot in Gb to be prepared to be prepared for backup"), + help="Size of backup snapshot in Gb to be prepared for backup"), + + dict(name="mgmt_snapshots_preparer_params", + env="SCT_MGMT_SNAPSHOTS_PREPARER_PARAMS", type=dict_or_str, + help="Custom parameters of c-s write operation used in snapshots preparer"), # PerformanceRegressionTest diff --git a/vars/managerPipeline.groovy b/vars/managerPipeline.groovy index 629b57653d..2677dabb41 100644 --- a/vars/managerPipeline.groovy +++ b/vars/managerPipeline.groovy @@ -170,11 +170,12 @@ def call(Map pipelineParams) { name: 'requested_by_user') text(defaultValue: "${pipelineParams.get('extra_environment_variables', '')}", description: ( - 'Extra environment variables to be set in the test environment, uses the java Properties File Format.\n' + - 'Example:\n' + - '\tSCT_MGMT_RESTORE_EXTRA_PARAMS=--batch-size 50 --parallel 0\n' + - '\tSCT_N_DB_NODES=6' + - '\tSCT_MGMT_SKIP_POST_RESTORE_STRESS_READ=true\n' + """Extra environment variables to be set in the test environment, uses the java Properties File Format. + Example: + SCT_MGMT_RESTORE_EXTRA_PARAMS=--batch-size 50 --parallel 0 + SCT_N_DB_NODES=6 + SCT_MGMT_SKIP_POST_RESTORE_STRESS_READ=true + SCT_MGMT_SNAPSHOTS_PREPARER_PARAMS={'compaction': 'SizeTieredCompactionStrategy', 'rf': 3}""" ), name: 'extra_environment_variables') }