Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Oct 12, 2023
1 parent 22a8740 commit 07d17e0
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 16 deletions.
119 changes: 112 additions & 7 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
from materialize.parallel_workload.database import (
MAX_CLUSTER_REPLICAS,
MAX_CLUSTERS,
MAX_KAFKA_SINKS,
MAX_KAFKA_SOURCES,
MAX_POSTGRES_SOURCES,
MAX_ROLES,
MAX_ROWS,
MAX_SOURCES,
MAX_TABLES,
MAX_VIEWS,
MAX_WEBHOOK_SOURCES,
Cluster,
ClusterReplica,
Database,
DBObject,
KafkaSink,
KafkaSource,
PostgresSource,
Role,
Table,
View,
Expand Down Expand Up @@ -661,15 +667,15 @@ def run(self, exe: Executor) -> None:
class CreateWebhookSourceAction(Action):
def run(self, exe: Executor) -> None:
with self.db.lock:
if len(self.db.sources) > MAX_SOURCES:
if len(self.db.webhook_sources) > MAX_WEBHOOK_SOURCES:
return
source_id = self.db.source_id
self.db.source_id += 1
potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1]
cluster = self.rng.choice(potential_clusters)
source = WebhookSource(source_id, cluster, self.rng)
source.create(exe)
self.db.sources.append(source)
self.db.webhook_sources.append(source)


class DropWebhookSourceAction(Action):
Expand All @@ -680,13 +686,106 @@ def errors_to_ignore(self) -> list[str]:

def run(self, exe: Executor) -> None:
with self.db.lock:
if not self.db.sources:
if not self.db.webhook_sources:
return
source_id = self.rng.randrange(len(self.db.webhook_sources))
source = self.db.webhook_sources[source_id]
query = f"DROP SOURCE {source}"
exe.execute(query)
del self.db.webhook_sources[source_id]


class CreateKafkaSourceAction(Action):
def run(self, exe: Executor) -> None:
with self.db.lock:
if len(self.db.kafka_sources) > MAX_KAFKA_SOURCES:
return
source_id = self.db.kafka_source_id
self.db.kafka_source_id += 1
potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1]
cluster = self.rng.choice(potential_clusters)
source = KafkaSource(source_id, cluster, self.rng)
source.create(exe)
self.db.kafka_sources.append(source)


class DropKafkaSourceAction(Action):
def errors_to_ignore(self) -> list[str]:
return [
"still depended upon by",
] + super().errors_to_ignore()

def run(self, exe: Executor) -> None:
with self.db.lock:
if not self.db.kafka_sources:
return
source_id = self.rng.randrange(len(self.db.sources))
source = self.db.sources[source_id]
source_id = self.rng.randrange(len(self.db.kafka_sources))
source = self.db.kafka_sources[source_id]
query = f"DROP SOURCE {source}"
exe.execute(query)
del self.db.sources[source_id]
del self.db.kafka_sources[source_id]


class CreatePostgresSourceAction(Action):
def run(self, exe: Executor) -> None:
with self.db.lock:
if len(self.db.postgres_sources) > MAX_POSTGRES_SOURCES:
return
source_id = self.db.postgres_source_id
self.db.postgres_source_id += 1
potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1]
cluster = self.rng.choice(potential_clusters)
source = PostgresSource(source_id, cluster, self.rng)
source.create(exe)
self.db.postgres_sources.append(source)


class DropPostgresSourceAction(Action):
def errors_to_ignore(self) -> list[str]:
return [
"still depended upon by",
] + super().errors_to_ignore()

def run(self, exe: Executor) -> None:
with self.db.lock:
if not self.db.postgres_sources:
return
source_id = self.rng.randrange(len(self.db.postgres_sources))
source = self.db.postgres_sources[source_id]
query = f"DROP SOURCE {source}"
exe.execute(query)
del self.db.postgres_sources[source_id]


class CreateKafkaSinkAction(Action):
def run(self, exe: Executor) -> None:
with self.db.lock:
if len(self.db.kafka_sinks) > MAX_KAFKA_SINKS:
return
sink_id = self.db.kafka_sink_id
self.db.kafka_sink_id += 1
potential_clusters = [c for c in self.db.clusters if len(c.replicas) == 1]
cluster = self.rng.choice(potential_clusters)
sink = KafkaSink(sink_id, cluster, self.rng)
sink.create(exe)
self.db.kafka_sinks.append(sink)


class DropKafkaSinkAction(Action):
def errors_to_ignore(self) -> list[str]:
return [
"still depended upon by",
] + super().errors_to_ignore()

def run(self, exe: Executor) -> None:
with self.db.lock:
if not self.db.kafka_sinks:
return
sink_id = self.rng.randrange(len(self.db.kafka_sinks))
sink = self.db.kafka_sinks[sink_id]
query = f"DROP SINK {sink}"
exe.execute(query)
del self.db.kafka_sinks[sink_id]


class HttpPostAction(Action):
Expand Down Expand Up @@ -790,6 +889,12 @@ def __init__(
(SetClusterAction, 1),
(CreateWebhookSourceAction, 2),
(DropWebhookSourceAction, 1),
(CreateKafkaSinkAction, 1),
(DropKafkaSinkAction, 1),
(CreateKafkaSourceAction, 1),
(DropKafkaSourceAction, 1),
(CreatePostgresSourceAction, 1),
(DropPostgresSourceAction, 1),
(GrantPrivilegesAction, 2),
(RevokePrivilegesAction, 1),
(ReconnectAction, 1),
Expand Down
159 changes: 151 additions & 8 deletions misc/python/materialize/parallel_workload/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@
MAX_TABLES = 100
MAX_VIEWS = 100
MAX_ROLES = 100
MAX_SOURCES = 20
MAX_WEBHOOK_SOURCES = 20
MAX_KAFKA_SOURCES = 20
MAX_POSTGRES_SOURCES = 20
MAX_KAFKA_SINKS = 20
MAX_INCLUDE_HEADERS = 5

MAX_INITIAL_CLUSTERS = 2
MAX_INITIAL_TABLES = 10
MAX_INITIAL_VIEWS = 10
MAX_INITIAL_ROLES = 3
MAX_INITIAL_SOURCES = 3
MAX_INITIAL_WEBHOOK_SOURCES = 3
MAX_INITIAL_KAFKA_SOURCES = 3
MAX_INITIAL_POSTGRES_SOURCES = 3
MAX_INITIAL_KAFKA_SINKS = 3


class BodyFormat(Enum):
Expand Down Expand Up @@ -277,6 +283,93 @@ def create(self, exe: Executor) -> None:
exe.execute(query)


class KafkaSource(DBObject):
source_id: int
rename: int
cluster: "Cluster"
topic: str

def __init__(
self, source_id: int, cluster: "Cluster", topic: str, rng: random.Random
):
self.source_id = source_id
self.cluster = cluster
self.topic = topic
self.format = rng.choice(
[
"AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn",
"PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn",
"REGEX '.*'",
"TEXT",
"BYTES",
]
)
# TODO: UPSERT
self.envelope = rng.choice(["DEBEZIUM", "NONE"])
self.rename = 0

def __str__(self) -> str:
if self.rename:
return f"ks{self.source_id}_{self.rename}"
return f"ks{self.source_id}"

def create(self, exe: Executor) -> None:
query = f"CREATE SOURCE {self} IN CLUSTER {self.cluster} FROM KAFKA CONNECTION kafka_conn (TOPIC {self.topic}) FORMAT {self.format} ENVELOPE {self.envelope}"
exe.execute(query)


class KafkaSink(DBObject):
sink_id: int
rename: int
cluster: "Cluster"

def __init__(
self,
sink_id: int,
cluster: "Cluster",
base_object: DBObject,
rng: random.Random,
):
self.sink_id = sink_id
self.cluster = cluster
self.base_object = base_object
self.rename = 0

def __str__(self) -> str:
if self.rename:
return f"sink{self.sink_id}_{self.rename}"
return f"sink{self.sink_id}"

def create(self, exe: Executor) -> None:
topic = f"sink_topic{self.sink_id}"
query = f"CREATE SINK {self} IN CLUSTER {self.cluster} FROM {self.base_object} INTO KAFKA CONNECTION kafka_conn TOPIC {topic} FORMAT {format} ENVELOPE {self.envelope}"
exe.execute(query)


class PostgresSource(DBObject):
source_id: int
rename: int
cluster: "Cluster"
table: str

def __init__(
self, source_id: int, cluster: "Cluster", table: str, rng: random.Random
):
self.source_id = source_id
self.cluster = cluster
self.table = table
self.rename = 0

def __str__(self) -> str:
if self.rename:
return f"pgs{self.source_id}_{self.rename}"
return f"pgs{self.source_id}"

def create(self, exe: Executor) -> None:
query = f"CREATE SOURCE {self} IN CLUSTER {self.cluster} FROM POSTGRES CONNECTION postgres_conn ( PUBLICATION 'publication') FOR TABLES ({self.table})"
exe.execute(query)


class Role:
role_id: int

Expand Down Expand Up @@ -368,8 +461,16 @@ class Database:
clusters: list[Cluster]
cluster_id: int
indexes: set[str]
sources: list[WebhookSource]
source_id: int
webhook_sources: list[WebhookSource]
webhook_source_id: int
kafka_sources: list[KafkaSource]
kafka_source_id: int
postgres_sources: list[PostgresSource]
postgres_source_id: int
postgres_tables: list[str]
kafka_sinks: list[KafkaSink]
kafka_sink_id: int
kafka_topics: list[str]
lock: threading.Lock

def __init__(
Expand Down Expand Up @@ -419,20 +520,50 @@ def __init__(
]
self.cluster_id = len(self.clusters)
self.indexes = set()
self.sources = [
self.webhook_sources = [
WebhookSource(i, rng.choice(self.clusters), rng)
for i in range(rng.randint(0, MAX_INITIAL_SOURCES))
for i in range(rng.randint(0, MAX_INITIAL_WEBHOOK_SOURCES))
]
self.webhook_source_id = len(self.webhook_sources)
self.kafka_topics = ["topic1", "topic2"]
self.kafka_sources = [
KafkaSource(
i, rng.choice(self.clusters), rng.choice(self.kafka_topics), rng
)
for i in range(rng.randint(0, MAX_INITIAL_KAFKA_SOURCES))
]
self.kafka_source_id = len(self.kafka_sources)
self.postgres_tables = ["table1", "table2"]
self.postgres_sources = [
PostgresSource(
i, rng.choice(self.clusters), rng.choice(self.postgres_tables), rng
)
for i in range(rng.randint(0, MAX_INITIAL_POSTGRES_SOURCES))
]
self.postgres_source_id = len(self.postgres_sources)
self.kafka_sinks = [
KafkaSink(i, rng.choice(self.clusters), rng.choice(self.db_objects()), rng)
for i in range(rng.randint(0, MAX_INITIAL_KAFKA_SINKS))
]
self.source_id = len(self.sources)
self.kafka_sink_id = len(self.kafka_sinks)
self.lock = threading.Lock()

def __str__(self) -> str:
return f"db_pw_{self.seed}"

def db_objects(self) -> list[DBObject]:
return (
self.tables
+ self.views
+ self.kafka_sources
+ self.postgres_sources
+ self.webhook_sources
)

def __iter__(self):
"""Returns all relations"""
return (
self.clusters + self.tables + self.views + self.roles + self.sources
self.clusters + self.roles + self.db_objects()
).__iter__()

def drop(self, exe: Executor) -> None:
Expand All @@ -451,5 +582,17 @@ def create_relations(self, exe: Executor) -> None:
for row in exe.cur.fetchall():
exe.execute(f"DROP ROLE {row[0]}")

exe.execute("CREATE CONNECTION kafka_conn FOR KAFKA BROKER 'kafka:9092'")
exe.execute(
"CREATE CONNECTION csr_conn FOR CONFLUENT SCHEMA REGISTRY URL 'http://schema-registry:8081'"
)

# Should we actually just hook up data-ingest? Make it reusable from parallel-workload
# Yes, seems better, do that on Thursday
pg_conn =

exe.execute("CREATE SECRET pgpass AS 'postgres'")
exe.execute("CREATE CONNECTION postgres_conn FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres, PASSWORD SECRET pgpass")

for relation in self:
relation.create(exe)
Loading

0 comments on commit 07d17e0

Please sign in to comment.