Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Oct 10, 2023
1 parent 83f3437 commit b071fac
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 15 deletions.
114 changes: 107 additions & 7 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
MAX_CLUSTERS,
MAX_ROLES,
MAX_ROWS,
MAX_SOURCES,
MAX_WEBHOOK_SOURCES,
MAX_KAFKA_SOURCES,
MAX_TABLES,
MAX_VIEWS,
Cluster,
Expand Down Expand Up @@ -661,15 +662,15 @@ def run(self, exe: Executor) -> None:
class CreateWebhookSourceAction(Action):
def run(self, exe: Executor) -> None:
with self.db.lock:
if len(self.db.sources) > MAX_SOURCES:
if len(self.db.webhook_sources) > MAX_WEBHOOK_SOURCES:
return
source_id = self.db.source_id
self.db.source_id += 1
potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1]
cluster = self.rng.choice(potential_clusters)
source = WebhookSource(source_id, cluster, self.rng)
source.create(exe)
self.db.sources.append(source)
self.db.webhook_sources.append(source)


class DropWebhookSourceAction(Action):
Expand All @@ -680,13 +681,106 @@ def errors_to_ignore(self) -> list[str]:

def run(self, exe: Executor) -> None:
with self.db.lock:
if not self.db.sources:
if not self.db.webhook_sources:
return
source_id = self.rng.randrange(len(self.db.webhook_sources))
source = self.db.webhook_sources[source_id]
query = f"DROP SOURCE {source}"
exe.execute(query)
del self.db.webhook_sources[source_id]


class CreateKafkaSourceAction(Action):
def run(self, exe: Executor) -> None:
with self.db.lock:
if len(self.db.kafka_sources) > MAX_KAFKA_SOURCES:
return
source_id = self.db.kafka_source_id
self.db.kafka_source_id += 1
potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1]
cluster = self.rng.choice(potential_clusters)
source = KafkaSource(source_id, cluster, self.rng)
source.create(exe)
self.db.kafka_sources.append(source)


class DropKafkaSourceAction(Action):
def errors_to_ignore(self) -> list[str]:
return [
"still depended upon by",
] + super().errors_to_ignore()

def run(self, exe: Executor) -> None:
with self.db.lock:
if not self.db.kafka_sources:
return
source_id = self.rng.randrange(len(self.db.sources))
source = self.db.sources[source_id]
source_id = self.rng.randrange(len(self.db.kafka_sources))
source = self.db.kafka_sources[source_id]
query = f"DROP SOURCE {source}"
exe.execute(query)
del self.db.sources[source_id]
del self.db.kafka_sources[source_id]


class CreatePostgresSourceAction(Action):
def run(self, exe: Executor) -> None:
with self.db.lock:
if len(self.db.postgres_sources) > MAX_POSTGRES_SOURCES:
return
source_id = self.db.postgres_source_id
self.db.postgres_source_id += 1
potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1]
cluster = self.rng.choice(potential_clusters)
source = PostgresSource(source_id, cluster, self.rng)
source.create(exe)
self.db.postgres_sources.append(source)


class DropPostgresSourceAction(Action):
def errors_to_ignore(self) -> list[str]:
return [
"still depended upon by",
] + super().errors_to_ignore()

def run(self, exe: Executor) -> None:
with self.db.lock:
if not self.db.postgres_sources:
return
source_id = self.rng.randrange(len(self.db.postgres_sources))
source = self.db.postgres_sources[source_id]
query = f"DROP SOURCE {source}"
exe.execute(query)
del self.db.postgres_sources[source_id]


class CreateKafkaSinkAction(Action):
def run(self, exe: Executor) -> None:
with self.db.lock:
if len(self.db.kafka_sinks) > MAX_KAFKA_SINKS:
return
sink_id = self.db.kafka_sink_id
self.db.kafka_sink_id += 1
potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1]
cluster = self.rng.choice(potential_clusters)
sink = KafkaSink(sink_id, cluster, self.rng)
sink.create(exe)
self.db.kafka_sinks.append(sink)


class DropKafkaSinkAction(Action):
def errors_to_ignore(self) -> list[str]:
return [
"still depended upon by",
] + super().errors_to_ignore()

def run(self, exe: Executor) -> None:
with self.db.lock:
if not self.db.kafka_sinks:
return
sink_id = self.rng.randrange(len(self.db.kafka_sinks))
sink = self.db.kafka_sinks[sink_id]
query = f"DROP SINK {sink}"
exe.execute(query)
del self.db.kafka_sinks[sink_id]


class HttpPostAction(Action):
Expand Down Expand Up @@ -790,6 +884,12 @@ def __init__(
(SetClusterAction, 1),
(CreateWebhookSourceAction, 2),
(DropWebhookSourceAction, 1),
(CreateKafkaSinkAction, 1),
(DropKafkaSinkAction, 1),
(CreateKafkaSourceAction, 1),
(DropKafkaSourceAction, 1),
(CreatePostgresSourceAction, 1),
(DropPostgresSourceAction, 1),
(GrantPrivilegesAction, 2),
(RevokePrivilegesAction, 1),
(ReconnectAction, 1),
Expand Down
111 changes: 103 additions & 8 deletions misc/python/materialize/parallel_workload/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@
MAX_TABLES = 100
MAX_VIEWS = 100
MAX_ROLES = 100
MAX_SOURCES = 20
MAX_WEBHOOK_SOURCES = 20
MAX_KAFKA_SOURCES = 20
MAX_POSTGRES_SOURCES = 20
MAX_KAFKA_SINKS = 20
MAX_INCLUDE_HEADERS = 5

MAX_INITIAL_CLUSTERS = 2
MAX_INITIAL_TABLES = 10
MAX_INITIAL_VIEWS = 10
MAX_INITIAL_ROLES = 3
MAX_INITIAL_SOURCES = 3
MAX_INITIAL_WEBHOOK_SOURCES = 3
MAX_INITIAL_KAFKA_SOURCES = 3
MAX_INITIAL_POSTGRES_SOURCES = 3
MAX_INITIAL_KAFKA_SINKS = 3


class BodyFormat(Enum):
Expand Down Expand Up @@ -277,6 +283,72 @@ def create(self, exe: Executor) -> None:
exe.execute(query)


class KafkaSource(DBObject):
source_id: int
rename: int
cluster: "Cluster"
topic: str

def __init__(self, source_id: int, cluster: "Cluster", topic: str, rng: random.Random):
self.source_id = source_id
self.cluster = cluster
self.topic = topic
self.format = rng.choice(["AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn", "PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn", "REGEX '.*'", "TEXT", "BYTES"])
self.envelope = rng.choice(["DEBEZIUM", "UPSERT", "NONE"])
self.rename = 0

def __str__(self) -> str:
if self.rename:
return f"ks{self.source_id}_{self.rename}"
return f"ks{self.source_id}"

def create(self, exe: Executor) -> None:
query = f"CREATE SOURCE {self} IN CLUSTER {self.cluster} FROM KAFKA CONNECTION kafka_conn TOPIC {self.topic} FORMAT {self.format} ENVELOPE {self.envelope}"
exe.execute(query)


class KafkaSink(DBObject):
sink_id: int
rename: int
cluster: "Cluster"

def __init__(self, sink_id: int, cluster: "Cluster", rng: random.Random):
self.sink_id = sink_id
self.cluster = cluster
self.rename = 0

def __str__(self) -> str:
if self.rename:
return f"sink{self.sink_id}_{self.rename}"
return f"sink{self.sink_id}"

def create(self, exe: Executor) -> None:
query = f"CREATE SINK {self} IN CLUSTER {self.cluster} FROM {table_or_mv_or_view} INTO KAFKA CONNECTION {kafka_conn} TOPIC {topic} FORMAT {format} ENVELOPE {self.envelope}"
exe.execute(query)


class PostgresSource(DBObject):
source_id: int
rename: int
cluster: "Cluster"
table: str

def __init__(self, source_id: int, cluster: "Cluster", table: str, rng: random.Random):
self.source_id = source_id
self.cluster = cluster
self.table = table
self.rename = 0

def __str__(self) -> str:
if self.rename:
return f"pgs{self.source_id}_{self.rename}"
return f"pgs{self.source_id}"

def create(self, exe: Executor) -> None:
query = f"CREATE SOURCE {self} IN CLUSTER {self.cluster} FROM POSTGRES CONNECTION pg_conn ( PUBLICATION 'publication') FOR TABLES {self.table}"
exe.execute(query)


class Role:
role_id: int

Expand Down Expand Up @@ -368,8 +440,16 @@ class Database:
clusters: list[Cluster]
cluster_id: int
indexes: set[str]
sources: list[WebhookSource]
source_id: int
webhook_sources: list[WebhookSource]
webhook_source_id: int
kafka_sources: list[KafkaSource]
kafka_source_id: int
postgres_sources: list[PostgresSource]
postgres_source_id: int
postgres_tables: list[str]
kafka_sinks: list[KafkaSink]
kafka_sink_id: int
kafka_topics: list[str]
lock: threading.Lock

def __init__(
Expand Down Expand Up @@ -419,11 +499,26 @@ def __init__(
]
self.cluster_id = len(self.clusters)
self.indexes = set()
self.sources = [
self.webhook_sources = [
WebhookSource(i, rng.choice(self.clusters), rng)
for i in range(rng.randint(0, MAX_INITIAL_SOURCES))
for i in range(rng.randint(0, MAX_INITIAL_WEBHOOK_SOURCES))
]
self.webhook_source_id = len(self.webhook_sources)
self.kafka_sources = [
KafkaSource(i, rng.choice(self.clusters), rng.choice(self.kafka_topics), rng)
for i in range(rng.randint(0, MAX_INITIAL_KAFKA_SOURCES))
]
self.kafka_source_id = len(self.kafka_sources)
self.postgres_sources = [
PostgresSource(i, rng.choice(self.clusters), rng.choice(self.postgres_tables), rng)
for i in range(rng.randint(0, MAX_INITIAL_POSTGRES_SOURCES))
]
self.postgres_source_id = len(self.postgres_sources)
self.kafka_sinks = [
KafkaSink(i, rng.choice(self.clusters), rng)
for i in range(rng.randint(0, MAX_INITIAL_KAFKA_SINKS))
]
self.source_id = len(self.sources)
self.kafka_sink_id = len(self.kafka_sinks)
self.lock = threading.Lock()

def __str__(self) -> str:
Expand All @@ -432,7 +527,7 @@ def __str__(self) -> str:
def __iter__(self):
"""Returns all relations"""
return (
self.clusters + self.tables + self.views + self.roles + self.sources
self.clusters + self.tables + self.views + self.roles + self.webhook_sources + self.kafka_sources + self.postgres_sources
).__iter__()

def drop(self, exe: Executor) -> None:
Expand Down

0 comments on commit b071fac

Please sign in to comment.