diff --git a/plugin-server/src/kafka/config.ts b/plugin-server/src/kafka/config.ts index e16f673f9a..ff407936ab 100644 --- a/plugin-server/src/kafka/config.ts +++ b/plugin-server/src/kafka/config.ts @@ -10,7 +10,7 @@ export const RDKAFKA_LOG_LEVEL_MAPPING = { ERROR: 3, } -export type KafkaConfigTarget = 'PRODUCER' | 'CONSUMER' | 'CDP_PRODUCER' +export type KafkaConfigTarget = 'PRODUCER' | 'CONSUMER' | 'CDP_PRODUCER' | 'WARPSTREAM_PRODUCER' export const getKafkaConfigFromEnv = (prefix: KafkaConfigTarget): GlobalConfig => { // NOTE: We have learnt that having as much exposed config to the env as possible is really useful diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts index c0b67f7a24..d41e9a7682 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts @@ -299,7 +299,7 @@ export class SessionRecordingIngester { // Initialize overflow producer if not consuming from overflow if (!this.consumeOverflow) { - this.kafkaOverflowProducer = await KafkaProducerWrapper.create(this.hub, 'CONSUMER') + this.kafkaOverflowProducer = await KafkaProducerWrapper.create(this.hub, 'WARPSTREAM_PRODUCER') } // Initialize restriction handler with the overflow producer