Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: create migration for coordinator schema #28139

Open
wants to merge 18 commits into
base: master
Choose a base branch
from

Conversation

Daesgar
Copy link
Contributor

@Daesgar Daesgar commented Jan 31, 2025

Problem

The coordinator schema is manually created.

Changes

This adds the migration to the repository so we can evolve the schema from that point onward through the code and not manually.

Also, I'll compare the differences between the schema created through the migration and the actual ones to fix them when it applies.

Finally, I will add the entry to the migrations table manually, since this schema already exists. We don't want to recreate it.

👉 Stay up-to-date with PostHog coding conventions for a smoother review.

Does this work well for both Cloud and self-hosted?

Yes

How did you test this code?

Running migrations locally.

HostInfo(ConnectionInfo(host_address, port), shard_num, replica_num, host_cluster_type, host_cluster_role)
HostInfo(
ConnectionInfo(host_address, port),
shard_num if host_cluster_role != "coordinator" else None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to take into account the coordinators as separate shards, since they are merely "compute" nodes. This is mainly for the map_host_per_shard function, to avoid taking the coordinators into account.

@Daesgar Daesgar marked this pull request as ready for review January 31, 2025 16:56
@Daesgar Daesgar requested a review from a team as a code owner January 31, 2025 16:56
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

This PR adds a migration for the coordinator schema in ClickHouse, moving away from manual schema creation. Here are the key changes:

  • Added new migration file 0096_coordinator_schemas.py to create distributed tables and dictionaries specifically for coordinator nodes
  • Introduced ON_CLUSTER_CLAUSE lambda function across SQL files to centralize cluster clause generation
  • Modified table creation SQL functions to accept optional on_cluster parameter to control whether cluster clauses are included
  • Updated SQL generation functions to use NodeRole.COORDINATOR when creating coordinator-specific schemas
  • Added documentation in README.md explaining when to run migrations on worker nodes vs all nodes

The changes standardize schema management through code rather than manual intervention while maintaining backward compatibility. The PR follows best practices by avoiding ON CLUSTER clauses in migrations since they will be run through run_sql_with_exceptions.

💡 (4/5) You can add custom instructions or style guidelines for the bot here!

23 file(s) reviewed, 7 comment(s)
Edit PR Review Bot Settings | Greptile

run_sql_with_exceptions(CREATE_COHORTPEOPLE_TABLE_SQL(on_cluster=False), node_role=NodeRole.COORDINATOR),
run_sql_with_exceptions(DISTRIBUTED_EVENTS_RECENT_TABLE_SQL(on_cluster=False), node_role=NodeRole.COORDINATOR),
run_sql_with_exceptions(DISTRIBUTED_EVENTS_TABLE_SQL(on_cluster=False), node_role=NodeRole.COORDINATOR),
run_sql_with_exceptions(EVENTS_RECENT_TABLE_SQL(on_cluster=False), node_role=NodeRole.COORDINATOR),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: EVENTS_RECENT_TABLE_SQL is not a distributed table and should not be included in coordinator schema


### When to run a migration for all nodes

- Basically when the migration does not include any of the above listed in the previous section.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This line could be more explicit by saying 'When the migration is not for any of the table types listed in the worker node section above' instead of the current wording which is less precise

Comment on lines +51 to +53
The ON CLUSTER clause is used to specify the cluster to run the DDL statement on. By default, the `posthog` cluster is used. That cluster only includes the worker nodes.

Ideally, **do not use the ON CLUSTER clause**, since the DDL statement will be run on all nodes anyway through the `run_sql_with_exceptions` function, and, by default, the ON CLUSTER clause make the DDL statement run on nodes specified for the default cluster, and that does not include the coordinator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The explanation of why not to use ON CLUSTER could be clearer. The current wording implies two separate reasons but they're actually connected - the run_sql_with_exceptions handles cluster distribution AND includes the coordinator, while ON CLUSTER only targets worker nodes

@@ -18,23 +18,23 @@
)
from posthog.kafka_client.topics import KAFKA_EVENTS_JSON

ON_CLUSTER_CLAUSE = lambda: f"ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider making this a regular function instead of a lambda since it's used in many places and doesn't need to be anonymous

Comment on lines 11 to 12
DROP_GROUPS_TABLE_SQL = f"DROP TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'"
TRUNCATE_GROUPS_TABLE_SQL = f"TRUNCATE TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: These SQL statements still use hardcoded ON CLUSTER clauses while the rest of the file uses the new ON_CLUSTER_CLAUSE function. Should update for consistency.

Suggested change
DROP_GROUPS_TABLE_SQL = f"DROP TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'"
TRUNCATE_GROUPS_TABLE_SQL = f"TRUNCATE TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'"
DROP_GROUPS_TABLE_SQL = f"DROP TABLE {GROUPS_TABLE} {ON_CLUSTER_CLAUSE()}"
TRUNCATE_GROUPS_TABLE_SQL = f"TRUNCATE TABLE {GROUPS_TABLE} {ON_CLUSTER_CLAUSE()}"

table_name="ingestion_warnings",
cluster=settings.CLICKHOUSE_CLUSTER,
on_cluster_clause=ON_CLUSTER_CLAUSE() if on_cluster else "",
engine=Distributed(data_table="sharded_ingestion_warnings", sharding_key="rand()"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Using rand() as a sharding key can lead to uneven data distribution across shards. Consider using a more deterministic key based on team_id or another relevant field.

@@ -85,7 +82,10 @@
-- the newest known mapping for it in the table. Query side we will need to
-- ensure that we are always querying the latest version of the mapping.
ORDER BY (team_id, old_person_id)
"""
""".format(
engine=ReplacingMergeTree("person_overrides", replication_scheme=ReplicationScheme.REPLICATED, ver="version")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding a TTL clause to automatically clean up old version records after a certain period

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant