From b071fac858f34f2ab7959fcc8c4404f9e3a43d4a Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 10 Oct 2023 21:43:05 +0000 Subject: [PATCH] WIP --- .../materialize/parallel_workload/action.py | 114 ++++++++++++++++-- .../materialize/parallel_workload/database.py | 111 +++++++++++++++-- 2 files changed, 210 insertions(+), 15 deletions(-) diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 5378a75bd350..673f13943fbf 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -21,7 +21,8 @@ MAX_CLUSTERS, MAX_ROLES, MAX_ROWS, - MAX_SOURCES, + MAX_WEBHOOK_SOURCES, + MAX_KAFKA_SOURCES, MAX_TABLES, MAX_VIEWS, Cluster, @@ -661,7 +662,7 @@ def run(self, exe: Executor) -> None: class CreateWebhookSourceAction(Action): def run(self, exe: Executor) -> None: with self.db.lock: - if len(self.db.sources) > MAX_SOURCES: + if len(self.db.webhook_sources) > MAX_WEBHOOK_SOURCES: return source_id = self.db.source_id self.db.source_id += 1 @@ -669,7 +670,7 @@ def run(self, exe: Executor) -> None: cluster = self.rng.choice(potential_clusters) source = WebhookSource(source_id, cluster, self.rng) source.create(exe) - self.db.sources.append(source) + self.db.webhook_sources.append(source) class DropWebhookSourceAction(Action): @@ -680,13 +681,106 @@ def errors_to_ignore(self) -> list[str]: def run(self, exe: Executor) -> None: with self.db.lock: - if not self.db.sources: + if not self.db.webhook_sources: + return + source_id = self.rng.randrange(len(self.db.webhook_sources)) + source = self.db.webhook_sources[source_id] + query = f"DROP SOURCE {source}" + exe.execute(query) + del self.db.webhook_sources[source_id] + + +class CreateKafkaSourceAction(Action): + def run(self, exe: Executor) -> None: + with self.db.lock: + if len(self.db.kafka_sources) > MAX_KAFKA_SOURCES: + return + source_id = self.db.kafka_source_id + self.db.kafka_source_id += 1 + potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1] + cluster = self.rng.choice(potential_clusters) + source = KafkaSource(source_id, cluster, self.rng) + source.create(exe) + self.db.kafka_sources.append(source) + + +class DropKafkaSourceAction(Action): + def errors_to_ignore(self) -> list[str]: + return [ + "still depended upon by", + ] + super().errors_to_ignore() + + def run(self, exe: Executor) -> None: + with self.db.lock: + if not self.db.kafka_sources: return - source_id = self.rng.randrange(len(self.db.sources)) - source = self.db.sources[source_id] + source_id = self.rng.randrange(len(self.db.kafka_sources)) + source = self.db.kafka_sources[source_id] query = f"DROP SOURCE {source}" exe.execute(query) - del self.db.sources[source_id] + del self.db.kafka_sources[source_id] + + +class CreatePostgresSourceAction(Action): + def run(self, exe: Executor) -> None: + with self.db.lock: + if len(self.db.postgres_sources) > MAX_POSTGRES_SOURCES: + return + source_id = self.db.postgres_source_id + self.db.postgres_source_id += 1 + potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1] + cluster = self.rng.choice(potential_clusters) + source = PostgresSource(source_id, cluster, self.rng) + source.create(exe) + self.db.postgres_sources.append(source) + + +class DropPostgresSourceAction(Action): + def errors_to_ignore(self) -> list[str]: + return [ + "still depended upon by", + ] + super().errors_to_ignore() + + def run(self, exe: Executor) -> None: + with self.db.lock: + if not self.db.postgres_sources: + return + source_id = self.rng.randrange(len(self.db.postgres_sources)) + source = self.db.postgres_sources[source_id] + query = f"DROP SOURCE {source}" + exe.execute(query) + del self.db.postgres_sources[source_id] + + +class CreateKafkaSinkAction(Action): + def run(self, exe: Executor) -> None: + with self.db.lock: + if len(self.db.kafka_sinks) > MAX_KAFKA_SINKS: + return + sink_id = self.db.kafka_sink_id + self.db.kafka_sink_id += 1 + potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1] + cluster = self.rng.choice(potential_clusters) + sink = KafkaSink(sink_id, cluster, self.rng) + sink.create(exe) + self.db.kafka_sinks.append(sink) + + +class DropKafkaSinkAction(Action): + def errors_to_ignore(self) -> list[str]: + return [ + "still depended upon by", + ] + super().errors_to_ignore() + + def run(self, exe: Executor) -> None: + with self.db.lock: + if not self.db.kafka_sinks: + return + sink_id = self.rng.randrange(len(self.db.kafka_sinks)) + sink = self.db.kafka_sinks[sink_id] + query = f"DROP SINK {sink}" + exe.execute(query) + del self.db.kafka_sinks[sink_id] class HttpPostAction(Action): @@ -790,6 +884,12 @@ def __init__( (SetClusterAction, 1), (CreateWebhookSourceAction, 2), (DropWebhookSourceAction, 1), + (CreateKafkaSinkAction, 1), + (DropKafkaSinkAction, 1), + (CreateKafkaSourceAction, 1), + (DropKafkaSourceAction, 1), + (CreatePostgresSourceAction, 1), + (DropPostgresSourceAction, 1), (GrantPrivilegesAction, 2), (RevokePrivilegesAction, 1), (ReconnectAction, 1), diff --git a/misc/python/materialize/parallel_workload/database.py b/misc/python/materialize/parallel_workload/database.py index 1fa285f1e9e3..d8b47ec7a025 100644 --- a/misc/python/materialize/parallel_workload/database.py +++ b/misc/python/materialize/parallel_workload/database.py @@ -33,14 +33,20 @@ MAX_TABLES = 100 MAX_VIEWS = 100 MAX_ROLES = 100 -MAX_SOURCES = 20 +MAX_WEBHOOK_SOURCES = 20 +MAX_KAFKA_SOURCES = 20 +MAX_POSTGRES_SOURCES = 20 +MAX_KAFKA_SINKS = 20 MAX_INCLUDE_HEADERS = 5 MAX_INITIAL_CLUSTERS = 2 MAX_INITIAL_TABLES = 10 MAX_INITIAL_VIEWS = 10 MAX_INITIAL_ROLES = 3 -MAX_INITIAL_SOURCES = 3 +MAX_INITIAL_WEBHOOK_SOURCES = 3 +MAX_INITIAL_KAFKA_SOURCES = 3 +MAX_INITIAL_POSTGRES_SOURCES = 3 +MAX_INITIAL_KAFKA_SINKS = 3 class BodyFormat(Enum): @@ -277,6 +283,72 @@ def create(self, exe: Executor) -> None: exe.execute(query) +class KafkaSource(DBObject): + source_id: int + rename: int + cluster: "Cluster" + topic: str + + def __init__(self, source_id: int, cluster: "Cluster", topic: str, rng: random.Random): + self.source_id = source_id + self.cluster = cluster + self.topic = topic + self.format = rng.choice(["AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn", "PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn", "REGEX '.*'", "TEXT", "BYTES"]) + self.envelope = rng.choice(["DEBEZIUM", "UPSERT", "NONE"]) + self.rename = 0 + + def __str__(self) -> str: + if self.rename: + return f"ks{self.source_id}_{self.rename}" + return f"ks{self.source_id}" + + def create(self, exe: Executor) -> None: + query = f"CREATE SOURCE {self} IN CLUSTER {self.cluster} FROM KAFKA CONNECTION kafka_conn TOPIC {self.topic} FORMAT {self.format} ENVELOPE {self.envelope}" + exe.execute(query) + + +class KafkaSink(DBObject): + sink_id: int + rename: int + cluster: "Cluster" + + def __init__(self, sink_id: int, cluster: "Cluster", rng: random.Random): + self.sink_id = sink_id + self.cluster = cluster + self.rename = 0 + + def __str__(self) -> str: + if self.rename: + return f"sink{self.sink_id}_{self.rename}" + return f"sink{self.sink_id}" + + def create(self, exe: Executor) -> None: + query = f"CREATE SINK {self} IN CLUSTER {self.cluster} FROM {table_or_mv_or_view} INTO KAFKA CONNECTION {kafka_conn} TOPIC {topic} FORMAT {format} ENVELOPE {self.envelope}" + exe.execute(query) + + +class PostgresSource(DBObject): + source_id: int + rename: int + cluster: "Cluster" + table: str + + def __init__(self, source_id: int, cluster: "Cluster", table: str, rng: random.Random): + self.source_id = source_id + self.cluster = cluster + self.table = table + self.rename = 0 + + def __str__(self) -> str: + if self.rename: + return f"pgs{self.source_id}_{self.rename}" + return f"pgs{self.source_id}" + + def create(self, exe: Executor) -> None: + query = f"CREATE SOURCE {self} IN CLUSTER {self.cluster} FROM POSTGRES CONNECTION pg_conn ( PUBLICATION 'publication') FOR TABLES {self.table}" + exe.execute(query) + + class Role: role_id: int @@ -368,8 +440,16 @@ class Database: clusters: list[Cluster] cluster_id: int indexes: set[str] - sources: list[WebhookSource] - source_id: int + webhook_sources: list[WebhookSource] + webhook_source_id: int + kafka_sources: list[KafkaSource] + kafka_source_id: int + postgres_sources: list[PostgresSource] + postgres_source_id: int + postgres_tables: list[str] + kafka_sinks: list[KafkaSink] + kafka_sink_id: int + kafka_topics: list[str] lock: threading.Lock def __init__( @@ -419,11 +499,26 @@ def __init__( ] self.cluster_id = len(self.clusters) self.indexes = set() - self.sources = [ + self.webhook_sources = [ WebhookSource(i, rng.choice(self.clusters), rng) - for i in range(rng.randint(0, MAX_INITIAL_SOURCES)) + for i in range(rng.randint(0, MAX_INITIAL_WEBHOOK_SOURCES)) + ] + self.webhook_source_id = len(self.webhook_sources) + self.kafka_sources = [ + KafkaSource(i, rng.choice(self.clusters), rng.choice(self.kafka_topics), rng) + for i in range(rng.randint(0, MAX_INITIAL_KAFKA_SOURCES)) + ] + self.kafka_source_id = len(self.kafka_sources) + self.postgres_sources = [ + PostgresSource(i, rng.choice(self.clusters), rng.choice(self.postgres_tables), rng) + for i in range(rng.randint(0, MAX_INITIAL_POSTGRES_SOURCES)) + ] + self.postgres_source_id = len(self.postgres_sources) + self.kafka_sinks = [ + KafkaSink(i, rng.choice(self.clusters), rng) + for i in range(rng.randint(0, MAX_INITIAL_KAFKA_SINKS)) ] - self.source_id = len(self.sources) + self.kafka_sink_id = len(self.kafka_sinks) self.lock = threading.Lock() def __str__(self) -> str: @@ -432,7 +527,7 @@ def __str__(self) -> str: def __iter__(self): """Returns all relations""" return ( - self.clusters + self.tables + self.views + self.roles + self.sources + self.clusters + self.tables + self.views + self.roles + self.webhook_sources + self.kafka_sources + self.postgres_sources ).__iter__() def drop(self, exe: Executor) -> None: