feat: Move session replay to new consumers (#31647)

This commit is contained in:
Ben White
2025-04-28 15:57:59 +02:00
committed by GitHub
parent b21888f7c9
commit d750f4b8c1
18 changed files with 479 additions and 1562 deletions

View File

@@ -11,10 +11,6 @@
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
<env name="KAFKA_HOSTS" value="localhost:9092" />
<env name="OBJECT_STORAGE_ENABLED" value="True" />
<env name="SESSION_RECORDING_KAFKA_BATCH_SIZE" value="200" />
<env name="SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS" value="180" />
<env name="SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS" value="all" />
<env name="SESSION_RECORDING_KAFKA_QUEUE_SIZE" value="600" />
</envs>
<method v="2" />
</configuration>

View File

@@ -25,12 +25,13 @@ export BASE_DIR=$(dirname $(dirname "$PWD/${0#./}"))
export KAFKA_HOSTS=${KAFKA_HOSTS:-'kafka:9092'}
# NOTE: This is no longer used and will be removed in the future. Startup is now managed directly in the helm chart repo
if [[ -n $INJECT_EC2_CLIENT_RACK ]]; then
# To avoid cross-AZ Kafka traffic, set KAFKA_CLIENT_RACK from the EC2 metadata endpoint.
# TODO: switch to the downwards API when https://github.com/kubernetes/kubernetes/issues/40610 is released
TOKEN=$(curl --max-time 0.1 -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600")
export KAFKA_CLIENT_RACK=$(curl --max-time 0.1 -H "X-aws-ec2-metadata-token: $TOKEN" -v http://169.254.169.254/latest/meta-data/placement/availability-zone-id)
# interpolate the KAFKA_CLIENT_RACK value in KAFKA_CLIENT_ID env vars
# Allows the above exported KAFKA_CLIENT_RACK to be used like foo-$KAFKA_CLIENT_RACK in the following vars
export KAFKA_CLIENT_ID=$(echo $KAFKA_CLIENT_ID | envsubst)
export KAFKA_PRODUCER_CLIENT_ID=$(echo $KAFKA_PRODUCER_CLIENT_ID | envsubst)
fi

View File

@@ -9,7 +9,7 @@ import {
serializeHogFunctionInvocation,
} from '../../cdp/utils'
import { KAFKA_EVENTS_JSON } from '../../config/kafka-topics'
import { KafkaConsumer } from '../../kafka/batch-consumer-v2'
import { KafkaConsumer } from '../../kafka/consumer'
import { runInstrumentedFunction } from '../../main/utils'
import { Hub, RawClickHouseEvent } from '../../types'
import { parseJSON } from '../../utils/json-parse'

View File

@@ -147,13 +147,6 @@ export function getDefaultConfig(): PluginsServerConfig {
STARTUP_PROFILE_HEAP_INTERVAL: 512 * 1024, // default v8 value
STARTUP_PROFILE_HEAP_DEPTH: 16, // default v8 value
SESSION_RECORDING_KAFKA_HOSTS: undefined,
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: undefined,
SESSION_RECORDING_KAFKA_BATCH_SIZE: 500,
SESSION_RECORDING_KAFKA_QUEUE_SIZE: 1500,
// if not set we'll use the plugin server default value
SESSION_RECORDING_KAFKA_QUEUE_SIZE_KB: undefined,
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions',
// NOTE: 10 minutes
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: 60 * 10,
@@ -169,14 +162,11 @@ export function getDefaultConfig(): PluginsServerConfig {
SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED: true,
SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED: true,
SESSION_RECORDING_DEBUG_PARTITION: '',
SESSION_RECORDING_KAFKA_DEBUG: undefined,
SESSION_RECORDING_MAX_PARALLEL_FLUSHES: 10,
SESSION_RECORDING_OVERFLOW_ENABLED: false,
SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: 5_000_000, // 5MB/second uncompressed, sustained
SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: 200_000_000, // 200MB burst
SESSION_RECORDING_OVERFLOW_MIN_PER_BATCH: 1_000_000, // All sessions consume at least 1MB/batch, to penalise poor batching
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: 0, // 0 disables stats collection
SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: 1_048_576, // 1MB
ENCRYPTION_SALT_KEYS: isDevEnv() || isTestEnv() ? '00beef0000beef0000beef0000beef00' : '',

View File

@@ -3,7 +3,7 @@ import { Counter } from 'prom-client'
import { z } from 'zod'
import { HogTransformerService } from '../cdp/hog-transformations/hog-transformer.service'
import { KafkaConsumer } from '../kafka/batch-consumer-v2'
import { KafkaConsumer } from '../kafka/consumer'
import { KafkaProducerWrapper } from '../kafka/producer'
import { ingestionOverflowingMessagesTotal } from '../main/ingestion-queues/batch-processing/metrics'
import {

View File

@@ -1,403 +0,0 @@
import {
ClientMetrics,
ConsumerGlobalConfig,
KafkaConsumer as RdKafkaConsumer,
LibrdKafkaError,
Message,
Metadata,
PartitionMetadata,
TopicPartitionOffset,
WatermarkOffsets,
} from 'node-rdkafka'
import { hostname } from 'os'
import { Histogram } from 'prom-client'
import { defaultConfig } from '../config/config'
import { logger } from '../utils/logger'
import { captureException } from '../utils/posthog'
import { retryIfRetriable } from '../utils/retries'
import { promisifyCallback } from '../utils/utils'
import { ensureTopicExists } from './admin'
import { getConsumerConfigFromEnv } from './config'
import { consumedBatchDuration, consumerBatchSize, gaugeBatchUtilization } from './consumer'
const DEFAULT_BATCH_TIMEOUT_MS = 500
const DEFAULT_FETCH_BATCH_SIZE = 1000
const SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS = 10000
const MAX_HEALTH_HEARTBEAT_INTERVAL_MS = 60_000
// export const consumedBatchDuration = new Histogram({
// name: 'consumed_batch_duration_ms',
// help: 'Main loop consumer batch processing duration in ms',
// labelNames: ['topic', 'groupId'],
// })
// export const consumerBatchSize = new Histogram({
// name: 'consumed_batch_size',
// help: 'Size of the batch fetched by the consumer',
// labelNames: ['topic', 'groupId'],
// buckets: exponentialBuckets(1, 3, 5),
// })
// export const consumedMessageSizeBytes = new Histogram({
// name: 'consumed_message_size_bytes',
// help: 'Size of consumed message value in bytes',
// labelNames: ['topic', 'groupId', 'messageType'],
// buckets: exponentialBuckets(1, 8, 4).map((bucket) => bucket * 1024),
// })
// export const kafkaAbsolutePartitionCount = new Gauge({
// name: 'kafka_absolute_partition_count',
// help: 'Number of partitions assigned to this consumer. (Absolute value from the consumer state.)',
// labelNames: ['topic'],
// })
// export const gaugeBatchUtilization = new Gauge({
// name: 'consumer_batch_utilization',
// help: 'Indicates how big batches are we are processing compared to the max batch size. Useful as a scaling metric',
// labelNames: ['groupId'],
// })
export const histogramKafkaBatchSize = new Histogram({
name: 'consumer_batch_size',
help: 'The size of the batches we are receiving from Kafka',
buckets: [0, 50, 100, 250, 500, 750, 1000, 1500, 2000, 3000, Infinity],
})
export const histogramKafkaBatchSizeKb = new Histogram({
name: 'consumer_batch_size_kb',
help: 'The size in kb of the batches we are receiving from Kafka',
buckets: [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity],
})
const findOffsetsToCommit = (messages: TopicPartitionOffset[]): TopicPartitionOffset[] => {
// We only need to commit the highest offset for a batch of messages
const messagesByTopicPartition = messages.reduce((acc, message) => {
if (!acc[message.topic]) {
acc[message.topic] = {}
}
if (!acc[message.topic][message.partition]) {
acc[message.topic][message.partition] = []
}
acc[message.topic][message.partition].push(message)
return acc
}, {} as { [topic: string]: { [partition: number]: TopicPartitionOffset[] } })
// Then we find the highest offset for each topic partition
const highestOffsets = Object.entries(messagesByTopicPartition).flatMap(([topic, partitions]) => {
return Object.entries(partitions).map(([partition, messages]) => {
const highestOffset = Math.max(...messages.map((message) => message.offset))
return {
topic,
partition: parseInt(partition),
offset: highestOffset,
}
})
})
return highestOffsets
}
export type KafkaConsumerConfig = {
groupId: string
topic: string
batchTimeoutMs?: number
callEachBatchWhenEmpty?: boolean
autoOffsetStore?: boolean
autoCommit?: boolean
}
export type RdKafkaConsumerConfig = Omit<
ConsumerGlobalConfig,
'group.id' | 'enable.auto.offset.store' | 'enable.auto.commit'
>
export class KafkaConsumer {
private isStopping = false
private lastHeartbeatTime = 0
private rdKafkaConsumer: RdKafkaConsumer
private consumerConfig: ConsumerGlobalConfig
private fetchBatchSize: number
private maxHealthHeartbeatIntervalMs: number
private consumerLoop: Promise<void> | undefined
constructor(private config: KafkaConsumerConfig, rdKafkaConfig: RdKafkaConsumerConfig = {}) {
this.config.autoCommit ??= true
this.config.autoOffsetStore ??= true
this.config.callEachBatchWhenEmpty ??= false
this.fetchBatchSize = defaultConfig.CONSUMER_BATCH_SIZE || DEFAULT_FETCH_BATCH_SIZE
this.maxHealthHeartbeatIntervalMs =
defaultConfig.CONSUMER_MAX_HEARTBEAT_INTERVAL_MS || MAX_HEALTH_HEARTBEAT_INTERVAL_MS
this.consumerConfig = {
'client.id': hostname(),
'security.protocol': 'plaintext',
'metadata.broker.list': 'kafka:9092', // Overridden with KAFKA_CONSUMER_METADATA_BROKER_LIST
log_level: 4, // WARN as the default
'group.id': this.config.groupId,
'session.timeout.ms': 30_000,
'max.poll.interval.ms': 300_000,
'max.partition.fetch.bytes': 1_048_576,
'fetch.error.backoff.ms': 100,
'fetch.message.max.bytes': 10_485_760,
'fetch.wait.max.ms': 50,
'queued.min.messages': 100000,
'queued.max.messages.kbytes': 102400, // 1048576 is the default, we go smaller to reduce mem usage.
// Custom settings and overrides - this is where most configuration overrides should be done
...getConsumerConfigFromEnv(),
// Finally any specifically given consumer config overrides
...rdKafkaConfig,
// Below is config that we explicitly DO NOT want to be overrideable by env vars - i.e. things that would require code changes to change
'enable.auto.offset.store': false, // NOTE: This is always false - we handle it using a custom function
'enable.auto.commit': this.config.autoCommit,
'partition.assignment.strategy': 'cooperative-sticky',
'enable.partition.eof': true,
rebalance_cb: true,
offset_commit_cb: true,
}
this.rdKafkaConsumer = this.createConsumer()
}
public getConfig() {
return {
...this.consumerConfig,
}
}
public heartbeat() {
// Can be called externally to update the heartbeat time and keep the consumer alive
this.lastHeartbeatTime = Date.now()
}
public isHealthy() {
// this is called as a readiness and a liveness probe
const isWithinInterval = Date.now() - this.lastHeartbeatTime < this.maxHealthHeartbeatIntervalMs
const isConnected = this.rdKafkaConsumer.isConnected()
return isConnected && isWithinInterval
}
public assignments() {
return this.rdKafkaConsumer.isConnected() ? this.rdKafkaConsumer.assignments() : []
}
public offsetsStore(topicPartitionOffsets: TopicPartitionOffset[]) {
return this.rdKafkaConsumer.offsetsStore(topicPartitionOffsets)
}
public on: RdKafkaConsumer['on'] = (...args) => {
// Delegate to the internal consumer
return this.rdKafkaConsumer.on(...args)
}
public async queryWatermarkOffsets(topic: string, partition: number, timeout = 10000): Promise<[number, number]> {
if (!this.rdKafkaConsumer.isConnected()) {
throw new Error('Not connected')
}
const offsets = await promisifyCallback<WatermarkOffsets>((cb) =>
this.rdKafkaConsumer.queryWatermarkOffsets(topic, partition, timeout, cb)
).catch((err) => {
captureException(err)
logger.error('🔥', 'Failed to query kafka watermark offsets', err)
throw err
})
return [offsets.lowOffset, offsets.highOffset]
}
public async getPartitionsForTopic(topic: string): Promise<PartitionMetadata[]> {
if (!this.rdKafkaConsumer.isConnected()) {
throw new Error('Not connected')
}
const meta = await promisifyCallback<Metadata>((cb) => this.rdKafkaConsumer.getMetadata({ topic }, cb)).catch(
(err) => {
captureException(err)
logger.error('🔥', 'Failed to get partition metadata', err)
throw err
}
)
return meta.topics.find((x) => x.name === topic)?.partitions ?? []
}
private createConsumer() {
const consumer = new RdKafkaConsumer(this.consumerConfig, {
// Default settings
'auto.offset.reset': 'earliest',
})
consumer.on('event.log', (log) => {
logger.info('📝', 'librdkafka log', { log: log })
})
consumer.on('event.error', (error: LibrdKafkaError) => {
logger.error('📝', 'librdkafka error', { log: error })
})
consumer.on('subscribed', (topics) => {
logger.info('📝', 'librdkafka consumer subscribed', { topics, config: this.consumerConfig })
})
consumer.on('connection.failure', (error: LibrdKafkaError, metrics: ClientMetrics) => {
logger.error('📝', 'librdkafka connection failure', { error, metrics, config: this.consumerConfig })
})
consumer.on('offset.commit', (error: LibrdKafkaError, topicPartitionOffsets: TopicPartitionOffset[]) => {
if (error) {
logger.warn('📝', 'librdkafka_offet_commit_error', { error, topicPartitionOffsets })
} else {
logger.debug('📝', 'librdkafka_offset_commit', { topicPartitionOffsets })
}
})
return consumer
}
private storeOffsetsForMessages = (messages: Message[]) => {
const topicPartitionOffsets = findOffsetsToCommit(messages).map((message) => {
return {
...message,
// When committing to Kafka you commit the offset of the next message you want to consume
offset: message.offset + 1,
}
})
if (topicPartitionOffsets.length > 0) {
logger.debug('📝', 'Storing offsets', { topicPartitionOffsets })
this.rdKafkaConsumer.offsetsStore(topicPartitionOffsets)
}
}
public async connect(eachBatch: (messages: Message[]) => Promise<void>) {
const { topic, groupId, callEachBatchWhenEmpty = false } = this.config
try {
await promisifyCallback<Metadata>((cb) => this.rdKafkaConsumer.connect({}, cb))
logger.info('📝', 'librdkafka consumer connected')
} catch (error) {
logger.error('⚠️', 'connect_error', { error: error })
throw error
}
this.heartbeat() // Setup the heartbeat so we are healthy since connection is established
await ensureTopicExists(this.consumerConfig, this.config.topic)
// The consumer has an internal pre-fetching queue that sequentially pools
// each partition, with the consumerMaxWaitMs timeout. We want to read big
// batches from this queue, but guarantee we are still running (with smaller
// batches) if the queue is not full enough. batchingTimeoutMs is that
// timeout, to return messages even if fetchBatchSize is not reached.
this.rdKafkaConsumer.setDefaultConsumeTimeout(this.config.batchTimeoutMs || DEFAULT_BATCH_TIMEOUT_MS)
this.rdKafkaConsumer.subscribe([this.config.topic])
const startConsuming = async () => {
try {
while (!this.isStopping) {
logger.debug('🔁', 'main_loop_consuming')
// TRICKY: We wrap this in a retry check. It seems that despite being connected and ready, the client can still have an undeterministic
// error when consuming, hence the retryIfRetriable.
const messages = await retryIfRetriable(() =>
promisifyCallback<Message[]>((cb) => this.rdKafkaConsumer.consume(this.fetchBatchSize, cb))
)
// After successfully pulling a batch, we can update our heartbeat time
this.heartbeat()
if (!messages) {
logger.debug('🔁', 'main_loop_empty_batch', { cause: 'undefined' })
consumerBatchSize.labels({ topic, groupId }).observe(0)
continue
}
gaugeBatchUtilization.labels({ groupId }).set(messages.length / this.fetchBatchSize)
logger.debug('🔁', 'main_loop_consumed', { messagesLength: messages.length })
if (!messages.length && !callEachBatchWhenEmpty) {
logger.debug('🔁', 'main_loop_empty_batch', { cause: 'empty' })
consumerBatchSize.labels({ topic, groupId }).observe(0)
continue
}
histogramKafkaBatchSize.observe(messages.length)
histogramKafkaBatchSizeKb.observe(
messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024
)
const startProcessingTimeMs = new Date().valueOf()
await eachBatch(messages)
const processingTimeMs = new Date().valueOf() - startProcessingTimeMs
consumedBatchDuration.labels({ topic, groupId }).observe(processingTimeMs)
const logSummary = `Processed ${messages.length} events in ${
Math.round(processingTimeMs / 10) / 100
}s`
if (processingTimeMs > SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS) {
logger.warn('🕒', `Slow batch: ${logSummary}`)
}
if (this.config.autoCommit && this.config.autoOffsetStore) {
this.storeOffsetsForMessages(messages)
}
}
} catch (error) {
throw error
} finally {
logger.info('🔁', 'main_loop_stopping')
// Finally, disconnect from the broker. If stored offsets have changed via
// `storeOffsetsForMessages` above, they will be committed before shutdown (so long
// as this consumer is still part of the group).
await this.disconnectConsumer()
logger.info('🔁', 'Disconnected node-rdkafka consumer')
}
}
this.consumerLoop = startConsuming().catch((error) => {
logger.error('🔁', 'consumer_loop_error', {
error: String(error),
config: this.config,
consumerConfig: this.consumerConfig,
})
// We re-throw the error as that way it will be caught in server.ts and trigger a full shutdown
throw error
})
}
public async disconnect() {
if (this.isStopping) {
return
}
// Mark as stopping - this will also essentially stop the consumer loop
this.isStopping = true
// Allow the in progress consumer loop to finish if possible
if (this.consumerLoop) {
await this.consumerLoop.catch((error) => {
logger.error('🔁', 'failed to stop consumer loop safely. Continuing shutdown', {
error: String(error),
config: this.config,
consumerConfig: this.consumerConfig,
})
})
}
await this.disconnectConsumer()
}
private async disconnectConsumer() {
if (this.rdKafkaConsumer.isConnected()) {
logger.info('📝', 'Disconnecting consumer...')
await new Promise<void>((res, rej) => this.rdKafkaConsumer.disconnect((e) => (e ? rej(e) : res())))
logger.info('📝', 'Disconnected consumer!')
}
}
}

View File

@@ -1,392 +0,0 @@
import { ConsumerGlobalConfig, GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka'
import { logger } from '../utils/logger'
import { retryIfRetriable } from '../utils/retries'
import { ensureTopicExists } from './admin'
import {
consumedBatchDuration,
consumedMessageSizeBytes,
consumeMessages,
consumerBatchSize,
countPartitionsPerTopic,
createKafkaConsumer,
disconnectConsumer,
gaugeBatchUtilization,
instrumentConsumerMetrics,
kafkaAbsolutePartitionCount,
storeOffsetsForMessages,
} from './consumer'
export interface BatchConsumer {
consumer: KafkaConsumer
join: () => Promise<void>
stop: () => Promise<void>
isHealthy: () => boolean
}
const STATUS_LOG_INTERVAL_MS = 10000
const SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS = 10000
type PartitionSummary = {
// number of messages received (often this can be derived from the
// difference between the minimum and maximum offset values + 1, but not
// always in case of messages deleted on the broker, or offset resets)
count: number
// minimum and maximum offsets observed
offsets: [number, number]
}
class BatchSummary {
// NOTE: ``Map`` would probably be more appropriate here, but ``Record`` is
// easier to JSON serialize.
private partitions: Record<number, PartitionSummary> = {}
public record(message: Message) {
let summary = this.partitions[message.partition]
if (summary === undefined) {
summary = {
count: 1,
offsets: [message.offset, message.offset],
}
this.partitions[message.partition] = summary
} else {
summary.count += 1
summary.offsets[0] = Math.min(summary.offsets[0], message.offset)
summary.offsets[1] = Math.max(summary.offsets[1], message.offset)
}
}
}
export const startBatchConsumer = async ({
connectionConfig,
groupId,
topic,
autoCommit,
sessionTimeout,
maxPollIntervalMs,
consumerMaxBytesPerPartition,
consumerMaxBytes,
consumerMaxWaitMs,
consumerErrorBackoffMs,
fetchBatchSize,
batchingTimeoutMs,
eachBatch,
queuedMinMessages = 100000,
callEachBatchWhenEmpty = false,
debug,
queuedMaxMessagesKBytes = 102400,
kafkaStatisticIntervalMs = 0,
fetchMinBytes,
maxHealthHeartbeatIntervalMs = 60_000,
autoOffsetStore = true,
topicMetadataRefreshInterval,
}: {
connectionConfig: GlobalConfig
groupId: string
topic: string
autoCommit: boolean
autoOffsetStore?: boolean
sessionTimeout: number
maxPollIntervalMs: number
consumerMaxBytesPerPartition: number
consumerMaxBytes: number
consumerMaxWaitMs: number
consumerErrorBackoffMs: number
fetchBatchSize: number
batchingTimeoutMs: number
eachBatch: (messages: Message[], context: { heartbeat: () => void }) => Promise<void>
queuedMinMessages?: number
callEachBatchWhenEmpty?: boolean
debug?: string
queuedMaxMessagesKBytes?: number
fetchMinBytes?: number
topicMetadataRefreshInterval?: number
/**
* default to 0 which disables logging
* granularity of 1000ms
* configures kafka to emit a statistics event on this interval
* consumer has to register a callback to listen to the event
* see https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md
*/
kafkaStatisticIntervalMs?: number
maxHealthHeartbeatIntervalMs?: number
}): Promise<BatchConsumer> => {
// Starts consuming from `topic` in batches of `fetchBatchSize` messages,
// with consumer group id `groupId`. We use `connectionConfig` to connect
// to Kafka. We commit offsets after each batch has been processed,
// disabling the default auto commit behaviour.
//
// The general purpose of processing in batches is that it allows e.g. some
// optimisations to be made to database queries, or batching production to
// Kafka.
//
// Note that we do not handle any pre-fetching explicitly, rather
// node-rdkafka will fill its own internal queue of messages as fast as it
// can, and we will consume from that queue periodically. Prefetching will
// stop if the internal queue is full, and will resume once we have
// `consume`d some messages.
//
// Aside from configuring the consumer, we also ensure that the topic
// exists explicitly.
//
// We also instrument the consumer with Prometheus metrics, which are
// exposed on the /_metrics endpoint by the global prom-client registry.
const consumerConfig: ConsumerGlobalConfig = {
...connectionConfig,
'group.id': groupId,
'session.timeout.ms': sessionTimeout,
'max.poll.interval.ms': maxPollIntervalMs,
'enable.auto.commit': autoCommit,
'enable.auto.offset.store': false,
/**
* max.partition.fetch.bytes
* The maximum amount of data per-partition the server will return.
* Records are fetched in batches by the consumer.
* If the first record batch in the first non-empty partition of the fetch is larger than this limit,
* the batch will still be returned to ensure that the consumer can make progress.
* The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config)
* or max.message.bytes (topic config).
* https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#:~:text=max.partition.fetch.bytes,the%20consumer%20can%20make%20progress.
*/
'max.partition.fetch.bytes': consumerMaxBytesPerPartition,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L122
// Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
'fetch.message.max.bytes': consumerMaxBytes,
'fetch.wait.max.ms': consumerMaxWaitMs,
'fetch.error.backoff.ms': consumerErrorBackoffMs,
'enable.partition.eof': true,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L118
// Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue
'queued.min.messages': queuedMinMessages, // 100000 is the default
'queued.max.messages.kbytes': queuedMaxMessagesKBytes, // 1048576 is the default, we go smaller to reduce mem usage.
// Use cooperative-sticky rebalancing strategy, which is the
// [default](https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy)
// in the Java Kafka Client. There its actually
// RangeAssignor,CooperativeStickyAssignor i.e. it mixes eager and
// cooperative strategies. This is however explicitly mentioned to not
// be supported in the [librdkafka library config
// docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#partitionassignmentstrategy)
// so we just use cooperative-sticky. If there are other consumer
// members with other strategies already running, you'll need to delete
// e.g. the replicaset for them if on k8s.
//
// See
// https://www.confluent.io/en-gb/blog/incremental-cooperative-rebalancing-in-kafka/
// for details on the advantages of this rebalancing strategy as well as
// how it works.
'partition.assignment.strategy': 'cooperative-sticky',
rebalance_cb: true,
offset_commit_cb: true,
}
// undefined is valid but things get unhappy if you provide that explicitly
if (fetchMinBytes) {
consumerConfig['fetch.min.bytes'] = fetchMinBytes
}
if (kafkaStatisticIntervalMs) {
consumerConfig['statistics.interval.ms'] = kafkaStatisticIntervalMs
}
if (topicMetadataRefreshInterval) {
consumerConfig['topic.metadata.refresh.interval.ms'] = topicMetadataRefreshInterval
}
if (debug) {
// NOTE: If the key exists with value undefined the consumer will throw which is annoying, so we define it here instead
consumerConfig.debug = debug
}
const consumer = await createKafkaConsumer(consumerConfig, {
// It is typically safest to roll back to the earliest offset if we
// find ourselves in a situation where there is no stored offset or
// the stored offset is invalid, compared to the default behavior of
// potentially jumping ahead to the latest offset.
'auto.offset.reset': 'earliest',
})
instrumentConsumerMetrics(consumer, groupId)
let isShuttingDown = false
let lastHeartbeatTime = 0
// Before subscribing, we need to ensure that the topic exists. We don't
// currently have a way to manage topic creation elsewhere (we handle this
// via terraform in production but this isn't applicable e.g. to hobby
// deployments) so we use the Kafka admin client to do so. We don't use the
// Kafka `enable.auto.create.topics` option as the behaviour of this doesn't
// seem to be well documented and it seems to not function as expected in
// our testing of it, we end up getting "Unknown topic or partition" errors
// on consuming, possibly similar to
// https://github.com/confluentinc/confluent-kafka-dotnet/issues/1366.
await ensureTopicExists(connectionConfig, topic)
// The consumer has an internal pre-fetching queue that sequentially pools
// each partition, with the consumerMaxWaitMs timeout. We want to read big
// batches from this queue, but guarantee we are still running (with smaller
// batches) if the queue is not full enough. batchingTimeoutMs is that
// timeout, to return messages even if fetchBatchSize is not reached.
consumer.setDefaultConsumeTimeout(batchingTimeoutMs)
consumer.subscribe([topic])
const startConsuming = async () => {
// Start consuming in a loop, fetching a batch of a max of `fetchBatchSize` messages then
// processing these with eachMessage, and finally calling consumer.offsetsStore. This will
// not actually commit offsets on the brokers, but rather just store the offsets locally
// such that when commit is called, either manually or via auto-commit, these are the values
// that will be used.
//
// Note that we rely on librdkafka handling retries for any Kafka related operations, e.g.
// it will handle in the background rebalances, during which time consumeMessages will
// simply return an empty array.
//
// We log the number of messages that have been processed every 10 seconds, which should
// give some feedback to the user that things are functioning as expected. If a single batch
// takes more than SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS we log it individually.
let messagesProcessed = 0
let batchesProcessed = 0
const statusLogInterval = setInterval(() => {
logger.info('🔁', 'main_loop', {
groupId,
topic,
messagesPerSecond: messagesProcessed / (STATUS_LOG_INTERVAL_MS / 1000),
batchesProcessed: batchesProcessed,
lastHeartbeatTime: new Date(lastHeartbeatTime).toISOString(),
})
messagesProcessed = 0
batchesProcessed = 0
}, STATUS_LOG_INTERVAL_MS)
try {
while (!isShuttingDown) {
logger.debug('🔁', 'main_loop_consuming')
const messages = await retryIfRetriable(async () => {
return await consumeMessages(consumer, fetchBatchSize)
})
// It's important that we only set the `lastHeartbeatTime` after a successful consume
// call. Even if we received 0 messages, a successful call means we are actually
// subscribed and didn't receive, for example, an error about an inconsistent group
// protocol. If we never manage to consume, we don't want our health checks to pass.
lastHeartbeatTime = Date.now()
for (const [topic, count] of countPartitionsPerTopic(consumer.assignments())) {
kafkaAbsolutePartitionCount.labels({ topic }).set(count)
}
if (!messages) {
logger.debug('🔁', 'main_loop_empty_batch', { cause: 'undefined' })
consumerBatchSize.labels({ topic, groupId }).observe(0)
continue
}
gaugeBatchUtilization.labels({ groupId }).set(messages.length / fetchBatchSize)
logger.debug('🔁', 'main_loop_consumed', { messagesLength: messages.length })
if (!messages.length && !callEachBatchWhenEmpty) {
logger.debug('🔁', 'main_loop_empty_batch', { cause: 'empty' })
consumerBatchSize.labels({ topic, groupId }).observe(0)
continue
}
const startProcessingTimeMs = new Date().valueOf()
const batchSummary = new BatchSummary()
consumerBatchSize.labels({ topic, groupId }).observe(messages.length)
for (const message of messages) {
consumedMessageSizeBytes.labels({ topic, groupId }).observe(message.size)
batchSummary.record(message)
}
// NOTE: we do not handle any retries. This should be handled by
// the implementation of `eachBatch`.
logger.debug('⏳', `Starting to process batch of ${messages.length} events...`, batchSummary)
await eachBatch(messages, {
heartbeat: () => {
lastHeartbeatTime = Date.now()
},
})
messagesProcessed += messages.length
batchesProcessed += 1
const processingTimeMs = new Date().valueOf() - startProcessingTimeMs
consumedBatchDuration.labels({ topic, groupId }).observe(processingTimeMs)
const logSummary = `Processed ${messages.length} events in ${Math.round(processingTimeMs / 10) / 100}s`
if (processingTimeMs > SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS) {
logger.warn('🕒', `Slow batch: ${logSummary}`, batchSummary)
} else {
logger.debug('⌛️', logSummary, batchSummary)
}
if (autoCommit && autoOffsetStore) {
storeOffsetsForMessages(messages, consumer)
}
}
} catch (error) {
logger.error('🔁', 'main_loop_error', { error })
throw error
} finally {
logger.info('🔁', 'main_loop_stopping')
clearInterval(statusLogInterval)
// Finally, disconnect from the broker. If stored offsets have changed via
// `storeOffsetsForMessages` above, they will be committed before shutdown (so long
// as this consumer is still part of the group).
await disconnectConsumer(consumer)
}
}
const mainLoop = startConsuming()
const isHealthy = () => {
// this is called as a readiness and a liveness probe
const hasRun = lastHeartbeatTime > 0
const isWithinInterval = Date.now() - lastHeartbeatTime < maxHealthHeartbeatIntervalMs
const isConnected = consumer.isConnected()
return hasRun ? isConnected && isWithinInterval : isConnected
}
const stop = async () => {
logger.info('🔁', 'Stopping kafka batch consumer')
// First we signal to the mainLoop that we should be stopping. The main
// loop should complete one loop, flush the producer, and store its offsets.
isShuttingDown = true
// Wait for the main loop to finish, but only give it 30 seconds
await join(30000)
}
const join = async (timeout?: number) => {
if (timeout) {
// If we have a timeout set we want to wait for the main loop to finish
// but also want to ensure that we don't wait forever. We do this by
// creating a promise that will resolve after the timeout, and then
// waiting for either the main loop to finish or the timeout to occur.
// We need to make sure that if the main loop finishes before the
// timeout, we don't leave the timeout around to resolve later thus
// keeping file descriptors open, so make sure to call clearTimeout
// on the timer handle.
await new Promise((resolve) => {
const timerHandle = setTimeout(() => {
resolve(null)
}, timeout)
// eslint-disable-next-line @typescript-eslint/no-floating-promises
mainLoop.finally(() => {
resolve(null)
clearTimeout(timerHandle)
})
})
} else {
await mainLoop
}
}
return { isHealthy, stop, join, consumer }
}

View File

@@ -1,166 +1,53 @@
import {
Assignment,
ClientMetrics,
CODES,
ConsumerGlobalConfig,
ConsumerTopicConfig,
KafkaConsumer as RdKafkaConsumer,
LibrdKafkaError,
Message,
TopicPartition,
Metadata,
PartitionMetadata,
TopicPartitionOffset,
WatermarkOffsets,
} from 'node-rdkafka'
import { Gauge } from 'prom-client'
import { exponentialBuckets } from 'prom-client'
import { Histogram } from 'prom-client'
import { hostname } from 'os'
import { Gauge, Histogram } from 'prom-client'
import { kafkaRebalancePartitionCount, latestOffsetTimestampGauge } from '../main/ingestion-queues/metrics'
import { defaultConfig } from '../config/config'
import { logger } from '../utils/logger'
import { captureException } from '../utils/posthog'
import { retryIfRetriable } from '../utils/retries'
import { promisifyCallback } from '../utils/utils'
import { ensureTopicExists } from './admin'
import { getConsumerConfigFromEnv } from './config'
export const createKafkaConsumer = async (config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig = {}) => {
// Creates a node-rdkafka consumer and connects it to the brokers, resolving
// only when the connection is established.
const DEFAULT_BATCH_TIMEOUT_MS = 500
const DEFAULT_FETCH_BATCH_SIZE = 1000
const SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS = 10000
const MAX_HEALTH_HEARTBEAT_INTERVAL_MS = 60_000
return await new Promise<RdKafkaConsumer>((resolve, reject) => {
const consumer = new RdKafkaConsumer(config, topicConfig)
const consumedBatchDuration = new Histogram({
name: 'consumed_batch_duration_ms',
help: 'Main loop consumer batch processing duration in ms',
labelNames: ['topic', 'groupId'],
})
consumer.on('event.log', (log) => {
logger.info('📝', 'librdkafka log', { log: log })
})
const gaugeBatchUtilization = new Gauge({
name: 'consumer_batch_utilization',
help: 'Indicates how big batches are we are processing compared to the max batch size. Useful as a scaling metric',
labelNames: ['groupId'],
})
consumer.on('event.error', (error: LibrdKafkaError) => {
logger.error('📝', 'librdkafka error', { log: error })
})
const histogramKafkaBatchSize = new Histogram({
name: 'consumer_batch_size',
help: 'The size of the batches we are receiving from Kafka',
buckets: [0, 50, 100, 250, 500, 750, 1000, 1500, 2000, 3000, Infinity],
})
consumer.on('subscribed', (topics) => {
logger.info('📝', 'librdkafka consumer subscribed', { topics })
})
consumer.on('connection.failure', (error: LibrdKafkaError, metrics: ClientMetrics) => {
logger.error('📝', 'librdkafka connection failure', { error, metrics })
})
consumer.on('offset.commit', (error: LibrdKafkaError, topicPartitionOffsets: TopicPartitionOffset[]) => {
if (error) {
logger.warn('📝', 'librdkafka_offet_commit_error', { error, topicPartitionOffsets })
} else {
logger.debug('📝', 'librdkafka_offset_commit', { topicPartitionOffsets })
}
})
consumer.connect({}, (error, data) => {
if (error) {
logger.error('⚠️', 'connect_error', { error: error })
reject(error)
} else {
logger.info('📝', 'librdkafka consumer connected', { brokers: data?.brokers })
resolve(consumer)
}
})
})
}
export function countPartitionsPerTopic(assignments: Assignment[]): Map<string, number> {
const partitionsPerTopic = new Map()
for (const assignment of assignments) {
if (partitionsPerTopic.has(assignment.topic)) {
partitionsPerTopic.set(assignment.topic, partitionsPerTopic.get(assignment.topic) + 1)
} else {
partitionsPerTopic.set(assignment.topic, 1)
}
}
return partitionsPerTopic
}
export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: string) => {
// For each message consumed, we record the latest timestamp processed for
// each partition assigned to this consumer group member. This consumer
// should only provide metrics for the partitions that are assigned to it,
// so we need to make sure we don't publish any metrics for other
// partitions, otherwise we can end up with ghost readings.
//
// We also need to consider the case where we have a partition that
// has reached EOF, in which case we want to record the current time
// as opposed to the timestamp of the current message (as in this
// case, no such message exists).
//
// Further, we are not guaranteed to have messages from all of the
// partitions assigned to this consumer group member, event if there
// are partitions with messages to be consumed. This is because
// librdkafka will only fetch messages from a partition if there is
// space in the internal partition queue. If the queue is full, it
// will not fetch any more messages from the given partition.
//
// Note that we don't try to align the timestamps with the actual broker
// committed offsets. The discrepancy is hopefully in most cases quite
// small.
//
// TODO: add other relevant metrics here
// TODO: expose the internal librdkafka metrics as well.
consumer.on('rebalance', (error: LibrdKafkaError, assignments: TopicPartition[]) => {
/**
* see https://github.com/Blizzard/node-rdkafka#rebalancing errors are used to signal
* both errors and _not_ errors
*
* When rebalancing starts the consumer receives ERR_REVOKED_PARTITIONS
* And when the balancing is completed the new assignments are received with ERR__ASSIGN_PARTITIONS
*/
if (error.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
logger.info('📝️', `librdkafka cooperative rebalance, partitions assigned`, { assignments })
for (const [topic, count] of countPartitionsPerTopic(assignments)) {
kafkaRebalancePartitionCount.labels({ topic: topic }).inc(count)
}
} else if (error.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
logger.info('📝️', `librdkafka cooperative rebalance started, partitions revoked`, {
revocations: assignments,
})
for (const [topic, count] of countPartitionsPerTopic(assignments)) {
kafkaRebalancePartitionCount.labels({ topic: topic }).dec(count)
}
} else {
// We had a "real" error
logger.error('⚠️', 'rebalance_error', { error })
}
latestOffsetTimestampGauge.reset()
})
consumer.on('partition.eof', (topicPartitionOffset: TopicPartitionOffset) => {
latestOffsetTimestampGauge
.labels({
topic: topicPartitionOffset.topic,
partition: topicPartitionOffset.partition.toString(),
groupId,
})
.set(Date.now())
})
consumer.on('data', (message) => {
if (message.timestamp) {
latestOffsetTimestampGauge
.labels({ topic: message.topic, partition: message.partition, groupId })
.set(message.timestamp)
}
})
}
export const consumeMessages = async (consumer: RdKafkaConsumer, fetchBatchSize: number) => {
// Rather than using the pure streaming method of consuming, we
// instead fetch in batches. This is to make the logic a little
// simpler to start with, although we may want to move to a
// streaming implementation if needed. Although given we might want
// to switch to a language with better support for Kafka stream
// processing, perhaps this will be enough for us.
// TODO: handle retriable `LibrdKafkaError`s.
return await new Promise<Message[]>((resolve, reject) => {
consumer.consume(fetchBatchSize, (error: LibrdKafkaError, messages: Message[]) => {
if (error) {
reject(error)
} else {
resolve(messages)
}
})
})
}
const histogramKafkaBatchSizeKb = new Histogram({
name: 'consumer_batch_size_kb',
help: 'The size in kb of the batches we are receiving from Kafka',
buckets: [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity],
})
export const findOffsetsToCommit = (messages: TopicPartitionOffset[]): TopicPartitionOffset[] => {
// We only need to commit the highest offset for a batch of messages
@@ -194,69 +81,297 @@ export const findOffsetsToCommit = (messages: TopicPartitionOffset[]): TopicPart
return highestOffsets
}
/**
* Updates the offsets that will be committed on the next call to commit() (without offsets
* specified) or the next auto commit.
*
* This is a local (in-memory) operation and does not talk to the Kafka broker.
*/
export const storeOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => {
const topicPartitionOffsets = findOffsetsToCommit(messages).map((message) => {
return {
...message,
// When committing to Kafka you commit the offset of the next message you want to consume
offset: message.offset + 1,
}
})
if (topicPartitionOffsets.length > 0) {
logger.debug('📝', 'Storing offsets', { topicPartitionOffsets })
consumer.offsetsStore(topicPartitionOffsets)
}
export type KafkaConsumerConfig = {
groupId: string
topic: string
batchTimeoutMs?: number
callEachBatchWhenEmpty?: boolean
autoOffsetStore?: boolean
autoCommit?: boolean
}
export const disconnectConsumer = async (consumer: RdKafkaConsumer) => {
await new Promise((resolve, reject) => {
consumer.disconnect((error, data) => {
export type RdKafkaConsumerConfig = Omit<
ConsumerGlobalConfig,
'group.id' | 'enable.auto.offset.store' | 'enable.auto.commit'
>
export class KafkaConsumer {
private isStopping = false
private lastHeartbeatTime = 0
private rdKafkaConsumer: RdKafkaConsumer
private consumerConfig: ConsumerGlobalConfig
private fetchBatchSize: number
private maxHealthHeartbeatIntervalMs: number
private consumerLoop: Promise<void> | undefined
constructor(private config: KafkaConsumerConfig, rdKafkaConfig: RdKafkaConsumerConfig = {}) {
this.config.autoCommit ??= true
this.config.autoOffsetStore ??= true
this.config.callEachBatchWhenEmpty ??= false
this.fetchBatchSize = defaultConfig.CONSUMER_BATCH_SIZE || DEFAULT_FETCH_BATCH_SIZE
this.maxHealthHeartbeatIntervalMs =
defaultConfig.CONSUMER_MAX_HEARTBEAT_INTERVAL_MS || MAX_HEALTH_HEARTBEAT_INTERVAL_MS
this.consumerConfig = {
'client.id': hostname(),
'security.protocol': 'plaintext',
'metadata.broker.list': 'kafka:9092', // Overridden with KAFKA_CONSUMER_METADATA_BROKER_LIST
log_level: 4, // WARN as the default
'group.id': this.config.groupId,
'session.timeout.ms': 30_000,
'max.poll.interval.ms': 300_000,
'max.partition.fetch.bytes': 1_048_576,
'fetch.error.backoff.ms': 100,
'fetch.message.max.bytes': 10_485_760,
'fetch.wait.max.ms': 50,
'queued.min.messages': 100000,
'queued.max.messages.kbytes': 102400, // 1048576 is the default, we go smaller to reduce mem usage.
'client.rack': defaultConfig.KAFKA_CLIENT_RACK, // Helps with cross-AZ traffic awareness and is not unique to the consumer
// Custom settings and overrides - this is where most configuration overrides should be done
...getConsumerConfigFromEnv(),
// Finally any specifically given consumer config overrides
...rdKafkaConfig,
// Below is config that we explicitly DO NOT want to be overrideable by env vars - i.e. things that would require code changes to change
'enable.auto.offset.store': false, // NOTE: This is always false - we handle it using a custom function
'enable.auto.commit': this.config.autoCommit,
'partition.assignment.strategy': 'cooperative-sticky',
'enable.partition.eof': true,
rebalance_cb: true,
offset_commit_cb: true,
}
this.rdKafkaConsumer = this.createConsumer()
}
public getConfig() {
return {
...this.consumerConfig,
}
}
public heartbeat() {
// Can be called externally to update the heartbeat time and keep the consumer alive
this.lastHeartbeatTime = Date.now()
}
public isHealthy() {
// this is called as a readiness and a liveness probe
const isWithinInterval = Date.now() - this.lastHeartbeatTime < this.maxHealthHeartbeatIntervalMs
const isConnected = this.rdKafkaConsumer.isConnected()
return isConnected && isWithinInterval
}
public assignments() {
return this.rdKafkaConsumer.isConnected() ? this.rdKafkaConsumer.assignments() : []
}
public offsetsStore(topicPartitionOffsets: TopicPartitionOffset[]) {
return this.rdKafkaConsumer.offsetsStore(topicPartitionOffsets)
}
public on: RdKafkaConsumer['on'] = (...args) => {
// Delegate to the internal consumer
return this.rdKafkaConsumer.on(...args)
}
public async queryWatermarkOffsets(topic: string, partition: number, timeout = 10000): Promise<[number, number]> {
if (!this.rdKafkaConsumer.isConnected()) {
throw new Error('Not connected')
}
const offsets = await promisifyCallback<WatermarkOffsets>((cb) =>
this.rdKafkaConsumer.queryWatermarkOffsets(topic, partition, timeout, cb)
).catch((err) => {
captureException(err)
logger.error('🔥', 'Failed to query kafka watermark offsets', err)
throw err
})
return [offsets.lowOffset, offsets.highOffset]
}
public async getPartitionsForTopic(topic: string): Promise<PartitionMetadata[]> {
if (!this.rdKafkaConsumer.isConnected()) {
throw new Error('Not connected')
}
const meta = await promisifyCallback<Metadata>((cb) => this.rdKafkaConsumer.getMetadata({ topic }, cb)).catch(
(err) => {
captureException(err)
logger.error('🔥', 'Failed to get partition metadata', err)
throw err
}
)
return meta.topics.find((x) => x.name === topic)?.partitions ?? []
}
private createConsumer() {
const consumer = new RdKafkaConsumer(this.consumerConfig, {
// Default settings
'auto.offset.reset': 'earliest',
})
consumer.on('event.log', (log) => {
logger.info('📝', 'librdkafka log', { log: log })
})
consumer.on('event.error', (error: LibrdKafkaError) => {
logger.error('📝', 'librdkafka error', { log: error })
})
consumer.on('subscribed', (topics) => {
logger.info('📝', 'librdkafka consumer subscribed', { topics, config: this.consumerConfig })
})
consumer.on('connection.failure', (error: LibrdKafkaError, metrics: ClientMetrics) => {
logger.error('📝', 'librdkafka connection failure', { error, metrics, config: this.consumerConfig })
})
consumer.on('offset.commit', (error: LibrdKafkaError, topicPartitionOffsets: TopicPartitionOffset[]) => {
if (error) {
logger.error('🔥', 'Failed to disconnect node-rdkafka consumer', { error })
reject(error)
logger.warn('📝', 'librdkafka_offet_commit_error', { error, topicPartitionOffsets })
} else {
logger.info('🔁', 'Disconnected node-rdkafka consumer')
resolve(data)
logger.debug('📝', 'librdkafka_offset_commit', { topicPartitionOffsets })
}
})
})
return consumer
}
private storeOffsetsForMessages = (messages: Message[]) => {
const topicPartitionOffsets = findOffsetsToCommit(messages).map((message) => {
return {
...message,
// When committing to Kafka you commit the offset of the next message you want to consume
offset: message.offset + 1,
}
})
if (topicPartitionOffsets.length > 0) {
logger.debug('📝', 'Storing offsets', { topicPartitionOffsets })
this.rdKafkaConsumer.offsetsStore(topicPartitionOffsets)
}
}
public async connect(eachBatch: (messages: Message[]) => Promise<void>) {
const { topic, groupId, callEachBatchWhenEmpty = false } = this.config
try {
await promisifyCallback<Metadata>((cb) => this.rdKafkaConsumer.connect({}, cb))
logger.info('📝', 'librdkafka consumer connected')
} catch (error) {
logger.error('⚠️', 'connect_error', { error: error })
throw error
}
this.heartbeat() // Setup the heartbeat so we are healthy since connection is established
await ensureTopicExists(this.consumerConfig, this.config.topic)
// The consumer has an internal pre-fetching queue that sequentially pools
// each partition, with the consumerMaxWaitMs timeout. We want to read big
// batches from this queue, but guarantee we are still running (with smaller
// batches) if the queue is not full enough. batchingTimeoutMs is that
// timeout, to return messages even if fetchBatchSize is not reached.
this.rdKafkaConsumer.setDefaultConsumeTimeout(this.config.batchTimeoutMs || DEFAULT_BATCH_TIMEOUT_MS)
this.rdKafkaConsumer.subscribe([this.config.topic])
const startConsuming = async () => {
try {
while (!this.isStopping) {
logger.debug('🔁', 'main_loop_consuming')
// TRICKY: We wrap this in a retry check. It seems that despite being connected and ready, the client can still have an undeterministic
// error when consuming, hence the retryIfRetriable.
const messages = await retryIfRetriable(() =>
promisifyCallback<Message[]>((cb) => this.rdKafkaConsumer.consume(this.fetchBatchSize, cb))
)
// After successfully pulling a batch, we can update our heartbeat time
this.heartbeat()
gaugeBatchUtilization.labels({ groupId }).set(messages.length / this.fetchBatchSize)
logger.debug('🔁', 'main_loop_consumed', { messagesLength: messages.length })
histogramKafkaBatchSize.observe(messages.length)
histogramKafkaBatchSizeKb.observe(
messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024
)
if (!messages.length && !callEachBatchWhenEmpty) {
logger.debug('🔁', 'main_loop_empty_batch', { cause: 'empty' })
continue
}
const startProcessingTimeMs = new Date().valueOf()
await eachBatch(messages)
const processingTimeMs = new Date().valueOf() - startProcessingTimeMs
consumedBatchDuration.labels({ topic, groupId }).observe(processingTimeMs)
const logSummary = `Processed ${messages.length} events in ${
Math.round(processingTimeMs / 10) / 100
}s`
if (processingTimeMs > SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS) {
logger.warn('🕒', `Slow batch: ${logSummary}`)
}
if (this.config.autoCommit && this.config.autoOffsetStore) {
this.storeOffsetsForMessages(messages)
}
}
} catch (error) {
throw error
} finally {
logger.info('🔁', 'main_loop_stopping')
// Finally, disconnect from the broker. If stored offsets have changed via
// `storeOffsetsForMessages` above, they will be committed before shutdown (so long
// as this consumer is still part of the group).
await this.disconnectConsumer()
logger.info('🔁', 'Disconnected node-rdkafka consumer')
}
}
this.consumerLoop = startConsuming().catch((error) => {
logger.error('🔁', 'consumer_loop_error', {
error: String(error),
config: this.config,
consumerConfig: this.consumerConfig,
})
// We re-throw the error as that way it will be caught in server.ts and trigger a full shutdown
throw error
})
}
public async disconnect() {
if (this.isStopping) {
return
}
// Mark as stopping - this will also essentially stop the consumer loop
this.isStopping = true
// Allow the in progress consumer loop to finish if possible
if (this.consumerLoop) {
await this.consumerLoop.catch((error) => {
logger.error('🔁', 'failed to stop consumer loop safely. Continuing shutdown', {
error: String(error),
config: this.config,
consumerConfig: this.consumerConfig,
})
})
}
await this.disconnectConsumer()
}
private async disconnectConsumer() {
if (this.rdKafkaConsumer.isConnected()) {
logger.info('📝', 'Disconnecting consumer...')
await new Promise<void>((res, rej) => this.rdKafkaConsumer.disconnect((e) => (e ? rej(e) : res())))
logger.info('📝', 'Disconnected consumer!')
}
}
}
export const consumedBatchDuration = new Histogram({
name: 'consumed_batch_duration_ms',
help: 'Main loop consumer batch processing duration in ms',
labelNames: ['topic', 'groupId'],
})
export const consumerBatchSize = new Histogram({
name: 'consumed_batch_size',
help: 'Size of the batch fetched by the consumer',
labelNames: ['topic', 'groupId'],
buckets: exponentialBuckets(1, 3, 5),
})
export const consumedMessageSizeBytes = new Histogram({
name: 'consumed_message_size_bytes',
help: 'Size of consumed message value in bytes',
labelNames: ['topic', 'groupId', 'messageType'],
buckets: exponentialBuckets(1, 8, 4).map((bucket) => bucket * 1024),
})
export const kafkaAbsolutePartitionCount = new Gauge({
name: 'kafka_absolute_partition_count',
help: 'Number of partitions assigned to this consumer. (Absolute value from the consumer state.)',
labelNames: ['topic'],
})
export const gaugeBatchUtilization = new Gauge({
name: 'consumer_batch_utilization',
help: 'Indicates how big batches are we are processing compared to the max batch size. Useful as a scaling metric',
labelNames: ['groupId'],
})

View File

@@ -1,6 +1,4 @@
import * as Sentry from '@sentry/node'
import { Consumer } from 'kafkajs'
import { KafkaConsumer } from 'node-rdkafka'
import {
kafkaConsumerEventCounter,
@@ -30,69 +28,3 @@ export function addMetricsEventListeners(consumer: Consumer): void {
kafkaConsumerEventRequestPendingMsSummary.observe(payload.pendingDuration)
})
}
export function addSentryBreadcrumbsEventListeners(consumer: KafkaConsumer): void {
/** these events are a string literal union and, they're not exported so, we can't enumerate them
* type KafkaClientEvents = 'disconnected' | 'ready' | 'connection.failure' | 'event.error' | 'event.stats' | 'event.log' | 'event.event' | 'event.throttle';
* type KafkaConsumerEvents = 'data' | 'partition.eof' | 'rebalance' | 'rebalance.error' | 'subscribed' | 'unsubscribed' | 'unsubscribe' | 'offset.commit' | KafkaClientEvents;
*
* some of them happen very frequently so, we don't want to capture them as breadcrumbs
* and the way the library is written if we listen to individual events then we get typed args we can capture
* with the breadcrumb
*/
consumer.on('disconnected', (metrics) => {
Sentry.addBreadcrumb({
category: 'kafka_lifecycle',
message: 'disconnected',
level: 'info',
data: {
metrics,
},
})
})
consumer.on('connection.failure', (error) => {
Sentry.addBreadcrumb({
category: 'kafka_lifecycle',
message: 'connection.failure',
level: 'info',
data: {
error,
},
})
})
consumer.on('event.throttle', (eventData) => {
Sentry.addBreadcrumb({
category: 'kafka_lifecycle',
message: 'event.throttle',
level: 'info',
data: {
eventData,
},
})
})
consumer.on('rebalance', (error) => {
Sentry.addBreadcrumb({
category: 'kafka_lifecycle',
message: 'rebalance',
level: 'info',
data: {
error,
},
})
})
consumer.on('rebalance.error', (error) => {
Sentry.addBreadcrumb({
category: 'kafka_lifecycle',
message: 'rebalance.error',
level: 'info',
data: {
error,
},
})
})
}

View File

@@ -1,71 +0,0 @@
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { PluginsServerConfig } from '../../../types'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { KAFKA_CONSUMER_SESSION_TIMEOUT_MS } from './constants'
import { EachBatchHandler } from './types'
export interface BatchConsumerFactory {
createBatchConsumer(groupId: string, topic: string, eachBatch: EachBatchHandler): Promise<BatchConsumer>
}
export class DefaultBatchConsumerFactory implements BatchConsumerFactory {
private readonly kafkaConfig: PluginsServerConfig
constructor(private readonly serverConfig: PluginsServerConfig) {
// TRICKY: We re-use the kafka helpers which assume KAFKA_HOSTS hence we overwrite it if set
this.kafkaConfig = {
...serverConfig,
KAFKA_HOSTS: serverConfig.SESSION_RECORDING_KAFKA_HOSTS || serverConfig.KAFKA_HOSTS,
KAFKA_SECURITY_PROTOCOL:
serverConfig.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL || serverConfig.KAFKA_SECURITY_PROTOCOL,
}
}
public async createBatchConsumer(
groupId: string,
topic: string,
eachBatch: EachBatchHandler
): Promise<BatchConsumer> {
const connectionConfig = createRdConnectionConfigFromEnvVars(this.kafkaConfig, 'consumer')
// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatch with context, then commits offsets for the batch.
// the batch consumer reads from the session replay kafka cluster
const consumer = await startBatchConsumer({
connectionConfig,
groupId,
topic,
eachBatch,
callEachBatchWhenEmpty: true, // Required, as we want to flush session batches periodically
autoCommit: true,
autoOffsetStore: false,
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.serverConfig.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
// we only use 9 or 10MB but there's no reason to limit this 🤷️
consumerMaxBytes: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
fetchMinBytes: this.serverConfig.SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES,
// our messages are very big, so we don't want to queue too many
queuedMinMessages: this.serverConfig.SESSION_RECORDING_KAFKA_QUEUE_SIZE,
// we'll anyway never queue more than the value set here
// since we have large messages we'll need this to be a reasonable multiple
// of the likely message size times the fetchBatchSize
// or we'll always hit the batch timeout
queuedMaxMessagesKBytes: this.serverConfig.SESSION_RECORDING_KAFKA_QUEUE_SIZE_KB,
fetchBatchSize: this.serverConfig.SESSION_RECORDING_KAFKA_BATCH_SIZE,
consumerMaxWaitMs: this.serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: this.serverConfig.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
batchingTimeoutMs: this.serverConfig.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicMetadataRefreshInterval: this.serverConfig.KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS,
debug: this.serverConfig.SESSION_RECORDING_KAFKA_DEBUG,
kafkaStatisticIntervalMs:
this.serverConfig.SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS,
maxHealthHeartbeatIntervalMs: KAFKA_CONSUMER_SESSION_TIMEOUT_MS * 2,
})
addSentryBreadcrumbsEventListeners(consumer.consumer)
return consumer
}
}

View File

@@ -1,26 +1,16 @@
import { S3Client, S3ClientConfig } from '@aws-sdk/client-s3'
import {
CODES,
features,
KafkaConsumer,
librdkafkaVersion,
Message,
TopicPartition,
TopicPartitionOffset,
} from 'node-rdkafka'
import { CODES, features, librdkafkaVersion, Message, TopicPartition, TopicPartitionOffset } from 'node-rdkafka'
import { KafkaProducerWrapper } from '~/src/kafka/producer'
import { PostgresRouter } from '~/src/utils/db/postgres'
import { buildIntegerMatcher } from '../../../config/config'
import { BatchConsumer } from '../../../kafka/batch-consumer'
import { KafkaConsumer } from '../../../kafka/consumer'
import { PluginServerService, PluginsServerConfig, ValueMatcher } from '../../../types'
import { logger as logger } from '../../../utils/logger'
import { captureException } from '../../../utils/posthog'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { runInstrumentedFunction } from '../../utils'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { BatchConsumerFactory } from './batch-consumer-factory'
import {
KAFKA_CONSUMER_GROUP_ID,
KAFKA_CONSUMER_GROUP_ID_OVERFLOW,
@@ -42,14 +32,13 @@ import { TeamFilter } from './teams/team-filter'
import { TeamService } from './teams/team-service'
import { MessageWithTeam } from './teams/types'
import { CaptureIngestionWarningFn } from './types'
import { getPartitionsForTopic } from './utils'
import { LibVersionMonitor } from './versions/lib-version-monitor'
// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
export class SessionRecordingIngester {
batchConsumer?: BatchConsumer
kafkaConsumer: KafkaConsumer
topic: string
consumerGroupId: string
totalNumPartitions = 0
@@ -57,7 +46,6 @@ export class SessionRecordingIngester {
private isDebugLoggingEnabled: ValueMatcher<number>
private readonly promiseScheduler: PromiseScheduler
private readonly batchConsumerFactory: BatchConsumerFactory
private readonly sessionBatchManager: SessionBatchManager
private readonly kafkaParser: KafkaMessageParser
private readonly teamFilter: TeamFilter
@@ -68,19 +56,25 @@ export class SessionRecordingIngester {
private config: PluginsServerConfig,
private consumeOverflow: boolean,
postgres: PostgresRouter,
batchConsumerFactory: BatchConsumerFactory,
producer: KafkaProducerWrapper,
ingestionWarningProducer?: KafkaProducerWrapper
) {
this.topic = consumeOverflow
? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW
: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
this.batchConsumerFactory = batchConsumerFactory
this.consumerGroupId = this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID
this.isDebugLoggingEnabled = buildIntegerMatcher(config.SESSION_RECORDING_DEBUG_PARTITION, true)
this.promiseScheduler = new PromiseScheduler()
this.kafkaConsumer = new KafkaConsumer({
topic: this.topic,
groupId: this.consumerGroupId,
callEachBatchWhenEmpty: true,
autoCommit: true,
autoOffsetStore: false,
})
let s3Client: S3Client | null = null
if (
config.SESSION_RECORDING_V2_S3_ENDPOINT &&
@@ -137,8 +131,6 @@ export class SessionRecordingIngester {
metadataStore,
consoleLogStore,
})
this.consumerGroupId = this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID
}
public get service(): PluginServerService {
@@ -146,12 +138,11 @@ export class SessionRecordingIngester {
id: 'session-recordings-blob-v2-overflow',
onShutdown: async () => await this.stop(),
healthcheck: () => this.isHealthy() ?? false,
batchConsumer: this.batchConsumer,
}
}
public async handleEachBatch(messages: Message[], context: { heartbeat: () => void }): Promise<void> {
context.heartbeat()
public async handleEachBatch(messages: Message[]): Promise<void> {
this.kafkaConsumer.heartbeat()
if (messages.length > 0) {
logger.info('🔁', `blob_ingester_consumer_v2 - handling batch`, {
@@ -164,11 +155,11 @@ export class SessionRecordingIngester {
await runInstrumentedFunction({
statsKey: `recordingingesterv2.handleEachBatch`,
sendTimeoutGuardToSentry: false,
func: async () => this.processBatchMessages(messages, context),
func: async () => this.processBatchMessages(messages),
})
}
private async processBatchMessages(messages: Message[], context: { heartbeat: () => void }): Promise<void> {
private async processBatchMessages(messages: Message[]): Promise<void> {
messages.forEach((message) => {
SessionRecordingIngesterMetrics.incrementMessageReceived(message.partition)
})
@@ -190,14 +181,14 @@ export class SessionRecordingIngester {
},
})
context.heartbeat()
this.kafkaConsumer.heartbeat()
await runInstrumentedFunction({
statsKey: `recordingingesterv2.handleEachBatch.processMessages`,
func: async () => this.processMessages(processedMessages),
})
context.heartbeat()
this.kafkaConsumer.heartbeat()
if (this.sessionBatchManager.shouldFlush()) {
await runInstrumentedFunction({
@@ -254,18 +245,11 @@ export class SessionRecordingIngester {
// Check that the storage backend is healthy before starting the consumer
// This is especially important in local dev with minio
await this.fileStorage.checkHealth()
await this.kafkaConsumer.connect((messages) => this.handleEachBatch(messages))
this.batchConsumer = await this.batchConsumerFactory.createBatchConsumer(
this.consumerGroupId,
this.topic,
this.handleEachBatch.bind(this)
)
this.totalNumPartitions = (await this.kafkaConsumer.getPartitionsForTopic(this.topic)).length
this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer, this.topic)).length
addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer)
this.batchConsumer.consumer.on('rebalance', async (err, topicPartitions) => {
this.kafkaConsumer.on('rebalance', async (err, topicPartitions) => {
logger.info('🔁', 'blob_ingester_consumer_v2 - rebalancing', { err, topicPartitions })
/**
* see https://github.com/Blizzard/node-rdkafka#rebalancing
@@ -290,15 +274,8 @@ export class SessionRecordingIngester {
// TODO: immediately die? or just keep going?
})
this.batchConsumer.consumer.on('disconnected', async (err) => {
// since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect
// we need to listen to disconnect and make sure we're stopped
logger.info('🔁', 'blob_ingester_consumer_v2 batch consumer disconnected, cleaning up', { err })
await this.stop()
})
// nothing happens here unless we configure SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS
this.batchConsumer.consumer.on('event.stats', (stats) => {
this.kafkaConsumer.on('event.stats', (stats) => {
logger.info('🪵', 'blob_ingester_consumer_v2 - kafka stats', { stats })
})
}
@@ -308,7 +285,7 @@ export class SessionRecordingIngester {
this.isStopping = true
const assignedPartitions = this.assignedTopicPartitions
await this.batchConsumer?.stop()
await this.kafkaConsumer.disconnect()
void this.promiseScheduler.schedule(this.onRevokePartitions(assignedPartitions))
@@ -321,17 +298,11 @@ export class SessionRecordingIngester {
public isHealthy(): boolean {
// TODO: Maybe extend this to check if we are shutting down so we don't get killed early.
return this.batchConsumer?.isHealthy() ?? false
}
private get connectedBatchConsumer(): KafkaConsumer | undefined {
// Helper to only use the batch consumer if we are actually connected to it - otherwise it will throw errors
const consumer = this.batchConsumer?.consumer
return consumer && consumer.isConnected() ? consumer : undefined
return this.kafkaConsumer.isHealthy()
}
private get assignedTopicPartitions(): TopicPartition[] {
return this.connectedBatchConsumer?.assignments() ?? []
return this.kafkaConsumer.assignments() ?? []
}
private get assignedPartitions(): TopicPartition['partition'][] {
@@ -358,7 +329,7 @@ export class SessionRecordingIngester {
await runInstrumentedFunction({
statsKey: `recordingingesterv2.handleEachBatch.flush.commitOffsets`,
func: async () => {
this.batchConsumer!.consumer.offsetsStore(offsets)
this.kafkaConsumer.offsetsStore(offsets)
return Promise.resolve()
},
})

View File

@@ -1,24 +0,0 @@
import { KafkaConsumer, PartitionMetadata } from 'node-rdkafka'
import { logger } from '../../../utils/logger'
import { captureException } from '../../../utils/posthog'
export const getPartitionsForTopic = (
kafkaConsumer: KafkaConsumer | undefined,
topic: string
): Promise<PartitionMetadata[]> => {
return new Promise<PartitionMetadata[]>((resolve, reject) => {
if (!kafkaConsumer) {
return reject('Not connected')
}
kafkaConsumer.getMetadata({ topic }, (err, meta) => {
if (err) {
captureException(err)
logger.error('🔥', 'Failed to get partition metadata', err)
return reject(err)
}
return resolve(meta.topics.find((x) => x.name === topic)?.partitions ?? [])
})
})
}

View File

@@ -1,7 +1,7 @@
import crypto from 'crypto'
import { Redis } from 'ioredis'
import { mkdirSync, rmSync } from 'node:fs'
import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import { CODES, features, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import { Counter, Gauge, Histogram, Summary } from 'prom-client'
import { buildIntegerMatcher } from '../../../config/config'
@@ -9,8 +9,7 @@ import {
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW,
} from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { KafkaConsumer } from '../../../kafka/consumer'
import { KafkaProducerWrapper } from '../../../kafka/producer'
import { PluginServerService, PluginsServerConfig, RedisPool, TeamId, ValueMatcher } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
@@ -20,7 +19,6 @@ import { logger } from '../../../utils/logger'
import { ObjectStorage } from '../../../utils/object_storage'
import { captureException } from '../../../utils/posthog'
import { runInstrumentedFunction } from '../../utils'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { eventDroppedCounter } from '../metrics'
import { fetchTeamTokensWithRecordings } from '../session-recording-v2/teams/team-service'
import { ConsoleLogsIngester } from './services/console-logs-ingester'
@@ -30,14 +28,7 @@ import { RealtimeManager } from './services/realtime-manager'
import { ReplayEventsIngester } from './services/replay-events-ingester'
import { BUCKETS_KB_WRITTEN, SessionManager } from './services/session-manager'
import { IncomingRecordingMessage } from './types'
import {
allSettledWithConcurrency,
bufferFileDir,
getPartitionsForTopic,
now,
parseKafkaBatch,
queryWatermarkOffsets,
} from './utils'
import { allSettledWithConcurrency, bufferFileDir, now, parseKafkaBatch } from './utils'
// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
@@ -45,7 +36,6 @@ require('@sentry/tracing')
// WARNING: Do not change this - it will essentially reset the consumer
const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob'
const KAFKA_CONSUMER_GROUP_ID_OVERFLOW = 'session-recordings-blob-overflow'
const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 90_000
const SHUTDOWN_FLUSH_TIMEOUT_MS = 30000
const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay'
@@ -138,7 +128,7 @@ export class SessionRecordingIngester {
overflowDetection?: OverflowManager
replayEventsIngester?: ReplayEventsIngester
consoleLogsIngester?: ConsoleLogsIngester
batchConsumer?: BatchConsumer
kafkaConsumer: KafkaConsumer
partitionMetrics: Record<number, PartitionMetrics> = {}
teamsRefresher: BackgroundRefresher<Record<string, TeamIDWithConfig>>
latestOffsetsRefresher: BackgroundRefresher<Record<number, number | undefined>>
@@ -148,23 +138,9 @@ export class SessionRecordingIngester {
isStopping = false
private promises: Set<Promise<any>> = new Set()
// if ingestion is lagging on a single partition it is often hard to identify _why_,
// this allows us to output more information for that partition
private debugPartition: number | undefined = undefined
private sharedClusterProducerWrapper: KafkaProducerWrapper | undefined = undefined
private isDebugLoggingEnabled: ValueMatcher<number>
private sessionRecordingKafkaConfig = (): PluginsServerConfig => {
// TRICKY: We re-use the kafka helpers which assume KAFKA_HOSTS hence we overwrite it if set
return {
...this.config,
KAFKA_HOSTS: this.config.SESSION_RECORDING_KAFKA_HOSTS || this.config.KAFKA_HOSTS,
KAFKA_SECURITY_PROTOCOL:
this.config.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL || this.config.KAFKA_SECURITY_PROTOCOL,
}
}
constructor(
private config: PluginsServerConfig,
private postgres: PostgresRouter,
@@ -178,6 +154,12 @@ export class SessionRecordingIngester {
? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW
: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
this.consumerGroupId = this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID
this.kafkaConsumer = new KafkaConsumer({
groupId: this.consumerGroupId,
topic: this.topic,
autoOffsetStore: false,
callEachBatchWhenEmpty: true,
})
this.redisPool = createRedisPool(this.config, 'session-recording')
this.realtimeManager = new RealtimeManager(this.redisPool, this.config)
@@ -193,12 +175,15 @@ export class SessionRecordingIngester {
)
}
const brokers = this.kafkaConsumer.getConfig()['metadata.broker.list']
if (!brokers) {
throw new Error('No brokers found')
}
// We create a hash of the cluster to use as a unique identifier for the high-water marks
// This enables us to swap clusters without having to worry about resetting the high-water marks
const kafkaClusterIdentifier = crypto
.createHash('md5')
.update(this.sessionRecordingKafkaConfig().KAFKA_HOSTS)
.digest('hex')
const kafkaClusterIdentifier = crypto.createHash('md5').update(brokers).digest('hex')
this.sessionHighWaterMarker = new OffsetHighWaterMarker(
this.redisPool,
@@ -224,12 +209,16 @@ export class SessionRecordingIngester {
this.latestOffsetsRefresher = new BackgroundRefresher(async () => {
const results = await Promise.all(
this.assignedTopicPartitions.map(({ partition }) =>
queryWatermarkOffsets(this.connectedBatchConsumer, this.topic, partition).catch((err) => {
// NOTE: This can error due to a timeout or the consumer being disconnected, not stop the process
// as it is currently only used for reporting lag.
captureException(err)
return [undefined, undefined]
})
this.kafkaConsumer
.queryWatermarkOffsets(this.topic, partition)
.catch(() => {
// NOTE: This can error due to a timeout or the consumer being disconnected, not stop the process
// as it is currently only used for reporting lag.
return [undefined, undefined]
})
.then(([_, highOffset]) => {
return [partition, highOffset]
})
)
)
@@ -247,18 +236,11 @@ export class SessionRecordingIngester {
id: 'session-recordings-blob-overflow',
onShutdown: async () => await this.stop(),
healthcheck: () => this.isHealthy() ?? false,
batchConsumer: this.batchConsumer,
}
}
private get connectedBatchConsumer(): KafkaConsumer | undefined {
// Helper to only use the batch consumer if we are actually connected to it - otherwise it will throw errors
const consumer = this.batchConsumer?.consumer
return consumer && consumer.isConnected() ? consumer : undefined
}
private get assignedTopicPartitions(): TopicPartition[] {
return this.connectedBatchConsumer?.assignments() ?? []
return this.kafkaConsumer.assignments()
}
private get assignedPartitions(): TopicPartition['partition'][] {
@@ -349,8 +331,8 @@ export class SessionRecordingIngester {
])
}
public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void> {
heartbeat()
public async handleEachBatch(messages: Message[]): Promise<void> {
this.kafkaConsumer.heartbeat()
if (messages.length !== 0) {
logger.info('🔁', `blob_ingester_consumer - handling batch`, {
@@ -393,7 +375,7 @@ export class SessionRecordingIngester {
}
},
})
heartbeat()
this.kafkaConsumer.heartbeat()
await this.reportPartitionMetrics()
@@ -413,7 +395,7 @@ export class SessionRecordingIngester {
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`,
func: async () => {
await this.flushAllReadySessions(heartbeat)
await this.flushAllReadySessions()
},
})
@@ -431,7 +413,7 @@ export class SessionRecordingIngester {
await this.replayEventsIngester!.consumeBatch(recordingMessages)
},
})
heartbeat()
this.kafkaConsumer.heartbeat()
}
if (this.consoleLogsIngester) {
@@ -441,7 +423,7 @@ export class SessionRecordingIngester {
await this.consoleLogsIngester!.consumeBatch(recordingMessages)
},
})
heartbeat()
this.kafkaConsumer.heartbeat()
}
},
})
@@ -489,56 +471,24 @@ export class SessionRecordingIngester {
)
}
// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
// the batch consumer reads from the session replay kafka cluster
const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(
// TODO: Replace this with the new ENV vars for producer specific config when ready.
this.sessionRecordingKafkaConfig(),
'consumer'
)
this.batchConsumer = await startBatchConsumer({
connectionConfig: replayClusterConnectionConfig,
groupId: this.consumerGroupId,
topic: this.topic,
autoCommit: true,
autoOffsetStore: false, // We will use our own offset store logic
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
// we only use 9 or 10MB but there's no reason to limit this 🤷️
consumerMaxBytes: this.config.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.config.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
fetchMinBytes: this.config.SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES,
// our messages are very big, so we don't want to queue too many
queuedMinMessages: this.config.SESSION_RECORDING_KAFKA_QUEUE_SIZE,
// we'll anyway never queue more than the value set here
// since we have large messages we'll need this to be a reasonable multiple
// of the likely message size times the fetchBatchSize
// or we'll always hit the batch timeout
queuedMaxMessagesKBytes: this.config.SESSION_RECORDING_KAFKA_QUEUE_SIZE_KB,
fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE,
consumerMaxWaitMs: this.config.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: this.config.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicMetadataRefreshInterval: this.config.KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS,
eachBatch: async (messages, { heartbeat }) => {
return await this.scheduleWork(this.handleEachBatch(messages, heartbeat))
},
callEachBatchWhenEmpty: true, // Useful as we will still want to account for flushing sessions
debug: this.config.SESSION_RECORDING_KAFKA_DEBUG,
kafkaStatisticIntervalMs: this.config.SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS,
maxHealthHeartbeatIntervalMs: KAFKA_CONSUMER_SESSION_TIMEOUT_MS * 2, // we don't want to proactively declare healthy - we'll let the broker do it
await this.kafkaConsumer.connect(async (messages) => {
return await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch`,
sendTimeoutGuardToSentry: false,
func: async () => {
return await this.scheduleWork(this.handleEachBatch(messages))
},
})
})
this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer, this.topic)).length
this.totalNumPartitions = (await this.kafkaConsumer.getPartitionsForTopic(this.topic)).length
addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer)
this.batchConsumer.consumer.on('rebalance', async (err, topicPartitions) => {
logger.info('🔁', 'blob_ingester_consumer - rebalancing', { err, topicPartitions })
this.kafkaConsumer.on('rebalance', async (err, topicPartitions) => {
logger.info('🔁', 'blob_ingester_consumer - rebalancing', {
err,
topicPartitions,
connected: this.kafkaConsumer.isHealthy(),
})
/**
* see https://github.com/Blizzard/node-rdkafka#rebalancing
*
@@ -562,15 +512,8 @@ export class SessionRecordingIngester {
// TODO: immediately die? or just keep going?
})
this.batchConsumer.consumer.on('disconnected', async (err) => {
// since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect
// we need to listen to disconnect and make sure we're stopped
logger.info('🔁', 'blob_ingester_consumer batch consumer disconnected, cleaning up', { err })
await this.stop()
})
// nothing happens here unless we configure SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS
this.batchConsumer.consumer.on('event.stats', (stats) => {
this.kafkaConsumer.on('event.stats', (stats) => {
logger.info('🪵', 'blob_ingester_consumer - kafka stats', { stats })
})
}
@@ -582,7 +525,9 @@ export class SessionRecordingIngester {
// NOTE: We have to get the partitions before we stop the consumer as it throws if disconnected
const assignedPartitions = this.assignedTopicPartitions
// Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive
await this.batchConsumer?.stop()
logger.info('🔁', 'kafka consumer disconnecting')
await this.kafkaConsumer.disconnect()
logger.info('🔁', 'kafka consumer disconnected')
// Simulate a revoke command to try and flush all sessions
// There is a race between the revoke callback and this function - Either way one of them gets there and covers the revocations
@@ -606,7 +551,7 @@ export class SessionRecordingIngester {
public isHealthy() {
// TODO: Maybe extend this to check if we are shutting down so we don't get killed early.
return this.batchConsumer?.isHealthy()
return this.kafkaConsumer.isHealthy()
}
private async reportPartitionMetrics() {
@@ -719,7 +664,7 @@ export class SessionRecordingIngester {
})
}
async flushAllReadySessions(heartbeat: () => void): Promise<void> {
async flushAllReadySessions(): Promise<void> {
const sessions = Object.entries(this.sessions)
// NOTE: We want to avoid flushing too many sessions at once as it can cause a lot of disk backpressure stalling the consumer
@@ -727,7 +672,7 @@ export class SessionRecordingIngester {
this.config.SESSION_RECORDING_MAX_PARALLEL_FLUSHES,
sessions,
async ([key, sessionManager], ctx) => {
heartbeat()
this.kafkaConsumer.heartbeat()
if (this.isStopping) {
// We can end up with a large number of flushes. We want to stop early if we hit shutdown
@@ -850,7 +795,7 @@ export class SessionRecordingIngester {
return
}
const result = this.connectedBatchConsumer?.offsetsStore([
const result = this.kafkaConsumer.offsetsStore([
{
...tp,
offset: highestOffsetToCommit + 1,
@@ -861,7 +806,6 @@ export class SessionRecordingIngester {
...tp,
highestOffsetToCommit,
result,
consumerExists: !!this.connectedBatchConsumer,
})
// Store the committed offset to the persistent store to avoid rebalance issues

View File

@@ -1,5 +1,5 @@
import { DateTime } from 'luxon'
import { KafkaConsumer, Message, MessageHeader, PartitionMetadata } from 'node-rdkafka'
import { Message, MessageHeader } from 'node-rdkafka'
import path from 'path'
import { Counter } from 'prom-client'
@@ -7,7 +7,6 @@ import { KafkaProducerWrapper } from '../../../kafka/producer'
import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types'
import { parseJSON } from '../../../utils/json-parse'
import { logger } from '../../../utils/logger'
import { captureException } from '../../../utils/posthog'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { eventDroppedCounter } from '../metrics'
import { TeamIDWithConfig } from './session-recordings-consumer'
@@ -60,49 +59,6 @@ export const maxDefined = (...args: (number | undefined)[]): number | undefined
export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-files')
export const queryWatermarkOffsets = (
kafkaConsumer: KafkaConsumer | undefined,
topic: string,
partition: number,
timeout = 10000
): Promise<[number, number]> => {
return new Promise<[number, number]>((resolve, reject) => {
if (!kafkaConsumer) {
return reject('Not connected')
}
kafkaConsumer.queryWatermarkOffsets(topic, partition, timeout, (err, offsets) => {
if (err) {
captureException(err)
logger.error('🔥', 'Failed to query kafka watermark offsets', err)
return reject(err)
}
resolve([partition, offsets.highOffset])
})
})
}
export const getPartitionsForTopic = (
kafkaConsumer: KafkaConsumer | undefined,
topic: string
): Promise<PartitionMetadata[]> => {
return new Promise<PartitionMetadata[]>((resolve, reject) => {
if (!kafkaConsumer) {
return reject('Not connected')
}
kafkaConsumer.getMetadata({ topic }, (err, meta) => {
if (err) {
captureException(err)
logger.error('🔥', 'Failed to get partition metadata', err)
return reject(err)
}
return resolve(meta.topics.find((x) => x.name === topic)?.partitions ?? [])
})
})
}
export const getLagMultiplier = (lag: number, threshold = 1000000) => {
if (lag < threshold) {
return 1

View File

@@ -26,7 +26,6 @@ import {
startAsyncWebhooksHandlerConsumer,
} from './main/ingestion-queues/on-event-handler-consumer'
import { SessionRecordingIngester } from './main/ingestion-queues/session-recording/session-recordings-consumer'
import { DefaultBatchConsumerFactory } from './main/ingestion-queues/session-recording-v2/batch-consumer-factory'
import { SessionRecordingIngester as SessionRecordingIngesterV2 } from './main/ingestion-queues/session-recording-v2/consumer'
import { setupCommonRoutes } from './router'
import { Hub, PluginServerService, PluginsServerConfig } from './types'
@@ -168,7 +167,6 @@ export class PluginServer {
id: 'session-recordings-blob',
onShutdown: async () => await ingester.stop(),
healthcheck: () => ingester.isHealthy() ?? false,
batchConsumer: ingester.batchConsumer,
}
})
}
@@ -192,16 +190,9 @@ export class PluginServer {
if (capabilities.sessionRecordingBlobIngestionV2) {
serviceLoaders.push(async () => {
const postgres = hub?.postgres ?? new PostgresRouter(this.config)
const batchConsumerFactory = new DefaultBatchConsumerFactory(this.config)
const producer = hub?.kafkaProducer ?? (await KafkaProducerWrapper.create(this.config))
const ingester = new SessionRecordingIngesterV2(
this.config,
false,
postgres,
batchConsumerFactory,
producer
)
const ingester = new SessionRecordingIngesterV2(this.config, false, postgres, producer)
await ingester.start()
return ingester.service
})
@@ -210,16 +201,9 @@ export class PluginServer {
if (capabilities.sessionRecordingBlobIngestionV2Overflow) {
serviceLoaders.push(async () => {
const postgres = hub?.postgres ?? new PostgresRouter(this.config)
const batchConsumerFactory = new DefaultBatchConsumerFactory(this.config)
const producer = hub?.kafkaProducer ?? (await KafkaProducerWrapper.create(this.config))
const ingester = new SessionRecordingIngesterV2(
this.config,
true,
postgres,
batchConsumerFactory,
producer
)
const ingester = new SessionRecordingIngesterV2(this.config, true, postgres, producer)
await ingester.start()
return ingester.service
})
@@ -295,18 +279,6 @@ export class PluginServer {
})
}
// If join rejects or throws, then the consumer is unhealthy and we should shut down the process.
// Ideally we would also join all the other background tasks as well to ensure we stop the
// server if we hit any errors and don't end up with zombie instances, but I'll leave that
// refactoring for another time. Note that we have the liveness health checks already, so in K8s
// cases zombies should be reaped anyway, albeit not in the most efficient way.
this.services.forEach((service) => {
service.batchConsumer?.join().catch(async (error) => {
logger.error('💥', 'Unexpected task joined!', { error: error.stack ?? error })
await this.stop(error)
})
})
pluginServerStartupTimeMs.inc(Date.now() - startupTimer.valueOf())
logger.info('🚀', `All systems go in ${Date.now() - startupTimer.valueOf()}ms`)
} catch (error) {

View File

@@ -21,7 +21,6 @@ import { z } from 'zod'
import { EncryptedFields } from './cdp/encryption-utils'
import { LegacyOneventCompareService } from './cdp/services/legacy-onevent-compare.service'
import type { CookielessManager } from './ingestion/cookieless/cookieless-manager'
import { BatchConsumer } from './kafka/batch-consumer'
import { KafkaProducerWrapper } from './kafka/producer'
import { Celery } from './utils/db/celery'
import { DB } from './utils/db/db'
@@ -91,7 +90,6 @@ export type PluginServerService = {
id: string
onShutdown: () => Promise<any>
healthcheck: () => boolean | Promise<boolean>
batchConsumer?: BatchConsumer
}
export type CdpConfig = {
@@ -292,23 +290,11 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: number
SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: number
SESSION_RECORDING_OVERFLOW_MIN_PER_BATCH: number
// Dedicated infra values
SESSION_RECORDING_KAFKA_HOSTS: string | undefined
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | undefined
SESSION_RECORDING_KAFKA_BATCH_SIZE: number
SESSION_RECORDING_KAFKA_QUEUE_SIZE: number
SESSION_RECORDING_KAFKA_QUEUE_SIZE_KB: number | undefined
SESSION_RECORDING_KAFKA_DEBUG: string | undefined
SESSION_RECORDING_MAX_PARALLEL_FLUSHES: number
SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: number
POSTHOG_SESSION_RECORDING_REDIS_HOST: string | undefined
POSTHOG_SESSION_RECORDING_REDIS_PORT: number | undefined
// kafka debug stats interval
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: number
ENCRYPTION_SALT_KEYS: string
CYCLOTRON_DATABASE_URL: string

View File

@@ -1,25 +0,0 @@
import { Assignment } from 'node-rdkafka'
import { countPartitionsPerTopic } from '../../../src/kafka/consumer'
jest.mock('../../../src/utils/logger')
jest.setTimeout(70000) // 60 sec timeout
describe('countPartitionsPerTopic', () => {
it('should correctly count the number of partitions per topic', () => {
const assignments: Assignment[] = [
{ topic: 'topic1', partition: 0 },
{ topic: 'topic1', partition: 1 },
{ topic: 'topic2', partition: 0 },
{ topic: 'topic2', partition: 1 },
{ topic: 'topic2', partition: 2 },
{ topic: 'topic3', partition: 0 },
]
const result = countPartitionsPerTopic(assignments)
expect(result.get('topic1')).toBe(2)
expect(result.get('topic2')).toBe(3)
expect(result.get('topic3')).toBe(1)
expect(result.size).toBe(3)
})
})

View File

@@ -4,6 +4,8 @@ import { mkdirSync, readdirSync, rmSync } from 'node:fs'
import { Message, TopicPartitionOffset } from 'node-rdkafka'
import path from 'path'
import { KafkaConsumer } from '~/src/kafka/consumer'
import { defaultConfig } from '../../../../src/config/config'
import { SessionRecordingIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer'
import { Hub, PluginsServerConfig, Team } from '../../../../src/types'
@@ -25,8 +27,6 @@ const config: PluginsServerConfig = {
SESSION_RECORDING_REDIS_PREFIX,
}
const noop = () => undefined
async function deleteKeys(hub: Hub) {
await deleteKeysWithPrefix(hub.redisPool, SESSION_RECORDING_REDIS_PREFIX)
}
@@ -49,15 +49,6 @@ const waitForExpect = async <T>(fn: () => T | Promise<T>, timeout = 10_000, inte
}
}
const mockConsumer = {
on: jest.fn(),
offsetsStore: jest.fn(),
queryWatermarkOffsets: jest.fn(),
assignments: jest.fn(),
isConnected: jest.fn(() => true),
getMetadata: jest.fn(),
}
// Mock the Upload class
jest.mock('@aws-sdk/lib-storage', () => {
return {
@@ -74,24 +65,11 @@ jest.mock('@aws-sdk/lib-storage', () => {
}
})
jest.mock('../../../../src/kafka/batch-consumer', () => {
return {
startBatchConsumer: jest.fn(() =>
Promise.resolve({
join: () => ({
finally: jest.fn(),
}),
stop: jest.fn(),
consumer: mockConsumer,
})
),
}
})
jest.setTimeout(1000)
describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOverflow) => {
let ingester: SessionRecordingIngester
let mockConsumer: jest.Mocked<KafkaConsumer>
let hub: Hub
let team: Team
@@ -109,21 +87,32 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
})
beforeEach(async () => {
mockConsumer = {
on: jest.fn(),
offsetsStore: jest.fn(),
queryWatermarkOffsets: jest.fn(),
assignments: jest.fn(),
isHealthy: jest.fn(() => true),
connect: jest.fn(),
disconnect: jest.fn(),
getPartitionsForTopic: jest.fn(() => Promise.resolve([])),
heartbeat: jest.fn(),
} as unknown as jest.Mocked<KafkaConsumer>
// The below mocks simulate committing to kafka and querying the offsets
mockCommittedOffsets = {}
mockOffsets = {}
mockConsumer.offsetsStore.mockImplementation(
(tpo: TopicPartitionOffset) => (mockCommittedOffsets[tpo.partition] = tpo.offset)
)
mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => {
cb(null, { highOffset: mockOffsets[partition] ?? 1, lowOffset: 0 })
mockConsumer.offsetsStore.mockImplementation((tpo: TopicPartitionOffset[]) => {
tpo.forEach((tpo) => (mockCommittedOffsets[tpo.partition] = tpo.offset))
})
mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout) => {
return Promise.resolve([mockOffsets[partition] ?? 1, 0])
})
mockConsumer.getMetadata.mockImplementation((options, cb) => {
cb(null, {
topics: [{ name: options.topic, partitions: [{ id: 0 }, { id: 1 }, { id: 2 }] }],
})
mockConsumer.getPartitionsForTopic.mockImplementation(() => {
return Promise.resolve([{ id: 0 } as any, { id: 1 } as any, { id: 2 } as any])
})
hub = await createHub()
team = await getFirstTeam(hub)
teamToken = team.api_token
@@ -132,6 +121,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
await deleteKeys(hub)
ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage!, consumeOverflow, redisConn)
ingester['kafkaConsumer'] = mockConsumer as any
await ingester.start()
mockConsumer.assignments.mockImplementation(() => [createTP(0, consumedTopic), createTP(1, consumedTopic)])
@@ -181,7 +171,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
expect(ingester.sessions['2-sid1-throw']).toBeTruthy()
expect(ingester.sessions['2-sid2']).toBeTruthy()
await expect(() => ingester.flushAllReadySessions(noop)).rejects.toThrow(
await expect(() => ingester.flushAllReadySessions()).rejects.toThrow(
'Failed to flush sessions. With 1 errors out of 2 sessions.'
)
})
@@ -214,24 +204,10 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
consumeOverflow,
undefined
)
ingester['kafkaConsumer'] = mockConsumer as any
expect(ingester['isDebugLoggingEnabled'](partition)).toEqual(expected)
})
it('can parse absence of debug partition config', () => {
const config = {
KAFKA_HOSTS: 'localhost:9092',
} satisfies Partial<PluginsServerConfig> as PluginsServerConfig
const ingester = new SessionRecordingIngester(
config,
hub.postgres,
hub.objectStorage!,
consumeOverflow,
undefined
)
expect(ingester['debugPartition']).toBeUndefined()
})
it('creates a new session manager if needed', async () => {
const event = createIncomingRecordingMessage()
await ingester.consume(event)
@@ -279,7 +255,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
lastMessageTimestamp: Date.now() + defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS,
}
await ingester.flushAllReadySessions(noop)
await ingester.flushAllReadySessions()
await waitForExpect(() => {
expect(ingester.sessions[`1-${sessionId}`]).not.toBeTruthy()
@@ -288,7 +264,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
describe('offset committing', () => {
it('should commit offsets in simple cases', async () => {
await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid1')], noop)
await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid1')])
expect(ingester.partitionMetrics[1]).toMatchObject({
lastMessageOffset: 2,
})
@@ -310,7 +286,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
})
it.skip('should commit higher values but not lower', async () => {
await ingester.handleEachBatch([createMessage('sid1')], noop)
await ingester.handleEachBatch([createMessage('sid1')])
await ingester.sessions[`${team.id}-sid1`].flush('buffer_age')
expect(ingester.partitionMetrics[1].lastMessageOffset).toBe(1)
await commitAllOffsets()
@@ -328,7 +304,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
await commitAllOffsets()
expect(mockConsumer.offsetsStore).toHaveBeenCalledTimes(1)
await ingester.handleEachBatch([createMessage('sid1')], noop)
await ingester.handleEachBatch([createMessage('sid1')])
await ingester.sessions[`${team.id}-sid1`].flush('buffer_age')
await commitAllOffsets()
@@ -343,10 +319,12 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
})
it('should commit the lowest known offset if there is a blocking session', async () => {
await ingester.handleEachBatch(
[createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')],
noop
)
await ingester.handleEachBatch([
createMessage('sid1'),
createMessage('sid2'),
createMessage('sid2'),
createMessage('sid2'),
])
await ingester.sessions[`${team.id}-sid2`].flush('buffer_age')
await commitAllOffsets()
@@ -370,10 +348,12 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
})
it('should commit one lower than the blocking session if that is the highest', async () => {
await ingester.handleEachBatch(
[createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')],
noop
)
await ingester.handleEachBatch([
createMessage('sid1'),
createMessage('sid2'),
createMessage('sid2'),
createMessage('sid2'),
])
// Flush the second session so the first one is still blocking
await ingester.sessions[`${team.id}-sid2`].flush('buffer_age')
await commitAllOffsets()
@@ -382,7 +362,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
expect(mockConsumer.offsetsStore).not.toHaveBeenCalled()
// Add a new message and session and flush the old one
await ingester.handleEachBatch([createMessage('sid2')], noop)
await ingester.handleEachBatch([createMessage('sid2')])
await ingester.sessions[`${team.id}-sid1`].flush('buffer_age')
await commitAllOffsets()
@@ -397,13 +377,14 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
})
it.skip('should not be affected by other partitions ', async () => {
await ingester.handleEachBatch(
[createMessage('sid1', 1), createMessage('sid2', 2), createMessage('sid2', 2)],
noop
)
await ingester.handleEachBatch([
createMessage('sid1', 1),
createMessage('sid2', 2),
createMessage('sid2', 2),
])
await ingester.sessions[`${team.id}-sid1`].flush('buffer_age')
await ingester.handleEachBatch([createMessage('sid1', 1)], noop)
await ingester.handleEachBatch([createMessage('sid1', 1)])
// We should now have a blocking session on partition 1 and 2 with partition 1 being committable
await commitAllOffsets()
@@ -444,10 +425,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition, consumedTopic))
it('should update session watermarkers with flushing', async () => {
await ingester.handleEachBatch(
[createMessage('sid1'), createMessage('sid2'), createMessage('sid3')],
noop
)
await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid3')])
await expect(getSessionWaterMarks()).resolves.toEqual({})
await ingester.sessions[`${team.id}-sid1`].flush('buffer_age')
@@ -458,10 +436,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
})
it('should update partition watermarkers when committing', async () => {
await ingester.handleEachBatch(
[createMessage('sid1'), createMessage('sid2'), createMessage('sid1')],
noop
)
await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid1')])
await ingester.sessions[`${team.id}-sid1`].flush('buffer_age')
await commitAllOffsets()
expect(mockConsumer.offsetsStore).toHaveBeenCalledTimes(1)
@@ -491,7 +466,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
const events = [createMessage('sid1'), createMessage('sid2'), createMessage('sid2')]
await expect(getPersistentWaterMarks()).resolves.toEqual({})
await ingester.handleEachBatch([events[0], events[1]], noop)
await ingester.handleEachBatch([events[0], events[1]])
await ingester.sessions[`${team.id}-sid2`].flush('buffer_age')
await commitAllOffsets()
expect(mockConsumer.offsetsStore).not.toHaveBeenCalled()
@@ -505,7 +480,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
// Simulate a re-processing
await ingester.destroySessions(Object.entries(ingester.sessions))
await ingester.handleEachBatch(events, noop)
await ingester.handleEachBatch(events)
expect(ingester.sessions[`${team.id}-sid2`].buffer.count).toBe(1)
expect(ingester.sessions[`${team.id}-sid1`].buffer.count).toBe(1)
})
@@ -523,6 +498,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
consumeOverflow,
undefined
)
otherIngester['kafkaConsumer'] = mockConsumer as any
await otherIngester.start()
})
@@ -542,7 +518,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
createTP(2, consumedTopic),
createTP(3, consumedTopic),
])
await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop)
await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2])
expect(
Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`)
@@ -563,7 +539,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
createTP(2, consumedTopic),
createTP(3, consumedTopic),
])
await otherIngester.handleEachBatch([...partitionMsgs2, createMessage('session_id_4', 2)], noop)
await otherIngester.handleEachBatch([...partitionMsgs2, createMessage('session_id_4', 2)])
await Promise.all(rebalancePromises)
// Should still have the partition 1 sessions that didnt move
@@ -578,10 +554,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
})
it("flushes and commits as it's revoked", async () => {
await ingester.handleEachBatch(
[createMessage('sid1'), createMessage('sid2'), createMessage('sid3', 2)],
noop
)
await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid3', 2)])
expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([
expect.stringContaining(`${team.id}.sid1.`), // gz
@@ -618,13 +591,10 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
describe('when a team is disabled', () => {
it('can commit even if an entire batch is disabled', async () => {
// non-zero offset because the code can't commit offset 0
await ingester.handleEachBatch(
[
createKafkaMessage(consumedTopic, 'invalid_token', { offset: 12 }),
createKafkaMessage(consumedTopic, 'invalid_token', { offset: 13 }),
],
noop
)
await ingester.handleEachBatch([
createKafkaMessage(consumedTopic, 'invalid_token', { offset: 12 }),
createKafkaMessage(consumedTopic, 'invalid_token', { offset: 13 }),
])
expect(mockConsumer.offsetsStore).toHaveBeenCalledTimes(1)
expect(mockConsumer.offsetsStore).toHaveBeenCalledWith([
{
@@ -653,7 +623,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
size: size_bytes,
timestamp: first_timestamp + n * timestamp_delta,
})
await ingester.handleEachBatch([message], noop)
await ingester.handleEachBatch([message])
}
}
@@ -711,8 +681,8 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
describe('lag reporting', () => {
it('should return the latest offsets', async () => {
mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => {
cb(null, { highOffset: 1000 + partition, lowOffset: 0 })
mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout) => {
return Promise.resolve([0, 1000 + partition])
})
const results = await ingester.latestOffsetsRefresher.get()
@@ -726,13 +696,12 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
describe('heartbeats', () => {
it('it should send them whilst processing', async () => {
const heartbeat = jest.fn()
// non-zero offset because the code can't commit offset 0
const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)]
await ingester.handleEachBatch(partitionMsgs1, heartbeat)
await ingester.handleEachBatch(partitionMsgs1)
// NOTE: the number here can change as we change the code. Important is that it is called a number of times
expect(heartbeat).toBeCalledTimes(6)
expect(mockConsumer.heartbeat).toBeCalledTimes(6)
})
})
})
@@ -741,7 +710,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
describe('stop()', () => {
const setup = async (): Promise<void> => {
const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)]
await ingester.handleEachBatch(partitionMsgs1, noop)
await ingester.handleEachBatch(partitionMsgs1)
}
// NOTE: This test is a sanity check for the follow up test. It demonstrates what happens if we shutdown in the wrong order