feat(plugin-server): Support preserving distinct ID locality on overflow rerouting (#20945)

Turned off by default for backwards compatibility for now.
This commit is contained in:
ted kaemming
2024-04-02 08:56:29 -07:00
committed by GitHub
parent 39050a9176
commit 85ef237459
6 changed files with 48 additions and 41 deletions

View File

@@ -74,6 +74,8 @@ export function getDefaultConfig(): PluginsServerConfig {
TASKS_PER_WORKER: 10,
INGESTION_CONCURRENCY: 10,
INGESTION_BATCH_SIZE: 500,
INGESTION_OVERFLOW_ENABLED: false,
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false,
PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log,
LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info,
SENTRY_DSN: null,

View File

@@ -4,7 +4,6 @@ import { Counter } from 'prom-client'
import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { isIngestionOverflowEnabled } from '../../utils/env-utils'
import { status } from '../../utils/status'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { IngestionConsumer } from './kafka-queue'
@@ -24,7 +23,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.
Before processing, if isIngestionOverflowEnabled and an event has
Before processing, if overflow rerouting is enabled and an event has
overflowed the capacity for its (team_id, distinct_id) pair, it will not
be processed here but instead re-produced into the
`events_plugin_ingestion_overflow` topic for later processing.
@@ -47,7 +46,11 @@ export const startAnalyticsEventsIngestionConsumer = async ({
// deployment, we require an env variable to be set to confirm this before
// enabling re-production of events to the OVERFLOW topic.
const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled
const overflowMode = hub.INGESTION_OVERFLOW_ENABLED
? hub.INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY
? IngestionOverflowMode.Reroute
: IngestionOverflowMode.RerouteRandomly
: IngestionOverflowMode.Disabled
const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false)
const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise<void> => {

View File

@@ -28,7 +28,8 @@ require('@sentry/tracing')
export enum IngestionOverflowMode {
Disabled,
Reroute,
Reroute, // preserves partition locality
RerouteRandomly, // discards partition locality
ConsumeSplitByDistinctId,
ConsumeSplitEvenly,
}
@@ -217,7 +218,9 @@ export async function eachBatchParallelIngestion(
op: 'emitToOverflow',
data: { eventCount: splitBatch.toOverflow.length },
})
processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow))
processingPromises.push(
emitToOverflow(queue, splitBatch.toOverflow, overflowMode === IngestionOverflowMode.RerouteRandomly)
)
overflowSpan.finish()
}
@@ -257,14 +260,14 @@ function computeKey(pluginEvent: PipelineEvent): string {
return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}`
}
async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]) {
async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[], useRandomPartitioner: boolean) {
ingestionOverflowingMessagesTotal.inc(kafkaMessages.length)
await Promise.all(
kafkaMessages.map((message) =>
queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null, // No locality guarantees in overflow
key: useRandomPartitioner ? undefined : message.key,
headers: message.headers,
waitForAck: true,
})
@@ -286,6 +289,9 @@ export function splitIngestionBatch(
toProcess: [],
toOverflow: [],
}
const shouldRerouteToOverflow = [IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly].includes(
overflowMode
)
if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) {
/**
@@ -314,7 +320,7 @@ export function splitIngestionBatch(
const batches: Map<string, { message: Message; pluginEvent: PipelineEvent }[]> = new Map()
for (const message of kafkaMessages) {
if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) {
if (shouldRerouteToOverflow && message.key == null) {
// Overflow detected by capture, reroute to overflow topic
// Not applying tokenBlockList to save CPU. TODO: do so once token is in the message headers
output.toOverflow.push(message)
@@ -334,12 +340,8 @@ export function splitIngestionBatch(
}
const eventKey = computeKey(pluginEvent)
if (
overflowMode === IngestionOverflowMode.Reroute &&
!ConfiguredLimiter.consume(eventKey, 1, message.timestamp)
) {
if (shouldRerouteToOverflow && !ConfiguredLimiter.consume(eventKey, 1, message.timestamp)) {
// Local overflow detection triggering, reroute to overflow topic too
message.key = null
ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc()
if (LoggingLimiter.consume(eventKey, 1)) {
status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`)

View File

@@ -94,6 +94,8 @@ export interface PluginsServerConfig {
TASKS_PER_WORKER: number // number of parallel tasks per worker thread
INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch
INGESTION_BATCH_SIZE: number // kafka consumer batch size
INGESTION_OVERFLOW_ENABLED: boolean // whether or not overflow rerouting is enabled (only used by analytics-ingestion)
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: boolean // whether or not Kafka message keys should be preserved or discarded when messages are rerouted to overflow
TASK_TIMEOUT: number // how many seconds until tasks are timed out
DATABASE_URL: string // Postgres database URL
DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database

View File

@@ -42,11 +42,6 @@ export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Productio
export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT
export function isIngestionOverflowEnabled(): boolean {
const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED
return stringToBoolean(ingestionOverflowEnabled)
}
export function isOverflowBatchByDistinctId(): boolean {
const overflowBatchByDistinctId = process.env.INGESTION_OVERFLOW_BATCH_BY_DISTINCT_ID
return stringToBoolean(overflowBatchByDistinctId)

View File

@@ -107,32 +107,35 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
expect(runEventPipeline).not.toHaveBeenCalled()
})
it('reroutes excess events to OVERFLOW topic', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)
it.each([IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly])(
'reroutes excess events to OVERFLOW topic (mode=%p)',
async (overflowMode) => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, overflowMode)
expect(consume).toHaveBeenCalledWith(
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
waitForAck: true,
})
expect(consume).toHaveBeenCalledWith(
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: overflowMode === IngestionOverflowMode.Reroute ? batch[0].key : undefined,
waitForAck: true,
})
// Event is not processed here
expect(runEventPipeline).not.toHaveBeenCalled()
})
// Event is not processed here
expect(runEventPipeline).not.toHaveBeenCalled()
}
)
it('does not reroute if not over capacity limit', async () => {
const now = Date.now()