From fa90a50dfb48221f66dc396a2790665c3a62bb95 Mon Sep 17 00:00:00 2001 From: Ben White Date: Fri, 31 Jan 2025 16:29:00 +0100 Subject: [PATCH] feat(cdp): Make IngestionConsumer parallel process (#27828) --- .../src/ingestion/ingestion-consumer.ts | 224 +++++++++--------- 1 file changed, 106 insertions(+), 118 deletions(-) diff --git a/plugin-server/src/ingestion/ingestion-consumer.ts b/plugin-server/src/ingestion/ingestion-consumer.ts index 0fbf01db1d..fdf9e52751 100644 --- a/plugin-server/src/ingestion/ingestion-consumer.ts +++ b/plugin-server/src/ingestion/ingestion-consumer.ts @@ -38,8 +38,10 @@ const histogramKafkaBatchSizeKb = new Histogram({ buckets: [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity], }) -type GroupedIncomingEvents = { - [key: string]: { message: Message; event: PipelineEvent }[] +type IncomingEvent = { message: Message; event: PipelineEvent } + +type IncomingEventsByDistinctId = { + [key: string]: IncomingEvent[] } const PERSON_EVENTS = new Set(['$set', '$identify', '$create_alias', '$merge_dangerously', '$groupidentify']) @@ -136,9 +138,27 @@ export class IngestionConsumer { return promise } + private runInstrumented(name: string, func: () => Promise): Promise { + return runInstrumentedFunction({ statsKey: `ingestionConsumer.${name}`, func }) + } + public async handleKafkaBatch(messages: Message[]) { - const parsedMessages = await this.parseKafkaBatch(messages) - await this.processBatch(parsedMessages) + const parsedMessages = await this.runInstrumented('parseKafkaMessages', () => this.parseKafkaBatch(messages)) + + await this.runInstrumented('processBatch', async () => { + await Promise.all( + Object.values(parsedMessages).map(async (x) => { + return await this.runInstrumented('processEventsForDistinctId', () => + this.processEventsForDistinctId(x) + ) + }) + ) + }) + + status.debug('🔁', `Waiting for promises`, { promises: this.promises.size }) + await this.runInstrumented('awaitScheduledWork', () => Promise.all(this.promises)) + status.debug('🔁', `Processed batch`) + for (const message of messages) { if (message.timestamp) { latestOffsetTimestampGauge @@ -148,65 +168,57 @@ export class IngestionConsumer { } } - public async processBatch(groupedIncomingEvents: GroupedIncomingEvents): Promise { - await this.runManyWithHeartbeat(Object.values(groupedIncomingEvents), async (eventsForDistinctId) => { - // Process every message sequentially, stash promises to await on later - for (const { message, event } of eventsForDistinctId) { - // Track $set usage in events that aren't known to use it, before ingestion adds anything there - if ( - event.properties && - !PERSON_EVENTS.has(event.event) && - !KNOWN_SET_EVENTS.has(event.event) && - ('$set' in event.properties || '$set_once' in event.properties || '$unset' in event.properties) - ) { - setUsageInNonPersonEventsCounter.inc() - } + private async processEventsForDistinctId(incomingEvents: IncomingEvent[]): Promise { + // Process every message sequentially, stash promises to await on later + for (const { message, event } of incomingEvents) { + // Track $set usage in events that aren't known to use it, before ingestion adds anything there + if ( + event.properties && + !PERSON_EVENTS.has(event.event) && + !KNOWN_SET_EVENTS.has(event.event) && + ('$set' in event.properties || '$set_once' in event.properties || '$unset' in event.properties) + ) { + setUsageInNonPersonEventsCounter.inc() + } - try { - status.debug('🔁', `Processing event`, { + try { + status.debug('🔁', `Processing event`, { + event, + }) + const eventKey = `${event.token}:${event.distinct_id}` + // Check the rate limiter and emit to overflow if necessary + const isBelowRateLimit = this.overflowRateLimiter.consume(eventKey, 1, message.timestamp) + if (this.overflowEnabled() && !isBelowRateLimit) { + status.debug('🔁', `Sending to overflow`, { event, }) - const eventKey = `${event.token}:${event.distinct_id}` - // Check the rate limiter and emit to overflow if necessary - const isBelowRateLimit = this.overflowRateLimiter.consume(eventKey, 1, message.timestamp) - if (this.overflowEnabled() && !isBelowRateLimit) { - status.debug('🔁', `Sending to overflow`, { - event, - }) - ingestionPartitionKeyOverflowed.labels(`${event.team_id ?? event.token}`).inc() - if (this.ingestionWarningLimiter.consume(eventKey, 1)) { - status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`) - } - - void this.scheduleWork(this.emitToOverflow([message])) - continue + ingestionPartitionKeyOverflowed.labels(`${event.team_id ?? event.token}`).inc() + if (this.ingestionWarningLimiter.consume(eventKey, 1)) { + status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`) } - const result = await this.runEventPipeline(event) - - status.debug('🔁', `Processed event`, { - event, - }) - - // This contains the Kafka producer ACKs & message promises, to avoid blocking after every message. - result.ackPromises?.forEach((promise) => { - void this.scheduleWork( - promise.catch(async (error) => { - await this.handleProcessingError(error, message, event) - }) - ) - }) - } catch (error) { - await this.handleProcessingError(error, message, event) + void this.scheduleWork(this.emitToOverflow([message])) + continue } - } - }) - status.debug('🔁', `Waiting for promises`, { - promises: this.promises.size, - }) - await Promise.all(this.promises) - status.debug('🔁', `Processed batch`) + const result = await this.runInstrumented('runEventPipeline', () => this.runEventPipeline(event)) + + status.debug('🔁', `Processed event`, { + event, + }) + + // This contains the Kafka producer ACKs & message promises, to avoid blocking after every message. + result.ackPromises?.forEach((promise) => { + void this.scheduleWork( + promise.catch(async (error) => { + await this.handleProcessingError(error, message, event) + }) + ) + }) + } catch (error) { + await this.handleProcessingError(error, message, event) + } + } } private async runEventPipeline(event: PipelineEvent): Promise { @@ -216,77 +228,53 @@ export class IngestionConsumer { }) } - private parseKafkaBatch(messages: Message[]): Promise { - return runInstrumentedFunction({ - statsKey: `ingestionConsumer.handleEachBatch.parseKafkaMessages`, - func: () => { - const batches: GroupedIncomingEvents = {} + private parseKafkaBatch(messages: Message[]): Promise { + const batches: IncomingEventsByDistinctId = {} - for (const message of messages) { - let distinctId: string | undefined - let token: string | undefined + for (const message of messages) { + let distinctId: string | undefined + let token: string | undefined - // Parse the headers so we can early exit if found and should be dropped - message.headers?.forEach((header) => { - if (header.key === 'distinct_id') { - distinctId = header.value.toString() - } - if (header.key === 'token') { - token = header.value.toString() - } - }) - - if (this.shouldDropEvent(token, distinctId)) { - this.logDroppedEvent(token, distinctId) - continue - } - - // Parse the message payload into the event object - const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString()) - const combinedEvent: PipelineEvent = { ...JSON.parse(dataStr), ...rawEvent } - const event: PipelineEvent = normalizeEvent({ - ...combinedEvent, - }) - - // In case the headers were not set we check the parsed message now - if (this.shouldDropEvent(combinedEvent.token, combinedEvent.distinct_id)) { - this.logDroppedEvent(combinedEvent.token, combinedEvent.distinct_id) - continue - } - - const eventKey = `${event.token}:${event.distinct_id}` - - // We collect the events grouped by token and distinct_id so that we can process batches in parallel whilst keeping the order of events - // for a given distinct_id - if (!batches[eventKey]) { - batches[eventKey] = [] - } - - batches[eventKey].push({ message, event }) + // Parse the headers so we can early exit if found and should be dropped + message.headers?.forEach((header) => { + if (header.key === 'distinct_id') { + distinctId = header.value.toString() } + if (header.key === 'token') { + token = header.value.toString() + } + }) - return Promise.resolve(batches) - }, - }) - } + if (this.shouldDropEvent(token, distinctId)) { + this.logDroppedEvent(token, distinctId) + continue + } - private async runWithHeartbeat(func: () => Promise | T): Promise { - // Helper function to ensure that looping over lots of hog functions doesn't block up the thread, killing the consumer - const res = await func() - this.heartbeat() - await new Promise((resolve) => process.nextTick(resolve)) + // Parse the message payload into the event object + const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString()) + const combinedEvent: PipelineEvent = { ...JSON.parse(dataStr), ...rawEvent } + const event: PipelineEvent = normalizeEvent({ + ...combinedEvent, + }) - return res - } + // In case the headers were not set we check the parsed message now + if (this.shouldDropEvent(combinedEvent.token, combinedEvent.distinct_id)) { + this.logDroppedEvent(combinedEvent.token, combinedEvent.distinct_id) + continue + } - private async runManyWithHeartbeat(items: T[], func: (item: T) => Promise | R): Promise { - // Helper function to ensure that looping over lots of hog functions doesn't block up the event loop, leading to healthcheck failures - const results = [] + const eventKey = `${event.token}:${event.distinct_id}` - for (const item of items) { - results.push(await this.runWithHeartbeat(() => func(item))) + // We collect the events grouped by token and distinct_id so that we can process batches in parallel whilst keeping the order of events + // for a given distinct_id + if (!batches[eventKey]) { + batches[eventKey] = [] + } + + batches[eventKey].push({ message, event }) } - return results + + return Promise.resolve(batches) } private async startKafkaConsumer(options: {