Skip to content

Commit

Permalink
feature-benchmark: Increase scale of fast scenarios, reduce retries
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Nov 22, 2024
1 parent f900de4 commit f471151
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 6 deletions.
108 changes: 104 additions & 4 deletions misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ def benchmark(self) -> MeasurementSource:
class FastPathFilterIndex(FastPath):
"""Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return"""

SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
self.table_ten(),
Expand Down Expand Up @@ -149,6 +154,11 @@ def benchmark(self) -> MeasurementSource:
class FastPathOrderByLimit(FastPath):
"""Benchmark the case SELECT * FROM materialized_view ORDER BY <key> LIMIT <i>"""

SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
self.table_ten(),
Expand Down Expand Up @@ -180,6 +190,11 @@ def benchmark(self) -> MeasurementSource:
class FastPathLimit(FastPath):
"""Benchmark the case SELECT * FROM source LIMIT <i> , optimized by materialize#21615"""

SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
TdAction(
Expand Down Expand Up @@ -213,6 +228,11 @@ class DML(Scenario):
class Insert(DML):
"""Measure the time it takes for an INSERT statement to return."""

SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> Action:
return self.table_ten()

Expand Down Expand Up @@ -293,7 +313,10 @@ def benchmark(self) -> MeasurementSource:
class InsertMultiRow(DML):
"""Measure the time it takes for a single multi-row INSERT statement to return."""

SCALE = 4 # FATAL: request larger than 2.0 MB
SCALE = 5 # FATAL: request larger than 2.0 MB

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def benchmark(self) -> MeasurementSource:
values = ", ".join(f"({i})" for i in range(0, self.n()))
Expand All @@ -314,11 +337,17 @@ def benchmark(self) -> MeasurementSource:
class Update(DML):
"""Measure the time it takes for an UPDATE statement to return to client"""

SCALE = 6 # TODO: Increase scale when database-issues#8766 is fixed

def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET max_result_size = 17179869184;
> CREATE TABLE t1 (f1 BIGINT);
> CREATE DEFAULT INDEX ON t1;
Expand Down Expand Up @@ -424,6 +453,11 @@ class InsertAndSelect(DML):
dataflow to be completely caught up.
"""

SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> Action:
return self.table_ten()

Expand Down Expand Up @@ -489,6 +523,11 @@ def benchmark(self) -> MeasurementSource:


class CountDistinct(Dataflow):
SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
self.view_ten(),
Expand Down Expand Up @@ -517,6 +556,11 @@ def benchmark(self) -> MeasurementSource:


class MinMax(Dataflow):
SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
self.view_ten(),
Expand Down Expand Up @@ -641,6 +685,11 @@ def benchmark(self) -> MeasurementSource:


class CrossJoin(Dataflow):
SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> Action:
return self.view_ten()

Expand Down Expand Up @@ -778,11 +827,20 @@ class CreateIndex(Dataflow):
it takes for a SELECT query that would use the index to return rows.
"""

SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET max_result_size = 17179869184;
> CREATE TABLE t1 (f1 INTEGER, f2 INTEGER);
> INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
Expand Down Expand Up @@ -813,6 +871,11 @@ def benchmark(self) -> MeasurementSource:


class DeltaJoin(Dataflow):
SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
self.view_ten(),
Expand Down Expand Up @@ -843,6 +906,11 @@ class DeltaJoinMaintained(Dataflow):
empty frontier is not emitted, in contrast with one-shot SELECT processing based on data
initialized as a constant view"""

SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
self.table_ten(),
Expand Down Expand Up @@ -872,6 +940,11 @@ def benchmark(self) -> MeasurementSource:


class DifferentialJoin(Dataflow):
SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
self.view_ten(),
Expand Down Expand Up @@ -943,6 +1016,11 @@ class Finish(Scenario):
class FinishOrderByLimit(Finish):
"""Benchmark ORDER BY + LIMIT without the benefit of an index"""

SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def init(self) -> list[Action]:
return [
self.view_ten(),
Expand Down Expand Up @@ -1014,6 +1092,11 @@ def benchmark(self) -> MeasurementSource:


class KafkaUpsert(Kafka):
SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def shared(self) -> Action:
return TdAction(
self.keyschema()
Expand Down Expand Up @@ -1062,6 +1145,11 @@ def benchmark(self) -> MeasurementSource:


class KafkaUpsertUnique(Kafka):
SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def shared(self) -> Action:
return TdAction(
self.keyschema()
Expand Down Expand Up @@ -1504,9 +1592,14 @@ class PgCdc(Scenario):


class PgCdcInitialLoad(PgCdc):
"""Measure the time it takes to read 1M existing records from Postgres
"""Measure the time it takes to read 10M existing records from Postgres
when creating a materialized source"""

SCALE = 7

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def shared(self) -> Action:
return TdAction(
f"""
Expand Down Expand Up @@ -1563,7 +1656,10 @@ def benchmark(self) -> MeasurementSource:
class PgCdcStreaming(PgCdc):
"""Measure the time it takes to ingest records from Postgres post-snapshot"""

SCALE = 5
SCALE = 6

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def shared(self) -> Action:
return TdAction(
Expand Down Expand Up @@ -1782,9 +1878,13 @@ class Coordinator(Scenario):


class QueryLatency(Coordinator):
SCALE = 3
"""Measure the time it takes to run SELECT 1 queries"""

SCALE = 4

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def benchmark(self) -> MeasurementSource:
selects = "\n".join("> SELECT 1\n1\n" for i in range(0, self.n()))

Expand Down
7 changes: 5 additions & 2 deletions misc/python/materialize/feature_benchmark/scenarios/scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@
from textwrap import dedent

from materialize.feature_benchmark.measurement_source import MeasurementSource, Td
from materialize.feature_benchmark.scenario import Scenario
from materialize.feature_benchmark.scenario import Scenario, ScenarioVersion


class SmallClusters(Scenario):
"""Materialized views across many small clusters."""

SCALE = 1.5 # 32 clusters
SCALE = 2 # 100 clusters
FIXED_SCALE = True

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 1, 0)

def benchmark(self) -> MeasurementSource:
create = "\n".join(
dedent(
Expand Down

0 comments on commit f471151

Please sign in to comment.