From 288103743c44b24961ec50345aac0de6e72edb73 Mon Sep 17 00:00:00 2001 From: Liapkovich Date: Fri, 10 Jan 2025 11:52:52 +0100 Subject: [PATCH 1/4] feature(manager): enhance backup snapshots preparation procedure New approach makes snapshots preparation process is more flexible, allowing to vary the whole scope of c-s write cmd parameters - full list of them is provided in manager_snapshots_preparer_config.yaml. Default values of config yaml parameters can be rewritten by passing SCT_MGMT_SNAPSHOTS_PREPARER_PARAMS parameter in Jenkins job. --- .../manager_snapshots_preparer_config.yaml | 23 ++++++ mgmt_cli_test.py | 80 +++++++++++++++++++ sdcm/sct_config.py | 7 +- vars/managerPipeline.groovy | 11 +-- 4 files changed, 115 insertions(+), 6 deletions(-) create mode 100644 defaults/manager_snapshots_preparer_config.yaml diff --git a/defaults/manager_snapshots_preparer_config.yaml b/defaults/manager_snapshots_preparer_config.yaml new file mode 100644 index 0000000000..6fcf7727f7 --- /dev/null +++ b/defaults/manager_snapshots_preparer_config.yaml @@ -0,0 +1,23 @@ +# Description: This file contains C-S command template and the default values of parameters used in this command. +# Parameter values (all expect cs_cmd_template) can be overwritten, otherwise, default values defined here will be used. +# Such a configuration is used by Manager backup snapshots prepare test. + +cs_cmd_template: "cassandra-stress {operation} cl={cl} n={num_of_rows} -schema 'keyspace={ks_name} replication(strategy={replication},replication_factor={rf}) compaction(strategy={compaction})' -mode cql3 native -rate threads={threads_num} -col 'size=FIXED({col_size}) n=FIXED({col_n})' -pop seq={sequence_start}..{sequence_end}" + +operation: "write" +cl: "QUORUM" + +replication: "NetworkTopologyStrategy" +rf: 3 +compaction: "IncrementalCompactionStrategy" + +threads_num: 500 + +col_size: 1024 +col_n: 1 + +# Defined in a runtime based on backup size, number of loaders, scylla version, etc +ks_name: '' +num_of_rows: '' +sequence_start: '' +sequence_end: '' diff --git a/mgmt_cli_test.py b/mgmt_cli_test.py index 8b4f8964c1..96da3d28be 100644 --- a/mgmt_cli_test.py +++ b/mgmt_cli_test.py @@ -471,12 +471,92 @@ def get_all_snapshot_files(self, cluster_id): raise ValueError(f'"{self.params.get("backup_bucket_backend")}" not supported') +class SnapshotPreparerOperations(ClusterTester): + ks_name_template = "{size}gb_{compaction}_{cl}_{col_size}_{col_n}_{scylla_version}" + + @staticmethod + def get_snapshot_preparer_config() -> dict: + """Get the snapshot preparer configuration from the defaults/manager_snapshots_preparer_config.yaml file""" + path_ = "defaults/manager_snapshots_preparer_config.yaml" + with open(path_, encoding="utf-8") as snapshots_yaml: + return yaml.safe_load(snapshots_yaml) + + @staticmethod + def _abbreviate_compaction_strategy_name(compaction_strategy: str) -> str: + """Abbreviate and lower compaction strategy name which comes from c-s cmd to make it more readable in ks name. + + For example, LeveledCompactionStrategy -> lcs or SizeTieredCompactionStrategy -> stcs. + """ + return ''.join(char for char in compaction_strategy if char.isupper()).lower() + + def _build_ks_name(self, backup_size: int, cs_cmd_params: dict) -> str: + """Build the keyspace name based on the backup size and the parameters used in the c-s command. + The name should include all the parameters important for c-s read verification and can be used to + recreate such a command based on ks_name. + """ + ks_name = self.ks_name_template.format( + size=backup_size, + compaction=self._abbreviate_compaction_strategy_name(cs_cmd_params.get("compaction")), + cl=cs_cmd_params.get("cl").lower(), + col_size=cs_cmd_params.get("col_size"), + col_n=cs_cmd_params.get("col_n"), + scylla_version=re.sub(r"[-.]", "_", self.params.get("scylla_version")), + ) + return ks_name + + def calculate_rows_per_loader(self, overall_rows_num: int) -> int: + """Calculate number of rows per loader thread based on the overall number of rows and the number of loaders.""" + num_of_loaders = int(self.params.get("n_loaders")) + return int(overall_rows_num / num_of_loaders) + + def build_snapshot_preparer_cs_write_cmd(self, backup_size: int) -> tuple[str, list[str]]: + """Build the c-s command from a template, default and overwrite parameters based on backup size. + Overwrite parameters are read from prepare_backup_params which should be set in Jenkins pipeline. + + If overwrite params are missing, default values from defaults/manager_snapshots_preparer_config.yaml are used. + + Extra params complete the missing part of command template. + Among them are keyspace_name, num_of_rows, sequence_start and sequence_end. + + Returns: + - ks_name: keyspace name + - cs_cmds: list of c-s commands to be executed + """ + overall_num_of_rows = backup_size * 1024 * 1024 # Considering 1 row = 1Kb + rows_per_loader = self.calculate_rows_per_loader(overall_num_of_rows) + + preparer_config = self.get_snapshot_preparer_config() + + default_params = {key: value for key, value in preparer_config.items() if key != "cs_cmd_template"} + overwrite_params = self.params.get("mgmt_snapshots_preparer_params") or {} + extra_params = {"num_of_rows": rows_per_loader} + # Defaults are overwritten by overwrite params if the latter are provided + params_to_use_in_cs_cmd = {**default_params, **overwrite_params, **extra_params} + + ks_name = self._build_ks_name(backup_size, params_to_use_in_cs_cmd) + params_to_use_in_cs_cmd["ks_name"] = ks_name + + cs_cmds = [] + num_of_loaders = int(self.params.get("n_loaders")) + for loader_index in range(num_of_loaders): + # Sequence params should be defined for every loader thread separately since vary for each thread + params_to_use_in_cs_cmd["sequence_start"] = rows_per_loader * loader_index + 1 + params_to_use_in_cs_cmd["sequence_end"] = rows_per_loader * (loader_index + 1) + + cs_cmd_template = preparer_config.get("cs_cmd_template") + cs_cmd = cs_cmd_template.format(**params_to_use_in_cs_cmd) + cs_cmds.append(cs_cmd) + + return ks_name, cs_cmds + + class ManagerTestFunctionsMixIn( DatabaseOperations, StressLoadOperations, ClusterOperations, BucketOperations, SnapshotOperations, + SnapshotPreparerOperations, ): test_config = TestConfig() manager_test_metrics = ManagerTestMetrics() diff --git a/sdcm/sct_config.py b/sdcm/sct_config.py index 02d0b60bef..11f3b272cf 100644 --- a/sdcm/sct_config.py +++ b/sdcm/sct_config.py @@ -1202,7 +1202,12 @@ class SCTConfiguration(dict): dict(name="mgmt_prepare_snapshot_size", env="SCT_MGMT_PREPARE_SNAPSHOT_SIZE", type=int, - help="Size of backup snapshot in Gb to be prepared to be prepared for backup"), + help="Size of backup snapshot in Gb to be prepared for backup"), + + dict(name="mgmt_snapshots_preparer_params", + env="SCT_MGMT_SNAPSHOTS_PREPARER_PARAMS", type=dict_or_str_or_pydantic, + help="Custom parameter of c-s write operation used in snapshots preparer." + "The whole list of parameters can be found in defaults/manager_snapshots_preparer_config.yaml"), # PerformanceRegressionTest diff --git a/vars/managerPipeline.groovy b/vars/managerPipeline.groovy index 629b57653d..2677dabb41 100644 --- a/vars/managerPipeline.groovy +++ b/vars/managerPipeline.groovy @@ -170,11 +170,12 @@ def call(Map pipelineParams) { name: 'requested_by_user') text(defaultValue: "${pipelineParams.get('extra_environment_variables', '')}", description: ( - 'Extra environment variables to be set in the test environment, uses the java Properties File Format.\n' + - 'Example:\n' + - '\tSCT_MGMT_RESTORE_EXTRA_PARAMS=--batch-size 50 --parallel 0\n' + - '\tSCT_N_DB_NODES=6' + - '\tSCT_MGMT_SKIP_POST_RESTORE_STRESS_READ=true\n' + """Extra environment variables to be set in the test environment, uses the java Properties File Format. + Example: + SCT_MGMT_RESTORE_EXTRA_PARAMS=--batch-size 50 --parallel 0 + SCT_N_DB_NODES=6 + SCT_MGMT_SKIP_POST_RESTORE_STRESS_READ=true + SCT_MGMT_SNAPSHOTS_PREPARER_PARAMS={'compaction': 'SizeTieredCompactionStrategy', 'rf': 3}""" ), name: 'extra_environment_variables') } From b377c75c4f86a805656dbed1db1f7720dcfd9645 Mon Sep 17 00:00:00 2001 From: Liapkovich Date: Fri, 10 Jan 2025 14:28:36 +0100 Subject: [PATCH 2/4] fix(manager): rework snapshot preparation helper test The process of snapshot generation is improved in a way that dataset params (which defined in c-s cmd) aren't hardcoded anymore what gives an opportunity to widely vary its properties (replication, compaction, cl, rf, etc). manager_restore_benchmark_snapshots.yaml file with snapshots config for current benchmark tests is extended to include such a parameters. These params are later used on C-S read command generation stage. Thus, snapshot preparation test becomes widely customizable with more flexibility to adjust dataset. --- .../manager_restore_benchmark_snapshots.yaml | 162 ++++++++++++------ mgmt_cli_test.py | 100 +++++------ 2 files changed, 155 insertions(+), 107 deletions(-) diff --git a/defaults/manager_restore_benchmark_snapshots.yaml b/defaults/manager_restore_benchmark_snapshots.yaml index 430ca04e01..1d61529829 100644 --- a/defaults/manager_restore_benchmark_snapshots.yaml +++ b/defaults/manager_restore_benchmark_snapshots.yaml @@ -1,110 +1,164 @@ bucket: "manager-backup-tests-permanent-snapshots-us-east-1" -cs_read_cmd_template: "cassandra-stress read cl=ONE n={num_of_rows} -schema 'keyspace={keyspace_name} replication(strategy=NetworkTopologyStrategy,replication_factor=3)' -mode cql3 native -rate threads=500 -col 'size=FIXED(1024) n=FIXED(1)' -pop seq={sequence_start}..{sequence_end}" +cs_read_cmd_template: "cassandra-stress read cl={cl} n={num_of_rows} -schema 'keyspace={keyspace_name} replication(strategy={replication},replication_factor={rf}) compaction(strategy={compaction})' -mode cql3 native -rate threads=500 -col 'size=FIXED({col_size}) n=FIXED({col_n})' -pop seq={sequence_start}..{sequence_end}" sizes: # size of backed up dataset in GB 1gb_1t_ics: tag: "sm_20240812100424UTC" - schema: - keyspace1: - - standard1: 1 - number_of_rows: 1073760 exp_timeout: 1200 # 20 minutes (timeout for restore data operation) scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 1 + num_of_rows: 1073760 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 500gb_1t_ics: tag: "sm_20240813112034UTC" - schema: - keyspace1: - - standard1: 500 - number_of_rows: 524288000 exp_timeout: 14400 # 4 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 500 + num_of_rows: 524288000 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 500gb_1t_ics_tablets: tag: "sm_20240813114617UTC" - schema: - keyspace1: - - standard1: 500 - number_of_rows: 524288000 exp_timeout: 14400 # 4 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 500 + num_of_rows: 524288000 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 500gb_2t_ics: tag: "sm_20240819203428UTC" - schema: - keyspace1: - - standard1: 250 - keyspace2: - - standard1: 250 - number_of_rows: 524288000 exp_timeout: 14400 # 4 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: true + dataset: + schema: + keyspace1: + - standard1: 250 + keyspace2: + - standard1: 250 + num_of_rows: 524288000 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 1tb_1t_ics: tag: "sm_20240814180009UTC" - schema: - keyspace1: - - standard1: 1024 - number_of_rows: 1073741824 exp_timeout: 28800 # 8 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 1024 + num_of_rows: 1073741824 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 1tb_4t_twcs: tag: "sm_20240821145503UTC" - schema: - keyspace1: - - t_10gb: 10 - - t_90gb: 90 - - t_300gb: 300 - - t_600gb: 600 - number_of_rows: 428571429 exp_timeout: 28800 # 8 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "TimeWindowCompactionStrategy" prohibit_verification_read: true + dataset: + schema: + keyspace1: + - t_10gb: 10 + - t_90gb: 90 + - t_300gb: 300 + - t_600gb: 600 + num_of_rows: 428571429 + compaction: "TimeWindowCompactionStrategy" + cl: + col_size: + col_n: + replication: "NetworkTopologyStrategy" + rf: 3 1tb_2t_twcs: tag: "sm_20240827191125UTC" - schema: - keyspace1: - - t_300gb: 300 - - t_700gb: 700 - number_of_rows: 428571429 exp_timeout: 28800 # 8 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 9 - compaction_strategy: "TimeWindowCompactionStrategy" prohibit_verification_read: true + dataset: + schema: + keyspace1: + - t_300gb: 300 + - t_700gb: 700 + num_of_rows: 428571429 + compaction: "TimeWindowCompactionStrategy" + cl: + col_size: + col_n: + replication: "NetworkTopologyStrategy" + rf: 3 1.5tb_2t_ics: tag: "sm_20240820180152UTC" - schema: - keyspace1: - - standard1: 500 - keyspace2: - - standard1: 1024 - number_of_rows: 1598029824 exp_timeout: 43200 # 12 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: true + dataset: + schema: + keyspace1: + - standard1: 500 + keyspace2: + - standard1: 1024 + num_of_rows: 1598029824 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 2tb_1t_ics: tag: "sm_20240816185129UTC" - schema: - keyspace1: - - standard1: 2048 - number_of_rows: 2147483648 exp_timeout: 57600 # 16 hours scylla_version: "2024.2.0-rc1" number_of_nodes: 3 - compaction_strategy: "IncrementalCompactionStrategy" prohibit_verification_read: false + dataset: + schema: + keyspace1: + - standard1: 2048 + num_of_rows: 2147483648 + compaction: "IncrementalCompactionStrategy" + cl: "ONE" + col_size: 1024 + col_n: 1 + replication: "NetworkTopologyStrategy" + rf: 3 diff --git a/mgmt_cli_test.py b/mgmt_cli_test.py index 96da3d28be..0d7c2ab3ca 100644 --- a/mgmt_cli_test.py +++ b/mgmt_cli_test.py @@ -70,21 +70,21 @@ class SnapshotData: - bucket: S3 bucket name - tag: snapshot tag, for example 'sm_20240816185129UTC' - exp_timeout: expected timeout for the restore operation + - dataset: dict with snapshot dataset details such as cl, replication, schema, etc. - keyspaces: list of keyspaces presented in backup - cs_read_cmd_template: cassandra-stress read command template - prohibit_verification_read: if True, the verification read will be prohibited. Most likely, such a backup was created via c-s user profile. - - number_of_rows: number of data rows written in the DB - node_ids: list of node ids where backup was created """ bucket: str tag: str exp_timeout: int + dataset: dict[str, str | int | dict] keyspaces: list[str] ks_tables_map: dict[str, list[str]] cs_read_cmd_template: str prohibit_verification_read: bool - number_of_rows: int node_ids: list[str] @@ -257,19 +257,12 @@ def run_verification_read_stress(self, ks_names=None): stress_run_time = datetime.now() - stress_start_time InfoEvent(message=f'The read stress run was completed. Total run time: {stress_run_time}').publish() - def prepare_run_and_verify_stress_in_threads(self, cmd_template: str, keyspace_name: str, num_of_rows: int, - stop_on_failure: bool = False) -> None: - """Prepares C-S commands, runs them in threads and verifies their execution results. + def run_and_verify_stress_in_threads(self, cs_cmds: list[str], stop_on_failure: bool = False) -> None: + """Runs C-S commands in threads and verifies their execution results. Stress operation can be either read or write, depending on the cmd_template. """ stress_queue = [] - num_of_loaders = self.params.get("n_loaders") - rows_per_loader = int(num_of_rows / num_of_loaders) - for loader_index in range(num_of_loaders): - stress_cmd = cmd_template.format(num_of_rows=rows_per_loader, - keyspace_name=keyspace_name, - sequence_start=rows_per_loader * loader_index + 1, - sequence_end=rows_per_loader * (loader_index + 1),) + for stress_cmd in cs_cmds: _thread = self.run_stress_thread(stress_cmd=stress_cmd, round_robin=True, stop_test_on_failure=stop_on_failure) stress_queue.append(_thread) @@ -277,26 +270,6 @@ def prepare_run_and_verify_stress_in_threads(self, cmd_template: str, keyspace_n for _thread in stress_queue: assert self.verify_stress_thread(cs_thread_pool=_thread), "Stress thread verification failed" - @staticmethod - def extract_compaction_strategy_from_cs_cmd(cs_cmd: str, lower: bool = True, remove_postfix: bool = True) -> str: - """Extracts the compaction strategy from the cassandra-stress command. - - :param cs_cmd: cassandra-stress command - :param lower: if True, the resulting string will be lowercased - :param remove_postfix: if True, the resulting string will have the "CompactionStrategy" postfix removed - """ - match = re.search(r"compaction\(strategy=([^)]+)\)", cs_cmd) - if match: - strategy = match.group(1) - if remove_postfix: - strategy = re.sub(r"CompactionStrategy$", "", strategy) - if lower: - strategy = strategy.lower() - else: - raise ValueError("Compaction strategy not found in cs_cmd.") - - return strategy - class ClusterOperations(ClusterTester): CLUSTER_NAME = "mgr_cluster1" @@ -406,7 +379,7 @@ def get_snapshot_data(snapshot_name: str) -> SnapshotData: raise ValueError(f"Snapshot data for size '{snapshot_name}'GB was not found in the {snapshots_config} file") ks_tables_map = {} - for ks, ts in snapshot_dict["schema"].items(): + for ks, ts in snapshot_dict["dataset"]["schema"].items(): t_names = [list(t.keys())[0] for t in ts] ks_tables_map[ks] = t_names @@ -414,11 +387,11 @@ def get_snapshot_data(snapshot_name: str) -> SnapshotData: bucket=all_snapshots_dict["bucket"], tag=snapshot_dict["tag"], exp_timeout=snapshot_dict["exp_timeout"], - keyspaces=list(snapshot_dict["schema"].keys()), + dataset=snapshot_dict["dataset"], + keyspaces=list(snapshot_dict["dataset"]["schema"].keys()), ks_tables_map=ks_tables_map, cs_read_cmd_template=all_snapshots_dict["cs_read_cmd_template"], prohibit_verification_read=snapshot_dict["prohibit_verification_read"], - number_of_rows=snapshot_dict["number_of_rows"], node_ids=snapshot_dict.get("node_ids"), ) return snapshot_data @@ -549,6 +522,40 @@ def build_snapshot_preparer_cs_write_cmd(self, backup_size: int) -> tuple[str, l return ks_name, cs_cmds + def build_cs_read_cmd_from_snapshot_details(self, snapshot: SnapshotData) -> list[str]: + """Define a list of cassandra-stress read commands from snapshot (dataset) details. + + C-S read command template and snapshot details are defined in defaults/manager_restore_benchmark_snapshots.yaml. + Number of commands is equal to the number of loaders defined in the test parameters. + """ + dataset = snapshot.dataset + + rows_per_loader = self.calculate_rows_per_loader(overall_rows_num=dataset["num_of_rows"]) + num_of_loaders = int(self.params.get("n_loaders")) + + cs_cmds = [] + cs_cmd_template = snapshot.cs_read_cmd_template + + for loader_index in range(num_of_loaders): + sequence_start = rows_per_loader * loader_index + 1 + sequence_end = rows_per_loader * (loader_index + 1) + + cs_cmd = cs_cmd_template.format( + cl=dataset["cl"], + num_of_rows=rows_per_loader, + keyspace_name=snapshot.keyspaces[0], + replication=dataset["replication"], + rf=dataset["rf"], + compaction=dataset["compaction"], + col_size=dataset["col_size"], + col_n=dataset["col_n"], + sequence_start=sequence_start, + sequence_end=sequence_end, + ) + cs_cmds.append(cs_cmd) + + return cs_cmds + class ManagerTestFunctionsMixIn( DatabaseOperations, @@ -1366,20 +1373,8 @@ def test_prepare_backup_snapshot(self): backup_size = self.params.get("mgmt_prepare_snapshot_size") # in Gb assert backup_size and backup_size >= 1, "Backup size must be at least 1Gb" - backend = self.params.get("cluster_backend") - cs_read_cmd_template = get_persistent_snapshots()[backend]["confirmation_stress_template"] - cs_write_cmd_template = cs_read_cmd_template.replace(" read ", " write ") - - compaction = self.extract_compaction_strategy_from_cs_cmd(cs_write_cmd_template) - scylla_version = re.sub(r"[-.]", "_", self.params.get("scylla_version")) - keyspace_name = f"{backup_size}gb_{compaction}_{scylla_version}" - - self.prepare_run_and_verify_stress_in_threads( - cmd_template=cs_write_cmd_template, - keyspace_name=keyspace_name, - num_of_rows=backup_size * 1024 * 1024, # Considering 1 row = 1Kb - stop_on_failure=True, - ) + ks_name, cs_write_cmds = self.build_snapshot_preparer_cs_write_cmd(backup_size) + self.run_and_verify_stress_in_threads(cs_cmds=cs_write_cmds, stop_on_failure=True) self.log.info("Initialize Scylla Manager") manager_tool = mgmt.get_scylla_manager_tool(manager_node=self.monitors.nodes[0]) @@ -1394,7 +1389,7 @@ def test_prepare_backup_snapshot(self): self.log.info("Log snapshot details") self.log.info( f"Snapshot tag: {backup_task.get_snapshot_tag()}\n" - f"Keyspace name: {keyspace_name}\n" + f"Keyspace name: {ks_name}\n" f"Bucket: {self.locations}\n" f"Cluster id: {mgr_cluster.id}\n" ) @@ -1679,9 +1674,8 @@ def test_restore_from_precreated_backup(self, snapshot_name: str, restore_outsid if not (self.params.get('mgmt_skip_post_restore_stress_read') or snapshot_data.prohibit_verification_read): self.log.info("Running verification read stress") - self.prepare_run_and_verify_stress_in_threads(cmd_template=snapshot_data.cs_read_cmd_template, - keyspace_name=snapshot_data.keyspaces[0], - num_of_rows=snapshot_data.number_of_rows) + cs_verify_cmds = self.build_cs_read_cmd_from_snapshot_details(snapshot_data) + self.run_and_verify_stress_in_threads(cs_cmds=cs_verify_cmds) else: self.log.info("Skipping verification read stress because of the test or snapshot configuration") From 1550cd2d939b0f34b50e73b4e37ae4cd106f4772 Mon Sep 17 00:00:00 2001 From: Liapkovich Date: Fri, 10 Jan 2025 16:21:25 +0100 Subject: [PATCH 3/4] feature(manager): support cloud clusters in prepare snapshot helper Extension for test_prepare_backup_snapshot that allows to run the test either for standard cluster created via SCT or for clusters created in via siren-tests (Cloud cluster). Changes: - backup location: for cloud cluster it should be taken from default automatically scheduled by Manager backup task. - copy s3 bucket for cloud cluster since the original bucket is deleted together with cluster while cluster removal. --- mgmt_cli_test.py | 64 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/mgmt_cli_test.py b/mgmt_cli_test.py index 0d7c2ab3ca..7eea78ad88 100644 --- a/mgmt_cli_test.py +++ b/mgmt_cli_test.py @@ -40,7 +40,7 @@ from sdcm.mgmt.cli import ScyllaManagerTool, RestoreTask from sdcm.mgmt.common import reconfigure_scylla_manager, get_persistent_snapshots from sdcm.provision.helpers.certificate import TLSAssets -from sdcm.remote import shell_script_cmd +from sdcm.remote import shell_script_cmd, LOCALRUNNER from sdcm.tester import ClusterTester from sdcm.cluster import TestConfig from sdcm.nemesis import MgmtRepair @@ -364,6 +364,43 @@ def download_from_azure(self, node, source, destination): source = f"{source.replace('azure://', self.backup_azure_blob_service)}{self.backup_azure_blob_sas}" node.remoter.sudo(f"azcopy copy '{source}' '{destination}'") + @staticmethod + def create_s3_bucket_aws(name: str, region: str) -> None: + LOCALRUNNER.run(f"aws s3 mb s3://{name} --region {region}") + + @staticmethod + def create_s3_bucket_gce(name: str, region: str) -> None: + LOCALRUNNER.run(f"gsutil mb -l {region} gs://{name}") + + @staticmethod + def sync_s3_buckets_aws(source: str, destination: str, acl: str = 'bucket-owner-full-control') -> None: + LOCALRUNNER.run(f"aws s3 sync s3://{source} s3://{destination} --acl {acl}") + + @staticmethod + def sync_s3_buckets_gce(source: str, destination: str) -> None: + LOCALRUNNER.run(f"gsutil -m rsync -r gs://{source} gs://{destination}") + + def copy_backup_snapshot_bucket(self, source: str, destination: str) -> None: + """Copy bucket with Manager backup snapshots. + The process consists of two stages - new bucket creation and data sync (original bucket -> newly created). + The main use case is to make a copy of a bucket created in a test with Cloud (siren) cluster since siren + deletes the bucket together with cluster. Thus, if there is a goal to reuse backup snapshot of such cluster + afterward, it should be copied to a new bucket. + + Only AWS and GCE backends are supported. + """ + cluster_backend = self.params.get("cluster_backend") + region = next(iter(self.params.region_names), '') + + if cluster_backend == "aws": + self.create_s3_bucket_aws(name=destination, region=region) + self.sync_s3_buckets_aws(source=source, destination=destination) + elif cluster_backend == "gce": + self.create_s3_bucket_gce(name=destination, region=region) + self.sync_s3_buckets_gce(source=source, destination=destination) + else: + raise ValueError(f"Unsupported cluster backend - {cluster_backend}, should be either aws or gce") + class SnapshotOperations(ClusterTester): @@ -1369,6 +1406,8 @@ def test_prepare_backup_snapshot(self): 2. Run backup and wait for it to finish. 3. Log snapshot details into console. """ + is_cloud_manager = self.params.get("use_cloud_manager") + self.log.info("Populate the cluster with data") backup_size = self.params.get("mgmt_prepare_snapshot_size") # in Gb assert backup_size and backup_size >= 1, "Backup size must be at least 1Gb" @@ -1377,20 +1416,35 @@ def test_prepare_backup_snapshot(self): self.run_and_verify_stress_in_threads(cs_cmds=cs_write_cmds, stop_on_failure=True) self.log.info("Initialize Scylla Manager") - manager_tool = mgmt.get_scylla_manager_tool(manager_node=self.monitors.nodes[0]) - mgr_cluster = self.ensure_and_get_cluster(manager_tool) + mgr_cluster = self.db_cluster.get_cluster_manager() + + self.log.info("Define backup location") + if is_cloud_manager: + # Extract location from automatically scheduled backup task + auto_backup_task = mgr_cluster.backup_task_list[0] + location_list = [auto_backup_task.get_task_info_dict()["location"]] + else: + location_list = self.locations self.log.info("Run backup and wait for it to finish") - backup_task = mgr_cluster.create_backup_task(location_list=self.locations, rate_limit_list=["0"]) + backup_task = mgr_cluster.create_backup_task(location_list=location_list, rate_limit_list=["0"]) backup_task_status = backup_task.wait_and_get_final_status(timeout=200000) assert backup_task_status == TaskStatus.DONE, \ f"Backup task ended in {backup_task_status} instead of {TaskStatus.DONE}" + if is_cloud_manager: + self.log.info("Copy bucket with snapshot since the original bucket is deleted together with cluster") + # from ["'AWS_US_EAST_1:s3:scylla-cloud-backup-8072-7216-v5dn53'"] to scylla-cloud-backup-8072-7216-v5dn53 + original_bucket_name = location_list[0].split(":")[-1].rstrip("'") + bucket_name = original_bucket_name + "-manager-tests" + + self.copy_backup_snapshot_bucket(source=original_bucket_name, destination=bucket_name) + self.log.info("Log snapshot details") self.log.info( f"Snapshot tag: {backup_task.get_snapshot_tag()}\n" f"Keyspace name: {ks_name}\n" - f"Bucket: {self.locations}\n" + f"Bucket: {location_list}\n" f"Cluster id: {mgr_cluster.id}\n" ) From 58b8047cc2ace91926d7705051b92e708fbdbe67 Mon Sep 17 00:00:00 2001 From: Liapkovich Date: Tue, 14 Jan 2025 19:53:22 +0100 Subject: [PATCH 4/4] fix(manager): send snapshot details to Argus Results instead of logging To improve usability of the snapshots preparer tool, generated backup snapshot details will be sent to Argus Results, where they can be taken. It eliminates the need to browse the logfile looking for details. --- mgmt_cli_test.py | 27 +++++++++++++++++---------- sdcm/argus_results.py | 21 +++++++++++++++++++++ 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/mgmt_cli_test.py b/mgmt_cli_test.py index 7eea78ad88..ce0a9a559f 100644 --- a/mgmt_cli_test.py +++ b/mgmt_cli_test.py @@ -34,8 +34,8 @@ from argus.client.generic_result import Status from sdcm import mgmt -from sdcm.argus_results import send_manager_benchmark_results_to_argus, submit_results_to_argus, \ - ManagerBackupReadResult, ManagerBackupBenchmarkResult +from sdcm.argus_results import (send_manager_benchmark_results_to_argus, send_manager_snapshot_details_to_argus, + submit_results_to_argus, ManagerBackupReadResult, ManagerBackupBenchmarkResult) from sdcm.mgmt import ScyllaManagerError, TaskStatus, HostStatus, HostSsl, HostRestStatus from sdcm.mgmt.cli import ScyllaManagerTool, RestoreTask from sdcm.mgmt.common import reconfigure_scylla_manager, get_persistent_snapshots @@ -1437,15 +1437,22 @@ def test_prepare_backup_snapshot(self): # from ["'AWS_US_EAST_1:s3:scylla-cloud-backup-8072-7216-v5dn53'"] to scylla-cloud-backup-8072-7216-v5dn53 original_bucket_name = location_list[0].split(":")[-1].rstrip("'") bucket_name = original_bucket_name + "-manager-tests" - self.copy_backup_snapshot_bucket(source=original_bucket_name, destination=bucket_name) - - self.log.info("Log snapshot details") - self.log.info( - f"Snapshot tag: {backup_task.get_snapshot_tag()}\n" - f"Keyspace name: {ks_name}\n" - f"Bucket: {location_list}\n" - f"Cluster id: {mgr_cluster.id}\n" + else: + bucket_name = location_list[0] + + self.log.info("Send snapshot details to Argus") + snapshot_details = { + "tag": backup_task.get_snapshot_tag(), + "size": backup_size, + "bucket": bucket_name, + "ks_name": ks_name, + "scylla_version": self.params.get("scylla_version"), + "cluster_id": mgr_cluster.id, + } + send_manager_snapshot_details_to_argus( + argus_client=self.test_config.argus_client(), + snapshot_details=snapshot_details, ) diff --git a/sdcm/argus_results.py b/sdcm/argus_results.py index d3e0ee3b9f..88bf3ceac7 100644 --- a/sdcm/argus_results.py +++ b/sdcm/argus_results.py @@ -135,6 +135,20 @@ class Meta: ] +class ManagerSnapshotDetails(GenericResultTable): + class Meta: + name = "Snapshot details" + description = "Manager snapshots (pre-created for next utilization in restore tests) details" + Columns = [ + ColumnMetadata(name="tag", unit="", type=ResultType.TEXT), + ColumnMetadata(name="size", unit="GB", type=ResultType.INTEGER), + ColumnMetadata(name="bucket", unit="", type=ResultType.TEXT), + ColumnMetadata(name="ks_name", unit="", type=ResultType.TEXT), + ColumnMetadata(name="cluster_id", unit="", type=ResultType.TEXT), + ColumnMetadata(name="scylla_version", unit="", type=ResultType.TEXT), + ] + + workload_to_table = { "mixed": LatencyCalculatorMixedResult, "write": LatencyCalculatorWriteResult, @@ -257,3 +271,10 @@ def send_manager_benchmark_results_to_argus(argus_client: ArgusClient, result: d for key, value in result.items(): result_table.add_result(column=key, row=row_name, value=value, status=Status.UNSET) submit_results_to_argus(argus_client, result_table) + + +def send_manager_snapshot_details_to_argus(argus_client: ArgusClient, snapshot_details: dict) -> None: + result_table = ManagerSnapshotDetails() + for key, value in snapshot_details.items(): + result_table.add_result(column=key, row="#1", value=value, status=Status.UNSET) + submit_results_to_argus(argus_client, result_table)