feat(cdp): Make IngestionConsumer parallel process (#27828)

This commit is contained in:
Ben White
2025-01-31 16:29:00 +01:00
committed by GitHub
parent 48f5a62032
commit fa90a50dfb

View File

@@ -38,8 +38,10 @@ const histogramKafkaBatchSizeKb = new Histogram({
buckets: [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity],
})
type GroupedIncomingEvents = {
[key: string]: { message: Message; event: PipelineEvent }[]
type IncomingEvent = { message: Message; event: PipelineEvent }
type IncomingEventsByDistinctId = {
[key: string]: IncomingEvent[]
}
const PERSON_EVENTS = new Set(['$set', '$identify', '$create_alias', '$merge_dangerously', '$groupidentify'])
@@ -136,9 +138,27 @@ export class IngestionConsumer {
return promise
}
private runInstrumented<T>(name: string, func: () => Promise<T>): Promise<T> {
return runInstrumentedFunction<T>({ statsKey: `ingestionConsumer.${name}`, func })
}
public async handleKafkaBatch(messages: Message[]) {
const parsedMessages = await this.parseKafkaBatch(messages)
await this.processBatch(parsedMessages)
const parsedMessages = await this.runInstrumented('parseKafkaMessages', () => this.parseKafkaBatch(messages))
await this.runInstrumented('processBatch', async () => {
await Promise.all(
Object.values(parsedMessages).map(async (x) => {
return await this.runInstrumented('processEventsForDistinctId', () =>
this.processEventsForDistinctId(x)
)
})
)
})
status.debug('🔁', `Waiting for promises`, { promises: this.promises.size })
await this.runInstrumented('awaitScheduledWork', () => Promise.all(this.promises))
status.debug('🔁', `Processed batch`)
for (const message of messages) {
if (message.timestamp) {
latestOffsetTimestampGauge
@@ -148,65 +168,57 @@ export class IngestionConsumer {
}
}
public async processBatch(groupedIncomingEvents: GroupedIncomingEvents): Promise<void> {
await this.runManyWithHeartbeat(Object.values(groupedIncomingEvents), async (eventsForDistinctId) => {
// Process every message sequentially, stash promises to await on later
for (const { message, event } of eventsForDistinctId) {
// Track $set usage in events that aren't known to use it, before ingestion adds anything there
if (
event.properties &&
!PERSON_EVENTS.has(event.event) &&
!KNOWN_SET_EVENTS.has(event.event) &&
('$set' in event.properties || '$set_once' in event.properties || '$unset' in event.properties)
) {
setUsageInNonPersonEventsCounter.inc()
}
private async processEventsForDistinctId(incomingEvents: IncomingEvent[]): Promise<void> {
// Process every message sequentially, stash promises to await on later
for (const { message, event } of incomingEvents) {
// Track $set usage in events that aren't known to use it, before ingestion adds anything there
if (
event.properties &&
!PERSON_EVENTS.has(event.event) &&
!KNOWN_SET_EVENTS.has(event.event) &&
('$set' in event.properties || '$set_once' in event.properties || '$unset' in event.properties)
) {
setUsageInNonPersonEventsCounter.inc()
}
try {
status.debug('🔁', `Processing event`, {
try {
status.debug('🔁', `Processing event`, {
event,
})
const eventKey = `${event.token}:${event.distinct_id}`
// Check the rate limiter and emit to overflow if necessary
const isBelowRateLimit = this.overflowRateLimiter.consume(eventKey, 1, message.timestamp)
if (this.overflowEnabled() && !isBelowRateLimit) {
status.debug('🔁', `Sending to overflow`, {
event,
})
const eventKey = `${event.token}:${event.distinct_id}`
// Check the rate limiter and emit to overflow if necessary
const isBelowRateLimit = this.overflowRateLimiter.consume(eventKey, 1, message.timestamp)
if (this.overflowEnabled() && !isBelowRateLimit) {
status.debug('🔁', `Sending to overflow`, {
event,
})
ingestionPartitionKeyOverflowed.labels(`${event.team_id ?? event.token}`).inc()
if (this.ingestionWarningLimiter.consume(eventKey, 1)) {
status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`)
}
void this.scheduleWork(this.emitToOverflow([message]))
continue
ingestionPartitionKeyOverflowed.labels(`${event.team_id ?? event.token}`).inc()
if (this.ingestionWarningLimiter.consume(eventKey, 1)) {
status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`)
}
const result = await this.runEventPipeline(event)
status.debug('🔁', `Processed event`, {
event,
})
// This contains the Kafka producer ACKs & message promises, to avoid blocking after every message.
result.ackPromises?.forEach((promise) => {
void this.scheduleWork(
promise.catch(async (error) => {
await this.handleProcessingError(error, message, event)
})
)
})
} catch (error) {
await this.handleProcessingError(error, message, event)
void this.scheduleWork(this.emitToOverflow([message]))
continue
}
}
})
status.debug('🔁', `Waiting for promises`, {
promises: this.promises.size,
})
await Promise.all(this.promises)
status.debug('🔁', `Processed batch`)
const result = await this.runInstrumented('runEventPipeline', () => this.runEventPipeline(event))
status.debug('🔁', `Processed event`, {
event,
})
// This contains the Kafka producer ACKs & message promises, to avoid blocking after every message.
result.ackPromises?.forEach((promise) => {
void this.scheduleWork(
promise.catch(async (error) => {
await this.handleProcessingError(error, message, event)
})
)
})
} catch (error) {
await this.handleProcessingError(error, message, event)
}
}
}
private async runEventPipeline(event: PipelineEvent): Promise<EventPipelineResult> {
@@ -216,77 +228,53 @@ export class IngestionConsumer {
})
}
private parseKafkaBatch(messages: Message[]): Promise<GroupedIncomingEvents> {
return runInstrumentedFunction({
statsKey: `ingestionConsumer.handleEachBatch.parseKafkaMessages`,
func: () => {
const batches: GroupedIncomingEvents = {}
private parseKafkaBatch(messages: Message[]): Promise<IncomingEventsByDistinctId> {
const batches: IncomingEventsByDistinctId = {}
for (const message of messages) {
let distinctId: string | undefined
let token: string | undefined
for (const message of messages) {
let distinctId: string | undefined
let token: string | undefined
// Parse the headers so we can early exit if found and should be dropped
message.headers?.forEach((header) => {
if (header.key === 'distinct_id') {
distinctId = header.value.toString()
}
if (header.key === 'token') {
token = header.value.toString()
}
})
if (this.shouldDropEvent(token, distinctId)) {
this.logDroppedEvent(token, distinctId)
continue
}
// Parse the message payload into the event object
const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString())
const combinedEvent: PipelineEvent = { ...JSON.parse(dataStr), ...rawEvent }
const event: PipelineEvent = normalizeEvent({
...combinedEvent,
})
// In case the headers were not set we check the parsed message now
if (this.shouldDropEvent(combinedEvent.token, combinedEvent.distinct_id)) {
this.logDroppedEvent(combinedEvent.token, combinedEvent.distinct_id)
continue
}
const eventKey = `${event.token}:${event.distinct_id}`
// We collect the events grouped by token and distinct_id so that we can process batches in parallel whilst keeping the order of events
// for a given distinct_id
if (!batches[eventKey]) {
batches[eventKey] = []
}
batches[eventKey].push({ message, event })
// Parse the headers so we can early exit if found and should be dropped
message.headers?.forEach((header) => {
if (header.key === 'distinct_id') {
distinctId = header.value.toString()
}
if (header.key === 'token') {
token = header.value.toString()
}
})
return Promise.resolve(batches)
},
})
}
if (this.shouldDropEvent(token, distinctId)) {
this.logDroppedEvent(token, distinctId)
continue
}
private async runWithHeartbeat<T>(func: () => Promise<T> | T): Promise<T> {
// Helper function to ensure that looping over lots of hog functions doesn't block up the thread, killing the consumer
const res = await func()
this.heartbeat()
await new Promise((resolve) => process.nextTick(resolve))
// Parse the message payload into the event object
const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString())
const combinedEvent: PipelineEvent = { ...JSON.parse(dataStr), ...rawEvent }
const event: PipelineEvent = normalizeEvent({
...combinedEvent,
})
return res
}
// In case the headers were not set we check the parsed message now
if (this.shouldDropEvent(combinedEvent.token, combinedEvent.distinct_id)) {
this.logDroppedEvent(combinedEvent.token, combinedEvent.distinct_id)
continue
}
private async runManyWithHeartbeat<T, R>(items: T[], func: (item: T) => Promise<R> | R): Promise<R[]> {
// Helper function to ensure that looping over lots of hog functions doesn't block up the event loop, leading to healthcheck failures
const results = []
const eventKey = `${event.token}:${event.distinct_id}`
for (const item of items) {
results.push(await this.runWithHeartbeat(() => func(item)))
// We collect the events grouped by token and distinct_id so that we can process batches in parallel whilst keeping the order of events
// for a given distinct_id
if (!batches[eventKey]) {
batches[eventKey] = []
}
batches[eventKey].push({ message, event })
}
return results
return Promise.resolve(batches)
}
private async startKafkaConsumer(options: {