diff --git a/data_dir/templated-elasticity-tables.yaml b/data_dir/templated-elasticity-tables.yaml index 0894b82792..e35c076444 100644 --- a/data_dir/templated-elasticity-tables.yaml +++ b/data_dir/templated-elasticity-tables.yaml @@ -59,5 +59,5 @@ queries: fields: samerow # Run stress -# cassandra-stress user profile={} 'ops(insert=1)' cl=QUORUM n=2572262 -rate threads=1 -# cassandra-stress user profile={} 'ops(read1=1)' cl=QUORUM n=1286128 -rate threads=1 +# cassandra-stress user profile={} 'ops(insert=1)' cl=QUORUM n=1495501 -rate threads=1 +# cassandra-stress user profile={} 'ops(read1=1)' cl=QUORUM n=747748 -rate threads=1 diff --git a/longevity_test.py b/longevity_test.py index 87ccfdef7f..b17c8af562 100644 --- a/longevity_test.py +++ b/longevity_test.py @@ -35,7 +35,7 @@ from sdcm.utils.common import skip_optional_stage from sdcm.utils.decorators import optional_stage from sdcm.utils.operations_thread import ThreadParams -from sdcm.sct_events.system import InfoEvent +from sdcm.sct_events.system import InfoEvent, TestFrameworkEvent from sdcm.sct_events import Severity from sdcm.cluster import MAX_TIME_WAIT_FOR_NEW_NODE_UP @@ -269,10 +269,18 @@ def test_user_batch_custom_time(self): :param batch_size: Number of stress commands to run together in a batch. """ + customer_profiles = self.params.get('cs_user_profiles') + if not customer_profiles: + TestFrameworkEvent(source="test_user_batch_custom_time", + message="The Longevity test_user_batch_custom_time cannot run without 'cs_user_profiles' parameter. aborting.", + severity=Severity.ERROR).publish() + return self.db_cluster.add_nemesis(nemesis=self.get_nemesis_class(), tester_obj=self) batch_size = self.params.get('batch_size') - + user_profile_table_count = self.params.get('user_profile_table_count') # pylint: disable=invalid-name + # Creating extra table during batch only for a high number of tables (>500). + create_extra_tables = True if int(user_profile_table_count) > 500 else False if not self.params.get('reuse_cluster'): self._pre_create_templated_user_schema() @@ -290,35 +298,39 @@ def test_user_batch_custom_time(self): stress_params_list = [] - customer_profiles = self.params.get('cs_user_profiles') - templated_table_counter = itertools.count() - if customer_profiles: - cs_duration = self.params.get('cs_duration') - duration = int(cs_duration.translate(str.maketrans('', '', string.ascii_letters))) - - for cs_profile in customer_profiles: - cs_profile = sct_abs_path(cs_profile) # noqa: PLW2901 - assert os.path.exists(cs_profile), 'File not found: {}'.format(cs_profile) - self.log.debug('Run stress test with user profile {}, duration {}'.format(cs_profile, cs_duration)) - - user_profile_table_count = self.params.get('user_profile_table_count') # pylint: disable=invalid-name - - for _ in range(user_profile_table_count): - stress_params_list += self.create_templated_user_stress_params(next(templated_table_counter), - cs_profile) - if not skip_optional_stage('main_load'): - self._run_user_stress_in_batches(batch_size=batch_size, - stress_params_list=stress_params_list, duration=duration) + cs_duration = self.params.get('cs_duration') + try: + duration = int(cs_duration.translate(str.maketrans('', '', string.ascii_letters))) if cs_duration else None + except ValueError: + raise ValueError(f"Invalid cs_duration format: {cs_duration}") + + for cs_profile in customer_profiles: + cs_profile = sct_abs_path(cs_profile) # noqa: PLW2901 + assert os.path.exists(cs_profile), 'File not found: {}'.format(cs_profile) + msg = f"Run stress test with user profile {cs_profile}" + msg += f", duration {cs_duration}" if cs_duration else ", no duration parameter" + self.log.debug(msg) + + for _ in range(user_profile_table_count): + stress_params_list += self.create_templated_user_stress_params(next(templated_table_counter), + cs_profile) + if not skip_optional_stage('main_load'): + self.log.debug("Starting stress in batches of: %d with %d stress commands", + batch_size, len(stress_params_list)) + self._run_user_stress_in_batches(batch_size=batch_size, + stress_params_list=stress_params_list, duration=duration, + create_extra_tables=create_extra_tables) - def _run_user_stress_in_batches(self, batch_size, stress_params_list, duration): + def _run_user_stress_in_batches(self, batch_size, stress_params_list, duration, create_extra_tables: bool = True): """ run user profile in batches, while adding 4 stress-commands which are not with precreated tables and wait for them to finish :param batch_size: size of the batch :param stress_params_list: the list of all stress commands + :param create_extra_tables: Creating an extra table during batch. :return: """ # pylint: disable=too-many-locals @@ -326,23 +338,28 @@ def _run_user_stress_in_batches(self, batch_size, stress_params_list, duration): def chunks(_list, chunk_size): """Yield successive n-sized chunks from _list.""" for i in range(0, len(_list), chunk_size): - yield _list[i:i + chunk_size], i, i+chunk_size, len(_list) + i * 2 + yield _list[i:i + chunk_size], len(_list) + i * 2 + + all_batches = list(chunks(stress_params_list, batch_size)) + total_batches = len(all_batches) - for batch, _, _, extra_tables_idx in list(chunks(stress_params_list, batch_size)): + for batch_number, (batch, extra_tables_idx) in enumerate(all_batches, start=1): + self.log.info(f"Starting batch {batch_number} out of {total_batches}") stress_queue = [] batch_params = dict(duration=duration, round_robin=True, stress_cmd=[]) - # add few stress threads with tables that weren't pre-created - customer_profiles = self.params.get('cs_user_profiles') - for cs_profile in customer_profiles: - cs_profile = sct_abs_path(cs_profile) # noqa: PLW2901 - # for now we'll leave to just one fresh table, to kick schema update - num_of_newly_created_tables = 1 - self._pre_create_templated_user_schema(batch_start=extra_tables_idx, - batch_end=extra_tables_idx+num_of_newly_created_tables) - for i in range(num_of_newly_created_tables): - batch += self.create_templated_user_stress_params(extra_tables_idx + i, cs_profile=cs_profile) # noqa: PLW2901 + if create_extra_tables: + # add few stress threads with tables that weren't pre-created + customer_profiles = self.params.get('cs_user_profiles') + for cs_profile in customer_profiles: + cs_profile = sct_abs_path(cs_profile) # noqa: PLW2901 + # for now we'll leave to just one fresh table, to kick schema update + num_of_newly_created_tables = 1 + self._pre_create_templated_user_schema(batch_start=extra_tables_idx, + batch_end=extra_tables_idx+num_of_newly_created_tables) + for i in range(num_of_newly_created_tables): + batch += self.create_templated_user_stress_params(extra_tables_idx + i, cs_profile=cs_profile) # noqa: PLW2901 nodes_ips = self.all_node_ips_for_stress_command for params in batch: @@ -513,7 +530,11 @@ def create_templated_user_stress_params(self, idx, cs_profile) -> List[Dict]: # # example: # cassandra-stress user profile={} cl=QUORUM 'ops(insert=1)' duration={} -rate threads=100 -pop 'dist=gauss(0..1M)' for cmd in [line.lstrip('#').strip() for line in cont if line.find('cassandra-stress') > 0]: - stress_cmd = cmd.format(profile_dst, cs_duration) + # Use a conditional tuple to handle optional formatting arguments for c-s duration parameter. + # For example, without a duration: cassandra-stress user profile={} 'ops(insert=1)' cl=QUORUM n=2572262 -rate threads=1 + # Or with a duration: cassandra-stress user profile={} 'ops(insert=1)' cl=QUORUM duration={} -rate threads=1 + args = (profile_dst, cs_duration) if cs_duration else (profile_dst,) + stress_cmd = cmd.format(*args) params = {'stress_cmd': stress_cmd, 'profile': profile_dst} self.log.debug('Stress cmd: {}'.format(stress_cmd)) params_list.append(params) diff --git a/test-cases/scale/longevity-elasticity-many-small-tables.yaml b/test-cases/scale/longevity-elasticity-many-small-tables.yaml index ff165af36f..f2d3d220c2 100644 --- a/test-cases/scale/longevity-elasticity-many-small-tables.yaml +++ b/test-cases/scale/longevity-elasticity-many-small-tables.yaml @@ -9,13 +9,13 @@ test_duration: 600 -cs_duration: '0m' cs_user_profiles: - data_dir/templated-elasticity-tables.yaml pre_create_schema: true user_profile_table_count: 500 batch_size: 125 +cs_duration: '' n_loaders: 5 n_monitor_nodes: 1