feat(node): Added opentelemetry to node services (#37154)

This commit is contained in:
Ben White
2025-08-26 14:30:03 +01:00
committed by GitHub
parent 120d0d6710
commit 396e3631d8
18 changed files with 1360 additions and 253 deletions

View File

@@ -134,6 +134,7 @@ services:
CDP_REDIS_PORT: 6379
ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS
CYCLOTRON_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
OTEL_EXPORTER_OTLP_ENDPOINT: ''
depends_on:
- db
- redis

View File

@@ -62,6 +62,15 @@
"@google-cloud/pubsub": "4.11.0",
"@google-cloud/storage": "^5.8.5",
"@maxmind/geoip2-node": "^3.4.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/auto-instrumentations-node": "^0.62.1",
"@opentelemetry/exporter-trace-otlp-grpc": "^0.203.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.203.0",
"@opentelemetry/instrumentation-pg": "^0.56.0",
"@opentelemetry/resources": "^2.0.1",
"@opentelemetry/sdk-node": "^0.203.0",
"@opentelemetry/sdk-trace-node": "^2.0.1",
"@opentelemetry/semantic-conventions": "^1.36.0",
"@posthog/cyclotron": "workspace:*",
"@posthog/hogvm": "workspace:*",
"@posthog/plugin-scaffold": "1.4.4",

View File

@@ -0,0 +1,36 @@
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node'
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc'
import { resourceFromAttributes } from '@opentelemetry/resources'
import { NodeSDK } from '@opentelemetry/sdk-node'
import { ParentBasedSampler, TraceIdRatioBasedSampler } from '@opentelemetry/sdk-trace-node'
import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from '@opentelemetry/semantic-conventions'
import { defaultConfig } from '~/config/config'
import { logger } from '~/utils/logger'
// W3C is default; keep it for header interop
const sdk = new NodeSDK({
resource: resourceFromAttributes({
[ATTR_SERVICE_NAME]: `node-${defaultConfig.PLUGIN_SERVER_MODE ?? 'plugin-server'}`,
[ATTR_SERVICE_VERSION]: process.env.COMMIT_SHA ?? 'dev',
}),
traceExporter: new OTLPTraceExporter({
url: defaultConfig.OTEL_EXPORTER_OTLP_ENDPOINT,
}),
instrumentations: [getNodeAutoInstrumentations()],
sampler: new ParentBasedSampler({
root: new TraceIdRatioBasedSampler(defaultConfig.OTEL_TRACES_SAMPLER_ARG),
}),
// add your DB/HTTP instrumentations as needed
})
export const initTracing = (): void => {
if (!defaultConfig.OTEL_SDK_DISABLED && defaultConfig.OTEL_EXPORTER_OTLP_ENDPOINT) {
logger.info('Starting tracing with endpoint', {
endpoint: defaultConfig.OTEL_EXPORTER_OTLP_ENDPOINT,
samplerArg: defaultConfig.OTEL_TRACES_SAMPLER_ARG,
})
sdk.start()
}
}

View File

@@ -0,0 +1,40 @@
import { Attributes, SpanKind, SpanStatusCode, Tracer, trace } from '@opentelemetry/api'
import { Summary } from 'prom-client'
const instrumentedFnSummary = new Summary({
name: 'instrumented_fn_duration_ms',
help: 'Duration of instrumented functions',
labelNames: ['metricName', 'tag'],
percentiles: [0.5, 0.9, 0.95, 0.99],
})
// Helper method to instrument a function - instruments it in opentelem primarily with a prom metric too
export function withSpan<T>(
tracer: Tracer | string,
name: string,
attrs: Attributes,
fn: () => Promise<T>
): Promise<T> {
const stopTimer = instrumentedFnSummary
.labels({
metricName: name,
tag: attrs.tag ? String(attrs.tag) : undefined,
})
.startTimer()
const _tracer = typeof tracer === 'string' ? trace.getTracer(tracer) : tracer
return _tracer.startActiveSpan(name, { kind: SpanKind.CLIENT, attributes: attrs }, async (span) => {
try {
const out = await fn()
span.setStatus({ code: SpanStatusCode.OK })
return out
} catch (e: any) {
span.recordException(e)
span.setStatus({ code: SpanStatusCode.ERROR, message: e?.message })
throw e
} finally {
span.end()
stopTimer()
}
})
}

View File

@@ -18,6 +18,9 @@ export const defaultConfig = overrideWithEnv(getDefaultConfig())
export function getDefaultConfig(): PluginsServerConfig {
return {
INSTRUMENT_THREAD_PERFORMANCE: false,
OTEL_EXPORTER_OTLP_ENDPOINT: isDevEnv() ? 'http://localhost:4317' : '',
OTEL_SDK_DISABLED: isDevEnv() ? true : false,
OTEL_TRACES_SAMPLER_ARG: 1,
DATABASE_URL: isTestEnv()
? 'postgres://posthog:posthog@localhost:5432/test_posthog'
: isDevEnv()

View File

@@ -1,7 +1,9 @@
// NOTE: Keep these as ~ imports as we can validate the build output this way
import { initTracing } from '~/common/tracing/otel'
import { PluginServer } from '~/server'
import { initSuperProperties } from '~/utils/posthog'
initSuperProperties()
initTracing()
const server = new PluginServer()
void server.start()

View File

@@ -3,10 +3,11 @@ import Redis from 'ioredis'
import { CacheOptions } from '@posthog/plugin-scaffold'
import { withSpan } from '~/common/tracing/tracing-utils'
import { RedisOperationError } from '../../utils/db/error'
import { timeoutGuard } from '../../utils/db/utils'
import { parseJSON } from '../../utils/json-parse'
import { instrumentQuery } from '../../utils/metrics'
import { tryTwice } from '../../utils/utils'
/** The recommended way of accessing the database. */
@@ -24,7 +25,7 @@ export class RedisHelpers {
logContext: Record<string, string | string[] | number>,
runQuery: (client: Redis.Redis) => Promise<T>
): Promise<T> {
return instrumentQuery(operationName, tag, async () => {
return withSpan('redis', operationName, { tag: tag ?? 'unknown' }, async () => {
let client: Redis.Redis
const timeout = timeoutGuard(`${operationName} delayed. Waiting over 30 sec.`, logContext)
try {

View File

@@ -179,6 +179,9 @@ export type PersonBatchWritingMode = 'BATCH' | 'SHADOW' | 'NONE'
export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig {
INSTRUMENT_THREAD_PERFORMANCE: boolean
OTEL_EXPORTER_OTLP_ENDPOINT: string
OTEL_SDK_DISABLED: boolean
OTEL_TRACES_SAMPLER_ARG: number
TASKS_PER_WORKER: number // number of parallel tasks per worker thread
INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch
INGESTION_BATCH_SIZE: number // kafka consumer batch size

View File

@@ -1,7 +1,8 @@
import { CacheOptions } from '@posthog/plugin-scaffold'
import { withSpan } from '~/common/tracing/tracing-utils'
import { PluginsServerConfig, RedisPool } from '../../types'
import { instrumentQuery } from '../metrics'
import { UUIDT } from '../utils'
import { createRedisPool } from './redis'
import { timeoutGuard } from './utils'
@@ -23,7 +24,7 @@ export class Celery {
private redisLPush(key: string, value: unknown, options: CacheOptions = {}): Promise<number> {
const { jsonSerialize = true } = options
return instrumentQuery('query.redisLPush', undefined, async () => {
return withSpan('redis', 'LPush', { tag: 'celery' }, async () => {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard('LPushing redis key delayed. Waiting over 30 sec to lpush key', { key })
try {

View File

@@ -5,6 +5,8 @@ import { QueryResult } from 'pg'
import { CacheOptions } from '@posthog/plugin-scaffold'
import { withSpan } from '~/common/tracing/tracing-utils'
import { KAFKA_PLUGIN_LOG_ENTRIES } from '../../config/kafka-topics'
import { KafkaProducerWrapper, TopicMessage } from '../../kafka/producer'
import {
@@ -29,7 +31,6 @@ import {
import { fetchAction, fetchAllActionsGroupedByTeam } from '../../worker/ingestion/action-manager'
import { parseJSON } from '../json-parse'
import { logger } from '../logger'
import { instrumentQuery } from '../metrics'
import { captureException } from '../posthog'
import { UUID, UUIDT, tryTwice } from '../utils'
import { OrganizationPluginsAccessLevel } from './../../types'
@@ -168,7 +169,7 @@ export class DB {
logContext: Record<string, string | string[] | number>,
runQuery: (client: Redis.Redis) => Promise<T>
): Promise<T> {
return instrumentQuery(operationName, tag, async () => {
return withSpan('redis', operationName, { tag: tag ?? 'unknown' }, async () => {
let client: Redis.Redis
const timeout = timeoutGuard(`${operationName} delayed. Waiting over 30 sec.`, logContext)
try {

View File

@@ -1,8 +1,9 @@
// Postgres
import { Client, Pool, PoolClient, QueryConfig, QueryResult, QueryResultRow } from 'pg'
import { withSpan } from '~/common/tracing/tracing-utils'
import { PluginsServerConfig } from '../../types'
import { instrumentQuery } from '../../utils/metrics'
import { logger } from '../logger'
import { createPostgresPool } from '../utils'
import { POSTGRES_UNAVAILABLE_ERROR_MESSAGES } from './db'
@@ -138,7 +139,7 @@ export class PostgresRouter {
): Promise<ReturnType> {
const wrappedTag = `${PostgresUse[usage]}:Tx<${tag}>`
return instrumentQuery('query.postgres_transaction', wrappedTag, async () => {
return withSpan('postgres', 'query.postgres_transaction', { tag: wrappedTag }, async () => {
const timeout = timeoutGuard(`Postgres slow transaction warning after 30 sec!`)
const client = await this.pools.get(usage)!.connect()
try {
@@ -183,7 +184,7 @@ function postgresQuery<R extends QueryResultRow = any, I extends any[] = any[]>(
tag: string,
queryFailureLogLevel: 'error' | 'warn' = 'error'
): Promise<QueryResult<R>> {
return instrumentQuery('query.postgres', tag, async () => {
return withSpan('postgres', 'query.postgres', { tag: tag ?? 'unknown' }, async () => {
const queryConfig =
typeof queryString === 'string'
? {

View File

@@ -1,9 +1,9 @@
import { PoolClient } from 'pg'
import { withSpan } from '~/common/tracing/tracing-utils'
import { twoPhaseCommitFailuresCounter } from '~/worker/ingestion/persons/metrics'
import { logger } from '../logger'
import { instrumentQuery } from '../metrics'
import { PostgresRouter, PostgresUse, TransactionClient } from './postgres'
export type TwoPhaseSides = {
@@ -35,7 +35,7 @@ export class TwoPhaseCommitCoordinator {
const gidRightLiteral = `'${gidRight.replace(/'/g, "''")}'`
const { left, right } = this.sides
return await instrumentQuery('query.dualwrite_spc', tag, async () => {
return await withSpan('postgres', 'query.dualwrite_spc', { tag: tag ?? 'unknown' }, async () => {
let lClient: PoolClient | undefined
let rClient: PoolClient | undefined
let preparedLeft = false

View File

@@ -1,42 +0,0 @@
import { Summary } from 'prom-client'
export async function instrumentQuery<T>(
metricName: string,
tag: string | undefined,
runQuery: () => Promise<T>
): Promise<T> {
return instrument(
{
metricName,
key: 'queryTag',
tag,
},
runQuery
)
}
export async function instrument<T>(
options: {
metricName: string
key?: string
tag?: string
data?: any
},
runQuery: () => Promise<T>
): Promise<T> {
const timer = new Date()
try {
return await runQuery()
} finally {
instrumentedFnSummary
.labels(options.metricName, String(options.key ?? 'null'), String(options.tag ?? 'null'))
.observe(Date.now() - timer.getTime())
}
}
const instrumentedFnSummary = new Summary({
name: 'instrumented_fn_duration_ms',
help: 'Duration of instrumented functions',
labelNames: ['metricName', 'key', 'tag'],
percentiles: [0.5, 0.9, 0.95, 0.99],
})

View File

@@ -1,62 +0,0 @@
import { logger } from './logger'
import { instrument } from './metrics'
import { sleep } from './utils'
export class PeriodicTask {
public readonly promise: Promise<void>
private running = true
private abortController: AbortController
constructor(
public name: string,
task: () => Promise<void>,
intervalMs: number,
minimumWaitMs = 0
) {
this.abortController = new AbortController()
const abortRequested = new Promise((resolve) => {
this.abortController.signal.addEventListener('abort', resolve, { once: true })
})
this.promise = new Promise(async (resolve, reject) => {
try {
logger.debug('🔄', `${this}: Starting...`)
while (!this.abortController.signal.aborted) {
const startTimeMs = Date.now()
await instrument({ metricName: this.name }, task)
const durationMs = Date.now() - startTimeMs
const waitTimeMs = Math.max(intervalMs - durationMs, minimumWaitMs)
logger.debug(
'🔄',
`${this}: Task completed in ${durationMs / 1000}s, next evaluation in ${waitTimeMs / 1000}s`
)
await Promise.race([sleep(waitTimeMs), abortRequested])
}
logger.info('🔴', `${this}: Stopped by request.`)
resolve()
} catch (error) {
logger.warn('⚠️', `${this}: Unexpected error!`, { error })
reject(error)
} finally {
this.running = false
}
})
}
public toString(): string {
return `Periodic Task (${this.name})`
}
public isRunning(): boolean {
return this.running
}
public async stop(): Promise<void> {
logger.info(``, `${this}: Stop requested...`)
this.abortController.abort()
try {
await this.promise
} catch {}
}
}

View File

@@ -15,7 +15,6 @@ import {
import { processError } from '../../utils/db/error'
import { getPlugin, setPluginCapabilities } from '../../utils/db/sql'
import { logger } from '../../utils/logger'
import { instrument } from '../../utils/metrics'
import { getNextRetryMs } from '../../utils/retries'
import { pluginDigest } from '../../utils/utils'
import { getVMPluginCapabilities, shouldSetupPluginInServer } from '../vm/capabilities'
@@ -169,14 +168,7 @@ export class LazyPluginVM implements PluginInstance {
if (!this.ready) {
const vm = (await this.resolveInternalVm)?.vm
try {
await instrument(
{
metricName: 'vm.setup',
key: 'plugin',
tag: this.pluginConfig.plugin?.name || '?',
},
() => this._setupPlugin(vm)
)
await this._setupPlugin(vm)
} catch (error) {
logger.warn('⚠️', error.message)
return false

View File

@@ -2,6 +2,7 @@ import { ClickHouseClient, ExecResult, createClient as createClickhouseClient }
import { performance } from 'perf_hooks'
import { Readable } from 'stream'
import { withSpan } from '~/common/tracing/tracing-utils'
import {
ClickHouseEvent,
ClickHousePerson,
@@ -16,7 +17,6 @@ import { timeoutGuard } from '~/utils/db/utils'
import { isTestEnv } from '~/utils/env-utils'
import { parseRawClickHouseEvent } from '~/utils/event'
import { parseJSON } from '~/utils/json-parse'
import { instrumentQuery } from '~/utils/metrics'
import { fetch } from '~/utils/request'
import { logger } from '../../src/utils/logger'
@@ -160,7 +160,7 @@ export class Clickhouse {
}
query<T>(query: string): Promise<T[]> {
return instrumentQuery('query.clickhouse', undefined, async () => {
return withSpan('clickhouse', 'query.clickhouse', { tag: 'unknown' }, async () => {
const timeout = timeoutGuard('ClickHouse slow query warning after 30 sec', { query })
try {
const queryResult = await this.client.query({

View File

@@ -1,24 +0,0 @@
import { PeriodicTask } from '../../src/utils/periodic-task'
describe('PeriodicTask', () => {
describe('updates completion status', () => {
it('on success', async () => {
const fn = jest.fn()
const task = new PeriodicTask('test', fn, 1000)
expect(fn).toHaveBeenCalled()
expect(task.isRunning()).toEqual(true)
await task.stop()
expect(task.isRunning()).toEqual(false)
})
it('on failure', async () => {
const fn = jest.fn(() => {
throw new Error()
})
const task = new PeriodicTask('test', fn, 1000)
expect(fn).toHaveBeenCalled()
await task.stop()
expect(task.isRunning()).toEqual(false)
})
})
})

1351
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff