mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat: run replay compression in a worker (#40889)
roll out decompression in a web worker and measure the effect (behind a flag) 
This commit is contained in:
@@ -42,10 +42,10 @@ await buildInParallel(
|
||||
...common,
|
||||
},
|
||||
{
|
||||
name: 'Test Worker',
|
||||
entryPoints: ['src/scenes/session-recordings/player/testWorker.ts'],
|
||||
name: 'Decompression Worker',
|
||||
entryPoints: ['src/scenes/session-recordings/player/snapshot-processing/decompressionWorker.ts'],
|
||||
format: 'esm',
|
||||
outfile: path.resolve(__dirname, 'dist', 'testWorker.js'),
|
||||
outfile: path.resolve(__dirname, 'dist', 'decompressionWorker.js'),
|
||||
...common,
|
||||
},
|
||||
{
|
||||
|
||||
@@ -319,6 +319,7 @@ export const FEATURE_FLAGS = {
|
||||
COHORT_CALCULATION_HISTORY: 'cohort-calculation-history', // owner: @gustavo #team-feature-flags
|
||||
EXPERIMENTS_HIDE_STOP_BUTTON: 'experiments-hide-stop-button', // owner: @jurajmajerik #team-experiments
|
||||
REPLAY_CLIENT_SIDE_DECOMPRESSION: 'replay-client-side-decompression', // owner: @pauldambra #team-replay
|
||||
REPLAY_DECOMPRESSION_WORKER: 'replay-decompression-worker', // owner: @pauldambra #team-replay
|
||||
COHORT_CALCULATION_CHUNKED: 'cohort-calculation-chunked', // owner: @gustavo #team-feature-flags
|
||||
EXPERIMENTS_USE_NEW_QUERY_BUILDER: 'experiments-use-new-query-builder', // owner: @andehen #team-experiments
|
||||
EXPERIMENT_AI_SUMMARY: 'experiment-ai-summary', // owner: @jurajmajerik #team-experiments
|
||||
|
||||
@@ -1,87 +0,0 @@
|
||||
/* eslint-disable no-console */
|
||||
import type { TestWorkerMessage, TestWorkerResponse } from './testWorker'
|
||||
|
||||
export class TestWorkerManager {
|
||||
private worker: Worker | null = null
|
||||
private workerBlobUrl: string | null = null
|
||||
private messageId = 0
|
||||
|
||||
async initialize(): Promise<void> {
|
||||
try {
|
||||
this.worker = await this.createWorker()
|
||||
|
||||
this.worker.addEventListener('message', (event: MessageEvent<TestWorkerResponse>) => {
|
||||
const { type, originalMessage, amendedMessage } = event.data
|
||||
|
||||
if (type === 'response') {
|
||||
console.log('[TestWorkerManager] Received response from worker')
|
||||
console.log('[TestWorkerManager] Original:', originalMessage)
|
||||
console.log('[TestWorkerManager] Amended:', amendedMessage)
|
||||
}
|
||||
})
|
||||
|
||||
this.worker.addEventListener('error', (error) => {
|
||||
console.error('[TestWorkerManager] Worker error:', error)
|
||||
})
|
||||
|
||||
this.sendTestMessage('Hello from session replay player!')
|
||||
} catch (error) {
|
||||
console.error('[TestWorkerManager] Failed to initialize worker:', error)
|
||||
}
|
||||
}
|
||||
|
||||
private async createWorker(): Promise<Worker> {
|
||||
// Load the built worker file from /static/testWorker.js
|
||||
// The worker is built by esbuild (see build.mjs) in both dev and prod
|
||||
// In dev, esbuild watch mode rebuilds it when testWorker.ts changes
|
||||
// Workers must be same-origin, so we use Django's origin (localhost:8010)
|
||||
// not Vite's origin (localhost:8234)
|
||||
const workerUrl = '/static/testWorker.js'
|
||||
return new Worker(workerUrl, { type: 'module' })
|
||||
}
|
||||
|
||||
sendTestMessage(message: string): void {
|
||||
if (!this.worker) {
|
||||
console.warn('[TestWorkerManager] Worker not initialized')
|
||||
return
|
||||
}
|
||||
|
||||
const msg: TestWorkerMessage = {
|
||||
type: 'test',
|
||||
message,
|
||||
}
|
||||
|
||||
console.log('[TestWorkerManager] Sending message to worker:', message)
|
||||
this.worker.postMessage(msg)
|
||||
this.messageId++
|
||||
}
|
||||
|
||||
terminate(): void {
|
||||
if (this.worker) {
|
||||
console.log('[TestWorkerManager] Terminating worker')
|
||||
this.worker.terminate()
|
||||
this.worker = null
|
||||
}
|
||||
|
||||
if (this.workerBlobUrl) {
|
||||
URL.revokeObjectURL(this.workerBlobUrl)
|
||||
this.workerBlobUrl = null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let workerManager: TestWorkerManager | null = null
|
||||
|
||||
export function getTestWorkerManager(): TestWorkerManager {
|
||||
if (!workerManager) {
|
||||
workerManager = new TestWorkerManager()
|
||||
}
|
||||
return workerManager
|
||||
}
|
||||
|
||||
export function terminateTestWorker(): void {
|
||||
if (workerManager) {
|
||||
workerManager.terminate()
|
||||
workerManager = null
|
||||
}
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
export class TestWorkerManager {
|
||||
async initialize(): Promise<void> {
|
||||
// No-op in tests
|
||||
}
|
||||
|
||||
sendTestMessage(): void {
|
||||
// No-op in tests
|
||||
}
|
||||
|
||||
terminate(): void {
|
||||
// No-op in tests
|
||||
}
|
||||
}
|
||||
|
||||
let workerManager: TestWorkerManager | null = null
|
||||
|
||||
export function getTestWorkerManager(): TestWorkerManager {
|
||||
if (!workerManager) {
|
||||
workerManager = new TestWorkerManager()
|
||||
}
|
||||
return workerManager
|
||||
}
|
||||
|
||||
export function terminateTestWorker(): void {
|
||||
if (workerManager) {
|
||||
workerManager.terminate()
|
||||
workerManager = null
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,7 @@ import recordingEventsJson from '../../__mocks__/recording_events_query'
|
||||
import { recordingMetaJson } from '../../__mocks__/recording_meta'
|
||||
import { snapshotsAsJSONLines } from '../../__mocks__/recording_snapshots'
|
||||
|
||||
jest.mock('../TestWorkerManager')
|
||||
jest.mock('../snapshot-processing/DecompressionWorkerManager')
|
||||
|
||||
export const BLOB_SOURCE_V2: SessionRecordingSnapshotSource = {
|
||||
source: 'blob_v2',
|
||||
|
||||
@@ -13,7 +13,7 @@ import recordingEventsJson from '../../__mocks__/recording_events_query'
|
||||
import { recordingMetaJson } from '../../__mocks__/recording_meta'
|
||||
import { snapshotsAsJSONLines } from '../../__mocks__/recording_snapshots'
|
||||
|
||||
jest.mock('../TestWorkerManager')
|
||||
jest.mock('../snapshot-processing/DecompressionWorkerManager')
|
||||
|
||||
const playerProps = { sessionRecordingId: '1', playerKey: 'playlist' }
|
||||
|
||||
|
||||
@@ -113,6 +113,13 @@ describe('sessionRecordingDataCoordinatorLogic performance', () => {
|
||||
const durations: number[] = []
|
||||
const iterations = 10
|
||||
|
||||
// Warm up: initialize DecompressionWorkerManager singleton before timing
|
||||
setupLogic()
|
||||
await expectLogic(logic, () => {
|
||||
logic.actions.loadSnapshots()
|
||||
}).toFinishAllListeners()
|
||||
logic.unmount()
|
||||
|
||||
for (let i = 0; i < iterations; i++) {
|
||||
setupLogic()
|
||||
|
||||
@@ -146,7 +153,7 @@ describe('sessionRecordingDataCoordinatorLogic performance', () => {
|
||||
const variance = durations.reduce((a, b) => a + Math.pow(b - averageDuration, 2), 0) / iterations
|
||||
const stdDev = Math.sqrt(variance)
|
||||
|
||||
expect(averageDuration).toBeLessThan(110)
|
||||
expect(averageDuration).toBeLessThan(130)
|
||||
expect(stdDev).toBeLessThan(100)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -25,7 +25,7 @@ import {
|
||||
} from './__mocks__/test-setup'
|
||||
import { snapshotDataLogic } from './snapshotDataLogic'
|
||||
|
||||
jest.mock('./TestWorkerManager')
|
||||
jest.mock('./snapshot-processing/DecompressionWorkerManager')
|
||||
|
||||
describe('sessionRecordingPlayerLogic', () => {
|
||||
let logic: ReturnType<typeof sessionRecordingPlayerLogic.build>
|
||||
|
||||
@@ -45,7 +45,6 @@ import { AvailableFeature, ExporterFormat, RecordingSegment, SessionPlayerData,
|
||||
import { ExportedSessionRecordingFileV2 } from '../file-playback/types'
|
||||
import type { sessionRecordingsPlaylistLogicType } from '../playlist/sessionRecordingsPlaylistLogicType'
|
||||
import { sessionRecordingEventUsageLogic } from '../sessionRecordingEventUsageLogic'
|
||||
import { getTestWorkerManager, terminateTestWorker } from './TestWorkerManager'
|
||||
import { playerCommentOverlayLogic } from './commenting/playerFrameCommentOverlayLogic'
|
||||
import { playerCommentOverlayLogicType } from './commenting/playerFrameCommentOverlayLogicType'
|
||||
import { playerSettingsLogic } from './playerSettingsLogic'
|
||||
@@ -1900,12 +1899,6 @@ export const sessionRecordingPlayerLogic = kea<sessionRecordingPlayerLogicType>(
|
||||
values.player?.replayer?.destroy()
|
||||
actions.setPlayer(null)
|
||||
|
||||
try {
|
||||
terminateTestWorker()
|
||||
} catch (error) {
|
||||
console.warn('[SessionRecordingPlayerLogic] Failed to terminate test worker:', error)
|
||||
}
|
||||
|
||||
const playTimeMs = values.playingTimeTracking.watchTime || 0
|
||||
const summaryAnalytics: RecordingViewedSummaryAnalytics = {
|
||||
viewed_time_ms: cache.openTime !== undefined ? performance.now() - cache.openTime : undefined,
|
||||
@@ -1942,13 +1935,6 @@ export const sessionRecordingPlayerLogic = kea<sessionRecordingPlayerLogicType>(
|
||||
return () => document.removeEventListener('fullscreenchange', fullScreenListener)
|
||||
}, 'fullscreenListener')
|
||||
|
||||
try {
|
||||
const testWorker = getTestWorkerManager()
|
||||
void testWorker.initialize()
|
||||
} catch (error) {
|
||||
console.warn('[SessionRecordingPlayerLogic] Failed to initialize test worker:', error)
|
||||
}
|
||||
|
||||
if (props.sessionRecordingId) {
|
||||
actions.loadRecordingData()
|
||||
}
|
||||
|
||||
@@ -5,63 +5,89 @@ import {
|
||||
} from './DecompressionWorkerManager'
|
||||
|
||||
describe('DecompressionWorkerManager', () => {
|
||||
let manager: DecompressionWorkerManager
|
||||
describe.each([
|
||||
['main-thread mode', false],
|
||||
// Worker mode requires a real browser environment with Worker support
|
||||
// Skip for now since Jest runs in Node.js environment
|
||||
// ['worker mode', true],
|
||||
])('%s', (_modeName, useWorker) => {
|
||||
let manager: DecompressionWorkerManager
|
||||
|
||||
beforeEach(() => {
|
||||
manager = new DecompressionWorkerManager()
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
manager.terminate()
|
||||
})
|
||||
|
||||
describe('decompress', () => {
|
||||
it('decompresses data successfully', async () => {
|
||||
const data = new Uint8Array([1, 2, 3, 4, 5])
|
||||
const result = await manager.decompress(data)
|
||||
|
||||
expect(result).toBeInstanceOf(Uint8Array)
|
||||
expect(result).toEqual(data)
|
||||
beforeEach(() => {
|
||||
manager = new DecompressionWorkerManager(useWorker)
|
||||
})
|
||||
|
||||
it('handles multiple sequential decompressions', async () => {
|
||||
const data1 = new Uint8Array([1, 2, 3])
|
||||
const data2 = new Uint8Array([4, 5, 6])
|
||||
const data3 = new Uint8Array([7, 8, 9])
|
||||
|
||||
const result1 = await manager.decompress(data1)
|
||||
const result2 = await manager.decompress(data2)
|
||||
const result3 = await manager.decompress(data3)
|
||||
|
||||
expect(result1).toEqual(data1)
|
||||
expect(result2).toEqual(data2)
|
||||
expect(result3).toEqual(data3)
|
||||
afterEach(() => {
|
||||
manager.terminate()
|
||||
})
|
||||
|
||||
it('handles multiple concurrent decompressions', async () => {
|
||||
const data1 = new Uint8Array([1, 2, 3])
|
||||
const data2 = new Uint8Array([4, 5, 6])
|
||||
const data3 = new Uint8Array([7, 8, 9])
|
||||
describe('decompress', () => {
|
||||
it('decompresses data successfully', async () => {
|
||||
const data = new Uint8Array([1, 2, 3, 4, 5])
|
||||
const result = await manager.decompress(data)
|
||||
|
||||
const [result1, result2, result3] = await Promise.all([
|
||||
manager.decompress(data1),
|
||||
manager.decompress(data2),
|
||||
manager.decompress(data3),
|
||||
])
|
||||
expect(result).toBeInstanceOf(Uint8Array)
|
||||
expect(result).toEqual(data)
|
||||
})
|
||||
|
||||
expect(result1).toEqual(data1)
|
||||
expect(result2).toEqual(data2)
|
||||
expect(result3).toEqual(data3)
|
||||
it('handles multiple sequential decompressions', async () => {
|
||||
const data1 = new Uint8Array([1, 2, 3])
|
||||
const data2 = new Uint8Array([4, 5, 6])
|
||||
const data3 = new Uint8Array([7, 8, 9])
|
||||
|
||||
const result1 = await manager.decompress(data1)
|
||||
const result2 = await manager.decompress(data2)
|
||||
const result3 = await manager.decompress(data3)
|
||||
|
||||
expect(result1).toEqual(data1)
|
||||
expect(result2).toEqual(data2)
|
||||
expect(result3).toEqual(data3)
|
||||
})
|
||||
|
||||
it('handles multiple concurrent decompressions', async () => {
|
||||
const data1 = new Uint8Array([1, 2, 3])
|
||||
const data2 = new Uint8Array([4, 5, 6])
|
||||
const data3 = new Uint8Array([7, 8, 9])
|
||||
|
||||
const [result1, result2, result3] = await Promise.all([
|
||||
manager.decompress(data1),
|
||||
manager.decompress(data2),
|
||||
manager.decompress(data3),
|
||||
])
|
||||
|
||||
expect(result1).toEqual(data1)
|
||||
expect(result2).toEqual(data2)
|
||||
expect(result3).toEqual(data3)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('terminate', () => {
|
||||
it('terminates the manager successfully', () => {
|
||||
expect(() => manager.terminate()).not.toThrow()
|
||||
describe('terminate', () => {
|
||||
it('terminates the manager successfully', () => {
|
||||
expect(() => manager.terminate()).not.toThrow()
|
||||
})
|
||||
})
|
||||
|
||||
describe('stats', () => {
|
||||
it('tracks decompression stats', async () => {
|
||||
const data = new Uint8Array([1, 2, 3, 4, 5])
|
||||
await manager.decompress(data)
|
||||
await manager.decompress(data)
|
||||
|
||||
const stats = manager.getStats()
|
||||
const modeStats = useWorker ? stats.worker : stats.mainThread
|
||||
|
||||
expect(modeStats.count).toBe(2)
|
||||
expect(modeStats.totalSize).toBe(10)
|
||||
expect(modeStats.totalTime).toBeGreaterThan(0)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('singleton functions', () => {
|
||||
afterEach(() => {
|
||||
terminateDecompressionWorker()
|
||||
})
|
||||
|
||||
it('getDecompressionWorkerManager returns singleton instance', () => {
|
||||
const instance1 = getDecompressionWorkerManager()
|
||||
const instance2 = getDecompressionWorkerManager()
|
||||
@@ -76,5 +102,31 @@ describe('DecompressionWorkerManager', () => {
|
||||
|
||||
expect(instance1).not.toBe(instance2)
|
||||
})
|
||||
|
||||
it('recreates instance when useWorker config changes', () => {
|
||||
const instance1 = getDecompressionWorkerManager(false)
|
||||
const instance2 = getDecompressionWorkerManager(true)
|
||||
|
||||
expect(instance1).not.toBe(instance2)
|
||||
})
|
||||
|
||||
it('recreates instance when posthog config changes', () => {
|
||||
const mockPosthog1 = {} as any
|
||||
const mockPosthog2 = {} as any
|
||||
|
||||
const instance1 = getDecompressionWorkerManager(false, mockPosthog1)
|
||||
const instance2 = getDecompressionWorkerManager(false, mockPosthog2)
|
||||
|
||||
expect(instance1).not.toBe(instance2)
|
||||
})
|
||||
|
||||
it('returns same instance when config has not changed', () => {
|
||||
const mockPosthog = {} as any
|
||||
|
||||
const instance1 = getDecompressionWorkerManager(false, mockPosthog)
|
||||
const instance2 = getDecompressionWorkerManager(false, mockPosthog)
|
||||
|
||||
expect(instance1).toBe(instance2)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,48 +1,214 @@
|
||||
import type { PostHog } from 'posthog-js'
|
||||
import snappyInit, { decompress_raw } from 'snappy-wasm'
|
||||
|
||||
export class DecompressionWorkerManager {
|
||||
private readyPromise: Promise<void>
|
||||
private snappyInitialized = false
|
||||
import type { DecompressionRequest, DecompressionResponse } from './decompressionWorker'
|
||||
|
||||
constructor() {
|
||||
this.readyPromise = this.initSnappy()
|
||||
interface PendingRequest {
|
||||
resolve: (data: Uint8Array) => void
|
||||
reject: (error: Error) => void
|
||||
startTime: number
|
||||
dataSize: number
|
||||
}
|
||||
|
||||
interface DecompressionStats {
|
||||
totalTime: number
|
||||
count: number
|
||||
totalSize: number
|
||||
}
|
||||
|
||||
export class DecompressionWorkerManager {
|
||||
private readonly readyPromise: Promise<void>
|
||||
private snappyInitialized = false
|
||||
private worker: Worker | null = null
|
||||
private messageId = 0
|
||||
private pendingRequests = new Map<number, PendingRequest>()
|
||||
private stats: { worker: DecompressionStats; mainThread: DecompressionStats } = {
|
||||
worker: { totalTime: 0, count: 0, totalSize: 0 },
|
||||
mainThread: { totalTime: 0, count: 0, totalSize: 0 },
|
||||
}
|
||||
|
||||
constructor(
|
||||
private readonly useWorker: boolean = false,
|
||||
private readonly posthog?: PostHog
|
||||
) {
|
||||
this.readyPromise = this.useWorker ? this.initWorker() : this.initSnappy()
|
||||
}
|
||||
|
||||
private async initWorker(): Promise<void> {
|
||||
try {
|
||||
this.worker = new Worker('/static/decompressionWorker.js', { type: 'module' })
|
||||
|
||||
// Attach ready handler first to avoid race condition
|
||||
const readyPromise = Promise.race([
|
||||
new Promise<void>((resolve) => {
|
||||
const handler = (event: MessageEvent): void => {
|
||||
if (event.data.type === 'ready') {
|
||||
this.worker?.removeEventListener('message', handler)
|
||||
resolve()
|
||||
}
|
||||
}
|
||||
this.worker?.addEventListener('message', handler)
|
||||
}),
|
||||
new Promise<void>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Worker initialization timeout')), 5000)
|
||||
),
|
||||
])
|
||||
|
||||
this.worker.addEventListener('message', (event: MessageEvent) => {
|
||||
const data = event.data
|
||||
|
||||
// Ignore ready message (handled separately during initialization)
|
||||
if ('type' in data && data.type === 'ready') {
|
||||
return
|
||||
}
|
||||
|
||||
const { id, decompressedData, error } = data as DecompressionResponse
|
||||
|
||||
const pending = this.pendingRequests.get(id)
|
||||
if (!pending) {
|
||||
return
|
||||
}
|
||||
|
||||
this.pendingRequests.delete(id)
|
||||
|
||||
const duration = performance.now() - pending.startTime
|
||||
this.updateStats('worker', duration, pending.dataSize)
|
||||
|
||||
if (error || !decompressedData) {
|
||||
pending.reject(new Error(error || 'Decompression failed'))
|
||||
} else {
|
||||
pending.resolve(decompressedData)
|
||||
}
|
||||
})
|
||||
|
||||
this.worker.addEventListener('error', (error) => {
|
||||
console.error('[DecompressionWorkerManager] Worker error:', error)
|
||||
this.pendingRequests.forEach((pending) => {
|
||||
pending.reject(new Error(`Worker error: ${error.message}`))
|
||||
})
|
||||
this.pendingRequests.clear()
|
||||
})
|
||||
|
||||
await readyPromise
|
||||
} catch (error) {
|
||||
console.error('[DecompressionWorkerManager] Failed to initialize worker:', error)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private async initSnappy(): Promise<void> {
|
||||
if (this.snappyInitialized) {
|
||||
return
|
||||
}
|
||||
// Prevent multiple concurrent initializations
|
||||
if (!this.readyPromise) {
|
||||
this.readyPromise = snappyInit().then(() => {
|
||||
this.snappyInitialized = true
|
||||
})
|
||||
}
|
||||
await this.readyPromise
|
||||
await snappyInit()
|
||||
this.snappyInitialized = true
|
||||
}
|
||||
|
||||
async decompress(compressedData: Uint8Array): Promise<Uint8Array> {
|
||||
await this.readyPromise
|
||||
|
||||
if (this.useWorker && this.worker) {
|
||||
return this.decompressWithWorker(compressedData)
|
||||
}
|
||||
return this.decompressMainThread(compressedData)
|
||||
}
|
||||
|
||||
private async decompressWithWorker(compressedData: Uint8Array): Promise<Uint8Array> {
|
||||
const id = this.messageId++
|
||||
const startTime = performance.now()
|
||||
|
||||
return new Promise<Uint8Array>((resolve, reject) => {
|
||||
this.pendingRequests.set(id, {
|
||||
resolve,
|
||||
reject,
|
||||
startTime,
|
||||
dataSize: compressedData.length,
|
||||
})
|
||||
|
||||
const message: DecompressionRequest = {
|
||||
id,
|
||||
compressedData,
|
||||
}
|
||||
|
||||
this.worker!.postMessage(message, { transfer: [compressedData.buffer] })
|
||||
})
|
||||
}
|
||||
|
||||
private async decompressMainThread(compressedData: Uint8Array): Promise<Uint8Array> {
|
||||
const startTime = performance.now()
|
||||
const dataSize = compressedData.length
|
||||
|
||||
try {
|
||||
return decompress_raw(compressedData)
|
||||
const result = decompress_raw(compressedData)
|
||||
const duration = performance.now() - startTime
|
||||
this.updateStats('main-thread', duration, dataSize)
|
||||
return result
|
||||
} catch (error) {
|
||||
console.error('Decompression error:', error)
|
||||
throw error instanceof Error ? error : new Error('Unknown decompression error')
|
||||
}
|
||||
}
|
||||
|
||||
private getStatsForMethod(method: 'worker' | 'main-thread'): DecompressionStats {
|
||||
return method === 'worker' ? this.stats.worker : this.stats.mainThread
|
||||
}
|
||||
|
||||
private updateStats(method: 'worker' | 'main-thread', duration: number, dataSize: number): void {
|
||||
const stats = this.getStatsForMethod(method)
|
||||
stats.totalTime += duration
|
||||
stats.count += 1
|
||||
stats.totalSize += dataSize
|
||||
this.reportTiming(method, duration, dataSize)
|
||||
}
|
||||
|
||||
private reportTiming(method: 'worker' | 'main-thread', durationMs: number, sizeBytes: number): void {
|
||||
if (!this.posthog) {
|
||||
return
|
||||
}
|
||||
|
||||
const stats = this.getStatsForMethod(method)
|
||||
|
||||
this.posthog.capture('replay_decompression_timing', {
|
||||
method,
|
||||
duration_ms: durationMs,
|
||||
size_bytes: sizeBytes,
|
||||
aggregate_total_time_ms: stats.totalTime,
|
||||
aggregate_count: stats.count,
|
||||
aggregate_total_size_bytes: stats.totalSize,
|
||||
aggregate_avg_time_ms: stats.count > 0 ? stats.totalTime / stats.count : 0,
|
||||
})
|
||||
}
|
||||
|
||||
getStats(): typeof this.stats {
|
||||
return { ...this.stats }
|
||||
}
|
||||
|
||||
terminate(): void {
|
||||
// No cleanup needed for direct snappy usage
|
||||
if (this.worker) {
|
||||
this.worker.terminate()
|
||||
this.worker = null
|
||||
}
|
||||
|
||||
this.pendingRequests.forEach((pending) => {
|
||||
pending.reject(new Error('Worker terminated'))
|
||||
})
|
||||
this.pendingRequests.clear()
|
||||
}
|
||||
}
|
||||
|
||||
// Singleton instance
|
||||
let workerManager: DecompressionWorkerManager | null = null
|
||||
let currentConfig: { useWorker?: boolean; posthog?: PostHog } | null = null
|
||||
|
||||
export function getDecompressionWorkerManager(useWorker?: boolean, posthog?: PostHog): DecompressionWorkerManager {
|
||||
const configChanged = currentConfig && (currentConfig.useWorker !== useWorker || currentConfig.posthog !== posthog)
|
||||
|
||||
if (configChanged) {
|
||||
terminateDecompressionWorker()
|
||||
}
|
||||
|
||||
export function getDecompressionWorkerManager(): DecompressionWorkerManager {
|
||||
if (!workerManager) {
|
||||
workerManager = new DecompressionWorkerManager()
|
||||
workerManager = new DecompressionWorkerManager(useWorker, posthog)
|
||||
currentConfig = { useWorker, posthog }
|
||||
}
|
||||
return workerManager
|
||||
}
|
||||
@@ -52,4 +218,5 @@ export function terminateDecompressionWorker(): void {
|
||||
workerManager.terminate()
|
||||
workerManager = null
|
||||
}
|
||||
currentConfig = null
|
||||
}
|
||||
|
||||
@@ -1,7 +1,15 @@
|
||||
export class DecompressionWorkerManager {
|
||||
private mockStats = {
|
||||
worker: { totalTime: 0, count: 0, totalSize: 0 },
|
||||
mainThread: { totalTime: 0, count: 0, totalSize: 0 },
|
||||
}
|
||||
|
||||
async decompress(compressedData: Uint8Array): Promise<Uint8Array> {
|
||||
// Mock implementation for tests - just return the data as-is
|
||||
// In real tests that need actual decompression, they can mock this method
|
||||
this.mockStats.mainThread.count++
|
||||
this.mockStats.mainThread.totalSize += compressedData.length
|
||||
this.mockStats.mainThread.totalTime += 1
|
||||
return compressedData
|
||||
}
|
||||
|
||||
@@ -9,16 +17,28 @@ export class DecompressionWorkerManager {
|
||||
return Promise.all(compressedBlocks.map((block) => this.decompress(block)))
|
||||
}
|
||||
|
||||
getStats(): typeof this.mockStats {
|
||||
return { ...this.mockStats }
|
||||
}
|
||||
|
||||
terminate(): void {
|
||||
// No-op for tests
|
||||
}
|
||||
}
|
||||
|
||||
let workerManager: DecompressionWorkerManager | null = null
|
||||
let currentConfig: { useWorker?: boolean; posthog?: any } | null = null
|
||||
|
||||
export function getDecompressionWorkerManager(useWorker?: boolean, posthog?: any): DecompressionWorkerManager {
|
||||
const configChanged = currentConfig && (currentConfig.useWorker !== useWorker || currentConfig.posthog !== posthog)
|
||||
|
||||
if (configChanged) {
|
||||
terminateDecompressionWorker()
|
||||
}
|
||||
|
||||
export function getDecompressionWorkerManager(): DecompressionWorkerManager {
|
||||
if (!workerManager) {
|
||||
workerManager = new DecompressionWorkerManager()
|
||||
currentConfig = { useWorker, posthog }
|
||||
}
|
||||
return workerManager
|
||||
}
|
||||
@@ -28,4 +48,5 @@ export function terminateDecompressionWorker(): void {
|
||||
workerManager.terminate()
|
||||
workerManager = null
|
||||
}
|
||||
currentConfig = null
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ self.addEventListener('message', async (event: MessageEvent<DecompressionRequest
|
||||
try {
|
||||
await initSnappy()
|
||||
const decompressed = decompress_raw(compressedData)
|
||||
self.postMessage({ id, decompressedData: decompressed })
|
||||
self.postMessage({ id, decompressedData: decompressed }, { transfer: [decompressed.buffer] })
|
||||
} catch (error) {
|
||||
console.error('Decompression error:', error)
|
||||
self.postMessage({
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import posthog from 'posthog-js'
|
||||
import posthog, { PostHog } from 'posthog-js'
|
||||
|
||||
import posthogEE from '@posthog/ee/exports'
|
||||
import { EventType, eventWithTime, fullSnapshotEvent } from '@posthog/rrweb-types'
|
||||
@@ -373,8 +373,12 @@ function isLengthPrefixedSnappy(uint8Data: Uint8Array): boolean {
|
||||
return true
|
||||
}
|
||||
|
||||
const lengthPrefixedSnappyDecompress = async (uint8Data: Uint8Array): Promise<string> => {
|
||||
const workerManager = getDecompressionWorkerManager()
|
||||
const lengthPrefixedSnappyDecompress = async (
|
||||
uint8Data: Uint8Array,
|
||||
useWorker?: boolean,
|
||||
posthogInstance?: PostHog
|
||||
): Promise<string> => {
|
||||
const workerManager = getDecompressionWorkerManager(useWorker, posthogInstance)
|
||||
const decompressedParts: string[] = []
|
||||
let offset = 0
|
||||
|
||||
@@ -402,7 +406,7 @@ const lengthPrefixedSnappyDecompress = async (uint8Data: Uint8Array): Promise<st
|
||||
break
|
||||
}
|
||||
|
||||
const compressedBlock = uint8Data.slice(offset, offset + length)
|
||||
const compressedBlock = uint8Data.subarray(offset, offset + length)
|
||||
offset += length
|
||||
|
||||
const decompressedData = await workerManager.decompress(compressedBlock)
|
||||
@@ -416,8 +420,12 @@ const lengthPrefixedSnappyDecompress = async (uint8Data: Uint8Array): Promise<st
|
||||
return decompressedParts.join('\n')
|
||||
}
|
||||
|
||||
const rawSnappyDecompress = async (uint8Data: Uint8Array): Promise<string> => {
|
||||
const workerManager = getDecompressionWorkerManager()
|
||||
const rawSnappyDecompress = async (
|
||||
uint8Data: Uint8Array,
|
||||
useWorker?: boolean,
|
||||
posthogInstance?: PostHog
|
||||
): Promise<string> => {
|
||||
const workerManager = getDecompressionWorkerManager(useWorker, posthogInstance)
|
||||
|
||||
const decompressedData = await workerManager.decompress(uint8Data)
|
||||
|
||||
@@ -427,7 +435,9 @@ const rawSnappyDecompress = async (uint8Data: Uint8Array): Promise<string> => {
|
||||
|
||||
export const parseEncodedSnapshots = async (
|
||||
items: (RecordingSnapshot | EncodedRecordingSnapshot | string)[] | ArrayBuffer | Uint8Array,
|
||||
sessionId: string
|
||||
sessionId: string,
|
||||
useWorker?: boolean,
|
||||
posthogInstance?: PostHog
|
||||
): Promise<RecordingSnapshot[]> => {
|
||||
if (!postHogEEModule) {
|
||||
postHogEEModule = await posthogEE()
|
||||
@@ -439,10 +449,10 @@ export const parseEncodedSnapshots = async (
|
||||
|
||||
if (isLengthPrefixedSnappy(uint8Data)) {
|
||||
try {
|
||||
const combinedText = await lengthPrefixedSnappyDecompress(uint8Data)
|
||||
const combinedText = await lengthPrefixedSnappyDecompress(uint8Data, useWorker, posthogInstance)
|
||||
|
||||
const lines = combinedText.split('\n').filter((line) => line.trim().length > 0)
|
||||
return parseEncodedSnapshots(lines, sessionId)
|
||||
return parseEncodedSnapshots(lines, sessionId, useWorker, posthogInstance)
|
||||
} catch (error) {
|
||||
console.error('Length-prefixed Snappy decompression failed:', error)
|
||||
posthog.captureException(new Error('Failed to decompress length-prefixed snapshot data'), {
|
||||
@@ -455,17 +465,17 @@ export const parseEncodedSnapshots = async (
|
||||
}
|
||||
|
||||
try {
|
||||
const combinedText = await rawSnappyDecompress(uint8Data)
|
||||
const combinedText = await rawSnappyDecompress(uint8Data, useWorker, posthogInstance)
|
||||
|
||||
const lines = combinedText.split('\n').filter((line) => line.trim().length > 0)
|
||||
return parseEncodedSnapshots(lines, sessionId)
|
||||
return parseEncodedSnapshots(lines, sessionId, useWorker, posthogInstance)
|
||||
} catch (error) {
|
||||
try {
|
||||
const textDecoder = new TextDecoder('utf-8')
|
||||
const combinedText = textDecoder.decode(uint8Data)
|
||||
|
||||
const lines = combinedText.split('\n').filter((line) => line.trim().length > 0)
|
||||
return parseEncodedSnapshots(lines, sessionId)
|
||||
return parseEncodedSnapshots(lines, sessionId, useWorker, posthogInstance)
|
||||
} catch (decodeError) {
|
||||
console.error('Failed to decompress or decode binary data:', error, decodeError)
|
||||
posthog.captureException(new Error('Failed to process snapshot data'), {
|
||||
|
||||
@@ -171,10 +171,12 @@ export const snapshotDataLogic = kea<snapshotDataLogicType>([
|
||||
|
||||
const response = await api.recordings.getSnapshots(props.sessionRecordingId, params, headers)
|
||||
|
||||
const useDecompressionWorker =
|
||||
values.featureFlags[FEATURE_FLAGS.REPLAY_DECOMPRESSION_WORKER] === 'test'
|
||||
// sorting is very cheap for already sorted lists
|
||||
const parsedSnapshots = (await parseEncodedSnapshots(response, props.sessionRecordingId)).sort(
|
||||
(a, b) => a.timestamp - b.timestamp
|
||||
)
|
||||
const parsedSnapshots = (
|
||||
await parseEncodedSnapshots(response, props.sessionRecordingId, useDecompressionWorker, posthog)
|
||||
).sort((a, b) => a.timestamp - b.timestamp)
|
||||
// we store the data in the cache because we want to avoid copying this data as much as possible
|
||||
// and kea's immutability means we were copying all the data on every snapshot call
|
||||
cache.snapshotsBySource = cache.snapshotsBySource || {}
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
/* eslint-disable no-console */
|
||||
export interface TestWorkerMessage {
|
||||
type: 'test'
|
||||
message: string
|
||||
}
|
||||
|
||||
export interface TestWorkerResponse {
|
||||
type: 'response'
|
||||
originalMessage: string
|
||||
amendedMessage: string
|
||||
}
|
||||
|
||||
self.addEventListener('message', (event: MessageEvent<TestWorkerMessage>) => {
|
||||
const { type, message } = event.data
|
||||
|
||||
if (type === 'test') {
|
||||
console.log('[TestWorker] Received message:', message)
|
||||
|
||||
const amendedMessage = `${message} (processed by worker)`
|
||||
|
||||
const response: TestWorkerResponse = {
|
||||
type: 'response',
|
||||
originalMessage: message,
|
||||
amendedMessage,
|
||||
}
|
||||
|
||||
self.postMessage(response)
|
||||
}
|
||||
})
|
||||
|
||||
console.log('[TestWorker] Worker initialized and ready')
|
||||
Reference in New Issue
Block a user