feat: add top tracker to session recordings (#41061)

This commit is contained in:
Paweł Ledwoń
2025-11-06 20:22:05 +01:00
committed by GitHub
parent 691335bfad
commit 22ce7dcdda
4 changed files with 345 additions and 2 deletions

View File

@@ -43,6 +43,7 @@ import { SessionMetadataStore } from './sessions/session-metadata-store'
import { TeamFilter } from './teams/team-filter'
import { TeamService } from './teams/team-service'
import { MessageWithTeam } from './teams/types'
import { TopTracker } from './top-tracker'
import { CaptureIngestionWarningFn } from './types'
import { LibVersionMonitor } from './versions/lib-version-monitor'
@@ -65,6 +66,8 @@ export class SessionRecordingIngester {
private restrictionHandler?: SessionRecordingRestrictionHandler
private kafkaOverflowProducer?: KafkaProducerWrapper
private readonly overflowTopic: string
private readonly topTracker: TopTracker
private topTrackerLogInterval?: NodeJS.Timeout
constructor(
private hub: Hub,
@@ -116,7 +119,8 @@ export class SessionRecordingIngester {
s3Client = new S3Client(s3Config)
}
this.kafkaParser = new KafkaMessageParser()
this.topTracker = new TopTracker()
this.kafkaParser = new KafkaMessageParser(this.topTracker)
this.redisPool = createRedisPool(this.hub, 'session-recording')
@@ -245,6 +249,8 @@ export class SessionRecordingIngester {
}
private async consume(message: MessageWithTeam, batch: SessionBatchRecorder) {
const consumeStartTime = performance.now()
// we have to reset this counter once we're consuming messages since then we know we're not re-balancing
// otherwise the consumer continues to report however many sessions were revoked at the last re-balance forever
SessionRecordingIngesterMetrics.resetSessionsRevoked()
@@ -272,7 +278,17 @@ export class SessionRecordingIngester {
}
SessionRecordingIngesterMetrics.observeSessionInfo(parsedMessage.metadata.rawSize)
// Track message size per session_id
const trackingKey = `session_id:${parsedMessage.session_id}`
this.topTracker.increment('message_size_by_session_id', trackingKey, parsedMessage.metadata.rawSize)
await batch.record(message)
// Track consume time per session_id
const consumeEndTime = performance.now()
const consumeDurationMs = consumeEndTime - consumeStartTime
this.topTracker.increment('consume_time_ms_by_session_id', trackingKey, consumeDurationMs)
}
public async start(): Promise<void> {
@@ -331,12 +347,23 @@ export class SessionRecordingIngester {
this.kafkaConsumer.on('event.stats', (stats) => {
logger.info('🪵', 'blob_ingester_consumer_v2 - kafka stats', { stats })
})
// Start periodic logging of top tracked metrics (every 60 seconds)
this.topTrackerLogInterval = setInterval(() => {
this.topTracker.logAndReset(10)
}, 60000)
}
public async stop(): Promise<PromiseSettledResult<any>[]> {
logger.info('🔁', 'blob_ingester_consumer_v2 - stopping')
this.isStopping = true
// Stop the top tracker interval and log final results
if (this.topTrackerLogInterval) {
clearInterval(this.topTrackerLogInterval)
this.topTracker.logAndReset(10)
}
const assignedPartitions = this.assignedTopicPartitions
await this.kafkaConsumer.disconnect()

View File

@@ -5,6 +5,7 @@ import { gunzip } from 'zlib'
import { parseJSON } from '../../../../utils/json-parse'
import { logger } from '../../../../utils/logger'
import { TopTracker } from '../top-tracker'
import { KafkaMetrics } from './metrics'
import { EventSchema, ParsedMessageData, RawEventMessageSchema, SnapshotEvent, SnapshotEventSchema } from './types'
@@ -54,12 +55,15 @@ function getValidEvents(events: unknown[]): {
}
export class KafkaMessageParser {
constructor(private topTracker?: TopTracker) {}
public async parseBatch(messages: Message[]): Promise<ParsedMessageData[]> {
const parsedMessages = await Promise.all(messages.map((message) => this.parseMessage(message)))
return parsedMessages.filter((msg): msg is ParsedMessageData => msg !== null)
}
private async parseMessage(message: Message): Promise<ParsedMessageData | null> {
const parseStartTime = performance.now()
const dropMessage = (reason: string, extra?: Record<string, any>) => {
KafkaMetrics.incrementMessageDropped('session_recordings_blob_ingestion_v2', reason)
@@ -128,7 +132,7 @@ export class KafkaMessageParser {
return dropMessage('message_timestamp_diff_too_large')
}
return {
const parsedMessage = {
metadata: {
partition: message.partition,
topic: message.topic,
@@ -149,6 +153,16 @@ export class KafkaMessageParser {
snapshot_source: $snapshot_source ?? null,
snapshot_library: $lib ?? null,
}
// Track parsing time per session_id
if (this.topTracker) {
const parseEndTime = performance.now()
const parseDurationMs = parseEndTime - parseStartTime
const trackingKey = `session_id:${$session_id}`
this.topTracker.increment('parse_time_ms_by_session_id', trackingKey, parseDurationMs)
}
return parsedMessage
}
private isGzipped(buffer: Buffer): boolean {

View File

@@ -0,0 +1,236 @@
import { logger } from '../../../utils/logger'
import { TopTracker } from './top-tracker'
jest.mock('../../../utils/logger')
describe('TopTracker', () => {
let tracker: TopTracker
let mockLoggerInfo: jest.SpyInstance
beforeEach(() => {
tracker = new TopTracker()
mockLoggerInfo = jest.spyOn(logger, 'info').mockImplementation()
})
afterEach(() => {
jest.clearAllMocks()
})
describe('increment', () => {
it('should increment counter for a metric and key', () => {
tracker.increment('session_size', 'session-123')
expect(tracker.getCount('session_size', 'session-123')).toBe(1)
})
it('should increment by custom count', () => {
tracker.increment('session_size', 'session-123', 500)
expect(tracker.getCount('session_size', 'session-123')).toBe(500)
})
it('should accumulate counts for same metric and key', () => {
tracker.increment('session_size', 'session-123', 100)
tracker.increment('session_size', 'session-123', 200)
tracker.increment('session_size', 'session-123', 300)
expect(tracker.getCount('session_size', 'session-123')).toBe(600)
})
it('should track different keys for same metric separately', () => {
tracker.increment('session_size', 'session-123', 100)
tracker.increment('session_size', 'session-456', 200)
expect(tracker.getCount('session_size', 'session-123')).toBe(100)
expect(tracker.getCount('session_size', 'session-456')).toBe(200)
})
it('should track different metrics separately', () => {
tracker.increment('session_size', 'session-123', 100)
tracker.increment('message_count', 'session-123', 50)
expect(tracker.getCount('session_size', 'session-123')).toBe(100)
expect(tracker.getCount('message_count', 'session-123')).toBe(50)
})
})
describe('getCount', () => {
it('should return 0 for non-existent metric', () => {
expect(tracker.getCount('non_existent', 'key-123')).toBe(0)
})
it('should return 0 for non-existent key', () => {
tracker.increment('session_size', 'session-123', 100)
expect(tracker.getCount('session_size', 'non-existent')).toBe(0)
})
})
describe('getMetrics', () => {
it('should return empty array when no metrics tracked', () => {
expect(tracker.getMetrics()).toEqual([])
})
it('should return all tracked metrics', () => {
tracker.increment('session_size', 'session-123')
tracker.increment('message_count', 'session-456')
tracker.increment('event_count', 'session-789')
const metrics = tracker.getMetrics()
expect(metrics).toHaveLength(3)
expect(metrics).toContain('session_size')
expect(metrics).toContain('message_count')
expect(metrics).toContain('event_count')
})
})
describe('logAndReset', () => {
it('should log top N entries sorted by count descending', () => {
tracker.increment('session_size', 'session-1', 100)
tracker.increment('session_size', 'session-2', 500)
tracker.increment('session_size', 'session-3', 300)
tracker.increment('session_size', 'session-4', 200)
tracker.logAndReset(3)
expect(mockLoggerInfo).toHaveBeenCalledWith('📊 Top entries for metric', {
metric: 'session_size',
topN: 3,
entries: [
{ key: 'session-2', count: 500 },
{ key: 'session-3', count: 300 },
{ key: 'session-4', count: 200 },
],
totalKeys: 4,
})
})
it('should log all entries when topN is greater than total entries', () => {
tracker.increment('session_size', 'session-1', 100)
tracker.increment('session_size', 'session-2', 200)
tracker.logAndReset(10)
expect(mockLoggerInfo).toHaveBeenCalledWith('📊 Top entries for metric', {
metric: 'session_size',
topN: 10,
entries: [
{ key: 'session-2', count: 200 },
{ key: 'session-1', count: 100 },
],
totalKeys: 2,
})
})
it('should log multiple metrics separately', () => {
tracker.increment('session_size', 'session-1', 1000)
tracker.increment('session_size', 'session-2', 2000)
tracker.increment('message_count', 'session-1', 50)
tracker.increment('message_count', 'session-2', 100)
tracker.logAndReset(2)
expect(mockLoggerInfo).toHaveBeenCalledTimes(2)
expect(mockLoggerInfo).toHaveBeenCalledWith('📊 Top entries for metric', {
metric: 'session_size',
topN: 2,
entries: [
{ key: 'session-2', count: 2000 },
{ key: 'session-1', count: 1000 },
],
totalKeys: 2,
})
expect(mockLoggerInfo).toHaveBeenCalledWith('📊 Top entries for metric', {
metric: 'message_count',
topN: 2,
entries: [
{ key: 'session-2', count: 100 },
{ key: 'session-1', count: 50 },
],
totalKeys: 2,
})
})
it('should reset all counters after logging', () => {
tracker.increment('session_size', 'session-1', 100)
tracker.increment('message_count', 'session-2', 50)
tracker.logAndReset(5)
expect(tracker.getCount('session_size', 'session-1')).toBe(0)
expect(tracker.getCount('message_count', 'session-2')).toBe(0)
expect(tracker.getMetrics()).toEqual([])
})
it('should not log metrics with no entries', () => {
tracker.logAndReset(5)
expect(mockLoggerInfo).not.toHaveBeenCalled()
})
it('should handle single entry', () => {
tracker.increment('session_size', 'session-1', 100)
tracker.logAndReset(1)
expect(mockLoggerInfo).toHaveBeenCalledWith('📊 Top entries for metric', {
metric: 'session_size',
topN: 1,
entries: [{ key: 'session-1', count: 100 }],
totalKeys: 1,
})
})
it('should handle topN of 0', () => {
tracker.increment('session_size', 'session-1', 100)
tracker.increment('session_size', 'session-2', 200)
tracker.logAndReset(0)
expect(mockLoggerInfo).toHaveBeenCalledWith('📊 Top entries for metric', {
metric: 'session_size',
topN: 0,
entries: [],
totalKeys: 2,
})
})
})
describe('integration scenarios', () => {
it('should track session sizes and log top sessions', () => {
// Simulate tracking session sizes
tracker.increment('session_size', 'session-abc', 1024)
tracker.increment('session_size', 'session-def', 2048)
tracker.increment('session_size', 'session-abc', 512) // Same session, more data
tracker.increment('session_size', 'session-ghi', 4096)
tracker.logAndReset(2)
expect(mockLoggerInfo).toHaveBeenCalledWith('📊 Top entries for metric', {
metric: 'session_size',
topN: 2,
entries: [
{ key: 'session-ghi', count: 4096 },
{ key: 'session-def', count: 2048 },
],
totalKeys: 3,
})
})
it('should allow reuse after reset', () => {
tracker.increment('session_size', 'session-1', 100)
tracker.logAndReset(5)
tracker.increment('session_size', 'session-2', 200)
tracker.logAndReset(5)
expect(mockLoggerInfo).toHaveBeenCalledTimes(2)
expect(mockLoggerInfo).toHaveBeenLastCalledWith('📊 Top entries for metric', {
metric: 'session_size',
topN: 5,
entries: [{ key: 'session-2', count: 200 }],
totalKeys: 1,
})
})
})
})

View File

@@ -0,0 +1,66 @@
import { logger } from '../../../utils/logger'
export class TopTracker {
private counters: Map<string, Map<string, number>> = new Map()
/**
* Increment the counter for a specific metric and key
* @param metric - The metric name (e.g., 'session_size', 'message_count')
* @param key - The key to track (e.g., session_id, team_id)
* @param count - The amount to increment by (defaults to 1)
*/
public increment(metric: string, key: string, count: number = 1): void {
let metricCounters = this.counters.get(metric)
if (!metricCounters) {
metricCounters = new Map()
this.counters.set(metric, metricCounters)
}
const currentCount = metricCounters.get(key) ?? 0
metricCounters.set(key, currentCount + count)
}
/**
* Log the top N entries for each metric and reset all counters
* @param topN - Number of top entries to log for each metric
*/
public logAndReset(topN: number): void {
for (const [metric, metricCounters] of this.counters.entries()) {
if (metricCounters.size === 0) {
continue
}
// Sort entries by count descending and take top N
const sortedEntries = Array.from(metricCounters.entries())
.sort(([, a], [, b]) => b - a)
.slice(0, topN)
// Format for logging
const topEntries = sortedEntries.map(([key, count]) => ({ key, count }))
logger.info('📊 Top entries for metric', {
metric,
topN,
entries: topEntries,
totalKeys: metricCounters.size,
})
}
// Reset all counters
this.counters.clear()
}
/**
* Get the current count for a specific metric and key (useful for testing)
*/
public getCount(metric: string, key: string): number {
return this.counters.get(metric)?.get(key) ?? 0
}
/**
* Get all metrics being tracked
*/
public getMetrics(): string[] {
return Array.from(this.counters.keys())
}
}