-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: create migration for coordinator schema #28139
base: master
Are you sure you want to change the base?
Conversation
HostInfo(ConnectionInfo(host_address, port), shard_num, replica_num, host_cluster_type, host_cluster_role) | ||
HostInfo( | ||
ConnectionInfo(host_address, port), | ||
shard_num if host_cluster_role != "coordinator" else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to take into account the coordinators as separate shards, since they are merely "compute" nodes. This is mainly for the map_host_per_shard
function, to avoid taking the coordinators into account.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR adds a migration for the coordinator schema in ClickHouse, moving away from manual schema creation. Here are the key changes:
- Added new migration file
0096_coordinator_schemas.py
to create distributed tables and dictionaries specifically for coordinator nodes - Introduced
ON_CLUSTER_CLAUSE
lambda function across SQL files to centralize cluster clause generation - Modified table creation SQL functions to accept optional
on_cluster
parameter to control whether cluster clauses are included - Updated SQL generation functions to use
NodeRole.COORDINATOR
when creating coordinator-specific schemas - Added documentation in README.md explaining when to run migrations on worker nodes vs all nodes
The changes standardize schema management through code rather than manual intervention while maintaining backward compatibility. The PR follows best practices by avoiding ON CLUSTER clauses in migrations since they will be run through run_sql_with_exceptions
.
💡 (4/5) You can add custom instructions or style guidelines for the bot here!
23 file(s) reviewed, 7 comment(s)
Edit PR Review Bot Settings | Greptile
run_sql_with_exceptions(CREATE_COHORTPEOPLE_TABLE_SQL(on_cluster=False), node_role=NodeRole.COORDINATOR), | ||
run_sql_with_exceptions(DISTRIBUTED_EVENTS_RECENT_TABLE_SQL(on_cluster=False), node_role=NodeRole.COORDINATOR), | ||
run_sql_with_exceptions(DISTRIBUTED_EVENTS_TABLE_SQL(on_cluster=False), node_role=NodeRole.COORDINATOR), | ||
run_sql_with_exceptions(EVENTS_RECENT_TABLE_SQL(on_cluster=False), node_role=NodeRole.COORDINATOR), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: EVENTS_RECENT_TABLE_SQL is not a distributed table and should not be included in coordinator schema
|
||
### When to run a migration for all nodes | ||
|
||
- Basically when the migration does not include any of the above listed in the previous section. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This line could be more explicit by saying 'When the migration is not for any of the table types listed in the worker node section above' instead of the current wording which is less precise
The ON CLUSTER clause is used to specify the cluster to run the DDL statement on. By default, the `posthog` cluster is used. That cluster only includes the worker nodes. | ||
|
||
Ideally, **do not use the ON CLUSTER clause**, since the DDL statement will be run on all nodes anyway through the `run_sql_with_exceptions` function, and, by default, the ON CLUSTER clause make the DDL statement run on nodes specified for the default cluster, and that does not include the coordinator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: The explanation of why not to use ON CLUSTER could be clearer. The current wording implies two separate reasons but they're actually connected - the run_sql_with_exceptions handles cluster distribution AND includes the coordinator, while ON CLUSTER only targets worker nodes
@@ -18,23 +18,23 @@ | |||
) | |||
from posthog.kafka_client.topics import KAFKA_EVENTS_JSON | |||
|
|||
ON_CLUSTER_CLAUSE = lambda: f"ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Consider making this a regular function instead of a lambda since it's used in many places and doesn't need to be anonymous
DROP_GROUPS_TABLE_SQL = f"DROP TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" | ||
TRUNCATE_GROUPS_TABLE_SQL = f"TRUNCATE TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: These SQL statements still use hardcoded ON CLUSTER clauses while the rest of the file uses the new ON_CLUSTER_CLAUSE function. Should update for consistency.
DROP_GROUPS_TABLE_SQL = f"DROP TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" | |
TRUNCATE_GROUPS_TABLE_SQL = f"TRUNCATE TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" | |
DROP_GROUPS_TABLE_SQL = f"DROP TABLE {GROUPS_TABLE} {ON_CLUSTER_CLAUSE()}" | |
TRUNCATE_GROUPS_TABLE_SQL = f"TRUNCATE TABLE {GROUPS_TABLE} {ON_CLUSTER_CLAUSE()}" |
table_name="ingestion_warnings", | ||
cluster=settings.CLICKHOUSE_CLUSTER, | ||
on_cluster_clause=ON_CLUSTER_CLAUSE() if on_cluster else "", | ||
engine=Distributed(data_table="sharded_ingestion_warnings", sharding_key="rand()"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Using rand() as a sharding key can lead to uneven data distribution across shards. Consider using a more deterministic key based on team_id or another relevant field.
@@ -85,7 +82,10 @@ | |||
-- the newest known mapping for it in the table. Query side we will need to | |||
-- ensure that we are always querying the latest version of the mapping. | |||
ORDER BY (team_id, old_person_id) | |||
""" | |||
""".format( | |||
engine=ReplacingMergeTree("person_overrides", replication_scheme=ReplicationScheme.REPLICATED, ver="version") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Consider adding a TTL clause to automatically clean up old version records after a certain period
Problem
The coordinator schema is manually created.
Changes
This adds the migration to the repository so we can evolve the schema from that point onward through the code and not manually.
Also, I'll compare the differences between the schema created through the migration and the actual ones to fix them when it applies.
Finally, I will add the entry to the migrations table manually, since this schema already exists. We don't want to recreate it.
👉 Stay up-to-date with PostHog coding conventions for a smoother review.
Does this work well for both Cloud and self-hosted?
Yes
How did you test this code?
Running migrations locally.