Skip to content

Commit 4458e2e

Browse files
committed
alter stress: automatically recover from bad state
1 parent f4516ff commit 4458e2e

File tree

3 files changed

+185
-82
lines changed

3 files changed

+185
-82
lines changed

alter/stress/tests/actions.py

Lines changed: 126 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ def delete_random_rows(self):
529529
def delete_random_rows_lightweight(self):
530530
"""
531531
Lightweight delete a few rows at random.
532-
Not supported with projections!
532+
Not supported with projections.
533533
"""
534534
table_name = get_random_table_name()
535535
node = get_random_node_for_table(table_name=table_name)
@@ -744,7 +744,9 @@ def drop_random_index(self):
744744
node = get_random_node_for_table(table_name=table_name)
745745

746746
with And("waiting for any other mutations on that index to finish"):
747-
wait_for_mutations_to_finish(node=node, command_like=index_name, timeout=300)
747+
wait_for_mutations_to_finish(
748+
node=node, command_like=index_name, timeout=300
749+
)
748750

749751
with And("dropping the index"):
750752
r = node.query(
@@ -793,84 +795,127 @@ def remove_random_ttl(self):
793795

794796

795797
@TestStep(Then)
796-
def check_tables_have_same_columns(self, tables):
798+
def check_tables_have_same_columns(self, tables, return_outliers=False):
797799
"""
798800
Asserts that the given tables have the same columns.
799801
Smartly selects a node for each given replicated table.
800802
Does not check that all replicas of a table agree.
803+
804+
Args:
805+
tables: List of table names to check.
806+
return_outliers: If True, return the set of columns that are not present in all tables.
801807
"""
802-
with When("I get the columns for each table"):
803-
table_columns = {}
804-
for table_name in tables:
805-
node = get_random_node_for_table(table_name=table_name)
806-
table_columns[table_name] = set(
807-
get_column_names(node=node, table_name=table_name)
808-
)
808+
try:
809+
with When("I get the columns for each table"):
810+
table_columns = {}
811+
for table_name in tables:
812+
node = get_random_node_for_table(table_name=table_name)
813+
table_columns[table_name] = set(
814+
get_column_names(node=node, table_name=table_name)
815+
)
809816

810-
with Then("all tables should have the same columns"):
811-
for table1, table2 in combinations(tables, 2):
812-
with By(f"checking {table1} and {table2}", flags=TE):
813-
assert table_columns[table1] == table_columns[table2], error()
817+
with Then("all tables should have the same columns"):
818+
for table1, table2 in combinations(tables, 2):
819+
with By(f"checking {table1} and {table2}", flags=TE):
820+
assert table_columns[table1] == table_columns[table2], error()
821+
finally:
822+
if return_outliers:
823+
return set.union(*table_columns.values()) - set.intersection(
824+
*table_columns.values()
825+
)
826+
return set()
814827

815828

816829
@TestStep(Then)
817830
def check_tables_have_same_projections(
818-
self, tables, check_present: list = None, check_absent: list = None
831+
self,
832+
tables,
833+
check_present: list = None,
834+
check_absent: list = None,
835+
return_outliers=False,
819836
):
820837
"""
821838
Asserts that the given tables have the same projections.
822839
Smartly selects a node for each given replicated table.
823840
Does not check that all replicas of a table agree.
824-
"""
825-
with When("I get the projections for each table"):
826-
table_projections = {}
827-
for table_name in tables:
828-
node = get_random_node_for_table(table_name=table_name)
829-
wait_for_mutations_to_finish(node=node, command_like="PROJECTION")
830-
table_projections[table_name] = set(
831-
get_projections(node=node, table_name=table_name)
832-
)
833841
834-
with Then("all tables should have the same projections"):
835-
for table1, table2 in combinations(tables, 2):
836-
with By(f"checking {table1} and {table2}", flags=TE):
837-
assert table_projections[table1] == table_projections[table2], error()
838-
839-
if check_present is not None:
840-
with And(f"I check that {check_present} exist"):
841-
for projection_name in check_present:
842-
assert projection_name in table_projections[tables[0]], error()
842+
Args:
843+
tables: List of table names to check.
844+
check_present: List of projection names that should be present in all tables.
845+
check_absent: List of projection names that should not be present in any tables.
846+
return_outliers: If True, return the set of projections that are not present in all tables.
847+
"""
848+
try:
849+
with When("I get the projections for each table"):
850+
table_projections = {}
851+
for table_name in tables:
852+
node = get_random_node_for_table(table_name=table_name)
853+
wait_for_mutations_to_finish(node=node, command_like="PROJECTION")
854+
table_projections[table_name] = set(
855+
get_projections(node=node, table_name=table_name)
856+
)
843857

844-
if check_absent is not None:
845-
with And(f"I check that {check_absent} do not exist"):
846-
for projection_name in check_absent:
847-
assert projection_name not in table_projections[tables[0]], error()
858+
with Then("all tables should have the same projections"):
859+
for table1, table2 in combinations(tables, 2):
860+
with By(f"checking {table1} and {table2}", flags=TE):
861+
assert (
862+
table_projections[table1] == table_projections[table2]
863+
), error()
864+
865+
if check_present is not None:
866+
with And(f"I check that {check_present} exist"):
867+
for projection_name in check_present:
868+
assert projection_name in table_projections[tables[0]], error()
869+
870+
if check_absent is not None:
871+
with And(f"I check that {check_absent} do not exist"):
872+
for projection_name in check_absent:
873+
assert projection_name not in table_projections[tables[0]], error()
874+
finally:
875+
if return_outliers:
876+
return set.union(*table_projections.values()) - set.intersection(
877+
*table_projections.values()
878+
)
879+
return set()
848880

849881

850882
@TestStep(Then)
851-
def check_tables_have_same_indexes(self, tables):
883+
def check_tables_have_same_indexes(self, tables, return_outliers=False):
852884
"""
853885
Asserts that the given tables have the same indexes.
854886
Smartly selects a node for each given replicated table.
855887
Does not check that all replicas of a table agree.
888+
889+
Args:
890+
tables: List of table names to check.
891+
return_outliers: If True, return the set of indexes that are not present in all tables.
856892
"""
857-
with When("I get the indexes for each table"):
858-
table_indexes = {}
859-
for table_name in tables:
860-
node = get_random_node_for_table(table_name=table_name)
861-
table_indexes[table_name] = set(
862-
get_indexes(node=node, table_name=table_name)
863-
)
893+
try:
894+
with When("I get the indexes for each table"):
895+
table_indexes = {}
896+
for table_name in tables:
897+
node = get_random_node_for_table(table_name=table_name)
898+
table_indexes[table_name] = set(
899+
get_indexes(node=node, table_name=table_name)
900+
)
864901

865-
with Then("all tables should have the same indexes"):
866-
for table1, table2 in combinations(tables, 2):
867-
with By(f"checking {table1} and {table2}", flags=TE):
868-
assert table_indexes[table1] == table_indexes[table2], error()
902+
with Then("all tables should have the same indexes"):
903+
for table1, table2 in combinations(tables, 2):
904+
with By(f"checking {table1} and {table2}", flags=TE):
905+
assert table_indexes[table1] == table_indexes[table2], error()
906+
finally:
907+
if return_outliers:
908+
return set.union(*table_indexes.values()) - set.intersection(
909+
*table_indexes.values()
910+
)
911+
return set()
869912

870913

871914
@TestStep(Then)
872915
@Retry(timeout=step_retry_timeout, delay=step_retry_delay)
873-
def check_consistency(self, tables=None, sync_timeout=None):
916+
def check_consistency(
917+
self, tables=None, sync_timeout=None, restore_consistent_structure=False
918+
):
874919
"""
875920
Check that the given tables hold the same amount of data on all nodes where they exist.
876921
Also check that column names match, subsequent part move tests require matching columns.
@@ -933,10 +978,37 @@ def check_consistency(self, tables=None, sync_timeout=None):
933978
# The below check also asserts that all tables have the same structure
934979
# Part move actions require matching structure
935980

936-
with Then("check that table structures are in sync"):
937-
check_tables_have_same_columns(tables=tables)
938-
check_tables_have_same_projections(tables=tables)
939-
check_tables_have_same_indexes(tables=tables)
981+
try:
982+
with Then("check that table structures are in sync", flags=TE):
983+
outlier_columns = check_tables_have_same_columns(
984+
tables=tables, return_outliers=restore_consistent_structure
985+
)
986+
outlier_projections = check_tables_have_same_projections(
987+
tables=tables, return_outliers=restore_consistent_structure
988+
)
989+
outlier_indexes = check_tables_have_same_indexes(
990+
tables=tables, return_outliers=restore_consistent_structure
991+
)
992+
finally:
993+
if restore_consistent_structure and any(
994+
[outlier_columns, outlier_projections, outlier_indexes]
995+
):
996+
with Finally("I clean up any inconsistencies between tables"):
997+
for table_name in tables:
998+
node = get_random_node_for_table(table_name=table_name)
999+
1000+
for projection in outlier_projections:
1001+
node.query(
1002+
f"ALTER TABLE {table_name} DROP PROJECTION IF EXISTS {projection}"
1003+
)
1004+
for index in outlier_indexes:
1005+
node.query(
1006+
f"ALTER TABLE {table_name} DROP INDEX IF EXISTS {index}"
1007+
)
1008+
for column in outlier_columns:
1009+
node.query(
1010+
f"ALTER TABLE {table_name} DROP COLUMN IF EXISTS {column}"
1011+
)
9401012

9411013

9421014
@TestStep

alter/stress/tests/steps.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ def log_failing_mutations(self, nodes=None):
221221

222222
with And("double checking the failed mutations"):
223223
r = node.query(
224-
"SELECT latest_failed_part, table, latest_fail_reason FROM system.mutations WHERE is_done=0 FORMAT JSONCompactColumns"
224+
"SELECT latest_failed_part, table, latest_fail_reason FROM system.mutations WHERE is_done=0 FORMAT JSONCompactColumns",
225+
no_checks=True,
225226
)
226227
for part, table, fail_reason in json.loads(r.output):
227228
if fail_reason == "":
@@ -232,6 +233,7 @@ def log_failing_mutations(self, nodes=None):
232233
)
233234
r = node.query(
234235
f"SELECT * FROM system.parts WHERE name='{part}' and table='{table}' FORMAT Vertical",
236+
no_checks=True,
235237
)
236238
if r.output.strip():
237239
note(f"State of {part}:\n{r.output.strip()}")
@@ -240,6 +242,7 @@ def log_failing_mutations(self, nodes=None):
240242
column = re.search(r"column (.+):", fail_reason).group(1)
241243
r = node.query(
242244
f"SELECT * FROM system.parts_columns WHERE name='{part}' and table='{table}' and column='{column}' FORMAT Vertical",
245+
no_checks=True,
243246
)
244247
if r.output.strip():
245248
note(f"State of {column}:\n{r.output.strip()}")

alter/stress/tests/stress_alter.py

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ def alter_combinations(
129129
insert_keeper_fault_injection_probability=0,
130130
network_impairment=False,
131131
limit_disk_space=False,
132+
enforce_table_structure=None,
133+
kill_stuck_mutations=None,
132134
):
133135
"""
134136
Perform combinations of alter actions, checking that all replicas agree.
@@ -150,6 +152,11 @@ def alter_combinations(
150152
limit_disk_space
151153
), "enable limit_disk_space when using fill_disks to avoid unexpected behavior"
152154

155+
if enforce_table_structure is None:
156+
enforce_table_structure = self.flags & TE
157+
if kill_stuck_mutations is None:
158+
kill_stuck_mutations = self.flags & TE
159+
153160
action_groups = build_action_groups(
154161
actions=actions,
155162
combination_size=combination_size,
@@ -192,7 +199,9 @@ def alter_combinations(
192199
if modify_random_ttl in actions
193200
else None
194201
)
195-
table_settings = "min_bytes_for_wide_part=0" if self.context.wide_parts_only else None
202+
table_settings = (
203+
"min_bytes_for_wide_part=0" if self.context.wide_parts_only else None
204+
)
196205

197206
for i in range(n_tables):
198207
table_name = f"table{i}_{self.context.storage_policy}"
@@ -231,31 +240,50 @@ def alter_combinations(
231240
title += "," + net_mode.name
232241

233242
with Check(title):
234-
if network_impairment:
235-
with Given("a network impairment"):
236-
impaired_network(network_mode=net_mode)
237-
238-
with When("I perform a group of actions"):
239-
for action in chain(background_actions, chosen_actions):
240-
By(
241-
f"I {action.name}",
242-
run=action,
243-
parallel=run_groups_in_parallel,
244-
flags=TE | ERROR_NOT_COUNTED,
245-
)
246-
247-
for table in self.context.table_names:
248-
By(
249-
f"I OPTIMIZE {table}",
250-
test=optimize_random,
251-
parallel=run_optimize_in_parallel,
252-
flags=TE,
253-
)(table_name=table_name)
254-
255-
join()
256-
257-
with Then("I check that the replicas are consistent", flags=TE):
258-
check_consistency()
243+
try:
244+
if network_impairment:
245+
with Given("a network impairment"):
246+
impaired_network(network_mode=net_mode)
247+
248+
with When("I perform a group of actions"):
249+
for action in chain(background_actions, chosen_actions):
250+
By(
251+
f"I {action.name}",
252+
run=action,
253+
parallel=run_groups_in_parallel,
254+
flags=TE | ERROR_NOT_COUNTED,
255+
)
256+
257+
for table in self.context.table_names:
258+
By(
259+
f"I OPTIMIZE {table}",
260+
test=optimize_random,
261+
parallel=run_optimize_in_parallel,
262+
flags=TE,
263+
)(table_name=table_name)
264+
265+
join()
266+
267+
finally:
268+
with Then("I make sure that the replicas are consistent", flags=TE):
269+
if kill_stuck_mutations:
270+
with By("killing any failing mutations"):
271+
for node in self.context.ch_nodes:
272+
node.query(
273+
"SELECT * FROM system.mutations WHERE is_done=0 AND latest_fail_reason != '' FORMAT Vertical",
274+
no_checks=True,
275+
)
276+
r = node.query(
277+
"KILL MUTATION WHERE latest_fail_reason != ''"
278+
)
279+
assert r.output == "", error(
280+
"An erroring mutation was killed"
281+
)
282+
283+
with By("making sure that replicas agree"):
284+
check_consistency(
285+
restore_consistent_structure=enforce_table_structure
286+
)
259287

260288
note(f"Average time per test combination {(time.time()-t)/(i+1):.1f}s")
261289

@@ -429,7 +457,7 @@ def feature(self):
429457
# https://github.com/ClickHouse/ClickHouse/issues/62459
430458
self.context.disallow_move_partition_to_self = True
431459

432-
#https://github.com/ClickHouse/ClickHouse/issues/63545#issuecomment-2105013462
460+
# https://github.com/ClickHouse/ClickHouse/issues/63545#issuecomment-2105013462
433461
self.context.wide_parts_only = True
434462

435463
with Given("I have S3 disks configured"):

0 commit comments

Comments
 (0)