|
25 | 25 | from textwrap import dedent
|
26 | 26 | from datetime import datetime, timedelta
|
27 | 27 | from dataclasses import dataclass
|
| 28 | +from time import sleep |
28 | 29 |
|
29 | 30 | import boto3
|
30 | 31 | import yaml
|
@@ -1679,52 +1680,77 @@ def create_backup_and_report(self, mgr_cluster, label: str):
|
1679 | 1680 | return task
|
1680 | 1681 |
|
1681 | 1682 | def run_read_stress_and_report(self, label):
|
1682 |
| - stress_queue = [] |
| 1683 | + stress_queue = {"read": [], "write": []} |
1683 | 1684 |
|
1684 | 1685 | for command in self.params.get('stress_read_cmd'):
|
1685 |
| - stress_queue.append(self.run_stress_thread(command, round_robin=True, stop_test_on_failure=False)) |
| 1686 | + if " write " in command: |
| 1687 | + stress_queue["write"].append(self.run_stress_thread(command)) |
| 1688 | + elif " read " in command: |
| 1689 | + stress_queue["read"].append(self.run_stress_thread(command)) |
| 1690 | + else: |
| 1691 | + raise InvalidArgument("Unknown stress command") |
| 1692 | + |
| 1693 | + def get_stress_averages(queue): |
| 1694 | + averages = {'op rate': 0.0, 'partition rate': 0.0, 'row rate': 0.0, 'latency 99th percentile': 0.0} |
| 1695 | + num_results = 0 |
| 1696 | + for stress in queue: |
| 1697 | + results = self.get_stress_results(queue=stress) |
| 1698 | + num_results += len(results) |
| 1699 | + for result in results: |
| 1700 | + for key in averages: |
| 1701 | + averages[key] += float(result[key]) |
| 1702 | + stats = {key: averages[key] / num_results for key in averages} |
| 1703 | + return stats |
1686 | 1704 |
|
1687 | 1705 | with ExecutionTimer() as stress_timer:
|
1688 |
| - for stress in stress_queue: |
1689 |
| - assert self.verify_stress_thread(cs_thread_pool=stress), "Read stress command" |
| 1706 | + read_stats = get_stress_averages(stress_queue["read"]) |
| 1707 | + write_stats = get_stress_averages(stress_queue["write"]) |
| 1708 | + |
1690 | 1709 | InfoEvent(message=f'Read stress duration: {stress_timer.duration}s.').publish()
|
1691 | 1710 |
|
1692 | 1711 | read_stress_report = {
|
1693 | 1712 | "read time": int(stress_timer.duration.total_seconds()),
|
| 1713 | + "op rate": read_stats['op rate'], |
| 1714 | + "partition rate": read_stats['partition rate'], |
| 1715 | + "row rate": read_stats['row rate'], |
| 1716 | + "latency 99th percentile": read_stats['latency 99th percentile'], |
| 1717 | + } |
| 1718 | + self.report_to_argus(ManagerReportType.READ, read_stress_report, "Read stress: " + label) |
| 1719 | + |
| 1720 | + write_stress_report = { |
| 1721 | + "read time": int(stress_timer.duration.total_seconds()), |
| 1722 | + "op rate": write_stats['op rate'], |
| 1723 | + "partition rate": write_stats['partition rate'], |
| 1724 | + "row rate": write_stats['row rate'], |
| 1725 | + "latency 99th percentile": write_stats['latency 99th percentile'], |
1694 | 1726 | }
|
1695 |
| - self.report_to_argus(ManagerReportType.READ, read_stress_report, label) |
| 1727 | + self.report_to_argus(ManagerReportType.READ, write_stress_report, "Write stress: " + label) |
1696 | 1728 |
|
1697 | 1729 | def test_backup_benchmark(self):
|
1698 | 1730 | self.log.info("Executing test_backup_restore_benchmark...")
|
1699 | 1731 |
|
1700 | 1732 | self.log.info("Write data to table")
|
1701 | 1733 | self.run_prepare_write_cmd()
|
1702 | 1734 |
|
1703 |
| - self.log.info("Disable clusterwide compaction") |
1704 |
| - compaction_ops = CompactionOps(cluster=self.db_cluster) |
1705 |
| - # Disable keyspace autocompaction cluster-wide since we dont want it to interfere with our restore timing |
1706 |
| - for node in self.db_cluster.nodes: |
1707 |
| - compaction_ops.disable_autocompaction_on_ks_cf(node=node) |
1708 |
| - |
1709 | 1735 | manager_tool = mgmt.get_scylla_manager_tool(manager_node=self.monitors.nodes[0])
|
1710 | 1736 | mgr_cluster = self.ensure_and_get_cluster(manager_tool)
|
1711 | 1737 |
|
| 1738 | + self.log.info("Run read test") |
| 1739 | + self.run_read_stress_and_report(" w/o concurrent backup") |
| 1740 | + |
1712 | 1741 | self.log.info("Create and report backup time")
|
1713 | 1742 | backup_task = self.create_backup_and_report(mgr_cluster, "Backup")
|
1714 | 1743 |
|
1715 | 1744 | self.log.info("Remove backup")
|
1716 | 1745 | backup_task.delete_backup_snapshot()
|
1717 | 1746 |
|
1718 |
| - self.log.info("Run read test") |
1719 |
| - self.run_read_stress_and_report("Read stress") |
1720 |
| - |
1721 | 1747 | self.log.info("Create and report backup time during read stress")
|
1722 | 1748 |
|
1723 | 1749 | backup_thread = threading.Thread(target=self.create_backup_and_report,
|
1724 | 1750 | kwargs={"mgr_cluster": mgr_cluster, "label": "Backup during read stress"})
|
1725 | 1751 |
|
1726 | 1752 | read_stress_thread = threading.Thread(target=self.run_read_stress_and_report,
|
1727 |
| - kwargs={"label": "Read stress during backup"}) |
| 1753 | + kwargs={"label": " with concurrent backup"}) |
1728 | 1754 | backup_thread.start()
|
1729 | 1755 | read_stress_thread.start()
|
1730 | 1756 |
|
|
0 commit comments