feat: add support for coordinator schemas (#28031)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Daesgar
2025-01-31 10:00:40 +01:00
committed by GitHub
parent f7563e35d0
commit 9b30f8dac2
20 changed files with 292 additions and 73 deletions

View File

@@ -91,10 +91,10 @@ fi
if [[ -t 0 ]]; then # The block below only runs when in an interactive shell
# Add required entries to /etc/hosts if not present
if ! grep -q "127.0.0.1 kafka clickhouse" /etc/hosts; then
if ! grep -q "127.0.0.1 kafka clickhouse clickhouse-coordinator" /etc/hosts; then
echo
echo "🚨 Amending /etc/hosts to map hostnames 'kafka' and 'clickhouse' to 127.0.0.1..."
echo "127.0.0.1 kafka clickhouse" | sudo tee -a /etc/hosts 1> /dev/null
echo "🚨 Amending /etc/hosts to map hostnames 'kafka', 'clickhouse' and 'clickhouse-coordinator' to 127.0.0.1..."
echo "127.0.0.1 kafka clickhouse clickhouse-coordinator" | sudo tee -a /etc/hosts 1> /dev/null
echo "✅ /etc/hosts amended"
fi

View File

@@ -47,6 +47,7 @@ services:
- ./posthog/idl:/idl
- ./docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
- ./docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml
- ./docker/clickhouse/config.d/default.xml:/etc/clickhouse-server/config.d/default.xml
- ./docker/clickhouse/users-dev.xml:/etc/clickhouse-server/users.xml
- ./docker/clickhouse/user_defined_function.xml:/etc/clickhouse-server/user_defined_function.xml
- ./posthog/user_scripts:/var/lib/clickhouse/user_scripts

View File

@@ -51,7 +51,7 @@ services:
ports:
- '5555:5555'
clickhouse:
clickhouse: &clickhouse
extends:
file: docker-compose.base.yml
service: clickhouse
@@ -69,6 +69,7 @@ services:
- ./posthog/idl:/idl
- ./docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
- ./docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml
- ./docker/clickhouse/config.d/worker.xml:/etc/clickhouse-server/config.d/worker.xml
- ./docker/clickhouse/users-dev.xml:/etc/clickhouse-server/users.xml
- ./docker/clickhouse/user_defined_function.xml:/etc/clickhouse-server/user_defined_function.xml
- ./posthog/user_scripts:/var/lib/clickhouse/user_scripts
@@ -78,6 +79,23 @@ services:
- kafka
- zookeeper
clickhouse-coordinator:
hostname: clickhouse-coordinator
<<: *clickhouse
volumes:
# this new entrypoint file is to fix a bug detailed here https://github.com/ClickHouse/ClickHouse/pull/59991
# revert this when we upgrade clickhouse
- ./docker/clickhouse/entrypoint.sh:/entrypoint.sh
- ./posthog/idl:/idl
- ./docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
- ./docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml
- ./docker/clickhouse/config.d/coordinator.xml:/etc/clickhouse-server/config.d/coordinator.xml
- ./docker/clickhouse/users-dev.xml:/etc/clickhouse-server/users.xml
- ./docker/clickhouse/user_defined_function.xml:/etc/clickhouse-server/user_defined_function.xml
- ./posthog/user_scripts:/var/lib/clickhouse/user_scripts
ports:
- '9001:9001'
zookeeper:
extends:
file: docker-compose.base.yml

View File

@@ -48,6 +48,7 @@ services:
- ./posthog/posthog/idl:/idl
- ./posthog/docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
- ./posthog/docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml
- ./posthog/docker/clickhouse/config.d/default.xml:/etc/clickhouse-server/config.d/default.xml
- ./posthog/docker/clickhouse/users.xml:/etc/clickhouse-server/users.xml
- clickhouse-data:/var/lib/clickhouse
zookeeper:

View File

@@ -0,0 +1,42 @@
<clickhouse>
<tcp_port>9001</tcp_port>
<remote_servers>
<posthog>
<shard>
<replica>
<host>clickhouse</host>
<port>9000</port>
</replica>
</shard>
</posthog>
<posthog_single_shard>
<shard>
<replica>
<host>clickhouse</host>
<port>9000</port>
</replica>
</shard>
</posthog_single_shard>
<posthog_migrations>
<shard>
<replica>
<host>clickhouse</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>clickhouse-coordinator</host>
<port>9001</port>
</replica>
</shard>
</posthog_migrations>
</remote_servers>
<macros>
<shard>02</shard>
<replica>coord</replica>
<hostClusterType>online</hostClusterType>
<hostClusterRole>coordinator</hostClusterRole>
</macros>
</clickhouse>

View File

@@ -0,0 +1,37 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<remote_servers>
<posthog>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</posthog>
<posthog_single_shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</posthog_single_shard>
<posthog_migrations>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</posthog_migrations>
</remote_servers>
<macros>
<shard>01</shard>
<replica>ch1</replica>
<hostClusterType>online</hostClusterType>
<hostClusterRole>worker</hostClusterRole>
</macros>
</clickhouse>

View File

@@ -0,0 +1,42 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<remote_servers>
<posthog>
<shard>
<replica>
<host>clickhouse</host>
<port>9000</port>
</replica>
</shard>
</posthog>
<posthog_single_shard>
<shard>
<replica>
<host>clickhouse</host>
<port>9000</port>
</replica>
</shard>
</posthog_single_shard>
<posthog_migrations>
<shard>
<replica>
<host>clickhouse</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>clickhouse-coordinator</host>
<port>9001</port>
</replica>
</shard>
</posthog_migrations>
</remote_servers>
<macros>
<shard>01</shard>
<replica>ch1</replica>
<hostClusterType>online</hostClusterType>
<hostClusterRole>worker</hostClusterRole>
</macros>
</clickhouse>

View File

@@ -12,10 +12,10 @@
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<console>1</console>
</logger>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<mysql_port>9004</mysql_port>
<postgresql_port>9005</postgresql_port>
<https_port>8443</https_port>
@@ -162,25 +162,6 @@
<!-- Reallocate memory for machine code ("text") using huge pages. Highly experimental. -->
<remap_executable>false</remap_executable>
<remote_servers>
<posthog>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</posthog>
<posthog_single_shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</posthog_single_shard>
</remote_servers>
<remote_url_allow_hosts>
<host_regexp>.*</host_regexp>
</remote_url_allow_hosts>
@@ -192,12 +173,6 @@
</node>
</zookeeper>
<macros>
<shard>01</shard>
<replica>ch1</replica>
</macros>
<!-- Reloading interval for embedded dictionaries, in seconds. Default: 3600. -->
<builtin_dictionaries_reload_interval>3600</builtin_dictionaries_reload_interval>

View File

@@ -19,6 +19,12 @@ class Workload(Enum):
OFFLINE = "OFFLINE"
class NodeRole(Enum):
ALL = "ALL"
COORDINATOR = "COORDINATOR"
WORKER = "WORKER"
_default_workload = Workload.ONLINE

View File

@@ -1,23 +1,28 @@
from typing import Union
from collections.abc import Callable
import logging
from infi.clickhouse_orm import migrations
from posthog.clickhouse.client.execute import sync_execute
from posthog.clickhouse.client.connection import NodeRole
from posthog.clickhouse.cluster import get_cluster
from posthog.settings.data_stores import CLICKHOUSE_MIGRATIONS_CLUSTER
logger = logging.getLogger("migrations")
def run_sql_with_exceptions(sql: Union[str, Callable[[], str]], settings=None):
def run_sql_with_exceptions(sql: str, settings=None, node_role: NodeRole = NodeRole.WORKER):
"""
migrations.RunSQL does not raise exceptions, so we need to wrap it in a function that does.
node_role is set to WORKER by default to keep compatibility with the old migrations.
"""
if settings is None:
settings = {}
cluster = get_cluster(client_settings=settings, cluster=CLICKHOUSE_MIGRATIONS_CLUSTER)
def run_sql(database):
nonlocal sql
if callable(sql):
sql = sql()
sync_execute(sql, settings=settings)
def run_migration():
if node_role == NodeRole.ALL:
logger.info(" Running migration on coordinators and workers")
return cluster.map_all_hosts(lambda client: client.execute(sql)).result()
else:
logger.info(f" Running migration on {node_role.value.lower()}s")
return cluster.map_hosts_by_role(lambda client: client.execute(sql), node_role=node_role).result()
return migrations.RunPython(run_sql)
return migrations.RunPython(lambda _: run_migration())

View File

@@ -19,7 +19,7 @@ from clickhouse_driver import Client
from clickhouse_pool import ChPool
from posthog import settings
from posthog.clickhouse.client.connection import _make_ch_pool, default_client
from posthog.clickhouse.client.connection import NodeRole, _make_ch_pool, default_client
from posthog.settings import CLICKHOUSE_PER_TEAM_SETTINGS
K = TypeVar("K")
@@ -62,15 +62,18 @@ class FuturesMap(dict[K, Future[V]]):
class ConnectionInfo(NamedTuple):
address: str
port: int | None
def make_pool(self, client_settings: Mapping[str, str] | None = None) -> ChPool:
return _make_ch_pool(host=self.address, settings=client_settings)
return _make_ch_pool(host=self.address, port=self.port, settings=client_settings)
class HostInfo(NamedTuple):
connection_info: ConnectionInfo
shard_num: int | None
replica_num: int | None
host_cluster_type: str | None
host_cluster_role: str | None
T = TypeVar("T")
@@ -83,25 +86,42 @@ class ClickhouseCluster:
extra_hosts: Sequence[ConnectionInfo] | None = None,
logger: logging.Logger | None = None,
client_settings: Mapping[str, str] | None = None,
cluster: str | None = None,
) -> None:
if logger is None:
logger = logging.getLogger(__name__)
self.__hosts = [
HostInfo(ConnectionInfo(host_address), shard_num, replica_num)
for (host_address, shard_num, replica_num) in bootstrap_client.execute(
HostInfo(ConnectionInfo(host_address, port), shard_num, replica_num, host_cluster_type, host_cluster_role)
for (
host_address,
port,
shard_num,
replica_num,
host_cluster_type,
host_cluster_role,
) in bootstrap_client.execute(
"""
SELECT host_address, shard_num, replica_num
FROM system.clusters
WHERE name = %(name)s
SELECT host_address, port, shard_num, replica_num, getMacro('hostClusterType') as host_cluster_type, getMacro('hostClusterRole') as host_cluster_role
FROM clusterAllReplicas(%(name)s, system.clusters)
WHERE name = %(name)s and is_local
ORDER BY shard_num, replica_num
""",
{"name": settings.CLICKHOUSE_CLUSTER},
{"name": cluster or settings.CLICKHOUSE_CLUSTER},
)
]
if extra_hosts is not None:
self.__hosts.extend(
[HostInfo(connection_info, shard_num=None, replica_num=None) for connection_info in extra_hosts]
[
HostInfo(
connection_info,
shard_num=None,
replica_num=None,
host_cluster_type=None,
host_cluster_role=None,
)
for connection_info in extra_hosts
]
)
self.__pools: dict[HostInfo, ChPool] = {}
self.__logger = logger
@@ -139,11 +159,31 @@ class ClickhouseCluster:
"""
Execute the callable once for each host in the cluster.
The number of concurrent queries can limited with the ``concurrency`` parameter, or set to ``None`` to use the
default limit of the executor.
"""
return self.map_hosts_by_role(fn, NodeRole.ALL, concurrency)
def map_hosts_by_role(
self,
fn: Callable[[Client], T],
node_role: NodeRole,
concurrency: int | None = None,
) -> FuturesMap[HostInfo, T]:
"""
Execute the callable once for each host in the cluster with the given node role.
The number of concurrent queries can limited with the ``concurrency`` parameter, or set to ``None`` to use the
default limit of the executor.
"""
with ThreadPoolExecutor(max_workers=concurrency) as executor:
return FuturesMap({host: executor.submit(self.__get_task_function(host, fn)) for host in self.__hosts})
return FuturesMap(
{
host: executor.submit(self.__get_task_function(host, fn))
for host in self.__hosts
if host.host_cluster_role == node_role.value.lower() or node_role == NodeRole.ALL
}
)
def map_all_hosts_in_shard(
self, shard_num: int, fn: Callable[[Client], T], concurrency: int | None = None
@@ -209,13 +249,15 @@ class ClickhouseCluster:
def get_cluster(
logger: logging.Logger | None = None, client_settings: Mapping[str, str] | None = None
logger: logging.Logger | None = None, client_settings: Mapping[str, str] | None = None, cluster: str | None = None
) -> ClickhouseCluster:
extra_hosts = []
for host_config in map(copy, CLICKHOUSE_PER_TEAM_SETTINGS.values()):
extra_hosts.append(ConnectionInfo(host_config.pop("host")))
extra_hosts.append(ConnectionInfo(host_config.pop("host"), None))
assert len(host_config) == 0, f"unexpected values: {host_config!r}"
return ClickhouseCluster(default_client(), extra_hosts=extra_hosts, logger=logger, client_settings=client_settings)
return ClickhouseCluster(
default_client(), extra_hosts=extra_hosts, logger=logger, client_settings=client_settings, cluster=cluster
)
@dataclass

View File

@@ -8,9 +8,9 @@ from posthog.models.sessions.sql import (
)
operations = [
run_sql_with_exceptions(WRITABLE_SESSIONS_TABLE_SQL),
run_sql_with_exceptions(DISTRIBUTED_SESSIONS_TABLE_SQL),
run_sql_with_exceptions(SESSIONS_TABLE_SQL),
run_sql_with_exceptions(SESSIONS_TABLE_MV_SQL),
run_sql_with_exceptions(SESSIONS_VIEW_SQL),
run_sql_with_exceptions(WRITABLE_SESSIONS_TABLE_SQL()),
run_sql_with_exceptions(DISTRIBUTED_SESSIONS_TABLE_SQL()),
run_sql_with_exceptions(SESSIONS_TABLE_SQL()),
run_sql_with_exceptions(SESSIONS_TABLE_MV_SQL()),
run_sql_with_exceptions(SESSIONS_VIEW_SQL()),
]

View File

@@ -4,5 +4,5 @@ from posthog.models.sessions.sql import (
)
operations = [
run_sql_with_exceptions(SESSION_TABLE_UPDATE_SQL),
run_sql_with_exceptions(SESSION_TABLE_UPDATE_SQL()),
]

View File

@@ -8,9 +8,9 @@ from posthog.models.raw_sessions.sql import (
)
operations = [
run_sql_with_exceptions(WRITABLE_RAW_SESSIONS_TABLE_SQL),
run_sql_with_exceptions(DISTRIBUTED_RAW_SESSIONS_TABLE_SQL),
run_sql_with_exceptions(RAW_SESSIONS_TABLE_SQL),
run_sql_with_exceptions(RAW_SESSIONS_TABLE_MV_SQL),
run_sql_with_exceptions(RAW_SESSIONS_VIEW_SQL),
run_sql_with_exceptions(WRITABLE_RAW_SESSIONS_TABLE_SQL()),
run_sql_with_exceptions(DISTRIBUTED_RAW_SESSIONS_TABLE_SQL()),
run_sql_with_exceptions(RAW_SESSIONS_TABLE_SQL()),
run_sql_with_exceptions(RAW_SESSIONS_TABLE_MV_SQL()),
run_sql_with_exceptions(RAW_SESSIONS_VIEW_SQL()),
]

View File

@@ -2,5 +2,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.raw_sessions.sql import RAW_SESSION_TABLE_UPDATE_SQL
operations = [
run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL),
run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL()),
]

View File

@@ -2,5 +2,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.raw_sessions.sql import RAW_SESSION_TABLE_UPDATE_SQL
operations = [
run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL),
run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL()),
]

View File

@@ -2,5 +2,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.raw_sessions.sql import RAW_SESSION_TABLE_UPDATE_SQL
operations = [
run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL),
run_sql_with_exceptions(RAW_SESSION_TABLE_UPDATE_SQL()),
]

View File

@@ -1,10 +1,13 @@
from collections import defaultdict
from unittest.mock import Mock, patch
import uuid
from collections.abc import Iterator
from collections.abc import Callable, Iterator
import pytest
from clickhouse_driver import Client
from posthog.clickhouse.cluster import ClickhouseCluster, MutationRunner, get_cluster
from posthog.clickhouse.client.connection import NodeRole
from posthog.clickhouse.cluster import T, ClickhouseCluster, HostInfo, MutationRunner, get_cluster
from posthog.models.event.sql import EVENTS_DATA_TABLE
@@ -73,6 +76,43 @@ def test_mutations(cluster: ClickhouseCluster) -> None:
assert cluster.map_all_hosts(get_mutations_count).result() == mutations_count_before
def test_map_hosts_by_role() -> None:
bootstrap_client_mock = Mock()
bootstrap_client_mock.execute = Mock()
bootstrap_client_mock.execute.return_value = [
("host1", "9000", "1", "1", "online", "worker"),
("host2", "9000", "1", "2", "online", "worker"),
("host3", "9000", "1", "3", "online", "worker"),
("host4", "9000", "1", "4", "online", "coordinator"),
]
cluster = ClickhouseCluster(bootstrap_client_mock)
times_called: defaultdict[NodeRole, int] = defaultdict(int)
def mock_get_task_function(_, host: HostInfo, fn: Callable[[Client], T]) -> Callable[[], T]:
if host.host_cluster_role == NodeRole.WORKER.value.lower():
times_called[NodeRole.WORKER] += 1
elif host.host_cluster_role == NodeRole.COORDINATOR.value.lower():
times_called[NodeRole.COORDINATOR] += 1
return lambda: fn(Mock())
with patch.object(ClickhouseCluster, "_ClickhouseCluster__get_task_function", mock_get_task_function):
cluster.map_hosts_by_role(lambda _: (), node_role=NodeRole.WORKER).result()
assert times_called[NodeRole.WORKER] == 3
assert times_called[NodeRole.COORDINATOR] == 0
times_called.clear()
cluster.map_hosts_by_role(lambda _: (), node_role=NodeRole.COORDINATOR).result()
assert times_called[NodeRole.WORKER] == 0
assert times_called[NodeRole.COORDINATOR] == 1
times_called.clear()
cluster.map_hosts_by_role(lambda _: (), node_role=NodeRole.ALL).result()
assert times_called[NodeRole.WORKER] == 3
assert times_called[NodeRole.COORDINATOR] == 1
def test_lightweight_delete(cluster: ClickhouseCluster) -> None:
table = EVENTS_DATA_TABLE()
count = 100

View File

@@ -7,13 +7,14 @@ from infi.clickhouse_orm import Database
from infi.clickhouse_orm.migrations import MigrationHistory
from infi.clickhouse_orm.utils import import_submodules
from posthog.clickhouse.client.connection import default_client
from posthog.settings import (
CLICKHOUSE_DATABASE,
CLICKHOUSE_HTTP_URL,
CLICKHOUSE_PASSWORD,
CLICKHOUSE_USER,
)
from posthog.settings.data_stores import CLICKHOUSE_CLUSTER
from posthog.settings.data_stores import CLICKHOUSE_MIGRATIONS_CLUSTER
MIGRATIONS_PACKAGE_NAME = "posthog.clickhouse.migrations"
@@ -53,12 +54,14 @@ class Command(BaseCommand):
self.migrate(CLICKHOUSE_HTTP_URL, options)
def migrate(self, host, options):
# Infi only creates the DB in one node, but not the rest. Create it before running migrations.
self._create_database_if_not_exists(CLICKHOUSE_DATABASE, CLICKHOUSE_MIGRATIONS_CLUSTER)
database = Database(
CLICKHOUSE_DATABASE,
db_url=host,
username=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD,
cluster=CLICKHOUSE_CLUSTER,
cluster=CLICKHOUSE_MIGRATIONS_CLUSTER,
verify_ssl_cert=False,
randomize_replica_paths=settings.TEST or settings.E2E_TESTING,
)
@@ -105,3 +108,9 @@ class Command(BaseCommand):
def get_applied_migrations(self, database):
return database._get_applied_migrations(MIGRATIONS_PACKAGE_NAME, replicated=True)
def _create_database_if_not_exists(self, database: str, cluster: str):
with default_client() as client:
client.execute(
f"CREATE DATABASE IF NOT EXISTS {database} ON CLUSTER {cluster}",
)

View File

@@ -150,6 +150,7 @@ CLICKHOUSE_USER: str = os.getenv("CLICKHOUSE_USER", "default")
CLICKHOUSE_PASSWORD: str = os.getenv("CLICKHOUSE_PASSWORD", "")
CLICKHOUSE_DATABASE: str = CLICKHOUSE_TEST_DB if TEST else os.getenv("CLICKHOUSE_DATABASE", "default")
CLICKHOUSE_CLUSTER: str = os.getenv("CLICKHOUSE_CLUSTER", "posthog")
CLICKHOUSE_MIGRATIONS_CLUSTER: str = os.getenv("CLICKHOUSE_MIGRATIONS_CLUSTER", "posthog_migrations")
CLICKHOUSE_CA: str | None = os.getenv("CLICKHOUSE_CA", None)
CLICKHOUSE_SECURE: bool = get_from_env("CLICKHOUSE_SECURE", not TEST and not DEBUG, type_cast=str_to_bool)
CLICKHOUSE_VERIFY: bool = get_from_env("CLICKHOUSE_VERIFY", True, type_cast=str_to_bool)