fix(node): Don't keep the messages array in the closure (#39775)

This commit is contained in:
Ben White
2025-10-16 15:47:51 +02:00
committed by GitHub
parent 47677b07ed
commit b20fd9371f
12 changed files with 41 additions and 55 deletions

View File

@@ -1,4 +1,4 @@
import { LogLevel, PluginLogLevel, PluginsServerConfig, ValueMatcher, stringToPluginServerMode } from '../types'
import { PluginLogLevel, PluginsServerConfig, ValueMatcher, stringToPluginServerMode } from '../types'
import { isDevEnv, isProdEnv, isTestEnv, stringToBoolean } from '../utils/env-utils'
import { KAFKAJS_LOG_LEVEL_MAPPING } from './constants'
import {
@@ -75,6 +75,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CONSUMER_MAX_BACKGROUND_TASKS: 1,
CONSUMER_WAIT_FOR_BACKGROUND_TASKS_ON_REBALANCE: false,
CONSUMER_AUTO_CREATE_TOPICS: true,
CONSUMER_LOG_STATS_LEVEL: 'debug',
KAFKA_HOSTS: 'kafka:9092', // KEEP IN SYNC WITH posthog/settings/data_stores.py
KAFKA_CLIENT_CERT_B64: undefined,
KAFKA_CLIENT_CERT_KEY_B64: undefined,
@@ -103,7 +104,7 @@ export function getDefaultConfig(): PluginsServerConfig {
INGESTION_FORCE_OVERFLOW_BY_TOKEN_DISTINCT_ID: '',
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false,
PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log,
LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info,
LOG_LEVEL: isTestEnv() ? 'warn' : 'info',
HTTP_SERVER_PORT: DEFAULT_HTTP_SERVER_PORT,
SCHEDULE_LOCK_TTL: 60,
REDIS_POOL_MIN_SIZE: 1,

View File

@@ -21,6 +21,7 @@ import {
HealthCheckResultDegraded,
HealthCheckResultError,
HealthCheckResultOk,
LogLevel,
} from '~/types'
import { isTestEnv } from '~/utils/env-utils'
import { parseJSON } from '~/utils/json-parse'
@@ -110,7 +111,8 @@ export const findOffsetsToCommit = (messages: TopicPartitionOffset[]): TopicPart
return {
topic,
partition: parseInt(partition),
offset: highestOffset,
// When committing to Kafka you commit the offset of the next message you want to consume
offset: highestOffset + 1,
}
})
})
@@ -163,6 +165,7 @@ export class KafkaConsumer {
rebalanceTimeoutMs: 20000,
rebalanceStartTime: 0,
}
private consumerLogStatsLevel: LogLevel
constructor(
private config: KafkaConsumerConfig,
@@ -182,6 +185,7 @@ export class KafkaConsumer {
this.maxHealthHeartbeatIntervalMs =
defaultConfig.CONSUMER_MAX_HEARTBEAT_INTERVAL_MS || MAX_HEALTH_HEARTBEAT_INTERVAL_MS
this.consumerLoopStallThresholdMs = defaultConfig.CONSUMER_LOOP_STALL_THRESHOLD_MS
this.consumerLogStatsLevel = defaultConfig.CONSUMER_LOG_STATS_LEVEL
const rebalancecb: RebalanceCallback = this.config.waitForBackgroundTasksOnRebalance
? this.rebalanceCallback.bind(this)
@@ -523,11 +527,8 @@ export class KafkaConsumer {
topics: Object.keys(parsedStats.topics || {}),
broker_count: brokerStats.size,
brokers: Array.from(brokerStats.entries()).map(([name, stats]) => ({
...stats,
name,
state: stats.state,
rtt_avg: stats.rtt?.avg,
connects: stats.connects,
disconnects: stats.disconnects,
})),
}
@@ -540,7 +541,7 @@ export class KafkaConsumer {
logData.assignment_size = parsedStats.cgrp.assignment_size
}
logger.debug('📊', 'Kafka consumer statistics', logData)
logger[this.consumerLogStatsLevel]('📊', 'Kafka consumer statistics', logData)
} catch (error) {
logger.error('📊', 'Failed to parse consumer statistics', {
error: error instanceof Error ? error.message : String(error),
@@ -568,26 +569,18 @@ export class KafkaConsumer {
return consumer
}
private storeOffsetsForMessages = (messages: Message[]): void => {
const topicPartitionOffsets = findOffsetsToCommit(messages).map((message) => {
return {
...message,
// When committing to Kafka you commit the offset of the next message you want to consume
offset: message.offset + 1,
}
})
if (topicPartitionOffsets.length > 0) {
logger.debug('📝', 'Storing offsets', { topicPartitionOffsets })
private storeOffsetsForMessages = (topicPartitionOffsetsToCommit: TopicPartitionOffset[]): void => {
if (topicPartitionOffsetsToCommit.length > 0) {
logger.debug('📝', 'Storing offsets', { topicPartitionOffsetsToCommit })
try {
this.rdKafkaConsumer.offsetsStore(topicPartitionOffsets)
this.rdKafkaConsumer.offsetsStore(topicPartitionOffsetsToCommit)
} catch (e) {
// NOTE: We don't throw here - this can happen if we were re-assigned partitions
// and the offsets are no longer valid whilst processing a batch
logger.error('📝', 'Failed to store offsets', {
error: String(e),
assignedPartitions: this.assignments(),
topicPartitionOffsets,
topicPartitionOffsetsToCommit,
})
captureException(e)
}
@@ -696,14 +689,11 @@ export class KafkaConsumer {
// it would be hard to mix background work with non-background work.
// So we just create pretend work to simplify the rest of the logic
const backgroundTask = result?.backgroundTask ?? Promise.resolve()
const backgroundTaskStart = performance.now()
// Pull out the offsets to commit from the messages so we can release the messages reference
const topicPartitionOffsetsToCommit = findOffsetsToCommit(messages)
void backgroundTask.finally(async () => {
// Only when we are fully done with the background work we store the offsets
// TODO: Test if this fully works as expected - like what if backgroundBatches[1] finishes after backgroundBatches[0]
// Remove the background work from the queue when it is finished
// First of all clear ourselves from the queue
const index = this.backgroundTask.indexOf(backgroundTask)
void this.backgroundTask.splice(index, 1)
@@ -712,7 +702,7 @@ export class KafkaConsumer {
await Promise.all(this.backgroundTask.slice(0, index))
if (this.config.autoCommit && this.config.autoOffsetStore) {
this.storeOffsetsForMessages(messages)
this.storeOffsetsForMessages(topicPartitionOffsetsToCommit)
}
if (result?.backgroundTask) {

View File

@@ -50,12 +50,7 @@ export { Element } from '@posthog/plugin-scaffold' // Re-export Element from sca
type Brand<K, T> = K & { __brand: T }
export enum LogLevel {
Debug = 'debug',
Info = 'info',
Warn = 'warn',
Error = 'error',
}
export type LogLevel = 'debug' | 'info' | 'warn' | 'error'
export enum KafkaSecurityProtocol {
Plaintext = 'PLAINTEXT',
@@ -312,6 +307,7 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
CONSUMER_BATCH_SIZE: number // Primarily for kafka consumers the batch size to use
CONSUMER_MAX_HEARTBEAT_INTERVAL_MS: number // Primarily for kafka consumers the max heartbeat interval to use after which it will be considered unhealthy
CONSUMER_LOOP_STALL_THRESHOLD_MS: number // Threshold in ms after which the consumer loop is considered stalled
CONSUMER_LOG_STATS_LEVEL: LogLevel // Log level for consumer statistics
CONSUMER_LOOP_BASED_HEALTH_CHECK: boolean // Use consumer loop monitoring for health checks instead of heartbeats
CONSUMER_MAX_BACKGROUND_TASKS: number
CONSUMER_WAIT_FOR_BACKGROUND_TASKS_ON_REBALANCE: boolean

View File

@@ -79,19 +79,19 @@ export class Logger {
}
debug(...args: any[]) {
this._log(LogLevel.Debug, ...args)
this._log('debug', ...args)
}
info(...args: any[]) {
this._log(LogLevel.Info, ...args)
this._log('info', ...args)
}
warn(...args: any[]) {
this._log(LogLevel.Warn, ...args)
this._log('warn', ...args)
}
error(...args: any[]) {
this._log(LogLevel.Error, ...args)
this._log('error', ...args)
}
async shutdown(): Promise<void> {

View File

@@ -21,7 +21,7 @@ import { PersonsStoreForBatch } from '~/worker/ingestion/persons/persons-store-f
import { createEmitEventStep } from '../../src/ingestion/event-processing/emit-event-step'
import { isOkResult } from '../../src/ingestion/pipelines/results'
import { ClickHouseEvent, Hub, LogLevel, Person, PluginsServerConfig, Team } from '../../src/types'
import { ClickHouseEvent, Hub, Person, PluginsServerConfig, Team } from '../../src/types'
import { closeHub, createHub } from '../../src/utils/db/hub'
import { PostgresUse } from '../../src/utils/db/postgres'
import { UUIDT } from '../../src/utils/utils'
@@ -73,7 +73,7 @@ async function flushPersonStoreToKafka(hub: Hub, personStore: PersonsStoreForBat
}
const TEST_CONFIG: Partial<PluginsServerConfig> = {
LOG_LEVEL: LogLevel.Info,
LOG_LEVEL: 'info',
}
describe('processEvent', () => {

View File

@@ -1,5 +1,5 @@
import { PluginServer } from '../src/server'
import { LogLevel, PluginServerMode } from '../src/types'
import { PluginServerMode } from '../src/types'
import { resetTestDatabase } from './helpers/sql'
jest.setTimeout(20000) // 20 sec timeout - longer indicates an issue
@@ -29,7 +29,7 @@ describe('server', () => {
// Running all capabilities together takes too long in tests, so they are split up
it('should not error on startup - ingestion', async () => {
pluginsServer = new PluginServer({
LOG_LEVEL: LogLevel.Debug,
LOG_LEVEL: 'debug',
PLUGIN_SERVER_MODE: PluginServerMode.ingestion_v2,
})
await pluginsServer.start()
@@ -37,7 +37,7 @@ describe('server', () => {
it('should not error on startup - cdp', async () => {
pluginsServer = new PluginServer({
LOG_LEVEL: LogLevel.Debug,
LOG_LEVEL: 'debug',
PLUGIN_SERVER_MODE: PluginServerMode.cdp_processed_events,
})
await pluginsServer.start()
@@ -45,7 +45,7 @@ describe('server', () => {
it('should not error on startup - replay', async () => {
pluginsServer = new PluginServer({
LOG_LEVEL: LogLevel.Debug,
LOG_LEVEL: 'debug',
PLUGIN_SERVER_MODE: PluginServerMode.recordings_blob_ingestion_v2,
})
await pluginsServer.start()

View File

@@ -1,4 +1,4 @@
import { Hub, LogLevel, PluginCapabilities } from '../../src/types'
import { Hub, PluginCapabilities } from '../../src/types'
import { closeHub, createHub } from '../../src/utils/db/hub'
import { getVMPluginCapabilities, shouldSetupPluginInServer } from '../../src/worker/vm/capabilities'
import { createPluginConfigVM } from '../../src/worker/vm/vm'
@@ -14,7 +14,7 @@ describe('capabilities', () => {
beforeAll(async () => {
console.info = jest.fn() as any
console.warn = jest.fn() as any
hub = await createHub({ LOG_LEVEL: LogLevel.Warn })
hub = await createHub({ LOG_LEVEL: 'warn' })
})
afterAll(async () => {

View File

@@ -3,7 +3,6 @@ import { DateTime } from 'luxon'
import { PluginServer } from '../../../src/server'
import {
Hub,
LogLevel,
PluginServerMode,
PluginsServerConfig,
PropertyUpdateOperation,
@@ -26,7 +25,7 @@ jest.mock('../../../src/utils/logger')
jest.setTimeout(30000)
const extraServerConfig: Partial<PluginsServerConfig> = {
LOG_LEVEL: LogLevel.Info,
LOG_LEVEL: 'info',
}
describe('postgres parity', () => {

View File

@@ -1,4 +1,4 @@
import { Hub, LogLevel } from '../../../src/types'
import { Hub } from '../../../src/types'
import { closeHub, createHub } from '../../../src/utils/db/hub'
import { captureIngestionWarning } from '../../../src/worker/ingestion/utils'
import { Clickhouse } from '../../helpers/clickhouse'
@@ -10,7 +10,7 @@ describe('captureIngestionWarning()', () => {
let clickhouse: Clickhouse
beforeEach(async () => {
hub = await createHub({ LOG_LEVEL: LogLevel.Info })
hub = await createHub({ LOG_LEVEL: 'info' })
clickhouse = Clickhouse.create()
await clickhouse.resetTestDatabase()
})

View File

@@ -1,4 +1,4 @@
import { Hub, LogLevel } from '../../src/types'
import { Hub } from '../../src/types'
import { processError } from '../../src/utils/db/error'
import { closeHub, createHub } from '../../src/utils/db/hub'
import { loadPlugin } from '../../src/worker/plugins/loadPlugin'
@@ -29,7 +29,7 @@ describe('plugins', () => {
let hub: Hub
beforeEach(async () => {
hub = await createHub({ LOG_LEVEL: LogLevel.Info })
hub = await createHub({ LOG_LEVEL: 'info' })
console.warn = jest.fn() as any
await resetTestDatabase()
})

View File

@@ -1,6 +1,6 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { Hub, LogLevel, Plugin, PluginConfig } from '../../../src/types'
import { Hub, Plugin, PluginConfig } from '../../../src/types'
import { closeHub, createHub } from '../../../src/utils/db/hub'
import { PostgresUse } from '../../../src/utils/db/postgres'
import {
@@ -19,7 +19,7 @@ describe('Inline plugin', () => {
beforeAll(async () => {
console.info = jest.fn() as any
console.warn = jest.fn() as any
hub = await createHub({ LOG_LEVEL: LogLevel.Info })
hub = await createHub({ LOG_LEVEL: 'info' })
await resetTestDatabase()
})

View File

@@ -1,6 +1,6 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { LogLevel, PluginConfig } from '../../../../src/types'
import { PluginConfig } from '../../../../src/types'
import { closeHub, createHub } from '../../../../src/utils/db/hub'
import { constructInlinePluginInstance } from '../../../../src/worker/vm/inline/inline'
import { resetTestDatabase } from '../../../helpers/sql'
@@ -11,7 +11,7 @@ describe('user-agent tests', () => {
beforeAll(async () => {
console.info = jest.fn() as any
console.warn = jest.fn() as any
hub = await createHub({ LOG_LEVEL: LogLevel.Info })
hub = await createHub({ LOG_LEVEL: 'info' })
await resetTestDatabase()
})