mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat: Swapped to use KAFKA_HOSTS everywhere (#16109)
* Swapped to use KAFKA_HOSTS everywhere * Fixed up type of kafka config options and setup separate kafka hosts for blob consumer * allow session recordings to have its own kafka security protocol * remove slash commands from this pr * syntax must be obeyed * Update UI snapshots for `chromium` (2) * Update UI snapshots for `chromium` (2) * fix * Update query snapshots * no empty strings in kafka hosts * fix snapshot * fix test --------- Co-authored-by: Paul D'Ambra <paul@posthog.com> Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
@@ -14,8 +14,6 @@
|
||||
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
|
||||
"KAFKA_ENABLED": "true",
|
||||
"KAFKA_HOSTS": "kafka:9092",
|
||||
"KAFKA_URL": "kafka://kafka:9092",
|
||||
"SESSION_RECORDING_KAFKA_URL": "kafka://kafka:9092",
|
||||
"CLICKHOUST_HOST": "localhost",
|
||||
"CLICKHOUSE_DATABASE": "posthog_test",
|
||||
"CLICKHOUSE_VERIFY": "False",
|
||||
|
||||
3
.github/workflows/ci-e2e.yml
vendored
3
.github/workflows/ci-e2e.yml
vendored
@@ -152,8 +152,7 @@ jobs:
|
||||
SECRET_KEY=6b01eee4f945ca25045b5aab440b953461faf08693a9abbf1166dc7c6b9772da
|
||||
REDIS_URL=redis://localhost
|
||||
DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog
|
||||
KAFKA_URL=kafka://kafka:9092
|
||||
SESSION_RECORDING_KAFKA_URL=kafka://kafka:9092
|
||||
KAFKA_HOSTS=kafka:9092
|
||||
DISABLE_SECURE_SSL_REDIRECT=1
|
||||
SECURE_COOKIES=0
|
||||
OPT_OUT_CAPTURE=1
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<env name="PYTHONUNBUFFERED" value="1" />
|
||||
<env name="DEBUG" value="1" />
|
||||
<env name="CLICKHOUSE_SECURE" value="False" />
|
||||
<env name="KAFKA_URL" value="kafka://localhost" />
|
||||
<env name="KAFKA_HOSTS" value="localhost" />
|
||||
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
|
||||
<env name="SKIP_SERVICE_VERSION_REQUIREMENTS" value="1" />
|
||||
</envs>
|
||||
|
||||
@@ -10,12 +10,11 @@
|
||||
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
|
||||
<env name="DEBUG" value="1" />
|
||||
<env name="DJANGO_SETTINGS_MODULE" value="posthog.settings" />
|
||||
<env name="KAFKA_URL" value="kafka://localhost" />
|
||||
<env name="KAFKA_HOSTS" value="localhost" />
|
||||
<env name="KEA_VERBOSE_LOGGING" value="false" />
|
||||
<env name="PRINT_SQL" value="1" />
|
||||
<env name="PYTHONUNBUFFERED" value="1" />
|
||||
<env name="REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO" value="1" />
|
||||
<env name="SESSION_RECORDING_KAFKA_URL" value="kafka://localhost" />
|
||||
<env name="REPLAY_BLOB_INGESTION_TRAFFIC_RATIO" value="1" />
|
||||
<env name="SKIP_SERVICE_VERSION_REQUIREMENTS" value="1" />
|
||||
</envs>
|
||||
<option name="SDK_HOME" value="$PROJECT_DIR$/env/bin/python" />
|
||||
|
||||
7
.vscode/launch.json
vendored
7
.vscode/launch.json
vendored
@@ -27,7 +27,7 @@
|
||||
"PYTHONUNBUFFERED": "1",
|
||||
"DEBUG": "1",
|
||||
"CLICKHOUSE_SECURE": "False",
|
||||
"KAFKA_URL": "kafka://localhost",
|
||||
"KAFKA_HOSTS": "localhost:9092",
|
||||
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
|
||||
"WORKER_CONCURRENCY": "2",
|
||||
"SKIP_SERVICE_VERSION_REQUIREMENTS": "1"
|
||||
@@ -67,12 +67,11 @@
|
||||
"DJANGO_SETTINGS_MODULE": "posthog.settings",
|
||||
"DEBUG": "1",
|
||||
"CLICKHOUSE_SECURE": "False",
|
||||
"KAFKA_URL": "kafka://localhost",
|
||||
"SESSION_RECORDING_KAFKA_URL": "kafka://localhost",
|
||||
"KAFKA_HOSTS": "localhost",
|
||||
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
|
||||
"SKIP_SERVICE_VERSION_REQUIREMENTS": "1",
|
||||
"PRINT_SQL": "1",
|
||||
"REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO": "1.0"
|
||||
"REPLAY_BLOB_INGESTION_TRAFFIC_RATIO": "1.0"
|
||||
},
|
||||
"console": "integratedTerminal",
|
||||
"python": "${workspaceFolder}/env/bin/python",
|
||||
|
||||
@@ -52,7 +52,7 @@ export CYPRESS_BASE_URL=http://localhost:8080
|
||||
export OPT_OUT_CAPTURE=1
|
||||
export SECURE_COOKIES=0
|
||||
export SKIP_SERVICE_VERSION_REQUIREMENTS=1
|
||||
export KAFKA_URL=kafka://kafka:9092
|
||||
export KAFKA_HOSTS=kafka:9092
|
||||
export CLICKHOUSE_DATABASE=posthog_test
|
||||
export TEST=1 # Plugin server and kafka revert to 'default' Clickhouse database if TEST is not set
|
||||
export CLICKHOUSE_SECURE=0
|
||||
|
||||
@@ -2,29 +2,28 @@
|
||||
|
||||
while test $# -gt 0; do
|
||||
case "$1" in
|
||||
-h|--help)
|
||||
echo "USAGE:"
|
||||
echo " bin/plugin-server [FLAGS]"
|
||||
echo " "
|
||||
echo "FLAGS:"
|
||||
echo " -h, --help Print this help information."
|
||||
echo " --no-restart-loop Run without restart loop. Recommended when deferring resiliency to e.g. docker-compose."
|
||||
exit 0
|
||||
;;
|
||||
--no-restart-loop)
|
||||
NO_RESTART_LOOP='true'
|
||||
shift
|
||||
;;
|
||||
*)
|
||||
break
|
||||
;;
|
||||
-h | --help)
|
||||
echo "USAGE:"
|
||||
echo " bin/plugin-server [FLAGS]"
|
||||
echo " "
|
||||
echo "FLAGS:"
|
||||
echo " -h, --help Print this help information."
|
||||
echo " --no-restart-loop Run without restart loop. Recommended when deferring resiliency to e.g. docker-compose."
|
||||
exit 0
|
||||
;;
|
||||
--no-restart-loop)
|
||||
NO_RESTART_LOOP='true'
|
||||
shift
|
||||
;;
|
||||
*)
|
||||
break
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
export BASE_DIR=$(dirname $(dirname "$PWD/${0#./}"))
|
||||
|
||||
export KAFKA_URL=${KAFKA_URL:-'kafka://kafka:9092'}
|
||||
export KAFKA_HOSTS
|
||||
export KAFKA_HOSTS=${KAFKA_HOSTS:-'kafka:9092'}
|
||||
|
||||
if [[ -n $INJECT_EC2_CLIENT_RACK ]]; then
|
||||
# To avoid cross-AZ Kafka traffic, set KAFKA_CLIENT_RACK from the EC2 metadata endpoint.
|
||||
|
||||
@@ -74,8 +74,7 @@ services:
|
||||
CLICKHOUSE_DATABASE: 'posthog'
|
||||
CLICKHOUSE_SECURE: 'false'
|
||||
CLICKHOUSE_VERIFY: 'false'
|
||||
KAFKA_URL: 'kafka://kafka'
|
||||
SESSION_RECORDING_KAFKA_URL: 'kafka://kafka'
|
||||
KAFKA_HOSTS: 'kafka'
|
||||
REDIS_URL: 'redis://redis:6379/'
|
||||
PGHOST: db
|
||||
PGUSER: posthog
|
||||
|
||||
@@ -32,14 +32,14 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
EVENT_OVERFLOW_BUCKET_CAPACITY: 1000,
|
||||
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: 1.0,
|
||||
KAFKA_HOSTS: 'kafka:9092', // KEEP IN SYNC WITH posthog/settings/data_stores.py
|
||||
KAFKA_CLIENT_CERT_B64: null,
|
||||
KAFKA_CLIENT_CERT_KEY_B64: null,
|
||||
KAFKA_TRUSTED_CERT_B64: null,
|
||||
KAFKA_SECURITY_PROTOCOL: null,
|
||||
KAFKA_SASL_MECHANISM: null,
|
||||
KAFKA_SASL_USER: null,
|
||||
KAFKA_SASL_PASSWORD: null,
|
||||
KAFKA_CLIENT_RACK: null,
|
||||
KAFKA_CLIENT_CERT_B64: undefined,
|
||||
KAFKA_CLIENT_CERT_KEY_B64: undefined,
|
||||
KAFKA_TRUSTED_CERT_B64: undefined,
|
||||
KAFKA_SECURITY_PROTOCOL: undefined,
|
||||
KAFKA_SASL_MECHANISM: undefined,
|
||||
KAFKA_SASL_USER: undefined,
|
||||
KAFKA_SASL_PASSWORD: undefined,
|
||||
KAFKA_CLIENT_RACK: undefined,
|
||||
KAFKA_CONSUMPTION_MAX_BYTES: 10_485_760, // Default value for kafkajs
|
||||
KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: 1_048_576, // Default value for kafkajs, must be bigger than message size
|
||||
KAFKA_CONSUMPTION_MAX_WAIT_MS: 1_000, // Down from the 5s default for kafkajs
|
||||
@@ -120,6 +120,8 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
|
||||
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag
|
||||
|
||||
SESSION_RECORDING_KAFKA_HOSTS: 'kafka:9092',
|
||||
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: undefined,
|
||||
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: Change this to 'all' when we release it fully
|
||||
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions',
|
||||
// NOTE: 10 minutes
|
||||
|
||||
@@ -11,7 +11,6 @@ import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer
|
||||
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
|
||||
import { createKafkaProducer, disconnectProducer } from '../../../kafka/producer'
|
||||
import { PipelineEvent, PluginsServerConfig, RawEventMessage, Team } from '../../../types'
|
||||
import { KafkaConfig } from '../../../utils/db/hub'
|
||||
import { status } from '../../../utils/status'
|
||||
import { TeamManager } from '../../../worker/ingestion/team-manager'
|
||||
import { ObjectStorage } from '../../services/object_storage'
|
||||
@@ -235,7 +234,12 @@ export class SessionRecordingBlobIngester {
|
||||
throw e
|
||||
}
|
||||
|
||||
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig as KafkaConfig)
|
||||
const connectionConfig = createRdConnectionConfigFromEnvVars({
|
||||
...this.serverConfig,
|
||||
// We use the same kafka config overall but different hosts for the session recordings
|
||||
KAFKA_HOSTS: this.serverConfig.SESSION_RECORDING_KAFKA_HOSTS,
|
||||
KAFKA_SECURITY_PROTOCOL: this.serverConfig.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL,
|
||||
})
|
||||
this.producer = await createKafkaProducer(connectionConfig)
|
||||
|
||||
// Create a node-rdkafka consumer that fetches batches of messages, runs
|
||||
|
||||
@@ -10,7 +10,7 @@ import { Counter } from 'prom-client'
|
||||
import { getPluginServerCapabilities } from '../capabilities'
|
||||
import { defaultConfig } from '../config/config'
|
||||
import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
|
||||
import { createHub, KafkaConfig } from '../utils/db/hub'
|
||||
import { createHub } from '../utils/db/hub'
|
||||
import { captureEventLoopMetrics } from '../utils/metrics'
|
||||
import { cancelAllScheduledJobs } from '../utils/node-schedule'
|
||||
import { PubSub } from '../utils/pubsub'
|
||||
@@ -363,7 +363,7 @@ export async function startPluginsServer(
|
||||
join,
|
||||
} = await startSessionRecordingEventsConsumer({
|
||||
teamManager: teamManager,
|
||||
kafkaConfig: serverConfig as KafkaConfig,
|
||||
kafkaConfig: serverConfig,
|
||||
consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
|
||||
consumerMaxBytesPerPartition: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
|
||||
consumerMaxWaitMs: serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
|
||||
|
||||
@@ -96,14 +96,14 @@ export interface PluginsServerConfig {
|
||||
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: string // (advanced) a comma separated list of teams to disable clickhouse external schemas for
|
||||
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string // (advanced) topic to send events to for clickhouse ingestion
|
||||
KAFKA_HOSTS: string // comma-delimited Kafka hosts
|
||||
KAFKA_CLIENT_CERT_B64: string | null
|
||||
KAFKA_CLIENT_CERT_KEY_B64: string | null
|
||||
KAFKA_TRUSTED_CERT_B64: string | null
|
||||
KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | null
|
||||
KAFKA_SASL_MECHANISM: KafkaSaslMechanism | null
|
||||
KAFKA_SASL_USER: string | null
|
||||
KAFKA_SASL_PASSWORD: string | null
|
||||
KAFKA_CLIENT_RACK: string | null
|
||||
KAFKA_CLIENT_CERT_B64: string | undefined
|
||||
KAFKA_CLIENT_CERT_KEY_B64: string | undefined
|
||||
KAFKA_TRUSTED_CERT_B64: string | undefined
|
||||
KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | undefined
|
||||
KAFKA_SASL_MECHANISM: KafkaSaslMechanism | undefined
|
||||
KAFKA_SASL_USER: string | undefined
|
||||
KAFKA_SASL_PASSWORD: string | undefined
|
||||
KAFKA_CLIENT_RACK: string | undefined
|
||||
KAFKA_CONSUMPTION_MAX_BYTES: number
|
||||
KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: number
|
||||
KAFKA_CONSUMPTION_MAX_WAIT_MS: number // fetch.wait.max.ms rdkafka parameter
|
||||
@@ -189,6 +189,8 @@ export interface PluginsServerConfig {
|
||||
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: number
|
||||
CLOUD_DEPLOYMENT: string
|
||||
|
||||
SESSION_RECORDING_KAFKA_HOSTS: string
|
||||
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | undefined
|
||||
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: string
|
||||
// local directory might be a volume mount or a directory on disk (e.g. in local dev)
|
||||
SESSION_RECORDING_LOCAL_DIRECTORY: string
|
||||
|
||||
@@ -128,9 +128,8 @@ export async function createHub(
|
||||
|
||||
status.info('🤔', `Connecting to Kafka...`)
|
||||
|
||||
const kafka = createKafkaClient(serverConfig as KafkaConfig)
|
||||
|
||||
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig as KafkaConfig)
|
||||
const kafka = createKafkaClient(serverConfig)
|
||||
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig)
|
||||
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })
|
||||
|
||||
const kafkaProducer = new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK)
|
||||
|
||||
@@ -379,10 +379,7 @@ def get_event(request):
|
||||
# Legacy solution stays in place
|
||||
processed_replay_events = legacy_preprocess_session_recording_events_for_clickhouse(replay_events)
|
||||
|
||||
if (
|
||||
random() <= settings.REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO
|
||||
and settings.SESSION_RECORDING_KAFKA_HOSTS
|
||||
):
|
||||
if random() <= settings.REPLAY_BLOB_INGESTION_TRAFFIC_RATIO:
|
||||
# The new flow we only enable if the dedicated kafka is enabled
|
||||
processed_replay_events += preprocess_replay_events_for_blob_ingestion(replay_events)
|
||||
|
||||
|
||||
@@ -1195,7 +1195,7 @@ class TestCapture(BaseTest):
|
||||
snapshot_source = 8
|
||||
snapshot_type = 8
|
||||
event_data = {"foo": "bar"}
|
||||
with self.settings(REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO=1):
|
||||
with self.settings(REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=0):
|
||||
self._send_session_recording_event(
|
||||
timestamp=timestamp,
|
||||
snapshot_source=snapshot_source,
|
||||
@@ -1260,7 +1260,7 @@ class TestCapture(BaseTest):
|
||||
|
||||
with self.settings(
|
||||
SESSION_RECORDING_KAFKA_HOSTS=["kafka://another-server:9092"],
|
||||
REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO=1,
|
||||
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
|
||||
):
|
||||
default_kafka_producer_mock.return_value = KafkaProducer()
|
||||
session_recording_producer_mock.return_value = sessionRecordingKafkaProducer()
|
||||
@@ -1281,18 +1281,15 @@ class TestCapture(BaseTest):
|
||||
@patch("posthog.api.capture.sessionRecordingKafkaProducer")
|
||||
@patch("posthog.api.capture.KafkaProducer")
|
||||
@patch("posthog.kafka_client.client._KafkaProducer.produce")
|
||||
def test_uses_does_not_produce_if_session_recording_kafka_unavailable(
|
||||
def test_uses_does_not_produce_if_blob_ingestion_disabled(
|
||||
self,
|
||||
kafka_produce: MagicMock,
|
||||
default_kafka_producer_mock: MagicMock,
|
||||
session_recording_producer_mock: MagicMock,
|
||||
) -> None:
|
||||
with self.settings(
|
||||
SESSION_RECORDING_KAFKA_HOSTS=None,
|
||||
REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO=1,
|
||||
):
|
||||
with self.settings(REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=0):
|
||||
default_kafka_producer_mock.return_value = KafkaProducer()
|
||||
session_recording_producer_mock.side_effect = Exception("Kafka not available")
|
||||
session_recording_producer_mock.side_effect = sessionRecordingKafkaProducer()
|
||||
|
||||
data = "example"
|
||||
self._send_session_recording_event(event_data=data)
|
||||
|
||||
@@ -35,9 +35,9 @@ KAFKA_COLUMNS_WITH_PARTITION = """
|
||||
"""
|
||||
|
||||
|
||||
def kafka_engine(topic: str, kafka_host=None, group="group1"):
|
||||
def kafka_engine(topic: str, kafka_host: str | None = None, group="group1") -> str:
|
||||
if kafka_host is None:
|
||||
kafka_host = settings.KAFKA_HOSTS_FOR_CLICKHOUSE
|
||||
kafka_host = ",".join(settings.KAFKA_HOSTS_FOR_CLICKHOUSE)
|
||||
return KAFKA_ENGINE.format(topic=topic, kafka_host=kafka_host, group=group, serialization="JSONEachRow")
|
||||
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ def test_create_table_query_replicated_and_storage(query, snapshot, settings):
|
||||
|
||||
@pytest.mark.parametrize("query", CREATE_KAFKA_TABLE_QUERIES, ids=get_table_name)
|
||||
def test_create_kafka_table_with_different_kafka_host(query, snapshot, settings):
|
||||
settings.KAFKA_HOSTS_FOR_CLICKHOUSE = "test.kafka.broker:9092"
|
||||
settings.KAFKA_HOSTS_FOR_CLICKHOUSE = ["test.kafka.broker:9092"]
|
||||
|
||||
assert build_query(query) == snapshot
|
||||
|
||||
|
||||
@@ -184,13 +184,11 @@ SessionRecordingKafkaProducer = SingletonDecorator(_KafkaProducer)
|
||||
|
||||
|
||||
def sessionRecordingKafkaProducer() -> _KafkaProducer:
|
||||
if not settings.SESSION_RECORDING_KAFKA_HOSTS:
|
||||
raise Exception("Session recording kafka producer not available")
|
||||
|
||||
return SessionRecordingKafkaProducer(
|
||||
kafka_hosts=settings.SESSION_RECORDING_KAFKA_HOSTS,
|
||||
kafka_security_protocol=settings.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL,
|
||||
max_message_bytes=settings.SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES,
|
||||
compression_type=settings.SESSION_RECORDING_KAFKA_COMPRESSION,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -9,10 +9,7 @@ import os
|
||||
import ssl
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
try:
|
||||
from urllib.parse import urlparse
|
||||
except ImportError:
|
||||
from urlparse import urlparse # type: ignore
|
||||
from django.conf import settings
|
||||
|
||||
from base64 import standard_b64encode
|
||||
|
||||
@@ -82,29 +79,13 @@ def get_kafka_ssl_context():
|
||||
return ssl_context
|
||||
|
||||
|
||||
def get_kafka_brokers():
|
||||
"""
|
||||
Parses the KAKFA_URL and returns a list of hostname:port pairs in the format
|
||||
that kafka-python expects.
|
||||
"""
|
||||
# NOTE: The Kafka environment variables need to be present. If using
|
||||
# Apache Kafka on Heroku, they will be available in your app configuration.
|
||||
if not os.environ.get("KAFKA_URL"):
|
||||
raise RuntimeError("The KAFKA_URL config variable is not set.")
|
||||
|
||||
return [
|
||||
"{}:{}".format(parsedUrl.hostname, parsedUrl.port)
|
||||
for parsedUrl in [urlparse(url) for url in os.environ.get("KAFKA_URL", "").split(",")]
|
||||
]
|
||||
|
||||
|
||||
def get_kafka_producer(acks="all", value_serializer=lambda v: json.dumps(v).encode("utf-8"), **kwargs):
|
||||
"""
|
||||
Return a KafkaProducer that uses the SSLContext created with create_ssl_context.
|
||||
"""
|
||||
|
||||
producer = KafkaProducer(
|
||||
bootstrap_servers=get_kafka_brokers(),
|
||||
bootstrap_servers=settings.KAFKA_HOSTS,
|
||||
security_protocol="SSL",
|
||||
ssl_context=get_kafka_ssl_context(),
|
||||
value_serializer=value_serializer,
|
||||
@@ -123,7 +104,7 @@ def get_kafka_consumer(topic=None, value_deserializer=lambda v: json.loads(v.dec
|
||||
# Create the KafkaConsumer connected to the specified brokers. Use the
|
||||
# SSLContext that is created with create_ssl_context.
|
||||
consumer = KafkaConsumer(
|
||||
bootstrap_servers=get_kafka_brokers(),
|
||||
bootstrap_servers=settings.KAFKA_HOSTS,
|
||||
security_protocol="SSL",
|
||||
ssl_context=get_kafka_ssl_context(),
|
||||
value_deserializer=value_deserializer,
|
||||
|
||||
@@ -91,7 +91,7 @@ KAFKA_PERSON_OVERRIDES_TABLE_SQL = f"""
|
||||
ON CLUSTER '{CLICKHOUSE_CLUSTER}'
|
||||
|
||||
ENGINE = Kafka(
|
||||
'{KAFKA_HOSTS}', -- Kafka hosts
|
||||
'{",".join(KAFKA_HOSTS)}', -- Kafka hosts
|
||||
'{KAFKA_PERSON_OVERRIDE}', -- Kafka topic
|
||||
'clickhouse-person-overrides', -- Kafka consumer group id
|
||||
'JSONEachRow' -- Specify that we should pass Kafka messages as JSON
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import os
|
||||
import json
|
||||
from typing import List
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import dj_database_url
|
||||
@@ -157,16 +158,27 @@ CLICKHOUSE_HTTP_URL = f"{_clickhouse_http_protocol}{CLICKHOUSE_HOST}:{_clickhous
|
||||
READONLY_CLICKHOUSE_USER = os.getenv("READONLY_CLICKHOUSE_USER", None)
|
||||
READONLY_CLICKHOUSE_PASSWORD = os.getenv("READONLY_CLICKHOUSE_PASSWORD", None)
|
||||
|
||||
# Kafka configs
|
||||
|
||||
_parse_kafka_hosts = lambda kafka_url: ",".join(urlparse(host).netloc for host in kafka_url.split(","))
|
||||
def _parse_kafka_hosts(hosts_string: str) -> List[str]:
|
||||
hosts = []
|
||||
for host in hosts_string.split(","):
|
||||
if "://" in host:
|
||||
hosts.append(urlparse(host).netloc)
|
||||
else:
|
||||
hosts.append(host)
|
||||
|
||||
# We don't want empty strings
|
||||
return [host for host in hosts if host]
|
||||
|
||||
|
||||
# URL(s) used by Kafka clients/producers - KEEP IN SYNC WITH plugin-server/src/config/config.ts
|
||||
KAFKA_URL = os.getenv("KAFKA_URL", "kafka://kafka:9092")
|
||||
KAFKA_HOSTS = _parse_kafka_hosts(KAFKA_URL)
|
||||
|
||||
SESSION_RECORDING_KAFKA_URL = os.getenv("SESSION_RECORDING_KAFKA_URL", "")
|
||||
SESSION_RECORDING_KAFKA_HOSTS = _parse_kafka_hosts(SESSION_RECORDING_KAFKA_URL)
|
||||
# We prefer KAFKA_HOSTS over KAFKA_URL (which used to be used)
|
||||
KAFKA_HOSTS = _parse_kafka_hosts(os.getenv("KAFKA_HOSTS", "") or os.getenv("KAFKA_URL", "") or "kafka:9092")
|
||||
# Dedicated kafka hosts for session recordings
|
||||
SESSION_RECORDING_KAFKA_HOSTS = _parse_kafka_hosts(os.getenv("SESSION_RECORDING_KAFKA_HOSTS", "")) or KAFKA_HOSTS
|
||||
# Kafka broker host(s) that is used by clickhouse for ingesting messages.
|
||||
# Useful if clickhouse is hosted outside the cluster.
|
||||
KAFKA_HOSTS_FOR_CLICKHOUSE = _parse_kafka_hosts(os.getenv("KAFKA_URL_FOR_CLICKHOUSE", "")) or KAFKA_HOSTS
|
||||
|
||||
# can set ('gzip', 'snappy', 'lz4', None)
|
||||
SESSION_RECORDING_KAFKA_COMPRESSION = os.getenv("SESSION_RECORDING_KAFKA_COMPRESSION", None)
|
||||
@@ -177,9 +189,6 @@ SESSION_RECORDING_KAFKA_COMPRESSION = os.getenv("SESSION_RECORDING_KAFKA_COMPRES
|
||||
# for details.
|
||||
KAFKA_PREFIX = os.getenv("KAFKA_PREFIX", "")
|
||||
|
||||
# Kafka broker host(s) that is used by clickhouse for ingesting messages. Useful if clickhouse is hosted outside the cluster.
|
||||
KAFKA_HOSTS_FOR_CLICKHOUSE = _parse_kafka_hosts(os.getenv("KAFKA_URL_FOR_CLICKHOUSE", KAFKA_URL))
|
||||
|
||||
KAFKA_BASE64_KEYS = get_from_env("KAFKA_BASE64_KEYS", False, type_cast=str_to_bool)
|
||||
|
||||
SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES = get_from_env(
|
||||
|
||||
@@ -31,12 +31,10 @@ PARTITION_KEY_BUCKET_REPLENTISH_RATE = get_from_env(
|
||||
)
|
||||
|
||||
REPLAY_EVENT_MAX_SIZE = get_from_env("REPLAY_EVENT_MAX_SIZE", type_cast=int, default=1024 * 512) # 512kb
|
||||
REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO = get_from_env(
|
||||
"REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO", type_cast=float, default=0.0
|
||||
)
|
||||
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO = get_from_env("REPLAY_BLOB_INGESTION_TRAFFIC_RATIO", type_cast=float, default=0.0)
|
||||
|
||||
if REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO > 1 or REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO < 0:
|
||||
if REPLAY_BLOB_INGESTION_TRAFFIC_RATIO > 1 or REPLAY_BLOB_INGESTION_TRAFFIC_RATIO < 0:
|
||||
logger.critical(
|
||||
"Environment variable REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO is not between 0 and 1. Setting to 0 to be safe."
|
||||
"Environment variable REPLAY_BLOB_INGESTION_TRAFFIC_RATIO is not between 0 and 1. Setting to 0 to be safe."
|
||||
)
|
||||
REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO = 0
|
||||
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO = 0
|
||||
|
||||
Reference in New Issue
Block a user