From a298fae5df33cd748998e274def971745a7cc6a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Sequeira?= Date: Mon, 10 Nov 2025 14:46:39 +0100 Subject: [PATCH] feat(plugin-server): Add WARPSTREAM_PRODUCER kafka config target (#41194) --- plugin-server/src/kafka/config.ts | 2 +- .../src/main/ingestion-queues/session-recording-v2/consumer.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin-server/src/kafka/config.ts b/plugin-server/src/kafka/config.ts index e16f673f9a..ff407936ab 100644 --- a/plugin-server/src/kafka/config.ts +++ b/plugin-server/src/kafka/config.ts @@ -10,7 +10,7 @@ export const RDKAFKA_LOG_LEVEL_MAPPING = { ERROR: 3, } -export type KafkaConfigTarget = 'PRODUCER' | 'CONSUMER' | 'CDP_PRODUCER' +export type KafkaConfigTarget = 'PRODUCER' | 'CONSUMER' | 'CDP_PRODUCER' | 'WARPSTREAM_PRODUCER' export const getKafkaConfigFromEnv = (prefix: KafkaConfigTarget): GlobalConfig => { // NOTE: We have learnt that having as much exposed config to the env as possible is really useful diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts index c0b67f7a24..d41e9a7682 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts @@ -299,7 +299,7 @@ export class SessionRecordingIngester { // Initialize overflow producer if not consuming from overflow if (!this.consumeOverflow) { - this.kafkaOverflowProducer = await KafkaProducerWrapper.create(this.hub, 'CONSUMER') + this.kafkaOverflowProducer = await KafkaProducerWrapper.create(this.hub, 'WARPSTREAM_PRODUCER') } // Initialize restriction handler with the overflow producer