diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json
index a4a1be4021..6d11a0970a 100644
--- a/.devcontainer/devcontainer.json
+++ b/.devcontainer/devcontainer.json
@@ -14,8 +14,6 @@
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
"KAFKA_ENABLED": "true",
"KAFKA_HOSTS": "kafka:9092",
- "KAFKA_URL": "kafka://kafka:9092",
- "SESSION_RECORDING_KAFKA_URL": "kafka://kafka:9092",
"CLICKHOUST_HOST": "localhost",
"CLICKHOUSE_DATABASE": "posthog_test",
"CLICKHOUSE_VERIFY": "False",
diff --git a/.github/workflows/ci-e2e.yml b/.github/workflows/ci-e2e.yml
index 47553b0728..4f0dd181e1 100644
--- a/.github/workflows/ci-e2e.yml
+++ b/.github/workflows/ci-e2e.yml
@@ -152,8 +152,7 @@ jobs:
SECRET_KEY=6b01eee4f945ca25045b5aab440b953461faf08693a9abbf1166dc7c6b9772da
REDIS_URL=redis://localhost
DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog
- KAFKA_URL=kafka://kafka:9092
- SESSION_RECORDING_KAFKA_URL=kafka://kafka:9092
+ KAFKA_HOSTS=kafka:9092
DISABLE_SECURE_SSL_REDIRECT=1
SECURE_COOKIES=0
OPT_OUT_CAPTURE=1
diff --git a/.run/Celery.run.xml b/.run/Celery.run.xml
index 91b0cf2c2e..82f170a5fa 100644
--- a/.run/Celery.run.xml
+++ b/.run/Celery.run.xml
@@ -7,7 +7,7 @@
-
+
diff --git a/.run/PostHog.run.xml b/.run/PostHog.run.xml
index 89139b4232..febd0fb568 100644
--- a/.run/PostHog.run.xml
+++ b/.run/PostHog.run.xml
@@ -10,12 +10,11 @@
-
+
-
-
+
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 130f39fb8e..6b3cfca654 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -27,7 +27,7 @@
"PYTHONUNBUFFERED": "1",
"DEBUG": "1",
"CLICKHOUSE_SECURE": "False",
- "KAFKA_URL": "kafka://localhost",
+ "KAFKA_HOSTS": "localhost:9092",
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
"WORKER_CONCURRENCY": "2",
"SKIP_SERVICE_VERSION_REQUIREMENTS": "1"
@@ -67,12 +67,11 @@
"DJANGO_SETTINGS_MODULE": "posthog.settings",
"DEBUG": "1",
"CLICKHOUSE_SECURE": "False",
- "KAFKA_URL": "kafka://localhost",
- "SESSION_RECORDING_KAFKA_URL": "kafka://localhost",
+ "KAFKA_HOSTS": "localhost",
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
"SKIP_SERVICE_VERSION_REQUIREMENTS": "1",
"PRINT_SQL": "1",
- "REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO": "1.0"
+ "REPLAY_BLOB_INGESTION_TRAFFIC_RATIO": "1.0"
},
"console": "integratedTerminal",
"python": "${workspaceFolder}/env/bin/python",
diff --git a/bin/e2e-test-runner b/bin/e2e-test-runner
index fb93a90540..def1d16734 100755
--- a/bin/e2e-test-runner
+++ b/bin/e2e-test-runner
@@ -52,7 +52,7 @@ export CYPRESS_BASE_URL=http://localhost:8080
export OPT_OUT_CAPTURE=1
export SECURE_COOKIES=0
export SKIP_SERVICE_VERSION_REQUIREMENTS=1
-export KAFKA_URL=kafka://kafka:9092
+export KAFKA_HOSTS=kafka:9092
export CLICKHOUSE_DATABASE=posthog_test
export TEST=1 # Plugin server and kafka revert to 'default' Clickhouse database if TEST is not set
export CLICKHOUSE_SECURE=0
diff --git a/bin/plugin-server b/bin/plugin-server
index f82ca7731d..75000245ac 100755
--- a/bin/plugin-server
+++ b/bin/plugin-server
@@ -2,29 +2,28 @@
while test $# -gt 0; do
case "$1" in
- -h|--help)
- echo "USAGE:"
- echo " bin/plugin-server [FLAGS]"
- echo " "
- echo "FLAGS:"
- echo " -h, --help Print this help information."
- echo " --no-restart-loop Run without restart loop. Recommended when deferring resiliency to e.g. docker-compose."
- exit 0
- ;;
- --no-restart-loop)
- NO_RESTART_LOOP='true'
- shift
- ;;
- *)
- break
- ;;
+ -h | --help)
+ echo "USAGE:"
+ echo " bin/plugin-server [FLAGS]"
+ echo " "
+ echo "FLAGS:"
+ echo " -h, --help Print this help information."
+ echo " --no-restart-loop Run without restart loop. Recommended when deferring resiliency to e.g. docker-compose."
+ exit 0
+ ;;
+ --no-restart-loop)
+ NO_RESTART_LOOP='true'
+ shift
+ ;;
+ *)
+ break
+ ;;
esac
done
export BASE_DIR=$(dirname $(dirname "$PWD/${0#./}"))
-export KAFKA_URL=${KAFKA_URL:-'kafka://kafka:9092'}
-export KAFKA_HOSTS
+export KAFKA_HOSTS=${KAFKA_HOSTS:-'kafka:9092'}
if [[ -n $INJECT_EC2_CLIENT_RACK ]]; then
# To avoid cross-AZ Kafka traffic, set KAFKA_CLIENT_RACK from the EC2 metadata endpoint.
diff --git a/docker-compose.base.yml b/docker-compose.base.yml
index 77b4bc711a..3def6ce5cc 100644
--- a/docker-compose.base.yml
+++ b/docker-compose.base.yml
@@ -74,8 +74,7 @@ services:
CLICKHOUSE_DATABASE: 'posthog'
CLICKHOUSE_SECURE: 'false'
CLICKHOUSE_VERIFY: 'false'
- KAFKA_URL: 'kafka://kafka'
- SESSION_RECORDING_KAFKA_URL: 'kafka://kafka'
+ KAFKA_HOSTS: 'kafka'
REDIS_URL: 'redis://redis:6379/'
PGHOST: db
PGUSER: posthog
diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts
index c751fc9974..ea59c564e4 100644
--- a/plugin-server/src/config/config.ts
+++ b/plugin-server/src/config/config.ts
@@ -32,14 +32,14 @@ export function getDefaultConfig(): PluginsServerConfig {
EVENT_OVERFLOW_BUCKET_CAPACITY: 1000,
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: 1.0,
KAFKA_HOSTS: 'kafka:9092', // KEEP IN SYNC WITH posthog/settings/data_stores.py
- KAFKA_CLIENT_CERT_B64: null,
- KAFKA_CLIENT_CERT_KEY_B64: null,
- KAFKA_TRUSTED_CERT_B64: null,
- KAFKA_SECURITY_PROTOCOL: null,
- KAFKA_SASL_MECHANISM: null,
- KAFKA_SASL_USER: null,
- KAFKA_SASL_PASSWORD: null,
- KAFKA_CLIENT_RACK: null,
+ KAFKA_CLIENT_CERT_B64: undefined,
+ KAFKA_CLIENT_CERT_KEY_B64: undefined,
+ KAFKA_TRUSTED_CERT_B64: undefined,
+ KAFKA_SECURITY_PROTOCOL: undefined,
+ KAFKA_SASL_MECHANISM: undefined,
+ KAFKA_SASL_USER: undefined,
+ KAFKA_SASL_PASSWORD: undefined,
+ KAFKA_CLIENT_RACK: undefined,
KAFKA_CONSUMPTION_MAX_BYTES: 10_485_760, // Default value for kafkajs
KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: 1_048_576, // Default value for kafkajs, must be bigger than message size
KAFKA_CONSUMPTION_MAX_WAIT_MS: 1_000, // Down from the 5s default for kafkajs
@@ -120,6 +120,8 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag
+ SESSION_RECORDING_KAFKA_HOSTS: 'kafka:9092',
+ SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: undefined,
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: Change this to 'all' when we release it fully
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions',
// NOTE: 10 minutes
diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts
index 5515a3e959..321f9b57d2 100644
--- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts
+++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts
@@ -11,7 +11,6 @@ import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { createKafkaProducer, disconnectProducer } from '../../../kafka/producer'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, Team } from '../../../types'
-import { KafkaConfig } from '../../../utils/db/hub'
import { status } from '../../../utils/status'
import { TeamManager } from '../../../worker/ingestion/team-manager'
import { ObjectStorage } from '../../services/object_storage'
@@ -235,7 +234,12 @@ export class SessionRecordingBlobIngester {
throw e
}
- const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig as KafkaConfig)
+ const connectionConfig = createRdConnectionConfigFromEnvVars({
+ ...this.serverConfig,
+ // We use the same kafka config overall but different hosts for the session recordings
+ KAFKA_HOSTS: this.serverConfig.SESSION_RECORDING_KAFKA_HOSTS,
+ KAFKA_SECURITY_PROTOCOL: this.serverConfig.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL,
+ })
this.producer = await createKafkaProducer(connectionConfig)
// Create a node-rdkafka consumer that fetches batches of messages, runs
diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts
index d7f32da1f5..197f72df32 100644
--- a/plugin-server/src/main/pluginsServer.ts
+++ b/plugin-server/src/main/pluginsServer.ts
@@ -10,7 +10,7 @@ import { Counter } from 'prom-client'
import { getPluginServerCapabilities } from '../capabilities'
import { defaultConfig } from '../config/config'
import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
-import { createHub, KafkaConfig } from '../utils/db/hub'
+import { createHub } from '../utils/db/hub'
import { captureEventLoopMetrics } from '../utils/metrics'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PubSub } from '../utils/pubsub'
@@ -363,7 +363,7 @@ export async function startPluginsServer(
join,
} = await startSessionRecordingEventsConsumer({
teamManager: teamManager,
- kafkaConfig: serverConfig as KafkaConfig,
+ kafkaConfig: serverConfig,
consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxWaitMs: serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts
index c2c597c511..bb7992a97b 100644
--- a/plugin-server/src/types.ts
+++ b/plugin-server/src/types.ts
@@ -96,14 +96,14 @@ export interface PluginsServerConfig {
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: string // (advanced) a comma separated list of teams to disable clickhouse external schemas for
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string // (advanced) topic to send events to for clickhouse ingestion
KAFKA_HOSTS: string // comma-delimited Kafka hosts
- KAFKA_CLIENT_CERT_B64: string | null
- KAFKA_CLIENT_CERT_KEY_B64: string | null
- KAFKA_TRUSTED_CERT_B64: string | null
- KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | null
- KAFKA_SASL_MECHANISM: KafkaSaslMechanism | null
- KAFKA_SASL_USER: string | null
- KAFKA_SASL_PASSWORD: string | null
- KAFKA_CLIENT_RACK: string | null
+ KAFKA_CLIENT_CERT_B64: string | undefined
+ KAFKA_CLIENT_CERT_KEY_B64: string | undefined
+ KAFKA_TRUSTED_CERT_B64: string | undefined
+ KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | undefined
+ KAFKA_SASL_MECHANISM: KafkaSaslMechanism | undefined
+ KAFKA_SASL_USER: string | undefined
+ KAFKA_SASL_PASSWORD: string | undefined
+ KAFKA_CLIENT_RACK: string | undefined
KAFKA_CONSUMPTION_MAX_BYTES: number
KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: number
KAFKA_CONSUMPTION_MAX_WAIT_MS: number // fetch.wait.max.ms rdkafka parameter
@@ -189,6 +189,8 @@ export interface PluginsServerConfig {
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: number
CLOUD_DEPLOYMENT: string
+ SESSION_RECORDING_KAFKA_HOSTS: string
+ SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | undefined
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: string
// local directory might be a volume mount or a directory on disk (e.g. in local dev)
SESSION_RECORDING_LOCAL_DIRECTORY: string
diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts
index 1310ab828a..d31a82645c 100644
--- a/plugin-server/src/utils/db/hub.ts
+++ b/plugin-server/src/utils/db/hub.ts
@@ -128,9 +128,8 @@ export async function createHub(
status.info('🤔', `Connecting to Kafka...`)
- const kafka = createKafkaClient(serverConfig as KafkaConfig)
-
- const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig as KafkaConfig)
+ const kafka = createKafkaClient(serverConfig)
+ const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig)
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })
const kafkaProducer = new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK)
diff --git a/posthog/api/capture.py b/posthog/api/capture.py
index b3b376d2e4..b2a814463f 100644
--- a/posthog/api/capture.py
+++ b/posthog/api/capture.py
@@ -379,10 +379,7 @@ def get_event(request):
# Legacy solution stays in place
processed_replay_events = legacy_preprocess_session_recording_events_for_clickhouse(replay_events)
- if (
- random() <= settings.REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO
- and settings.SESSION_RECORDING_KAFKA_HOSTS
- ):
+ if random() <= settings.REPLAY_BLOB_INGESTION_TRAFFIC_RATIO:
# The new flow we only enable if the dedicated kafka is enabled
processed_replay_events += preprocess_replay_events_for_blob_ingestion(replay_events)
diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py
index f90b6bddd9..47c8723da0 100644
--- a/posthog/api/test/test_capture.py
+++ b/posthog/api/test/test_capture.py
@@ -1195,7 +1195,7 @@ class TestCapture(BaseTest):
snapshot_source = 8
snapshot_type = 8
event_data = {"foo": "bar"}
- with self.settings(REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO=1):
+ with self.settings(REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=0):
self._send_session_recording_event(
timestamp=timestamp,
snapshot_source=snapshot_source,
@@ -1260,7 +1260,7 @@ class TestCapture(BaseTest):
with self.settings(
SESSION_RECORDING_KAFKA_HOSTS=["kafka://another-server:9092"],
- REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO=1,
+ REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
):
default_kafka_producer_mock.return_value = KafkaProducer()
session_recording_producer_mock.return_value = sessionRecordingKafkaProducer()
@@ -1281,18 +1281,15 @@ class TestCapture(BaseTest):
@patch("posthog.api.capture.sessionRecordingKafkaProducer")
@patch("posthog.api.capture.KafkaProducer")
@patch("posthog.kafka_client.client._KafkaProducer.produce")
- def test_uses_does_not_produce_if_session_recording_kafka_unavailable(
+ def test_uses_does_not_produce_if_blob_ingestion_disabled(
self,
kafka_produce: MagicMock,
default_kafka_producer_mock: MagicMock,
session_recording_producer_mock: MagicMock,
) -> None:
- with self.settings(
- SESSION_RECORDING_KAFKA_HOSTS=None,
- REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO=1,
- ):
+ with self.settings(REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=0):
default_kafka_producer_mock.return_value = KafkaProducer()
- session_recording_producer_mock.side_effect = Exception("Kafka not available")
+ session_recording_producer_mock.side_effect = sessionRecordingKafkaProducer()
data = "example"
self._send_session_recording_event(event_data=data)
diff --git a/posthog/clickhouse/kafka_engine.py b/posthog/clickhouse/kafka_engine.py
index 253d6e3af7..3e7b562a7f 100644
--- a/posthog/clickhouse/kafka_engine.py
+++ b/posthog/clickhouse/kafka_engine.py
@@ -35,9 +35,9 @@ KAFKA_COLUMNS_WITH_PARTITION = """
"""
-def kafka_engine(topic: str, kafka_host=None, group="group1"):
+def kafka_engine(topic: str, kafka_host: str | None = None, group="group1") -> str:
if kafka_host is None:
- kafka_host = settings.KAFKA_HOSTS_FOR_CLICKHOUSE
+ kafka_host = ",".join(settings.KAFKA_HOSTS_FOR_CLICKHOUSE)
return KAFKA_ENGINE.format(topic=topic, kafka_host=kafka_host, group=group, serialization="JSONEachRow")
diff --git a/posthog/clickhouse/test/test_schema.py b/posthog/clickhouse/test/test_schema.py
index 190fa702b6..d6bc70435f 100644
--- a/posthog/clickhouse/test/test_schema.py
+++ b/posthog/clickhouse/test/test_schema.py
@@ -26,7 +26,7 @@ def test_create_table_query_replicated_and_storage(query, snapshot, settings):
@pytest.mark.parametrize("query", CREATE_KAFKA_TABLE_QUERIES, ids=get_table_name)
def test_create_kafka_table_with_different_kafka_host(query, snapshot, settings):
- settings.KAFKA_HOSTS_FOR_CLICKHOUSE = "test.kafka.broker:9092"
+ settings.KAFKA_HOSTS_FOR_CLICKHOUSE = ["test.kafka.broker:9092"]
assert build_query(query) == snapshot
diff --git a/posthog/kafka_client/client.py b/posthog/kafka_client/client.py
index 5e405f1536..c566058a6f 100644
--- a/posthog/kafka_client/client.py
+++ b/posthog/kafka_client/client.py
@@ -184,13 +184,11 @@ SessionRecordingKafkaProducer = SingletonDecorator(_KafkaProducer)
def sessionRecordingKafkaProducer() -> _KafkaProducer:
- if not settings.SESSION_RECORDING_KAFKA_HOSTS:
- raise Exception("Session recording kafka producer not available")
-
return SessionRecordingKafkaProducer(
kafka_hosts=settings.SESSION_RECORDING_KAFKA_HOSTS,
kafka_security_protocol=settings.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL,
max_message_bytes=settings.SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES,
+ compression_type=settings.SESSION_RECORDING_KAFKA_COMPRESSION,
)
diff --git a/posthog/kafka_client/helper.py b/posthog/kafka_client/helper.py
index 4b520aa031..6084e991a1 100644
--- a/posthog/kafka_client/helper.py
+++ b/posthog/kafka_client/helper.py
@@ -9,10 +9,7 @@ import os
import ssl
from tempfile import NamedTemporaryFile
-try:
- from urllib.parse import urlparse
-except ImportError:
- from urlparse import urlparse # type: ignore
+from django.conf import settings
from base64 import standard_b64encode
@@ -82,29 +79,13 @@ def get_kafka_ssl_context():
return ssl_context
-def get_kafka_brokers():
- """
- Parses the KAKFA_URL and returns a list of hostname:port pairs in the format
- that kafka-python expects.
- """
- # NOTE: The Kafka environment variables need to be present. If using
- # Apache Kafka on Heroku, they will be available in your app configuration.
- if not os.environ.get("KAFKA_URL"):
- raise RuntimeError("The KAFKA_URL config variable is not set.")
-
- return [
- "{}:{}".format(parsedUrl.hostname, parsedUrl.port)
- for parsedUrl in [urlparse(url) for url in os.environ.get("KAFKA_URL", "").split(",")]
- ]
-
-
def get_kafka_producer(acks="all", value_serializer=lambda v: json.dumps(v).encode("utf-8"), **kwargs):
"""
Return a KafkaProducer that uses the SSLContext created with create_ssl_context.
"""
producer = KafkaProducer(
- bootstrap_servers=get_kafka_brokers(),
+ bootstrap_servers=settings.KAFKA_HOSTS,
security_protocol="SSL",
ssl_context=get_kafka_ssl_context(),
value_serializer=value_serializer,
@@ -123,7 +104,7 @@ def get_kafka_consumer(topic=None, value_deserializer=lambda v: json.loads(v.dec
# Create the KafkaConsumer connected to the specified brokers. Use the
# SSLContext that is created with create_ssl_context.
consumer = KafkaConsumer(
- bootstrap_servers=get_kafka_brokers(),
+ bootstrap_servers=settings.KAFKA_HOSTS,
security_protocol="SSL",
ssl_context=get_kafka_ssl_context(),
value_deserializer=value_deserializer,
diff --git a/posthog/models/person_overrides/sql.py b/posthog/models/person_overrides/sql.py
index 798b76e55e..853988495f 100644
--- a/posthog/models/person_overrides/sql.py
+++ b/posthog/models/person_overrides/sql.py
@@ -91,7 +91,7 @@ KAFKA_PERSON_OVERRIDES_TABLE_SQL = f"""
ON CLUSTER '{CLICKHOUSE_CLUSTER}'
ENGINE = Kafka(
- '{KAFKA_HOSTS}', -- Kafka hosts
+ '{",".join(KAFKA_HOSTS)}', -- Kafka hosts
'{KAFKA_PERSON_OVERRIDE}', -- Kafka topic
'clickhouse-person-overrides', -- Kafka consumer group id
'JSONEachRow' -- Specify that we should pass Kafka messages as JSON
diff --git a/posthog/settings/data_stores.py b/posthog/settings/data_stores.py
index 86f86e030e..b1e357cc03 100644
--- a/posthog/settings/data_stores.py
+++ b/posthog/settings/data_stores.py
@@ -1,5 +1,6 @@
import os
import json
+from typing import List
from urllib.parse import urlparse
import dj_database_url
@@ -157,16 +158,27 @@ CLICKHOUSE_HTTP_URL = f"{_clickhouse_http_protocol}{CLICKHOUSE_HOST}:{_clickhous
READONLY_CLICKHOUSE_USER = os.getenv("READONLY_CLICKHOUSE_USER", None)
READONLY_CLICKHOUSE_PASSWORD = os.getenv("READONLY_CLICKHOUSE_PASSWORD", None)
-# Kafka configs
-_parse_kafka_hosts = lambda kafka_url: ",".join(urlparse(host).netloc for host in kafka_url.split(","))
+def _parse_kafka_hosts(hosts_string: str) -> List[str]:
+ hosts = []
+ for host in hosts_string.split(","):
+ if "://" in host:
+ hosts.append(urlparse(host).netloc)
+ else:
+ hosts.append(host)
+
+ # We don't want empty strings
+ return [host for host in hosts if host]
+
# URL(s) used by Kafka clients/producers - KEEP IN SYNC WITH plugin-server/src/config/config.ts
-KAFKA_URL = os.getenv("KAFKA_URL", "kafka://kafka:9092")
-KAFKA_HOSTS = _parse_kafka_hosts(KAFKA_URL)
-
-SESSION_RECORDING_KAFKA_URL = os.getenv("SESSION_RECORDING_KAFKA_URL", "")
-SESSION_RECORDING_KAFKA_HOSTS = _parse_kafka_hosts(SESSION_RECORDING_KAFKA_URL)
+# We prefer KAFKA_HOSTS over KAFKA_URL (which used to be used)
+KAFKA_HOSTS = _parse_kafka_hosts(os.getenv("KAFKA_HOSTS", "") or os.getenv("KAFKA_URL", "") or "kafka:9092")
+# Dedicated kafka hosts for session recordings
+SESSION_RECORDING_KAFKA_HOSTS = _parse_kafka_hosts(os.getenv("SESSION_RECORDING_KAFKA_HOSTS", "")) or KAFKA_HOSTS
+# Kafka broker host(s) that is used by clickhouse for ingesting messages.
+# Useful if clickhouse is hosted outside the cluster.
+KAFKA_HOSTS_FOR_CLICKHOUSE = _parse_kafka_hosts(os.getenv("KAFKA_URL_FOR_CLICKHOUSE", "")) or KAFKA_HOSTS
# can set ('gzip', 'snappy', 'lz4', None)
SESSION_RECORDING_KAFKA_COMPRESSION = os.getenv("SESSION_RECORDING_KAFKA_COMPRESSION", None)
@@ -177,9 +189,6 @@ SESSION_RECORDING_KAFKA_COMPRESSION = os.getenv("SESSION_RECORDING_KAFKA_COMPRES
# for details.
KAFKA_PREFIX = os.getenv("KAFKA_PREFIX", "")
-# Kafka broker host(s) that is used by clickhouse for ingesting messages. Useful if clickhouse is hosted outside the cluster.
-KAFKA_HOSTS_FOR_CLICKHOUSE = _parse_kafka_hosts(os.getenv("KAFKA_URL_FOR_CLICKHOUSE", KAFKA_URL))
-
KAFKA_BASE64_KEYS = get_from_env("KAFKA_BASE64_KEYS", False, type_cast=str_to_bool)
SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES = get_from_env(
diff --git a/posthog/settings/ingestion.py b/posthog/settings/ingestion.py
index e373984fba..3b554435d0 100644
--- a/posthog/settings/ingestion.py
+++ b/posthog/settings/ingestion.py
@@ -31,12 +31,10 @@ PARTITION_KEY_BUCKET_REPLENTISH_RATE = get_from_env(
)
REPLAY_EVENT_MAX_SIZE = get_from_env("REPLAY_EVENT_MAX_SIZE", type_cast=int, default=1024 * 512) # 512kb
-REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO = get_from_env(
- "REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO", type_cast=float, default=0.0
-)
+REPLAY_BLOB_INGESTION_TRAFFIC_RATIO = get_from_env("REPLAY_BLOB_INGESTION_TRAFFIC_RATIO", type_cast=float, default=0.0)
-if REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO > 1 or REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO < 0:
+if REPLAY_BLOB_INGESTION_TRAFFIC_RATIO > 1 or REPLAY_BLOB_INGESTION_TRAFFIC_RATIO < 0:
logger.critical(
- "Environment variable REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO is not between 0 and 1. Setting to 0 to be safe."
+ "Environment variable REPLAY_BLOB_INGESTION_TRAFFIC_RATIO is not between 0 and 1. Setting to 0 to be safe."
)
- REPLAY_ALTERNATIVE_COMPRESSION_TRAFFIC_RATIO = 0
+ REPLAY_BLOB_INGESTION_TRAFFIC_RATIO = 0