Skip to content

Commit

Permalink
Merge pull request #675 from openedx/bmtcril/load_test
Browse files Browse the repository at this point in the history
feat: Add support for data pipeline load testing
  • Loading branch information
Cristhian Garcia authored Apr 2, 2024
2 parents e1a5d59 + f6f5235 commit 9d8f75a
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
RUN --mount=type=cache,target=/openedx/.cache/pip,sharing=shared \
pip install "platform-plugin-aspects==0.4.0"
pip install "platform-plugin-aspects==v0.5.0"
RUN --mount=type=cache,target=/openedx/.cache/pip,sharing=shared \
pip install "edx-event-routing-backends==v8.1.1"
pip install "edx-event-routing-backends==v8.3.1"
7 changes: 5 additions & 2 deletions tutoraspects/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
# Each new setting is a pair: (setting_name, default_value).
# Prefix your setting names with 'ASPECTS_'.
("ASPECTS_VERSION", __version__),
# General tutor specific settings
("RUN_VECTOR", True),
# For out default deployment we currently use Celery -> Ralph for transport,
# so Vector is off by default.
("RUN_VECTOR", False),
("RUN_CLICKHOUSE", True),
("RUN_RALPH", True),
("RUN_SUPERSET", True),
Expand Down Expand Up @@ -136,6 +137,8 @@
("ASPECTS_EVENT_SINK_RECENT_BLOCKS_MV", "most_recent_course_blocks_mv"),
# Vector settings
("ASPECTS_DOCKER_HOST_SOCK_PATH", "/var/run/docker.sock"),
("ASPECTS_VECTOR_STORE_TRACKING_LOGS", False),
("ASPECTS_VECTOR_STORE_XAPI", True),
("ASPECTS_VECTOR_DATABASE", "openedx"),
("ASPECTS_VECTOR_RAW_TRACKING_LOGS_TABLE", "_tracking"),
("ASPECTS_VECTOR_RAW_XAPI_TABLE", "xapi_events_all"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
"""
Partition the event_sink.user_profile table
.. pii: Stores Open edX user profile data.
.. pii_types: user_id, name, username, location, phone_number, email_address, birth_date, biography, gender
.. pii_retirement: local_api, consumer_api
Partition the xapi table by year and month
"""
from alembic import op

Expand All @@ -15,15 +11,15 @@
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""
engine = "ReplicatedReplacingMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "ReplacingMergeTree"

old_user_profile_table = "{{ASPECTS_XAPI_DATABASE}}.old_{{ASPECTS_RAW_XAPI_TABLE}}"
old_xapi_table = "{{ASPECTS_XAPI_DATABASE}}.old_{{ASPECTS_RAW_XAPI_TABLE}}"

def upgrade():
# Partition event_sink.user_profile table
# 1. Rename old table
op.execute(
f"""
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
TO {old_user_profile_table}
TO {old_xapi_table}
{on_cluster}
"""
)
Expand All @@ -46,13 +42,13 @@ def upgrade():
op.execute(
f"""
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
SELECT * FROM {old_user_profile_table}
SELECT event_id, emission_time, event FROM {old_xapi_table}
"""
)
# 4. Drop the old table
op.execute(
f"""
DROP TABLE {old_user_profile_table}
DROP TABLE {old_xapi_table}
{on_cluster}
"""
)
Expand All @@ -64,7 +60,7 @@ def downgrade():
op.execute(
f"""
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
TO {old_user_profile_table}
TO {old_xapi_table}
{on_cluster}
"""
)
Expand All @@ -87,14 +83,14 @@ def downgrade():
op.execute(
f"""
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
SELECT * FROM {old_user_profile_table}
SELECT * FROM {old_xapi_table}
"""

)
# 4. Drop the old table
op.execute(
f"""
DROP TABLE {old_user_profile_table}
DROP TABLE {old_xapi_table}
{on_cluster}
"""
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Create the load_test_stats table
This table is always created, but it will only be populated if the load test
management commands are run from the platform_plugin_aspects app.
"""
from alembic import op


revision = "0034"
down_revision = "0033"
branch_labels = None
depends_on = None
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""
engine = "ReplicatedMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "MergeTree"


def upgrade():
op.execute(
f"""
CREATE TABLE IF NOT EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.load_test_runs
{on_cluster}
(
run_id String,
timestamp DateTime default now(),
event_type String,
extra String
)
engine = {engine} PRIMARY KEY (run_id, timestamp)
ORDER BY (run_id, timestamp)
SETTINGS index_granularity = 8192;
"""
)

op.execute(
f"""
CREATE TABLE IF NOT EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.load_test_stats
{on_cluster}
(
run_id String,
timestamp DateTime default now(),
stats String
)
engine = {engine} PRIMARY KEY (run_id, timestamp)
ORDER BY (run_id, timestamp)
SETTINGS index_granularity = 8192;
"""
)


def downgrade():
op.execute(
"DROP TABLE IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.load_test_stats"
f"{on_cluster}"
)

op.execute(
"DROP TABLE IF EXISTS {{ASPECTS_EVENT_SINK_DATABASE}}.load_test_runs"
f"{on_cluster}"
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
### Transforms
### Tracking logs

{% if ASPECTS_VECTOR_STORE_TRACKING_LOGS %}
# Parse tracking logs: extract time
[transforms.tracking]
type = "remap"
Expand Down Expand Up @@ -30,6 +31,7 @@ if err_timestamp != null {
drop_on_error = true
drop_on_abort = true


[transforms.tracking_debug]
type = "remap"
inputs = ["tracking"]
Expand All @@ -38,6 +40,32 @@ source = '''
.message = parse_json!(.message)
'''

# Log all events to stdout, for debugging
[sinks.out]
type = "console"
inputs = ["tracking_debug"]
encoding.codec = "json"
encoding.only_fields = ["time", "message.context.course_id", "message.context.user_id", "message.name"]

# # Send logs to clickhouse
[sinks.clickhouse]
type = "clickhouse"
auth.strategy = "basic"
auth.user = "{{ ASPECTS_CLICKHOUSE_VECTOR_USER }}"
auth.password = "{{ ASPECTS_CLICKHOUSE_VECTOR_PASSWORD }}"
# Required: https://github.com/timberio/vector/issues/5797
encoding.timestamp_format = "unix"
inputs = ["tracking"]
endpoint = "{% if CLICKHOUSE_SECURE_CONNECTION %}https{% else %}http{% endif %}://{{ CLICKHOUSE_HOST }}:{{ CLICKHOUSE_INTERNAL_HTTP_PORT }}"
database = "{{ ASPECTS_VECTOR_DATABASE }}"
table = "{{ ASPECTS_VECTOR_RAW_TRACKING_LOGS_TABLE }}"
healthcheck = true

{% endif %}

### xAPI
{% if ASPECTS_VECTOR_STORE_XAPI %}

[transforms.xapi]
type = "remap"
inputs = ["openedx_containers"]
Expand Down Expand Up @@ -80,35 +108,12 @@ source = '''
.message = parse_json!(.event)
'''

### Sinks

# Log all events to stdout, for debugging
[sinks.out]
type = "console"
inputs = ["tracking_debug"]
encoding.codec = "json"
encoding.only_fields = ["time", "message.context.course_id", "message.context.user_id", "message.name"]

[sinks.out_xapi]
type = "console"
inputs = ["xapi_debug"]
encoding.codec = "json"
encoding.only_fields = ["event_id", "emission_time", "event"]

# # Send logs to clickhouse
[sinks.clickhouse]
type = "clickhouse"
auth.strategy = "basic"
auth.user = "{{ ASPECTS_CLICKHOUSE_VECTOR_USER }}"
auth.password = "{{ ASPECTS_CLICKHOUSE_VECTOR_PASSWORD }}"
# Required: https://github.com/timberio/vector/issues/5797
encoding.timestamp_format = "unix"
inputs = ["tracking"]
endpoint = "{% if CLICKHOUSE_SECURE_CONNECTION %}https{% else %}http{% endif %}://{{ CLICKHOUSE_HOST }}:{{ CLICKHOUSE_INTERNAL_HTTP_PORT }}"
database = "{{ ASPECTS_VECTOR_DATABASE }}"
table = "{{ ASPECTS_VECTOR_RAW_TRACKING_LOGS_TABLE }}"
healthcheck = true

[sinks.clickhouse_xapi]
type = "clickhouse"
auth.strategy = "basic"
Expand All @@ -124,4 +129,6 @@ database = "{{ ASPECTS_VECTOR_DATABASE }}"
table = "{{ ASPECTS_VECTOR_RAW_XAPI_TABLE }}"
healthcheck = true

{% endif %}

{{ patch("vector-common-toml") }}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ data_dir = "/vector-data-dir"
# Vector's API for introspection
[api]
enabled = true
address = "127.0.0.1:8686"
address = "0.0.0.0:8686"

0 comments on commit 9d8f75a

Please sign in to comment.