From 9b30f8dac254a28053ff54bebd1ed1ce2a0fc2f3 Mon Sep 17 00:00:00 2001 From: Daesgar Date: Fri, 31 Jan 2025 10:00:40 +0100 Subject: [PATCH] feat: add support for coordinator schemas (#28031) Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- .flox/env/manifest.toml | 6 +- docker-compose.dev-full.yml | 1 + docker-compose.dev.yml | 20 +++++- docker-compose.hobby.yml | 1 + docker/clickhouse/config.d/coordinator.xml | 42 ++++++++++++ docker/clickhouse/config.d/default.xml | 37 ++++++++++ docker/clickhouse/config.d/worker.xml | 42 ++++++++++++ docker/clickhouse/config.xml | 27 +------- posthog/clickhouse/client/connection.py | 6 ++ posthog/clickhouse/client/migration_tools.py | 29 ++++---- posthog/clickhouse/cluster.py | 68 +++++++++++++++---- .../clickhouse/migrations/0054_sessions.py | 10 +-- .../0058_use_json_properties_in_sessions.py | 2 +- .../migrations/0064_sessions_with_uuidv7.py | 10 +-- .../migrations/0066_sessions_group_by.py | 2 +- .../0089_nullable_uuid_type_sessions.py | 2 +- ...0_nullable_uuid_type_session_on_cluster.py | 2 +- posthog/clickhouse/test/test_cluster.py | 44 +++++++++++- .../management/commands/migrate_clickhouse.py | 13 +++- posthog/settings/data_stores.py | 1 + 20 files changed, 292 insertions(+), 73 deletions(-) create mode 100644 docker/clickhouse/config.d/coordinator.xml create mode 100644 docker/clickhouse/config.d/default.xml create mode 100644 docker/clickhouse/config.d/worker.xml diff --git a/.flox/env/manifest.toml b/.flox/env/manifest.toml index b5264c595d..683371f45b 100644 --- a/.flox/env/manifest.toml +++ b/.flox/env/manifest.toml @@ -91,10 +91,10 @@ fi if [[ -t 0 ]]; then # The block below only runs when in an interactive shell # Add required entries to /etc/hosts if not present - if ! grep -q "127.0.0.1 kafka clickhouse" /etc/hosts; then + if ! grep -q "127.0.0.1 kafka clickhouse clickhouse-coordinator" /etc/hosts; then echo - echo "🚨 Amending /etc/hosts to map hostnames 'kafka' and 'clickhouse' to 127.0.0.1..." - echo "127.0.0.1 kafka clickhouse" | sudo tee -a /etc/hosts 1> /dev/null + echo "🚨 Amending /etc/hosts to map hostnames 'kafka', 'clickhouse' and 'clickhouse-coordinator' to 127.0.0.1..." + echo "127.0.0.1 kafka clickhouse clickhouse-coordinator" | sudo tee -a /etc/hosts 1> /dev/null echo "✅ /etc/hosts amended" fi diff --git a/docker-compose.dev-full.yml b/docker-compose.dev-full.yml index 35c21f326b..26ee33a9d3 100644 --- a/docker-compose.dev-full.yml +++ b/docker-compose.dev-full.yml @@ -47,6 +47,7 @@ services: - ./posthog/idl:/idl - ./docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d - ./docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml + - ./docker/clickhouse/config.d/default.xml:/etc/clickhouse-server/config.d/default.xml - ./docker/clickhouse/users-dev.xml:/etc/clickhouse-server/users.xml - ./docker/clickhouse/user_defined_function.xml:/etc/clickhouse-server/user_defined_function.xml - ./posthog/user_scripts:/var/lib/clickhouse/user_scripts diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 0010b8486c..ee7e928472 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -51,7 +51,7 @@ services: ports: - '5555:5555' - clickhouse: + clickhouse: &clickhouse extends: file: docker-compose.base.yml service: clickhouse @@ -69,6 +69,7 @@ services: - ./posthog/idl:/idl - ./docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d - ./docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml + - ./docker/clickhouse/config.d/worker.xml:/etc/clickhouse-server/config.d/worker.xml - ./docker/clickhouse/users-dev.xml:/etc/clickhouse-server/users.xml - ./docker/clickhouse/user_defined_function.xml:/etc/clickhouse-server/user_defined_function.xml - ./posthog/user_scripts:/var/lib/clickhouse/user_scripts @@ -78,6 +79,23 @@ services: - kafka - zookeeper + clickhouse-coordinator: + hostname: clickhouse-coordinator + <<: *clickhouse + volumes: + # this new entrypoint file is to fix a bug detailed here https://github.com/ClickHouse/ClickHouse/pull/59991 + # revert this when we upgrade clickhouse + - ./docker/clickhouse/entrypoint.sh:/entrypoint.sh + - ./posthog/idl:/idl + - ./docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d + - ./docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml + - ./docker/clickhouse/config.d/coordinator.xml:/etc/clickhouse-server/config.d/coordinator.xml + - ./docker/clickhouse/users-dev.xml:/etc/clickhouse-server/users.xml + - ./docker/clickhouse/user_defined_function.xml:/etc/clickhouse-server/user_defined_function.xml + - ./posthog/user_scripts:/var/lib/clickhouse/user_scripts + ports: + - '9001:9001' + zookeeper: extends: file: docker-compose.base.yml diff --git a/docker-compose.hobby.yml b/docker-compose.hobby.yml index 072a9d7985..bdb2e61eb2 100644 --- a/docker-compose.hobby.yml +++ b/docker-compose.hobby.yml @@ -48,6 +48,7 @@ services: - ./posthog/posthog/idl:/idl - ./posthog/docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d - ./posthog/docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml + - ./posthog/docker/clickhouse/config.d/default.xml:/etc/clickhouse-server/config.d/default.xml - ./posthog/docker/clickhouse/users.xml:/etc/clickhouse-server/users.xml - clickhouse-data:/var/lib/clickhouse zookeeper: diff --git a/docker/clickhouse/config.d/coordinator.xml b/docker/clickhouse/config.d/coordinator.xml new file mode 100644 index 0000000000..4e73b9c08e --- /dev/null +++ b/docker/clickhouse/config.d/coordinator.xml @@ -0,0 +1,42 @@ + + 9001 + + + + + clickhouse + 9000 + + + + + + + clickhouse + 9000 + + + + + + + clickhouse + 9000 + + + + + clickhouse-coordinator + 9001 + + + + + + + 02 + coord + online + coordinator + + \ No newline at end of file diff --git a/docker/clickhouse/config.d/default.xml b/docker/clickhouse/config.d/default.xml new file mode 100644 index 0000000000..d1f1ac6ee3 --- /dev/null +++ b/docker/clickhouse/config.d/default.xml @@ -0,0 +1,37 @@ + + 9000 + + + + + + localhost + 9000 + + + + + + + localhost + 9000 + + + + + + + localhost + 9000 + + + + + + + 01 + ch1 + online + worker + + \ No newline at end of file diff --git a/docker/clickhouse/config.d/worker.xml b/docker/clickhouse/config.d/worker.xml new file mode 100644 index 0000000000..8c138ff49c --- /dev/null +++ b/docker/clickhouse/config.d/worker.xml @@ -0,0 +1,42 @@ + + 9000 + + + + + clickhouse + 9000 + + + + + + + clickhouse + 9000 + + + + + + + clickhouse + 9000 + + + + + clickhouse-coordinator + 9001 + + + + + + + 01 + ch1 + online + worker + + \ No newline at end of file diff --git a/docker/clickhouse/config.xml b/docker/clickhouse/config.xml index 8777a41c6d..575207e88e 100644 --- a/docker/clickhouse/config.xml +++ b/docker/clickhouse/config.xml @@ -12,10 +12,10 @@ /var/log/clickhouse-server/clickhouse-server.err.log 1000M 10 + 1 8123 - 9000 9004 9005 8443 @@ -162,25 +162,6 @@ false - - - - - localhost - 9000 - - - - - - - localhost - 9000 - - - - - .* @@ -192,12 +173,6 @@ - - 01 - ch1 - - - 3600 diff --git a/posthog/clickhouse/client/connection.py b/posthog/clickhouse/client/connection.py index ede6aba666..89505266d5 100644 --- a/posthog/clickhouse/client/connection.py +++ b/posthog/clickhouse/client/connection.py @@ -19,6 +19,12 @@ class Workload(Enum): OFFLINE = "OFFLINE" +class NodeRole(Enum): + ALL = "ALL" + COORDINATOR = "COORDINATOR" + WORKER = "WORKER" + + _default_workload = Workload.ONLINE diff --git a/posthog/clickhouse/client/migration_tools.py b/posthog/clickhouse/client/migration_tools.py index aa3100b548..ac7de06e56 100644 --- a/posthog/clickhouse/client/migration_tools.py +++ b/posthog/clickhouse/client/migration_tools.py @@ -1,23 +1,28 @@ -from typing import Union -from collections.abc import Callable +import logging from infi.clickhouse_orm import migrations -from posthog.clickhouse.client.execute import sync_execute +from posthog.clickhouse.client.connection import NodeRole +from posthog.clickhouse.cluster import get_cluster +from posthog.settings.data_stores import CLICKHOUSE_MIGRATIONS_CLUSTER + +logger = logging.getLogger("migrations") -def run_sql_with_exceptions(sql: Union[str, Callable[[], str]], settings=None): +def run_sql_with_exceptions(sql: str, settings=None, node_role: NodeRole = NodeRole.WORKER): """ migrations.RunSQL does not raise exceptions, so we need to wrap it in a function that does. + node_role is set to WORKER by default to keep compatibility with the old migrations. """ - if settings is None: - settings = {} + cluster = get_cluster(client_settings=settings, cluster=CLICKHOUSE_MIGRATIONS_CLUSTER) - def run_sql(database): - nonlocal sql - if callable(sql): - sql = sql() - sync_execute(sql, settings=settings) + def run_migration(): + if node_role == NodeRole.ALL: + logger.info(" Running migration on coordinators and workers") + return cluster.map_all_hosts(lambda client: client.execute(sql)).result() + else: + logger.info(f" Running migration on {node_role.value.lower()}s") + return cluster.map_hosts_by_role(lambda client: client.execute(sql), node_role=node_role).result() - return migrations.RunPython(run_sql) + return migrations.RunPython(lambda _: run_migration()) diff --git a/posthog/clickhouse/cluster.py b/posthog/clickhouse/cluster.py index f597295538..95ad0eb974 100644 --- a/posthog/clickhouse/cluster.py +++ b/posthog/clickhouse/cluster.py @@ -19,7 +19,7 @@ from clickhouse_driver import Client from clickhouse_pool import ChPool from posthog import settings -from posthog.clickhouse.client.connection import _make_ch_pool, default_client +from posthog.clickhouse.client.connection import NodeRole, _make_ch_pool, default_client from posthog.settings import CLICKHOUSE_PER_TEAM_SETTINGS K = TypeVar("K") @@ -62,15 +62,18 @@ class FuturesMap(dict[K, Future[V]]): class ConnectionInfo(NamedTuple): address: str + port: int | None def make_pool(self, client_settings: Mapping[str, str] | None = None) -> ChPool: - return _make_ch_pool(host=self.address, settings=client_settings) + return _make_ch_pool(host=self.address, port=self.port, settings=client_settings) class HostInfo(NamedTuple): connection_info: ConnectionInfo shard_num: int | None replica_num: int | None + host_cluster_type: str | None + host_cluster_role: str | None T = TypeVar("T") @@ -83,25 +86,42 @@ class ClickhouseCluster: extra_hosts: Sequence[ConnectionInfo] | None = None, logger: logging.Logger | None = None, client_settings: Mapping[str, str] | None = None, + cluster: str | None = None, ) -> None: if logger is None: logger = logging.getLogger(__name__) self.__hosts = [ - HostInfo(ConnectionInfo(host_address), shard_num, replica_num) - for (host_address, shard_num, replica_num) in bootstrap_client.execute( + HostInfo(ConnectionInfo(host_address, port), shard_num, replica_num, host_cluster_type, host_cluster_role) + for ( + host_address, + port, + shard_num, + replica_num, + host_cluster_type, + host_cluster_role, + ) in bootstrap_client.execute( """ - SELECT host_address, shard_num, replica_num - FROM system.clusters - WHERE name = %(name)s + SELECT host_address, port, shard_num, replica_num, getMacro('hostClusterType') as host_cluster_type, getMacro('hostClusterRole') as host_cluster_role + FROM clusterAllReplicas(%(name)s, system.clusters) + WHERE name = %(name)s and is_local ORDER BY shard_num, replica_num """, - {"name": settings.CLICKHOUSE_CLUSTER}, + {"name": cluster or settings.CLICKHOUSE_CLUSTER}, ) ] if extra_hosts is not None: self.__hosts.extend( - [HostInfo(connection_info, shard_num=None, replica_num=None) for connection_info in extra_hosts] + [ + HostInfo( + connection_info, + shard_num=None, + replica_num=None, + host_cluster_type=None, + host_cluster_role=None, + ) + for connection_info in extra_hosts + ] ) self.__pools: dict[HostInfo, ChPool] = {} self.__logger = logger @@ -139,11 +159,31 @@ class ClickhouseCluster: """ Execute the callable once for each host in the cluster. + The number of concurrent queries can limited with the ``concurrency`` parameter, or set to ``None`` to use the + default limit of the executor. + """ + return self.map_hosts_by_role(fn, NodeRole.ALL, concurrency) + + def map_hosts_by_role( + self, + fn: Callable[[Client], T], + node_role: NodeRole, + concurrency: int | None = None, + ) -> FuturesMap[HostInfo, T]: + """ + Execute the callable once for each host in the cluster with the given node role. + The number of concurrent queries can limited with the ``concurrency`` parameter, or set to ``None`` to use the default limit of the executor. """ with ThreadPoolExecutor(max_workers=concurrency) as executor: - return FuturesMap({host: executor.submit(self.__get_task_function(host, fn)) for host in self.__hosts}) + return FuturesMap( + { + host: executor.submit(self.__get_task_function(host, fn)) + for host in self.__hosts + if host.host_cluster_role == node_role.value.lower() or node_role == NodeRole.ALL + } + ) def map_all_hosts_in_shard( self, shard_num: int, fn: Callable[[Client], T], concurrency: int | None = None @@ -209,13 +249,15 @@ class ClickhouseCluster: def get_cluster( - logger: logging.Logger | None = None, client_settings: Mapping[str, str] | None = None + logger: logging.Logger | None = None, client_settings: Mapping[str, str] | None = None, cluster: str | None = None ) -> ClickhouseCluster: extra_hosts = [] for host_config in map(copy, CLICKHOUSE_PER_TEAM_SETTINGS.values()): - extra_hosts.append(ConnectionInfo(host_config.pop("host"))) + extra_hosts.append(ConnectionInfo(host_config.pop("host"), None)) assert len(host_config) == 0, f"unexpected values: {host_config!r}" - return ClickhouseCluster(default_client(), extra_hosts=extra_hosts, logger=logger, client_settings=client_settings) + return ClickhouseCluster( + default_client(), extra_hosts=extra_hosts, logger=logger, client_settings=client_settings, cluster=cluster + ) @dataclass diff --git a/posthog/clickhouse/migrations/0054_sessions.py b/posthog/clickhouse/migrations/0054_sessions.py index 377bb0f1d3..12139bba24 100644 --- a/posthog/clickhouse/migrations/0054_sessions.py +++ b/posthog/clickhouse/migrations/0054_sessions.py @@ -8,9 +8,9 @@ from posthog.models.sessions.sql import ( ) operations = [ - run_sql_with_exceptions(WRITABLE_SESSIONS_TABLE_SQL), - run_sql_with_exceptions(DISTRIBUTED_SESSIONS_TABLE_SQL), - run_sql_with_exceptions(SESSIONS_TABLE_SQL), - run_sql_with_exceptions(SESSIONS_TABLE_MV_SQL), - run_sql_with_exceptions(SESSIONS_VIEW_SQL), + run_sql_with_exceptions(WRITABLE_SESSIONS_TABLE_SQL()), + run_sql_with_exceptions(DISTRIBUTED_SESSIONS_TABLE_SQL()), + run_sql_with_exceptions(SESSIONS_TABLE_SQL()), + run_sql_with_exceptions(SESSIONS_TABLE_MV_SQL()), + run_sql_with_exceptions(SESSIONS_VIEW_SQL()), ] diff --git a/posthog/clickhouse/migrations/0058_use_json_properties_in_sessions.py b/posthog/clickhouse/migrations/0058_use_json_properties_in_sessions.py index 82cdcb3e5a..af2adc2684 100644 --- a/posthog/clickhouse/migrations/0058_use_json_properties_in_sessions.py +++ b/posthog/clickhouse/migrations/0058_use_json_properties_in_sessions.py @@ -4,5 +4,5 @@ from posthog.models.sessions.sql import ( ) operations = [ - run_sql_with_exceptions(SESSION_TABLE_UPDATE_SQL), + run_sql_with_exceptions(SESSION_TABLE_UPDATE_SQL()), ] diff --git a/posthog/clickhouse/migrations/0064_sessions_with_uuidv7.py b/posthog/clickhouse/migrations/0064_sessions_with_uuidv7.py index ce3e87811d..e6f40b2e19 100644 --- a/posthog/clickhouse/migrations/0064_sessions_with_uuidv7.py +++ b/posthog/clickhouse/migrations/0064_sessions_with_uuidv7.py @@ -8,9 +8,9 @@ from posthog.models.raw_sessions.sql import ( ) operations = [ - run_sql_with_exceptions(WRITABLE_RAW_SESSIONS_TABLE_SQL), - run_sql_with_exceptions(DISTRIBUTED_RAW_SESSIONS_TABLE_SQL), - run_sql_with_exceptions(RAW_SESSIONS_TABLE_SQL), - run_sql_with_exceptions(RAW_SESSIONS_TABLE_MV_SQL), - run_sql_with_exceptions(RAW_SESSIONS_VIEW_SQL), + run_sql_with_exceptions(WRITABLE_RAW_SESSIONS_TABLE_SQL()), + run_sql_with_exceptions(DISTRIBUTED_RAW_SESSIONS_TABLE_SQL()), + run_sql_with_exceptions(RAW_SESSIONS_TABLE_SQL()), + run_sql_with_exceptions(RAW_SESSIONS_TABLE_MV_SQL()), + run_sql_with_exceptions(RAW_SESSIONS_VIEW_SQL()), ] diff --git a/posthog/clickhouse/migrations/0066_sessions_group_by.py b/posthog/clickhouse/migrations/0066_sessions_group_by.py index 628803f4be..5a11c68836 100644 --- a/posthog/clickhouse/migrations/0066_sessions_group_by.py +++ b/posthog/clickhouse/migrations/0066_sessions_group_by.py @@ -2,5 +2,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions from posthog.models.raw_sessions.sql import RAW_SESSION_TABLE_UPDATE_SQL operations = [ - run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL), + run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL()), ] diff --git a/posthog/clickhouse/migrations/0089_nullable_uuid_type_sessions.py b/posthog/clickhouse/migrations/0089_nullable_uuid_type_sessions.py index 628803f4be..5a11c68836 100644 --- a/posthog/clickhouse/migrations/0089_nullable_uuid_type_sessions.py +++ b/posthog/clickhouse/migrations/0089_nullable_uuid_type_sessions.py @@ -2,5 +2,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions from posthog.models.raw_sessions.sql import RAW_SESSION_TABLE_UPDATE_SQL operations = [ - run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL), + run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL()), ] diff --git a/posthog/clickhouse/migrations/0090_nullable_uuid_type_session_on_cluster.py b/posthog/clickhouse/migrations/0090_nullable_uuid_type_session_on_cluster.py index 628803f4be..5a11c68836 100644 --- a/posthog/clickhouse/migrations/0090_nullable_uuid_type_session_on_cluster.py +++ b/posthog/clickhouse/migrations/0090_nullable_uuid_type_session_on_cluster.py @@ -2,5 +2,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions from posthog.models.raw_sessions.sql import RAW_SESSION_TABLE_UPDATE_SQL operations = [ - run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL), + run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL()), ] diff --git a/posthog/clickhouse/test/test_cluster.py b/posthog/clickhouse/test/test_cluster.py index d46dfd0dab..1599c81f97 100644 --- a/posthog/clickhouse/test/test_cluster.py +++ b/posthog/clickhouse/test/test_cluster.py @@ -1,10 +1,13 @@ +from collections import defaultdict +from unittest.mock import Mock, patch import uuid -from collections.abc import Iterator +from collections.abc import Callable, Iterator import pytest from clickhouse_driver import Client -from posthog.clickhouse.cluster import ClickhouseCluster, MutationRunner, get_cluster +from posthog.clickhouse.client.connection import NodeRole +from posthog.clickhouse.cluster import T, ClickhouseCluster, HostInfo, MutationRunner, get_cluster from posthog.models.event.sql import EVENTS_DATA_TABLE @@ -73,6 +76,43 @@ def test_mutations(cluster: ClickhouseCluster) -> None: assert cluster.map_all_hosts(get_mutations_count).result() == mutations_count_before +def test_map_hosts_by_role() -> None: + bootstrap_client_mock = Mock() + bootstrap_client_mock.execute = Mock() + bootstrap_client_mock.execute.return_value = [ + ("host1", "9000", "1", "1", "online", "worker"), + ("host2", "9000", "1", "2", "online", "worker"), + ("host3", "9000", "1", "3", "online", "worker"), + ("host4", "9000", "1", "4", "online", "coordinator"), + ] + + cluster = ClickhouseCluster(bootstrap_client_mock) + + times_called: defaultdict[NodeRole, int] = defaultdict(int) + + def mock_get_task_function(_, host: HostInfo, fn: Callable[[Client], T]) -> Callable[[], T]: + if host.host_cluster_role == NodeRole.WORKER.value.lower(): + times_called[NodeRole.WORKER] += 1 + elif host.host_cluster_role == NodeRole.COORDINATOR.value.lower(): + times_called[NodeRole.COORDINATOR] += 1 + return lambda: fn(Mock()) + + with patch.object(ClickhouseCluster, "_ClickhouseCluster__get_task_function", mock_get_task_function): + cluster.map_hosts_by_role(lambda _: (), node_role=NodeRole.WORKER).result() + assert times_called[NodeRole.WORKER] == 3 + assert times_called[NodeRole.COORDINATOR] == 0 + times_called.clear() + + cluster.map_hosts_by_role(lambda _: (), node_role=NodeRole.COORDINATOR).result() + assert times_called[NodeRole.WORKER] == 0 + assert times_called[NodeRole.COORDINATOR] == 1 + times_called.clear() + + cluster.map_hosts_by_role(lambda _: (), node_role=NodeRole.ALL).result() + assert times_called[NodeRole.WORKER] == 3 + assert times_called[NodeRole.COORDINATOR] == 1 + + def test_lightweight_delete(cluster: ClickhouseCluster) -> None: table = EVENTS_DATA_TABLE() count = 100 diff --git a/posthog/management/commands/migrate_clickhouse.py b/posthog/management/commands/migrate_clickhouse.py index a946920209..8498745284 100644 --- a/posthog/management/commands/migrate_clickhouse.py +++ b/posthog/management/commands/migrate_clickhouse.py @@ -7,13 +7,14 @@ from infi.clickhouse_orm import Database from infi.clickhouse_orm.migrations import MigrationHistory from infi.clickhouse_orm.utils import import_submodules +from posthog.clickhouse.client.connection import default_client from posthog.settings import ( CLICKHOUSE_DATABASE, CLICKHOUSE_HTTP_URL, CLICKHOUSE_PASSWORD, CLICKHOUSE_USER, ) -from posthog.settings.data_stores import CLICKHOUSE_CLUSTER +from posthog.settings.data_stores import CLICKHOUSE_MIGRATIONS_CLUSTER MIGRATIONS_PACKAGE_NAME = "posthog.clickhouse.migrations" @@ -53,12 +54,14 @@ class Command(BaseCommand): self.migrate(CLICKHOUSE_HTTP_URL, options) def migrate(self, host, options): + # Infi only creates the DB in one node, but not the rest. Create it before running migrations. + self._create_database_if_not_exists(CLICKHOUSE_DATABASE, CLICKHOUSE_MIGRATIONS_CLUSTER) database = Database( CLICKHOUSE_DATABASE, db_url=host, username=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD, - cluster=CLICKHOUSE_CLUSTER, + cluster=CLICKHOUSE_MIGRATIONS_CLUSTER, verify_ssl_cert=False, randomize_replica_paths=settings.TEST or settings.E2E_TESTING, ) @@ -105,3 +108,9 @@ class Command(BaseCommand): def get_applied_migrations(self, database): return database._get_applied_migrations(MIGRATIONS_PACKAGE_NAME, replicated=True) + + def _create_database_if_not_exists(self, database: str, cluster: str): + with default_client() as client: + client.execute( + f"CREATE DATABASE IF NOT EXISTS {database} ON CLUSTER {cluster}", + ) diff --git a/posthog/settings/data_stores.py b/posthog/settings/data_stores.py index 2b4f19d32f..7216dd4098 100644 --- a/posthog/settings/data_stores.py +++ b/posthog/settings/data_stores.py @@ -150,6 +150,7 @@ CLICKHOUSE_USER: str = os.getenv("CLICKHOUSE_USER", "default") CLICKHOUSE_PASSWORD: str = os.getenv("CLICKHOUSE_PASSWORD", "") CLICKHOUSE_DATABASE: str = CLICKHOUSE_TEST_DB if TEST else os.getenv("CLICKHOUSE_DATABASE", "default") CLICKHOUSE_CLUSTER: str = os.getenv("CLICKHOUSE_CLUSTER", "posthog") +CLICKHOUSE_MIGRATIONS_CLUSTER: str = os.getenv("CLICKHOUSE_MIGRATIONS_CLUSTER", "posthog_migrations") CLICKHOUSE_CA: str | None = os.getenv("CLICKHOUSE_CA", None) CLICKHOUSE_SECURE: bool = get_from_env("CLICKHOUSE_SECURE", not TEST and not DEBUG, type_cast=str_to_bool) CLICKHOUSE_VERIFY: bool = get_from_env("CLICKHOUSE_VERIFY", True, type_cast=str_to_bool)