Skip to content

Commit

Permalink
improvement(test_user_batch_custom_time): refactor code to support 90…
Browse files Browse the repository at this point in the history
…% utilization scenario

The test of test_user_batch_custom_time is fixed and improved in order to run both 5000 tables scenario
and a scenario like 500 tables for testing 90% utilization with many small tables.
  • Loading branch information
yarongilor committed Jan 22, 2025
1 parent fff1bd5 commit 348ad37
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 38 deletions.
4 changes: 2 additions & 2 deletions data_dir/templated-elasticity-tables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ queries:
fields: samerow

# Run stress
# cassandra-stress user profile={} 'ops(insert=1)' cl=QUORUM n=2572262 -rate threads=1
# cassandra-stress user profile={} 'ops(read1=1)' cl=QUORUM n=1286128 -rate threads=1
# cassandra-stress user profile={} 'ops(insert=1)' cl=QUORUM n=1495501 -rate threads=1
# cassandra-stress user profile={} 'ops(read1=1)' cl=QUORUM n=747748 -rate threads=1
91 changes: 56 additions & 35 deletions longevity_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from sdcm.utils.common import skip_optional_stage
from sdcm.utils.decorators import optional_stage
from sdcm.utils.operations_thread import ThreadParams
from sdcm.sct_events.system import InfoEvent
from sdcm.sct_events.system import InfoEvent, TestFrameworkEvent
from sdcm.sct_events import Severity
from sdcm.cluster import MAX_TIME_WAIT_FOR_NEW_NODE_UP

Expand Down Expand Up @@ -269,10 +269,18 @@ def test_user_batch_custom_time(self):
:param batch_size: Number of stress commands to run together in a batch.
"""

customer_profiles = self.params.get('cs_user_profiles')
if not customer_profiles:
TestFrameworkEvent(source="test_user_batch_custom_time",
message="The Longevity test_user_batch_custom_time cannot run without 'cs_user_profiles' parameter. aborting.",
severity=Severity.ERROR).publish()
return
self.db_cluster.add_nemesis(nemesis=self.get_nemesis_class(), tester_obj=self)

batch_size = self.params.get('batch_size')

user_profile_table_count = self.params.get('user_profile_table_count') # pylint: disable=invalid-name
# Creating extra table during batch only for a high number of tables (>500).
create_extra_tables = True if int(user_profile_table_count) > 500 else False
if not self.params.get('reuse_cluster'):
self._pre_create_templated_user_schema()

Expand All @@ -290,59 +298,68 @@ def test_user_batch_custom_time(self):

stress_params_list = []

customer_profiles = self.params.get('cs_user_profiles')

templated_table_counter = itertools.count()

if customer_profiles:
cs_duration = self.params.get('cs_duration')
duration = int(cs_duration.translate(str.maketrans('', '', string.ascii_letters)))

for cs_profile in customer_profiles:
cs_profile = sct_abs_path(cs_profile) # noqa: PLW2901
assert os.path.exists(cs_profile), 'File not found: {}'.format(cs_profile)
self.log.debug('Run stress test with user profile {}, duration {}'.format(cs_profile, cs_duration))

user_profile_table_count = self.params.get('user_profile_table_count') # pylint: disable=invalid-name

for _ in range(user_profile_table_count):
stress_params_list += self.create_templated_user_stress_params(next(templated_table_counter),
cs_profile)
if not skip_optional_stage('main_load'):
self._run_user_stress_in_batches(batch_size=batch_size,
stress_params_list=stress_params_list, duration=duration)
cs_duration = self.params.get('cs_duration')
try:
duration = int(cs_duration.translate(str.maketrans('', '', string.ascii_letters))) if cs_duration else None
except ValueError:
raise ValueError(f"Invalid cs_duration format: {cs_duration}")

for cs_profile in customer_profiles:
cs_profile = sct_abs_path(cs_profile) # noqa: PLW2901
assert os.path.exists(cs_profile), 'File not found: {}'.format(cs_profile)
msg = f"Run stress test with user profile {cs_profile}"
msg += f", duration {cs_duration}" if cs_duration else ", no duration parameter"
self.log.debug(msg)

for _ in range(user_profile_table_count):
stress_params_list += self.create_templated_user_stress_params(next(templated_table_counter),
cs_profile)
if not skip_optional_stage('main_load'):
self.log.debug("Starting stress in batches of: %d with %d stress commands",
batch_size, len(stress_params_list))
self._run_user_stress_in_batches(batch_size=batch_size,
stress_params_list=stress_params_list, duration=duration,
create_extra_tables=create_extra_tables)

def _run_user_stress_in_batches(self, batch_size, stress_params_list, duration):
def _run_user_stress_in_batches(self, batch_size, stress_params_list, duration, create_extra_tables: bool = True):
"""
run user profile in batches, while adding 4 stress-commands which are not with precreated tables
and wait for them to finish
:param batch_size: size of the batch
:param stress_params_list: the list of all stress commands
:param create_extra_tables: Creating an extra table during batch.
:return:
"""
# pylint: disable=too-many-locals

def chunks(_list, chunk_size):
"""Yield successive n-sized chunks from _list."""
for i in range(0, len(_list), chunk_size):
yield _list[i:i + chunk_size], i, i+chunk_size, len(_list) + i * 2
yield _list[i:i + chunk_size], len(_list) + i * 2

all_batches = list(chunks(stress_params_list, batch_size))
total_batches = len(all_batches)

for batch, _, _, extra_tables_idx in list(chunks(stress_params_list, batch_size)):
for batch_number, (batch, extra_tables_idx) in enumerate(all_batches, start=1):
self.log.info(f"Starting batch {batch_number} out of {total_batches}")

stress_queue = []
batch_params = dict(duration=duration, round_robin=True, stress_cmd=[])

# add few stress threads with tables that weren't pre-created
customer_profiles = self.params.get('cs_user_profiles')
for cs_profile in customer_profiles:
cs_profile = sct_abs_path(cs_profile) # noqa: PLW2901
# for now we'll leave to just one fresh table, to kick schema update
num_of_newly_created_tables = 1
self._pre_create_templated_user_schema(batch_start=extra_tables_idx,
batch_end=extra_tables_idx+num_of_newly_created_tables)
for i in range(num_of_newly_created_tables):
batch += self.create_templated_user_stress_params(extra_tables_idx + i, cs_profile=cs_profile) # noqa: PLW2901
if create_extra_tables:
# add few stress threads with tables that weren't pre-created
customer_profiles = self.params.get('cs_user_profiles')
for cs_profile in customer_profiles:
cs_profile = sct_abs_path(cs_profile) # noqa: PLW2901
# for now we'll leave to just one fresh table, to kick schema update
num_of_newly_created_tables = 1
self._pre_create_templated_user_schema(batch_start=extra_tables_idx,
batch_end=extra_tables_idx+num_of_newly_created_tables)
for i in range(num_of_newly_created_tables):
batch += self.create_templated_user_stress_params(extra_tables_idx + i, cs_profile=cs_profile) # noqa: PLW2901

nodes_ips = self.all_node_ips_for_stress_command
for params in batch:
Expand Down Expand Up @@ -513,7 +530,11 @@ def create_templated_user_stress_params(self, idx, cs_profile) -> List[Dict]: #
# example:
# cassandra-stress user profile={} cl=QUORUM 'ops(insert=1)' duration={} -rate threads=100 -pop 'dist=gauss(0..1M)'
for cmd in [line.lstrip('#').strip() for line in cont if line.find('cassandra-stress') > 0]:
stress_cmd = cmd.format(profile_dst, cs_duration)
# Use a conditional tuple to handle optional formatting arguments for c-s duration parameter.
# For example, without a duration: cassandra-stress user profile={} 'ops(insert=1)' cl=QUORUM n=2572262 -rate threads=1
# Or with a duration: cassandra-stress user profile={} 'ops(insert=1)' cl=QUORUM duration={} -rate threads=1
args = (profile_dst, cs_duration) if cs_duration else (profile_dst,)
stress_cmd = cmd.format(*args)
params = {'stress_cmd': stress_cmd, 'profile': profile_dst}
self.log.debug('Stress cmd: {}'.format(stress_cmd))
params_list.append(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

test_duration: 600

cs_duration: '0m'
cs_user_profiles:
- data_dir/templated-elasticity-tables.yaml

pre_create_schema: true
user_profile_table_count: 500
batch_size: 125
cs_duration: ''

n_loaders: 5
n_monitor_nodes: 1
Expand Down

0 comments on commit 348ad37

Please sign in to comment.