feat: decommission the v2_test tables/topics (#37305)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Daniel Escribano <daniel.e@posthog.com>
This commit is contained in:
Paul D'Ambra
2025-09-18 15:36:38 +01:00
committed by GitHub
parent 367435b2ad
commit d3803ff707
24 changed files with 30 additions and 3179 deletions

View File

@@ -57,7 +57,6 @@ SHARDED_TABLES = [
"sharded_raw_sessions",
"sharded_session_replay_embeddings",
"sharded_session_replay_events",
"sharded_session_replay_events_v2_test",
"sharded_sessions",
"sharded_events",
]

View File

@@ -33,7 +33,6 @@ export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW = `${prefix}session_
// write session recording and replay events to ClickHouse
export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS_V2_TEST = `${prefix}clickhouse_session_replay_events_v2_test${suffix}`
// write performance events to ClickHouse
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`

View File

@@ -4,7 +4,6 @@ import { CODES, Message, TopicPartition, TopicPartitionOffset, features, librdka
import { instrumentFn } from '~/common/tracing/tracing-utils'
import { buildIntegerMatcher } from '../../../config/config'
import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS_V2_TEST } from '../../../config/kafka-topics'
import { KafkaConsumer } from '../../../kafka/consumer'
import { KafkaProducerWrapper } from '../../../kafka/producer'
import {
@@ -129,7 +128,7 @@ export class SessionRecordingIngester {
const offsetManager = new KafkaOffsetManager(this.commitOffsets.bind(this), this.topic)
const metadataStore = new SessionMetadataStore(
producer,
this.config.SESSION_RECORDING_V2_REPLAY_EVENTS_KAFKA_TOPIC || KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS_V2_TEST
this.config.SESSION_RECORDING_V2_REPLAY_EVENTS_KAFKA_TOPIC
)
const consoleLogStore = new SessionConsoleLogStore(
producer,

View File

@@ -14,7 +14,7 @@ describe('SessionMetadataStore', () => {
flush: jest.fn().mockResolvedValue(undefined),
} as unknown as jest.Mocked<KafkaProducerWrapper>
store = new SessionMetadataStore(mockProducer, 'clickhouse_session_replay_events_v2_test_test')
store = new SessionMetadataStore(mockProducer, 'clickhouse_session_replay_events')
})
it('should queue events to kafka with correct data', async () => {
@@ -100,7 +100,7 @@ describe('SessionMetadataStore', () => {
expect(mockProducer.queueMessages).toHaveBeenCalledTimes(1)
const queuedMessage = mockProducer.queueMessages.mock.calls[0][0] as TopicMessage
expect(queuedMessage.topic).toBe('clickhouse_session_replay_events_v2_test_test')
expect(queuedMessage.topic).toBe('clickhouse_session_replay_events')
const queuedMessages = queuedMessage.messages
const parsedEvents = queuedMessages.map((msg) => parseJSON(msg.value as string))
@@ -197,7 +197,7 @@ describe('SessionMetadataStore', () => {
it('should handle empty blocks array', async () => {
await store.storeSessionBlocks([])
expect(mockProducer.queueMessages).toHaveBeenCalledWith({
topic: 'clickhouse_session_replay_events_v2_test_test',
topic: 'clickhouse_session_replay_events',
messages: [],
})
expect(mockProducer.flush).toHaveBeenCalledTimes(1)

View File

@@ -1,20 +1,6 @@
from posthog.clickhouse.client.migration_tools import NodeRole, run_sql_with_exceptions
from posthog.session_recordings.sql.session_replay_event_v2_test_sql import (
SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_KAFKA_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_MV_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_WRITABLE_TABLE_SQL,
)
from infi.clickhouse_orm import Operation
operations = [
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_V2_TEST_KAFKA_TABLE_SQL(on_cluster=False)),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_SQL(on_cluster=False)),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_V2_TEST_WRITABLE_TABLE_SQL(on_cluster=False)),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_V2_TEST_MV_SQL(on_cluster=False)),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL(on_cluster=False)),
run_sql_with_exceptions(
SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL(on_cluster=False),
node_roles=[NodeRole.COORDINATOR],
),
operations: list[Operation] = [
# this was used for testing the migration to session replay v2 ingestion
# the migration is kept only for history / numbering
]

View File

@@ -1,34 +1,6 @@
from posthog.clickhouse.client.migration_tools import NodeRole, run_sql_with_exceptions
from posthog.session_recordings.sql.session_replay_event_v2_test_migrations_sql import (
ADD_MISSING_COLUMNS_DISTRIBUTED_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL,
ADD_MISSING_COLUMNS_SHARDED_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL,
ADD_MISSING_COLUMNS_WRITABLE_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL,
DROP_KAFKA_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL,
DROP_SESSION_REPLAY_EVENTS_V2_TEST_MV_TABLE_SQL,
)
from posthog.session_recordings.sql.session_replay_event_v2_test_sql import (
SESSION_REPLAY_EVENTS_V2_TEST_KAFKA_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_MV_SQL,
)
from infi.clickhouse_orm import Operation
operations = [
# First, drop the materialized view so it's no longer pulling from Kafka
run_sql_with_exceptions(DROP_SESSION_REPLAY_EVENTS_V2_TEST_MV_TABLE_SQL(on_cluster=False)),
# Then drop the Kafka table
run_sql_with_exceptions(DROP_KAFKA_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(on_cluster=False)),
# Now we can alter the target tables in the correct order:
# 1. Sharded table (physical storage)
run_sql_with_exceptions(ADD_MISSING_COLUMNS_SHARDED_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(on_cluster=False)),
# 2. Writable table (for writing to sharded table)
run_sql_with_exceptions(ADD_MISSING_COLUMNS_WRITABLE_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(on_cluster=False)),
# 3. Distributed table (for reading)
run_sql_with_exceptions(ADD_MISSING_COLUMNS_DISTRIBUTED_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(on_cluster=False)),
# Also run on coordinator node without the cluster clause
run_sql_with_exceptions(
ADD_MISSING_COLUMNS_DISTRIBUTED_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(on_cluster=False),
node_roles=[NodeRole.COORDINATOR],
),
# Finally, recreate the Kafka table and materialized view with the updated schema
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_V2_TEST_KAFKA_TABLE_SQL(on_cluster=False)),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_V2_TEST_MV_SQL(on_cluster=False)),
operations: list[Operation] = [
# this was used for testing the migration to session replay v2 ingestion
# the migration is kept only for history / numbering
]

View File

@@ -1,8 +1,6 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.session_recordings.sql.session_replay_event_v2_test_migrations_sql import (
REMOVE_SNAPSHOT_SOURCE_LOW_CARDINALITY_SQL,
)
from infi.clickhouse_orm import Operation
operations = [
run_sql_with_exceptions(REMOVE_SNAPSHOT_SOURCE_LOW_CARDINALITY_SQL(on_cluster=True)),
operations: list[Operation] = [
# this was used for testing the migration to session replay v2 ingestion
# the migration is kept only for history / numbering
]

View File

@@ -0,0 +1,13 @@
from posthog.clickhouse.client.connection import NodeRole
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
# NB the order of operations are important here
operations = [
run_sql_with_exceptions("DROP TABLE IF EXISTS session_replay_events_v2_test_mv"),
run_sql_with_exceptions("DROP TABLE IF EXISTS kafka_session_replay_events_v2_test"),
run_sql_with_exceptions("DROP TABLE IF EXISTS sharded_session_replay_events_v2_test"),
run_sql_with_exceptions("DROP TABLE IF EXISTS writable_session_replay_events_v2_test"),
run_sql_with_exceptions(
"DROP TABLE IF EXISTS session_replay_events_v2_test", node_roles=[NodeRole.DATA, NodeRole.COORDINATOR]
),
]

View File

@@ -139,13 +139,6 @@ from posthog.session_recordings.sql.session_replay_event_sql import (
SESSION_REPLAY_EVENTS_TABLE_SQL,
WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
from posthog.session_recordings.sql.session_replay_event_v2_test_sql import (
SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_KAFKA_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_MV_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_WRITABLE_TABLE_SQL,
)
# Queries to create tables, you must pass function, otherwise the table is created before
# objects are mocked and the ambr will go into infinite loop update.
@@ -171,7 +164,6 @@ CREATE_MERGETREE_TABLE_QUERIES = (
PERFORMANCE_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_TABLE_SQL,
WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_SQL,
CHANNEL_DEFINITION_TABLE_SQL,
EXCHANGE_RATE_TABLE_SQL,
SESSIONS_TABLE_SQL,
@@ -198,7 +190,6 @@ CREATE_DISTRIBUTED_TABLE_QUERIES = (
WRITABLE_PERFORMANCE_EVENTS_TABLE_SQL,
DISTRIBUTED_PERFORMANCE_EVENTS_TABLE_SQL,
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL,
WRITABLE_SESSIONS_TABLE_SQL,
WRITABLE_RAW_SESSIONS_TABLE_SQL,
DISTRIBUTED_SESSIONS_TABLE_SQL,
@@ -206,7 +197,6 @@ CREATE_DISTRIBUTED_TABLE_QUERIES = (
WRITABLE_HEATMAPS_TABLE_SQL,
DISTRIBUTED_HEATMAPS_TABLE_SQL,
DISTRIBUTED_SYSTEM_PROCESSES_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_WRITABLE_TABLE_SQL,
)
CREATE_KAFKA_TABLE_QUERIES = (
KAFKA_LOG_ENTRIES_TABLE_SQL,
@@ -227,7 +217,6 @@ CREATE_KAFKA_TABLE_QUERIES = (
KAFKA_APP_METRICS2_TABLE_SQL,
KAFKA_PERFORMANCE_EVENTS_TABLE_SQL,
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_KAFKA_TABLE_SQL,
KAFKA_HEATMAPS_TABLE_SQL,
)
CREATE_MV_TABLE_QUERIES = (
@@ -249,7 +238,6 @@ CREATE_MV_TABLE_QUERIES = (
APP_METRICS2_MV_TABLE_SQL,
PERFORMANCE_EVENTS_TABLE_MV_SQL,
SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_MV_SQL,
SESSIONS_TABLE_MV_SQL,
RAW_SESSIONS_TABLE_MV_SQL,
HEATMAPS_TABLE_MV_SQL,

View File

@@ -40,7 +40,6 @@ KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW = f"{KAFKA_PREFIX}session_recordi
# from recordings consumer to clickhouse
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_replay_events{SUFFIX}"
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_recording_events{SUFFIX}"
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS_V2_TEST = f"{KAFKA_PREFIX}clickhouse_session_replay_events_v2_test{SUFFIX}"
KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT = f"{KAFKA_PREFIX}clickhouse_error_tracking_issue_fingerprint{SUFFIX}"

View File

@@ -69,10 +69,6 @@ from posthog.temporal.salesforce_enrichment import (
ACTIVITIES as SALESFORCE_ENRICHMENT_ACTIVITIES,
WORKFLOWS as SALESFORCE_ENRICHMENT_WORKFLOWS,
)
from posthog.temporal.session_recordings import (
ACTIVITIES as SESSION_RECORDINGS_ACTIVITIES,
WORKFLOWS as SESSION_RECORDINGS_WORKFLOWS,
)
from posthog.temporal.subscriptions import (
ACTIVITIES as SUBSCRIPTION_ACTIVITIES,
WORKFLOWS as SUBSCRIPTION_WORKFLOWS,
@@ -105,7 +101,6 @@ WORKFLOWS_DICT = {
GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_WORKFLOWS
+ DELETE_PERSONS_WORKFLOWS
+ USAGE_REPORTS_WORKFLOWS
+ SESSION_RECORDINGS_WORKFLOWS
+ QUOTA_LIMITING_WORKFLOWS
+ SALESFORCE_ENRICHMENT_WORKFLOWS
+ PRODUCT_ANALYTICS_WORKFLOWS
@@ -126,7 +121,6 @@ ACTIVITIES_DICT = {
GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_ACTIVITIES
+ DELETE_PERSONS_ACTIVITIES
+ USAGE_REPORTS_ACTIVITIES
+ SESSION_RECORDINGS_ACTIVITIES
+ QUOTA_LIMITING_ACTIVITIES
+ SALESFORCE_ENRICHMENT_ACTIVITIES
+ PRODUCT_ANALYTICS_ACTIVITIES

View File

@@ -1,80 +0,0 @@
from posthog.clickhouse.cluster import ON_CLUSTER_CLAUSE
from posthog.session_recordings.sql.session_replay_event_v2_test_sql import SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE
# This migration adds columns that exist in the main session_replay_events table
# but are missing from the v2 test table. All columns are added with default values
# instead of being nullable, to maintain data consistency.
ALTER_SESSION_REPLAY_V2_TEST_ADD_MISSING_COLUMNS = """
ALTER TABLE {table_name} {on_cluster_clause}
ADD COLUMN IF NOT EXISTS first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
ADD COLUMN IF NOT EXISTS all_urls SimpleAggregateFunction(groupUniqArrayArray, Array(String)) DEFAULT [],
ADD COLUMN IF NOT EXISTS click_count SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS keypress_count SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS mouse_activity_count SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS active_milliseconds SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS console_log_count SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS console_warn_count SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS console_error_count SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS size SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS message_count SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS event_count SimpleAggregateFunction(sum, Int64) DEFAULT 0,
ADD COLUMN IF NOT EXISTS snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')),
ADD COLUMN IF NOT EXISTS snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
ADD COLUMN IF NOT EXISTS _timestamp SimpleAggregateFunction(max, DateTime) DEFAULT toDateTime(0)
"""
# Drop the materialized view
DROP_SESSION_REPLAY_EVENTS_V2_TEST_MV_SQL_TEMPLATE = """
DROP TABLE IF EXISTS session_replay_events_v2_test_mv {on_cluster_clause}
"""
# Drop the Kafka table
DROP_KAFKA_SESSION_REPLAY_EVENTS_V2_TEST_SQL_TEMPLATE = """
DROP TABLE IF EXISTS kafka_session_replay_events_v2_test {on_cluster_clause}
"""
# Remove the low cardinality constraint from the snapshot_source column
REMOVE_SNAPSHOT_SOURCE_LOW_CARDINALITY_SQL_TEMPLATE = """
ALTER TABLE {table_name} {on_cluster_clause}
MODIFY COLUMN `snapshot_source` AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC'))
"""
def ADD_MISSING_COLUMNS_DISTRIBUTED_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(on_cluster=True):
return ALTER_SESSION_REPLAY_V2_TEST_ADD_MISSING_COLUMNS.format(
table_name="session_replay_events_v2_test",
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
)
def ADD_MISSING_COLUMNS_WRITABLE_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(on_cluster=True):
return ALTER_SESSION_REPLAY_V2_TEST_ADD_MISSING_COLUMNS.format(
table_name="writable_session_replay_events_v2_test",
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
)
def ADD_MISSING_COLUMNS_SHARDED_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(on_cluster=True):
return ALTER_SESSION_REPLAY_V2_TEST_ADD_MISSING_COLUMNS.format(
table_name=SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE,
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
)
def DROP_SESSION_REPLAY_EVENTS_V2_TEST_MV_TABLE_SQL(on_cluster=True):
return DROP_SESSION_REPLAY_EVENTS_V2_TEST_MV_SQL_TEMPLATE.format(
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
)
def DROP_KAFKA_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(on_cluster=True):
return DROP_KAFKA_SESSION_REPLAY_EVENTS_V2_TEST_SQL_TEMPLATE.format(
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
)
def REMOVE_SNAPSHOT_SOURCE_LOW_CARDINALITY_SQL(on_cluster=True):
return REMOVE_SNAPSHOT_SOURCE_LOW_CARDINALITY_SQL_TEMPLATE.format(
table_name=SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE,
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
)

View File

@@ -1,255 +0,0 @@
"""
Session Replay Events Data Flow:
┌─────────────────────────────────────────────────┐ Raw events from plugin-server
│ Kafka Topic │ Contains individual events with:
│ session_replay_events_v2_test │ - session_id, team_id, distinct_id
└──────────────────────────┬──────────────────────┘ - first_timestamp, last_timestamp
│ - block_url
┌─────────────────────────────────────────────────┐ Kafka Engine Table
│ kafka_session_replay_events_v2_test │ Direct mirror of Kafka topic
│ │ Same schema as Kafka messages
└──────────────────────────┬──────────────────────┘
┌─────────────────────────────────────────────────┐ Materialized View aggregates events:
│ session_replay_events_v2_test_mv │ - Groups by session_id, team_id
└──────────────────────────┬──────────────────────┘ - min(first_timestamp)
│ - max(last_timestamp)
│ - groupUniqArrayArray(block_url)
┌─────────────────────────────────────────────────┐ Distributed Table
│ writable_session_replay_events_v2_test │ Handles writing to sharded table
│ │ Sharded by sipHash64(distinct_id)
└──────────────────────────┬──────────────────────┘
┌─────────────────────────────────────────────────┐ Physical Table (Replicated)
│ sharded_session_replay_events_v2_test │ Stores the actual data
│ │ AggregatingMergeTree engine
└──────────────────────────┬──────────────────────┘
┌─────────────────────────────────────────────────┐ Distributed Table
│ session_replay_events_v2_test │ Used for reading/querying data
└─────────────────────────────────────────────────┘
"""
from django.conf import settings
from posthog.clickhouse.cluster import ON_CLUSTER_CLAUSE
from posthog.clickhouse.kafka_engine import kafka_engine
from posthog.clickhouse.table_engines import AggregatingMergeTree, Distributed, ReplicationScheme
from posthog.kafka_client.topics import KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS_V2_TEST
SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE = "sharded_session_replay_events_v2_test"
def SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_ENGINE():
return AggregatingMergeTree("session_replay_events_v2_test", replication_scheme=ReplicationScheme.SHARDED)
"""
This table is a ClickHouse copy of the events from the Kafka topic.
We first ingest unaggregated events from Kafka, which are then processed by the materialized view
into aggregated session data. For this reason, this table needs a different column setup than
the other tables - it stores individual events with first_timestamp and last_timestamp, which
are later aggregated into min_first_timestamp and max_last_timestamp in the main table.
"""
SESSION_REPLAY_EVENTS_V2_TEST_KAFKA_TABLE_BASE_SQL = """
CREATE TABLE IF NOT EXISTS {table_name} {on_cluster_clause}
(
session_id VARCHAR,
team_id Int64,
distinct_id VARCHAR,
first_timestamp DateTime64(6, 'UTC'),
last_timestamp DateTime64(6, 'UTC'),
block_url String,
first_url Nullable(VARCHAR),
urls Array(String),
click_count Int64,
keypress_count Int64,
mouse_activity_count Int64,
active_milliseconds Int64,
console_log_count Int64,
console_warn_count Int64,
console_error_count Int64,
size Int64,
event_count Int64,
message_count Int64,
snapshot_source LowCardinality(Nullable(String)),
snapshot_library Nullable(String),
) ENGINE = {engine}
"""
"""
Base schema for tables storing aggregated session replay data. Used by:
- writable_session_replay_events_v2_test: Distributed table for writing
- sharded_session_replay_events_v2_test: Physical storage with AggregatingMergeTree engine
- session_replay_events_v2_test: Distributed table for reading
The materialized view (session_replay_events_v2_test_mv) aggregates raw events into this schema,
so any column changes here must be reflected in the materialized view's SELECT statement below.
"""
SESSION_REPLAY_EVENTS_V2_TEST_TABLE_BASE_SQL = """
CREATE TABLE IF NOT EXISTS {table_name} {on_cluster_clause}
(
session_id VARCHAR,
team_id Int64,
distinct_id VARCHAR,
min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')),
max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')),
block_first_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))),
block_last_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))),
block_urls SimpleAggregateFunction(groupArrayArray, Array(String)),
first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
all_urls SimpleAggregateFunction(groupUniqArrayArray, Array(String)),
click_count SimpleAggregateFunction(sum, Int64),
keypress_count SimpleAggregateFunction(sum, Int64),
mouse_activity_count SimpleAggregateFunction(sum, Int64),
active_milliseconds SimpleAggregateFunction(sum, Int64),
console_log_count SimpleAggregateFunction(sum, Int64),
console_warn_count SimpleAggregateFunction(sum, Int64),
console_error_count SimpleAggregateFunction(sum, Int64),
size SimpleAggregateFunction(sum, Int64),
message_count SimpleAggregateFunction(sum, Int64),
event_count SimpleAggregateFunction(sum, Int64),
snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')),
snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
_timestamp SimpleAggregateFunction(max, DateTime)
) ENGINE = {engine}
"""
"""
Base SQL for the materialized view that transforms raw events into aggregated data.
Note: Column types must be explicitly specified in the TO clause because ClickHouse
incorrectly expands some column types during materialized view creation (specifically
LowCardinality(Nullable(String)) gets expanded to just Nullable(String)).
"""
SESSION_REPLAY_EVENTS_V2_TEST_MV_BASE_SQL = """
CREATE MATERIALIZED VIEW IF NOT EXISTS session_replay_events_v2_test_mv {on_cluster_clause}
TO {database}.writable_session_replay_events_v2_test (
`session_id` String,
`team_id` Int64,
`distinct_id` String,
`min_first_timestamp` DateTime64(6, 'UTC'),
`max_last_timestamp` DateTime64(6, 'UTC'),
`block_first_timestamps` SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))),
`block_last_timestamps` SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))),
`block_urls` SimpleAggregateFunction(groupArrayArray, Array(String)),
`first_url` AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
`all_urls` SimpleAggregateFunction(groupUniqArrayArray, Array(String)),
`click_count` SimpleAggregateFunction(sum, Int64),
`keypress_count` SimpleAggregateFunction(sum, Int64),
`mouse_activity_count` SimpleAggregateFunction(sum, Int64),
`active_milliseconds` SimpleAggregateFunction(sum, Int64),
`console_log_count` SimpleAggregateFunction(sum, Int64),
`console_warn_count` SimpleAggregateFunction(sum, Int64),
`console_error_count` SimpleAggregateFunction(sum, Int64),
`size` SimpleAggregateFunction(sum, Int64),
`message_count` SimpleAggregateFunction(sum, Int64),
`event_count` SimpleAggregateFunction(sum, Int64),
`snapshot_source` AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')),
`snapshot_library` AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
`_timestamp` SimpleAggregateFunction(max, DateTime)
)
AS SELECT
session_id,
team_id,
distinct_id,
min(first_timestamp) AS min_first_timestamp,
max(last_timestamp) AS max_last_timestamp,
groupArray(first_timestamp) AS block_first_timestamps,
groupArray(last_timestamp) AS block_last_timestamps,
groupArray(block_url) AS block_urls,
argMinState(first_url, first_timestamp) as first_url,
groupUniqArrayArray(urls) AS all_urls,
sum(click_count) AS click_count,
sum(keypress_count) AS keypress_count,
sum(mouse_activity_count) AS mouse_activity_count,
sum(active_milliseconds) AS active_milliseconds,
sum(console_log_count) AS console_log_count,
sum(console_warn_count) AS console_warn_count,
sum(console_error_count) AS console_error_count,
sum(size) AS size,
sum(message_count) AS message_count,
sum(event_count) AS event_count,
argMinState(snapshot_source, first_timestamp) as snapshot_source,
argMinState(snapshot_library, first_timestamp) as snapshot_library,
max(_timestamp) as _timestamp
FROM {database}.kafka_session_replay_events_v2_test
GROUP BY session_id, team_id, distinct_id
"""
def SESSION_REPLAY_EVENTS_V2_TEST_KAFKA_TABLE_SQL(on_cluster=True):
return SESSION_REPLAY_EVENTS_V2_TEST_KAFKA_TABLE_BASE_SQL.format(
table_name="kafka_session_replay_events_v2_test",
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
engine=kafka_engine(topic=KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS_V2_TEST),
)
def SESSION_REPLAY_EVENTS_V2_TEST_MV_SQL(on_cluster=True):
return SESSION_REPLAY_EVENTS_V2_TEST_MV_BASE_SQL.format(
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
database=settings.CLICKHOUSE_DATABASE,
)
def SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_SQL(on_cluster=True):
# Order by is used by the aggregating merge tree engine to identify candidates to merge
# e.g. toDate(min_first_timestamp) would mean we would have one row per day per session_id
# if CH could completely merge to match the order by.
# It is also used to organise data to make queries faster.
# We want the fewest rows possible but also the fastest queries.
# Since we query by date and not by time, and order by must be in order of increasing cardinality,
# we order by date first, then team_id, then session_id.
# Hopefully, this is a good balance between the two.
return (
SESSION_REPLAY_EVENTS_V2_TEST_TABLE_BASE_SQL
+ """
PARTITION BY toYYYYMM(min_first_timestamp)
ORDER BY (toDate(min_first_timestamp), team_id, session_id)
SETTINGS index_granularity=512
"""
).format(
table_name=SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE,
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
engine=SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_ENGINE(),
)
def SESSION_REPLAY_EVENTS_V2_TEST_WRITABLE_TABLE_SQL(on_cluster=False):
return SESSION_REPLAY_EVENTS_V2_TEST_TABLE_BASE_SQL.format(
table_name="writable_session_replay_events_v2_test",
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
engine=Distributed(
data_table=SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE,
sharding_key="sipHash64(distinct_id)",
),
)
def SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL(on_cluster=False):
return SESSION_REPLAY_EVENTS_V2_TEST_TABLE_BASE_SQL.format(
table_name="session_replay_events_v2_test",
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
engine=Distributed(
data_table=SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE,
sharding_key="sipHash64(distinct_id)",
),
)
def DROP_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL():
return f"DROP TABLE IF EXISTS {SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE} {ON_CLUSTER_CLAUSE(False)}"
def TRUNCATE_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL():
return f"TRUNCATE TABLE IF EXISTS {SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE} {ON_CLUSTER_CLAUSE(False)}"

View File

@@ -1,23 +0,0 @@
from .compare_recording_console_logs_workflow import (
CompareRecordingConsoleLogsWorkflow,
compare_recording_console_logs_activity,
)
from .compare_recording_events_workflow import CompareRecordingSnapshotsWorkflow, compare_recording_snapshots_activity
from .compare_recording_metadata_workflow import CompareRecordingMetadataWorkflow, compare_recording_metadata_activity
from .compare_sampled_recording_events_workflow import (
CompareSampledRecordingEventsWorkflow,
compare_sampled_recording_events_activity,
)
WORKFLOWS = [
CompareRecordingMetadataWorkflow,
CompareRecordingSnapshotsWorkflow,
CompareRecordingConsoleLogsWorkflow,
CompareSampledRecordingEventsWorkflow,
]
ACTIVITIES = [
compare_recording_metadata_activity,
compare_recording_snapshots_activity,
compare_recording_console_logs_activity,
compare_sampled_recording_events_activity,
]

View File

@@ -1,393 +0,0 @@
import json
import typing
import asyncio
import datetime as dt
import dataclasses
import temporalio.common
import temporalio.activity
import temporalio.workflow
from structlog import get_logger
from posthog.clickhouse.client import sync_execute
from posthog.clickhouse.query_tagging import Product, tag_queries
from posthog.temporal.common.base import PostHogWorkflow
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.session_recordings.queries import get_sampled_session_ids
LOGGER = get_logger(__name__)
@dataclasses.dataclass(frozen=True)
class CompareRecordingConsoleLogsActivityInputs:
"""Inputs for the console logs comparison activity."""
started_after: str = dataclasses.field()
started_before: str = dataclasses.field()
sample_size: int = dataclasses.field(default=100)
@property
def properties_to_log(self) -> dict[str, typing.Any]:
return {
"started_after": self.started_after,
"started_before": self.started_before,
"sample_size": self.sample_size,
}
def get_console_logs(
table_name: str,
session_ids: list[tuple[str, int]], # [(session_id, team_id), ...]
) -> dict[tuple[str, int], list[dict]]:
"""Get console logs from the specified table for given session IDs."""
if not session_ids:
return {}
# Format session IDs and team IDs for the IN clause
session_id_tuples = ", ".join([f"('{sid}', {tid})" for sid, tid in session_ids])
query = """
SELECT
log_source_id,
team_id,
timestamp,
level,
message,
instance_id
FROM {table}
WHERE (log_source_id, team_id) IN ({session_id_tuples})
ORDER BY timestamp
"""
results = sync_execute(query.format(table=table_name, session_id_tuples=session_id_tuples))
# Group results by session_id and team_id
logs_by_session: dict[tuple[str, int], list[dict]] = {}
for row in results:
session_key = (str(row[0]), int(row[1])) # (log_source_id, team_id)
log_entry = {"timestamp": row[2], "level": row[3], "message": row[4], "instance_id": row[5]}
if session_key not in logs_by_session:
logs_by_session[session_key] = []
logs_by_session[session_key].append(log_entry)
return logs_by_session
def deduplicate_logs(entries: list[dict]) -> list[dict]:
"""Deduplicate logs using the same logic as session-console-log-recorder.ts."""
seen = set()
deduped = []
for entry in entries:
fingerprint = f"{entry['level']}-{entry['message']}"
if fingerprint not in seen:
deduped.append(entry)
seen.add(fingerprint)
return deduped
def get_console_logs_v1(session_ids: list[tuple[str, int]]) -> dict[tuple[str, int], list[dict]]:
"""Get console logs from v1 storage for given session IDs."""
return get_console_logs("log_entries", session_ids)
def get_console_logs_v2(session_ids: list[tuple[str, int]]) -> dict[tuple[str, int], list[dict]]:
"""Get console logs from v2 storage for given session IDs."""
return get_console_logs("log_entries_v2_test", session_ids)
@temporalio.activity.defn
async def compare_recording_console_logs_activity(inputs: CompareRecordingConsoleLogsActivityInputs) -> None:
"""Compare console logs between v1 and v2 storage for a sample of sessions."""
logger = LOGGER.bind()
start_time = dt.datetime.now()
tag_queries(product=Product.REPLAY)
await logger.ainfo(
"Starting console logs comparison activity",
started_after=inputs.started_after,
started_before=inputs.started_before,
sample_size=inputs.sample_size,
)
async with Heartbeater():
started_after = dt.datetime.fromisoformat(inputs.started_after)
started_before = dt.datetime.fromisoformat(inputs.started_before)
# Get sample of session IDs
session_ids = await asyncio.to_thread(
get_sampled_session_ids,
started_after,
started_before,
inputs.sample_size,
)
# Fetch raw logs from both systems
v1_logs_raw, v2_logs_raw = await asyncio.gather(
asyncio.to_thread(get_console_logs_v1, session_ids),
asyncio.to_thread(get_console_logs_v2, session_ids),
)
# Track duplication stats
v1_total_raw = 0
v1_total_deduped = 0
v2_total_raw = 0
v2_total_deduped = 0
# Deduplicate logs and track stats
v1_logs = {}
v2_logs = {}
for session_key, raw_entries in v1_logs_raw.items():
v1_total_raw += len(raw_entries)
deduped = deduplicate_logs(raw_entries)
v1_total_deduped += len(deduped)
v1_logs[session_key] = deduped
for session_key, raw_entries in v2_logs_raw.items():
v2_total_raw += len(raw_entries)
deduped = deduplicate_logs(raw_entries)
v2_total_deduped += len(deduped)
v2_logs[session_key] = deduped
# Compare results
differing_sessions = []
missing_in_v2 = []
missing_in_v1 = []
differences_by_type = {
"count_mismatch": 0,
"content_mismatch": 0,
}
for session_key in set(v1_logs.keys()) | set(v2_logs.keys()):
session_id, team_id = session_key
if session_key not in v2_logs:
missing_in_v2.append(session_key)
continue
if session_key not in v1_logs:
missing_in_v1.append(session_key)
continue
v1_entries = v1_logs[session_key]
v2_entries = v2_logs[session_key]
if len(v1_entries) != len(v2_entries):
differences_by_type["count_mismatch"] += 1
differing_sessions.append(session_key)
# Create sets of entries for comparison
v1_set = {(e["level"], e["message"]) for e in v1_entries}
v2_set = {(e["level"], e["message"]) for e in v2_entries}
common_entries = v1_set & v2_set
only_in_v1 = v1_set - v2_set
only_in_v2 = v2_set - v1_set
await logger.ainfo(
"Log entry count mismatch",
session_id=session_id,
team_id=team_id,
v1_total=len(v1_entries),
v2_total=len(v2_entries),
common_count=len(common_entries),
only_in_v1_count=len(only_in_v1),
only_in_v2_count=len(only_in_v2),
# Include example entries that differ
example_only_in_v1=list(only_in_v1)[:3] if only_in_v1 else None,
example_only_in_v2=list(only_in_v2)[:3] if only_in_v2 else None,
)
continue
# Compare entries in order for content mismatches
for i, (v1_entry, v2_entry) in enumerate(zip(v1_entries, v2_entries)):
if (
v1_entry["level"] != v2_entry["level"]
or v1_entry["message"] != v2_entry["message"]
or v1_entry["timestamp"] != v2_entry["timestamp"]
):
differences_by_type["content_mismatch"] += 1
differing_sessions.append(session_key)
await logger.ainfo(
"Log entry content mismatch",
session_id=session_id,
team_id=team_id,
entry_index=i,
v1_entry=v1_entry,
v2_entry=v2_entry,
differences={
"level": v1_entry["level"] != v2_entry["level"],
"message": v1_entry["message"] != v2_entry["message"],
"timestamp": v1_entry["timestamp"] != v2_entry["timestamp"],
},
)
break
# Calculate duplication percentages
v1_duplication_rate = ((v1_total_raw - v1_total_deduped) / v1_total_raw * 100) if v1_total_raw > 0 else 0
v2_duplication_rate = ((v2_total_raw - v2_total_deduped) / v2_total_raw * 100) if v2_total_raw > 0 else 0
# Log summary with more detailed statistics
await logger.ainfo(
"Completed console logs comparison activity",
duration_seconds=(dt.datetime.now() - start_time).total_seconds(),
sampled_sessions=len(session_ids),
differing_sessions=len(differing_sessions),
missing_in_v1=len(missing_in_v1),
missing_in_v2=len(missing_in_v2),
differences_by_type=differences_by_type,
duplication_stats={
"v1": {
"raw_events": v1_total_raw,
"deduped_events": v1_total_deduped,
"duplication_rate": round(v1_duplication_rate, 2),
},
"v2": {
"raw_events": v2_total_raw,
"deduped_events": v2_total_deduped,
"duplication_rate": round(v2_duplication_rate, 2),
},
},
time_range={
"started_after": started_after.isoformat(),
"started_before": started_before.isoformat(),
},
)
@dataclasses.dataclass(frozen=True)
class CompareRecordingConsoleLogsWorkflowInputs:
"""Inputs for the console logs comparison workflow."""
started_after: str = dataclasses.field()
started_before: str = dataclasses.field()
window_seconds: int = dataclasses.field(default=300) # 5 minutes default
sample_size: int = dataclasses.field(default=100)
@property
def properties_to_log(self) -> dict[str, typing.Any]:
return {
"started_after": self.started_after,
"started_before": self.started_before,
"window_seconds": self.window_seconds,
"sample_size": self.sample_size,
}
@temporalio.workflow.defn(name="compare-recording-console-logs")
class CompareRecordingConsoleLogsWorkflow(PostHogWorkflow):
"""Workflow to compare session recording console logs between storage backends."""
def __init__(self) -> None:
self.lock = asyncio.Lock()
self.paused = False
@staticmethod
def parse_inputs(inputs: list[str]) -> CompareRecordingConsoleLogsWorkflowInputs:
"""Parse inputs from the management command CLI."""
loaded = json.loads(inputs[0])
for field in ["started_after", "started_before"]:
if field not in loaded:
raise ValueError(f"Required field {field} not provided")
loaded[field] = dt.datetime.fromisoformat(loaded[field])
window_seconds = loaded.get("window_seconds", 300)
if not isinstance(window_seconds, int) or window_seconds <= 0:
raise ValueError("window_seconds must be a positive integer")
sample_size = loaded.get("sample_size", 100)
if not isinstance(sample_size, int) or sample_size <= 0:
raise ValueError("sample_size must be a positive integer")
return CompareRecordingConsoleLogsWorkflowInputs(
started_after=loaded["started_after"],
started_before=loaded["started_before"],
window_seconds=window_seconds,
sample_size=sample_size,
)
@staticmethod
def generate_time_windows(
start_time: dt.datetime, end_time: dt.datetime, window_seconds: int
) -> list[tuple[dt.datetime, dt.datetime]]:
"""Generate time windows between start and end time."""
windows = []
current = start_time
while current < end_time:
window_end = min(current + dt.timedelta(seconds=window_seconds), end_time)
windows.append((current, window_end))
current = window_end
return windows
@temporalio.workflow.run
async def run(self, inputs: CompareRecordingConsoleLogsWorkflowInputs):
"""Run the comparison of session recording console logs."""
await temporalio.workflow.wait_condition(lambda: not self.paused)
started_after = dt.datetime.fromisoformat(inputs.started_after)
started_before = dt.datetime.fromisoformat(inputs.started_before)
logger = LOGGER.bind()
workflow_start = dt.datetime.now()
logger.info(
"Starting console logs comparison workflow",
started_after=started_after,
started_before=started_before,
window_seconds=inputs.window_seconds,
sample_size=inputs.sample_size,
)
# Generate time windows
windows = self.generate_time_windows(started_after, started_before, inputs.window_seconds)
logger.info("Generated %d time windows to process", len(windows))
# Process each window
for i, (window_start, window_end) in enumerate(windows, 1):
logger.info(
"Processing window %d/%d: %s to %s",
i,
len(windows),
window_start,
window_end,
)
activity_inputs = CompareRecordingConsoleLogsActivityInputs(
started_after=window_start.isoformat(),
started_before=window_end.isoformat(),
sample_size=inputs.sample_size,
)
await temporalio.workflow.execute_activity(
compare_recording_console_logs_activity,
activity_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=temporalio.common.RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=1,
non_retryable_error_types=[],
),
)
workflow_end = dt.datetime.now()
duration = (workflow_end - workflow_start).total_seconds()
logger.info(
"Completed console logs comparison workflow",
duration_seconds=duration,
windows_processed=len(windows),
)
@temporalio.workflow.update
async def pause(self) -> None:
"""Signal handler for workflow to pause or unpause."""
async with self.lock:
if self.paused is True:
self.paused = False
else:
self.paused = True

View File

@@ -1,631 +0,0 @@
import json
import asyncio
import datetime as dt
import dataclasses
from typing import Any
import temporalio.common
import temporalio.activity
import temporalio.workflow
from asgiref.sync import sync_to_async
from structlog import get_logger
from posthog.clickhouse.query_tagging import Product, tag_queries
from posthog.models import Team
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.temporal.common.base import PostHogWorkflow
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.session_recordings.queries import get_session_metadata
from posthog.temporal.session_recordings.segmentation import compute_active_milliseconds
from posthog.temporal.session_recordings.session_comparer import count_events_per_window
from posthog.temporal.session_recordings.snapshot_utils import fetch_v1_snapshots, fetch_v2_snapshots
LOGGER = get_logger(__name__)
@dataclasses.dataclass(frozen=True)
class CompareRecordingSnapshotsActivityInputs:
"""Inputs for the `compare_recording_snapshots_activity`."""
session_id: str = dataclasses.field()
team_id: int = dataclasses.field()
sample_size: int = dataclasses.field(default=5)
@property
def properties_to_log(self) -> dict[str, Any]:
return {
"session_id": self.session_id,
"team_id": self.team_id,
"sample_size": self.sample_size,
}
@temporalio.activity.defn
async def compare_recording_snapshots_activity(inputs: CompareRecordingSnapshotsActivityInputs) -> None:
"""Compare session recording snapshots between v1 and v2 for a specific session."""
logger = LOGGER.bind()
start_time = dt.datetime.now()
await logger.ainfo(
"Starting snapshot comparison activity for session %s",
inputs.session_id,
)
tag_queries(team_id=inputs.team_id, product=Product.REPLAY)
async with Heartbeater():
team = await sync_to_async(Team.objects.get)(id=inputs.team_id)
recording = await sync_to_async(SessionRecording.get_or_build)(session_id=inputs.session_id, team=team)
# Get v1 and v2 snapshots using the shared utility functions
v1_snapshots = await asyncio.to_thread(fetch_v1_snapshots, recording)
v2_snapshots = await asyncio.to_thread(fetch_v2_snapshots, recording)
# Compare snapshots
snapshot_differences = []
v1_click_count = 0
v2_click_count = 0
def is_click(event: dict) -> bool:
CLICK_TYPES = [2, 4, 9, 3] # Click, DblClick, TouchEnd, ContextMenu
return (
event.get("type") == 3 # RRWebEventType.IncrementalSnapshot
and event.get("data", {}).get("source") == 2 # RRWebEventSource.MouseInteraction
and event.get("data", {}).get("type") in CLICK_TYPES
)
def is_mouse_activity(event: dict) -> bool:
MOUSE_ACTIVITY_SOURCES = [2, 1, 6] # MouseInteraction, MouseMove, TouchMove
return (
event.get("type") == 3 # RRWebEventType.IncrementalSnapshot
and event.get("data", {}).get("source") in MOUSE_ACTIVITY_SOURCES
)
def is_keypress(event: dict) -> bool:
return (
event.get("type") == 3 # RRWebEventType.IncrementalSnapshot
and event.get("data", {}).get("source") == 5 # RRWebEventSource.Input
)
def is_console_log(event: dict) -> bool:
return (
event.get("type") == 6 # RRWebEventType.Plugin
and event.get("data", {}).get("plugin") == "rrweb/console@1"
)
def get_console_level(event: dict) -> str | None:
if not is_console_log(event):
return None
return event.get("data", {}).get("payload", {}).get("level")
def get_url_from_event(event: dict) -> str | None:
"""Extract URL from event using same logic as hrefFrom in rrweb-types.ts."""
data = event.get("data", {})
if not isinstance(data, dict):
return None
meta_href = data.get("href", "")
meta_href = meta_href.strip() if isinstance(meta_href, str) else ""
payload = data.get("payload", {})
payload_href = payload.get("href", "") if isinstance(payload, dict) else ""
payload_href = payload_href.strip() if isinstance(payload_href, str) else ""
return meta_href or payload_href or None
# Track URLs for both versions
v1_urls: set[str] = set()
v1_first_url: str | None = None
v2_urls: set[str] = set()
v2_first_url: str | None = None
# Constants from snappy-session-recorder.ts
MAX_URL_LENGTH = 4 * 1024 # 4KB
MAX_URLS_COUNT = 25
def add_url(url_set: set[str], url: str) -> None:
"""Add URL to set with same constraints as snappy-session-recorder.ts."""
if not url:
return
if len(url) > MAX_URL_LENGTH:
url = url[:MAX_URL_LENGTH]
if len(url_set) < MAX_URLS_COUNT:
url_set.add(url)
# Count clicks, mouse activity, keypresses, and console logs in v1
v1_click_count = 0
v1_mouse_activity_count = 0
v1_keypress_count = 0
v1_console_log_count = 0
v1_console_warn_count = 0
v1_console_error_count = 0
for snapshot in v1_snapshots:
data = snapshot["data"]
if is_click(data):
v1_click_count += 1
if is_mouse_activity(data):
v1_mouse_activity_count += 1
if is_keypress(data):
v1_keypress_count += 1
if is_console_log(data):
level = get_console_level(data)
if level in [
"log",
"info",
"count",
"timeEnd",
"trace",
"dir",
"dirxml",
"group",
"groupCollapsed",
"debug",
"timeLog",
]:
v1_console_log_count += 1
elif level in ["warn", "countReset"]:
v1_console_warn_count += 1
elif level in ["error", "assert"]:
v1_console_error_count += 1
else: # default to log level for unknown levels
v1_console_log_count += 1
# Extract URL
url = get_url_from_event(data)
if url:
if v1_first_url is None:
v1_first_url = url[:MAX_URL_LENGTH] if len(url) > MAX_URL_LENGTH else url
add_url(v1_urls, url)
# Count clicks, mouse activity, keypresses, and console logs in v2
v2_click_count = 0
v2_mouse_activity_count = 0
v2_keypress_count = 0
v2_console_log_count = 0
v2_console_warn_count = 0
v2_console_error_count = 0
for snapshot in v2_snapshots:
data = snapshot["data"]
if is_click(data):
v2_click_count += 1
if is_mouse_activity(data):
v2_mouse_activity_count += 1
if is_keypress(data):
v2_keypress_count += 1
if is_console_log(data):
level = get_console_level(data)
if level in [
"log",
"info",
"count",
"timeEnd",
"trace",
"dir",
"dirxml",
"group",
"groupCollapsed",
"debug",
"timeLog",
]:
v2_console_log_count += 1
elif level in ["warn", "countReset"]:
v2_console_warn_count += 1
elif level in ["error", "assert"]:
v2_console_error_count += 1
else: # default to log level for unknown levels
v2_console_log_count += 1
# Extract URL
url = get_url_from_event(data)
if url:
if v2_first_url is None:
v2_first_url = url[:MAX_URL_LENGTH] if len(url) > MAX_URL_LENGTH else url
add_url(v2_urls, url)
# Get metadata counts
v1_metadata = get_session_metadata(team.pk, recording.session_id, "session_replay_events")
v2_metadata = get_session_metadata(team.pk, recording.session_id, "session_replay_events_v2_test")
# Compare URLs
await logger.ainfo(
"URL comparison",
v1_first_url=v1_first_url,
v2_first_url=v2_first_url,
first_url_matches=v1_first_url == v2_first_url,
v1_url_count=len(v1_urls),
v2_url_count=len(v2_urls),
urls_in_both=len(v1_urls & v2_urls),
only_in_v1=sorted(v1_urls - v2_urls)[:5], # Show up to 5 examples
only_in_v2=sorted(v2_urls - v1_urls)[:5], # Show up to 5 examples
metadata_comparison={
"v1": {
"first_url": v1_metadata["first_url"],
"all_urls": v1_metadata["all_urls"],
"first_url_matches_snapshot": v1_metadata["first_url"] == v1_first_url,
"all_urls_match_snapshot": set(v1_metadata["all_urls"]) == v1_urls,
},
"v2": {
"first_url": v2_metadata["first_url"],
"all_urls": v2_metadata["all_urls"],
"first_url_matches_snapshot": v2_metadata["first_url"] == v2_first_url,
"all_urls_match_snapshot": set(v2_metadata["all_urls"]) == v2_urls,
},
},
)
await logger.ainfo(
"Total event count comparison",
v1_snapshot_count=len(v1_snapshots),
v2_snapshot_count=len(v2_snapshots),
v1_metadata_count=v1_metadata["event_count"],
v2_metadata_count=v2_metadata["event_count"],
snapshot_difference=len(v2_snapshots) - len(v1_snapshots),
metadata_difference=v2_metadata["event_count"] - v1_metadata["event_count"],
snapshot_vs_metadata_v1_difference=len(v1_snapshots) - v1_metadata["event_count"],
snapshot_vs_metadata_v2_difference=len(v2_snapshots) - v2_metadata["event_count"],
)
await logger.ainfo(
"Click count comparison",
v1_snapshot_count=v1_click_count,
v2_snapshot_count=v2_click_count,
v1_metadata_count=v1_metadata["click_count"],
v2_metadata_count=v2_metadata["click_count"],
snapshot_difference=v2_click_count - v1_click_count,
metadata_difference=v2_metadata["click_count"] - v1_metadata["click_count"],
snapshot_vs_metadata_v1_difference=v1_click_count - v1_metadata["click_count"],
snapshot_vs_metadata_v2_difference=v2_click_count - v2_metadata["click_count"],
)
await logger.ainfo(
"Mouse activity count comparison",
v1_snapshot_count=v1_mouse_activity_count,
v2_snapshot_count=v2_mouse_activity_count,
v1_metadata_count=v1_metadata["mouse_activity_count"],
v2_metadata_count=v2_metadata["mouse_activity_count"],
snapshot_difference=v2_mouse_activity_count - v1_mouse_activity_count,
metadata_difference=v2_metadata["mouse_activity_count"] - v1_metadata["mouse_activity_count"],
snapshot_vs_metadata_v1_difference=v1_mouse_activity_count - v1_metadata["mouse_activity_count"],
snapshot_vs_metadata_v2_difference=v2_mouse_activity_count - v2_metadata["mouse_activity_count"],
)
await logger.ainfo(
"Keypress count comparison",
v1_snapshot_count=v1_keypress_count,
v2_snapshot_count=v2_keypress_count,
v1_metadata_count=v1_metadata["keypress_count"],
v2_metadata_count=v2_metadata["keypress_count"],
snapshot_difference=v2_keypress_count - v1_keypress_count,
metadata_difference=v2_metadata["keypress_count"] - v1_metadata["keypress_count"],
snapshot_vs_metadata_v1_difference=v1_keypress_count - v1_metadata["keypress_count"],
snapshot_vs_metadata_v2_difference=v2_keypress_count - v2_metadata["keypress_count"],
)
await logger.ainfo(
"Console log count comparison",
v1_snapshot_count=v1_console_log_count,
v2_snapshot_count=v2_console_log_count,
v1_metadata_count=v1_metadata["console_log_count"],
v2_metadata_count=v2_metadata["console_log_count"],
snapshot_difference=v2_console_log_count - v1_console_log_count,
metadata_difference=v2_metadata["console_log_count"] - v1_metadata["console_log_count"],
snapshot_vs_metadata_v1_difference=v1_console_log_count - v1_metadata["console_log_count"],
snapshot_vs_metadata_v2_difference=v2_console_log_count - v2_metadata["console_log_count"],
)
await logger.ainfo(
"Console warn count comparison",
v1_snapshot_count=v1_console_warn_count,
v2_snapshot_count=v2_console_warn_count,
v1_metadata_count=v1_metadata["console_warn_count"],
v2_metadata_count=v2_metadata["console_warn_count"],
snapshot_difference=v2_console_warn_count - v1_console_warn_count,
metadata_difference=v2_metadata["console_warn_count"] - v1_metadata["console_warn_count"],
snapshot_vs_metadata_v1_difference=v1_console_warn_count - v1_metadata["console_warn_count"],
snapshot_vs_metadata_v2_difference=v2_console_warn_count - v2_metadata["console_warn_count"],
)
await logger.ainfo(
"Console error count comparison",
v1_snapshot_count=v1_console_error_count,
v2_snapshot_count=v2_console_error_count,
v1_metadata_count=v1_metadata["console_error_count"],
v2_metadata_count=v2_metadata["console_error_count"],
snapshot_difference=v2_console_error_count - v1_console_error_count,
metadata_difference=v2_metadata["console_error_count"] - v1_metadata["console_error_count"],
snapshot_vs_metadata_v1_difference=v1_console_error_count - v1_metadata["console_error_count"],
snapshot_vs_metadata_v2_difference=v2_console_error_count - v2_metadata["console_error_count"],
)
# Compare total count
if len(v1_snapshots) != len(v2_snapshots):
snapshot_differences.append(
{
"type": "count_mismatch",
"v1_count": len(v1_snapshots),
"v2_count": len(v2_snapshots),
}
)
# Convert snapshots to dictionaries for counting duplicates
v1_events: dict[str, int] = {}
v2_events: dict[str, int] = {}
for s in v1_snapshots:
event_key = json.dumps((s["window_id"], s["data"]), sort_keys=True)
v1_events[event_key] = v1_events.get(event_key, 0) + 1
for s in v2_snapshots:
event_key = json.dumps((s["window_id"], s["data"]), sort_keys=True)
v2_events[event_key] = v2_events.get(event_key, 0) + 1
# Find events in both versions with their counts
all_keys = set(v1_events.keys()) | set(v2_events.keys())
common_events = {
k: (v1_events.get(k, 0), v2_events.get(k, 0)) for k in all_keys if k in v1_events and k in v2_events
}
only_in_v1 = {k: v1_events[k] for k in v1_events.keys() - v2_events.keys()}
only_in_v2 = {k: v2_events[k] for k in v2_events.keys() - v1_events.keys()}
# Group events by type
def get_event_type(event_json: str) -> str:
_, data = json.loads(event_json)
return data.get("type", "unknown")
def group_events_by_type(events: dict[str, int]) -> dict[str, int]:
type_counts: dict[str, int] = {}
for event, count in events.items():
event_type = get_event_type(event)
type_counts[event_type] = type_counts.get(event_type, 0) + count
return type_counts
v1_exclusive_by_type = group_events_by_type(only_in_v1)
v2_exclusive_by_type = group_events_by_type(only_in_v2)
# For common events, sum up the minimum count between v1 and v2 for each event
common_by_type = group_events_by_type({k: min(v1, v2) for k, (v1, v2) in common_events.items()})
# Analyze events per window
v1_window_counts = count_events_per_window(v1_events)
v2_window_counts = count_events_per_window(v2_events)
# Find all window IDs
all_window_ids = set(v1_window_counts.keys()) | set(v2_window_counts.keys())
window_comparison = []
# Handle None first, then sort the rest
sorted_window_ids = ([None] if None in all_window_ids else []) + sorted(
id for id in all_window_ids if id is not None
)
for window_id in sorted_window_ids:
window_comparison.append(
{
"window_id": window_id,
"v1_events": v1_window_counts.get(window_id, 0),
"v2_events": v2_window_counts.get(window_id, 0),
}
)
await logger.ainfo(
"Events per window comparison",
window_counts=window_comparison,
total_windows=len(all_window_ids),
windows_in_v1=len(v1_window_counts),
windows_in_v2=len(v2_window_counts),
windows_in_both=len(set(v1_window_counts.keys()) & set(v2_window_counts.keys())),
)
await logger.ainfo(
"Event type comparison",
common_events_count=sum(min(v1, v2) for v1, v2 in common_events.values()),
common_events_by_type=common_by_type,
only_in_v1_count=sum(only_in_v1.values()),
only_in_v1_by_type=v1_exclusive_by_type,
only_in_v2_count=sum(only_in_v2.values()),
only_in_v2_by_type=v2_exclusive_by_type,
duplicate_stats={
"v1_total_duplicates": sum(count - 1 for count in v1_events.values() if count > 1),
"v2_total_duplicates": sum(count - 1 for count in v2_events.values() if count > 1),
"events_with_different_counts": {k: (v1, v2) for k, (v1, v2) in common_events.items() if v1 != v2},
},
)
# Compare active milliseconds
v1_computed_active_ms, v2_computed_active_ms = compute_active_milliseconds(v1_snapshots)
v1_computed_active_ms_v2, v2_computed_active_ms_v2 = compute_active_milliseconds(v2_snapshots)
# Calculate percentage differences
def safe_percentage_diff(a: int, b: int) -> float:
if a == 0 and b == 0:
return 0.0
if a == 0:
return 100.0
return ((b - a) / a) * 100
await logger.ainfo(
"Active milliseconds comparison",
v1_snapshot_computed_v1_alg=v1_computed_active_ms,
v2_snapshot_computed_v1_alg=v2_computed_active_ms,
v1_snapshot_computed_v2_alg=v1_computed_active_ms_v2,
v2_snapshot_computed_v2_alg=v2_computed_active_ms_v2,
# Compare v1 vs v2 algorithms on v1 snapshots
v1_snapshots_alg_difference=v1_computed_active_ms_v2 - v1_computed_active_ms,
v1_snapshots_alg_difference_percentage=safe_percentage_diff(
v1_computed_active_ms, v1_computed_active_ms_v2
),
# Compare v1 vs v2 algorithms on v2 snapshots
v2_snapshots_alg_difference=v2_computed_active_ms_v2 - v2_computed_active_ms,
v2_snapshots_alg_difference_percentage=safe_percentage_diff(
v2_computed_active_ms, v2_computed_active_ms_v2
),
v1_metadata_value=v1_metadata["active_milliseconds"],
v2_metadata_value=v2_metadata["active_milliseconds"],
snapshot_difference_v1_alg=v2_computed_active_ms - v1_computed_active_ms,
snapshot_difference_percentage_v1_alg=safe_percentage_diff(v1_computed_active_ms, v2_computed_active_ms),
snapshot_difference_v2_alg=v2_computed_active_ms_v2 - v1_computed_active_ms_v2,
snapshot_difference_percentage_v2_alg=safe_percentage_diff(
v1_computed_active_ms_v2, v2_computed_active_ms_v2
),
metadata_difference=v2_metadata["active_milliseconds"] - v1_metadata["active_milliseconds"],
metadata_difference_percentage=safe_percentage_diff(
v1_metadata["active_milliseconds"], v2_metadata["active_milliseconds"]
),
v1_computed_vs_metadata_difference_v1_alg=v1_computed_active_ms - v1_metadata["active_milliseconds"],
v1_computed_vs_metadata_percentage_v1_alg=safe_percentage_diff(
v1_metadata["active_milliseconds"], v1_computed_active_ms
),
v2_computed_vs_metadata_difference_v1_alg=v2_computed_active_ms - v2_metadata["active_milliseconds"],
v2_computed_vs_metadata_percentage_v1_alg=safe_percentage_diff(
v2_metadata["active_milliseconds"], v2_computed_active_ms
),
v1_computed_vs_metadata_difference_v2_alg=v1_computed_active_ms_v2 - v1_metadata["active_milliseconds"],
v1_computed_vs_metadata_percentage_v2_alg=safe_percentage_diff(
v1_metadata["active_milliseconds"], v1_computed_active_ms_v2
),
v2_computed_vs_metadata_difference_v2_alg=v2_computed_active_ms_v2 - v2_metadata["active_milliseconds"],
v2_computed_vs_metadata_percentage_v2_alg=safe_percentage_diff(
v2_metadata["active_milliseconds"], v2_computed_active_ms_v2
),
)
# Compare snapshot metadata
await logger.ainfo(
"Snapshot metadata comparison",
v1_snapshot_source=v1_metadata["snapshot_source"],
v2_snapshot_source=v2_metadata["snapshot_source"],
v1_snapshot_library=v1_metadata["snapshot_library"],
v2_snapshot_library=v2_metadata["snapshot_library"],
snapshot_source_matches=v1_metadata["snapshot_source"] == v2_metadata["snapshot_source"],
snapshot_library_matches=v1_metadata["snapshot_library"] == v2_metadata["snapshot_library"],
)
end_time = dt.datetime.now()
duration = (end_time - start_time).total_seconds()
# Check for differences in metadata vs snapshots
metadata_differences = any(
[
v1_metadata["click_count"] != v1_click_count,
v1_metadata["mouse_activity_count"] != v1_mouse_activity_count,
v1_metadata["keypress_count"] != v1_keypress_count,
v1_metadata["console_log_count"] != v1_console_log_count,
v1_metadata["console_warn_count"] != v1_console_warn_count,
v1_metadata["console_error_count"] != v1_console_error_count,
v2_metadata["click_count"] != v2_click_count,
v2_metadata["mouse_activity_count"] != v2_mouse_activity_count,
v2_metadata["keypress_count"] != v2_keypress_count,
v2_metadata["console_log_count"] != v2_console_log_count,
v2_metadata["console_warn_count"] != v2_console_warn_count,
v2_metadata["console_error_count"] != v2_console_error_count,
]
)
# Check if sessions differ in any way
sessions_differ = any(
[
len(v1_snapshots) != len(v2_snapshots),
v1_click_count != v2_click_count,
v1_mouse_activity_count != v2_mouse_activity_count,
v1_keypress_count != v2_keypress_count,
v1_console_log_count != v2_console_log_count,
v1_console_warn_count != v2_console_warn_count,
v1_console_error_count != v2_console_error_count,
v1_urls != v2_urls,
v1_first_url != v2_first_url,
bool(only_in_v1),
bool(only_in_v2),
]
)
# Log summary
await logger.ainfo(
"Completed snapshot comparison activity",
duration_seconds=duration,
session_id=inputs.session_id,
v1_snapshot_count=len(v1_snapshots),
v2_snapshot_count=len(v2_snapshots),
sessions_differ=sessions_differ,
metadata_snapshot_differences=metadata_differences,
)
@dataclasses.dataclass(frozen=True)
class CompareRecordingSnapshotsWorkflowInputs:
"""Inputs for the `CompareRecordingSnapshotsWorkflow`."""
session_id: str = dataclasses.field()
team_id: int = dataclasses.field()
@property
def properties_to_log(self) -> dict[str, Any]:
return {
"session_id": self.session_id,
"team_id": self.team_id,
}
@temporalio.workflow.defn(name="compare-recording-snapshots")
class CompareRecordingSnapshotsWorkflow(PostHogWorkflow):
"""Workflow to compare session recording snapshots between v1 and v2."""
def __init__(self) -> None:
self.lock = asyncio.Lock()
self.paused = False
@staticmethod
def parse_inputs(inputs: list[str]) -> CompareRecordingSnapshotsWorkflowInputs:
"""Parse inputs from the management command CLI."""
loaded = json.loads(inputs[0])
if "session_id" not in loaded:
raise ValueError("Required field session_id not provided")
if "team_id" not in loaded:
raise ValueError("Required field team_id not provided")
return CompareRecordingSnapshotsWorkflowInputs(
session_id=loaded["session_id"],
team_id=loaded["team_id"],
)
@temporalio.workflow.run
async def run(self, inputs: CompareRecordingSnapshotsWorkflowInputs):
"""Run the comparison of session recording snapshots."""
await temporalio.workflow.wait_condition(lambda: not self.paused)
logger = LOGGER.bind()
workflow_start = dt.datetime.now()
logger.info(
"Starting snapshot comparison workflow for session %s",
inputs.session_id,
)
activity_inputs = CompareRecordingSnapshotsActivityInputs(
session_id=inputs.session_id,
team_id=inputs.team_id,
)
await temporalio.workflow.execute_activity(
compare_recording_snapshots_activity,
activity_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=temporalio.common.RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=1,
non_retryable_error_types=[],
),
)
workflow_end = dt.datetime.now()
duration = (workflow_end - workflow_start).total_seconds()
logger.info(
"Completed snapshot comparison workflow in %.2f seconds",
duration,
)
@temporalio.workflow.update
async def pause(self) -> None:
"""Signal handler for workflow to pause or unpause."""
async with self.lock:
if self.paused is True:
self.paused = False
else:
self.paused = True

View File

@@ -1,436 +0,0 @@
import json
import typing
import asyncio
import datetime as dt
import statistics
import dataclasses
import temporalio.common
import temporalio.activity
import temporalio.workflow
from structlog import get_logger
from posthog.clickhouse.client import sync_execute
from posthog.clickhouse.query_tagging import Product, tag_queries
from posthog.temporal.common.base import PostHogWorkflow
from posthog.temporal.common.heartbeat import Heartbeater
LOGGER = get_logger(__name__)
def get_session_replay_events(
table_name: str,
started_after: dt.datetime,
started_before: dt.datetime,
session_length_limit_seconds: int = 172800,
timestamp_leeway_seconds: int = 0,
) -> list[tuple]:
"""Get session replay events from the specified table within the time range."""
query = """
SELECT
session_id,
team_id,
any(distinct_id) as distinct_id,
min(min_first_timestamp) as min_first_timestamp_agg,
max(max_last_timestamp) as max_last_timestamp_agg,
argMinMerge(first_url) as first_url,
groupUniqArrayArray(all_urls) as all_urls,
sum(click_count) as click_count,
sum(keypress_count) as keypress_count,
sum(mouse_activity_count) as mouse_activity_count,
sum(active_milliseconds) as active_milliseconds,
sum(console_log_count) as console_log_count,
sum(console_warn_count) as console_warn_count,
sum(console_error_count) as console_error_count,
sum(event_count) as event_count,
argMinMerge(snapshot_source) as snapshot_source,
argMinMerge(snapshot_library) as snapshot_library
{block_fields}
FROM (
SELECT *
FROM {table}
WHERE min_first_timestamp >= toDateTime(%(started_after)s) - INTERVAL %(timestamp_leeway)s SECOND
AND max_last_timestamp <= toDateTime(%(started_before)s) + INTERVAL {session_length_limit_seconds} SECOND + INTERVAL %(timestamp_leeway)s SECOND
ORDER BY min_first_timestamp ASC
)
GROUP BY
session_id,
team_id
HAVING
min_first_timestamp_agg >= toDateTime(%(started_after)s)
AND min_first_timestamp_agg <= toDateTime(%(started_before)s)
AND max_last_timestamp_agg <= min_first_timestamp_agg + INTERVAL {session_length_limit_seconds} SECOND
"""
# Add block-related fields only for v2 table
block_fields = (
"""
,groupArrayArray(block_first_timestamps) as block_first_timestamps,
groupArrayArray(block_last_timestamps) as block_last_timestamps,
groupArrayArray(block_urls) as block_urls
"""
if "_v2_" in table_name
else ""
)
return sync_execute(
query.format(
table=table_name, block_fields=block_fields, session_length_limit_seconds=session_length_limit_seconds
),
{
"started_after": started_after.strftime("%Y-%m-%d %H:%M:%S"),
"started_before": started_before.strftime("%Y-%m-%d %H:%M:%S"),
"timestamp_leeway": timestamp_leeway_seconds,
},
)
FIELD_NAMES = [
"distinct_id",
"min_first_timestamp_agg",
"max_last_timestamp_agg",
"first_url",
"all_urls",
"click_count",
"keypress_count",
"mouse_activity_count",
"active_milliseconds",
"console_log_count",
"console_warn_count",
"console_error_count",
"event_count",
"snapshot_source",
"snapshot_library",
]
@dataclasses.dataclass(frozen=True)
class CompareRecordingMetadataActivityInputs:
"""Inputs for the `compare_recording_metadata_activity`."""
started_after: str = dataclasses.field()
started_before: str = dataclasses.field()
window_result_limit: int | None = dataclasses.field(default=None)
session_length_limit_seconds: int = dataclasses.field(default=172800) # 48h default
timestamp_leeway_seconds: int = dataclasses.field(default=0) # No leeway by default
@property
def properties_to_log(self) -> dict[str, typing.Any]:
return {
"started_after": self.started_after,
"started_before": self.started_before,
"window_result_limit": self.window_result_limit,
"session_length_limit_seconds": self.session_length_limit_seconds,
"timestamp_leeway_seconds": self.timestamp_leeway_seconds,
}
@temporalio.activity.defn
async def compare_recording_metadata_activity(inputs: CompareRecordingMetadataActivityInputs) -> None:
"""Compare session recording metadata between storage backends."""
logger = LOGGER.bind()
start_time = dt.datetime.now()
tag_queries(product=Product.REPLAY)
await logger.ainfo(
"Starting comparison activity for time range %s to %s%s%s",
inputs.started_after,
inputs.started_before,
f" (limiting to {inputs.window_result_limit} sessions)" if inputs.window_result_limit else "",
f" (session length limit: {inputs.session_length_limit_seconds}s)"
if inputs.session_length_limit_seconds
else "",
)
async with Heartbeater():
started_after = dt.datetime.fromisoformat(inputs.started_after)
started_before = dt.datetime.fromisoformat(inputs.started_before)
results_v1, results_v2 = await asyncio.gather(
asyncio.to_thread(
get_session_replay_events,
"session_replay_events",
started_after,
started_before,
inputs.session_length_limit_seconds,
inputs.timestamp_leeway_seconds,
),
asyncio.to_thread(
get_session_replay_events,
"session_replay_events_v2_test",
started_after,
started_before,
inputs.session_length_limit_seconds,
inputs.timestamp_leeway_seconds,
),
)
# Create lookup tables for easier comparison
v1_sessions = {(r[0], r[1]): r for r in results_v1} # (session_id, team_id) -> full record
v2_sessions = {(r[0], r[1]): r for r in results_v2}
# Find sessions in v1 but not in v2
only_in_v1 = list(set(v1_sessions.keys()) - set(v2_sessions.keys()))
# Find sessions in v2 but not in v1
only_in_v2 = list(set(v2_sessions.keys()) - set(v1_sessions.keys()))
# Compare data for sessions in both
all_differing_sessions: list[tuple[str, int]] = [] # (session_id, team_id)
all_differing_sessions_excluding_active_ms: list[tuple[str, int]] = [] # (session_id, team_id)
differing_sessions_count = 0
active_ms_diffs_percentage: list[float] = []
field_differences: dict[str, int] = {field: 0 for field in FIELD_NAMES} # Track per-field differences
field_example_sessions: dict[str, list[tuple[str, int, typing.Any, typing.Any]]] = {
field: [] for field in FIELD_NAMES
} # Track example sessions per field
for session_key in set(v1_sessions.keys()) & set(v2_sessions.keys()):
session_id, team_id = session_key
v1_data = v1_sessions[session_key]
v2_data = v2_sessions[session_key]
# Calculate active_ms percentage difference
v1_active_ms = v1_data[
FIELD_NAMES.index("active_milliseconds") + 2
] # +2 because session_id and team_id are at index 0,1
v2_active_ms = v2_data[FIELD_NAMES.index("active_milliseconds") + 2]
if v1_active_ms > 0: # Avoid division by zero
diff_percentage = ((v2_active_ms - v1_active_ms) / v1_active_ms) * 100
active_ms_diffs_percentage.append(diff_percentage)
# Compare each field and collect differences
differences = []
differences_excluding_active_ms = []
for i, field_name in enumerate(
FIELD_NAMES, start=2
): # start=2 because session_id and team_id are at index 0,1
if v1_data[i] != v2_data[i]:
diff = {"field": field_name, "v1_value": v1_data[i], "v2_value": v2_data[i]}
differences.append(diff)
field_differences[field_name] += 1
# Store example session if we haven't stored 3 examples for this field yet
if len(field_example_sessions[field_name]) < 3:
field_example_sessions[field_name].append((session_id, team_id, v1_data[i], v2_data[i]))
if field_name != "active_milliseconds":
differences_excluding_active_ms.append(diff)
if differences:
all_differing_sessions.append(session_key)
differing_sessions_count += 1
# Only log detailed differences if within limit and there are differences beyond active_milliseconds
if (
not inputs.window_result_limit or differing_sessions_count <= inputs.window_result_limit
) and differences_excluding_active_ms:
await logger.ainfo(
"Found differences in session", session_id=session_id, team_id=team_id, differences=differences
)
if differences_excluding_active_ms:
all_differing_sessions_excluding_active_ms.append(session_key)
end_time = dt.datetime.now()
duration = (end_time - start_time).total_seconds()
# Calculate active_ms statistics
active_ms_stats = {}
if active_ms_diffs_percentage:
active_ms_stats = {
"avg_percentage_diff": round(statistics.mean(active_ms_diffs_percentage), 2),
"std_dev_percentage_diff": round(
statistics.stdev(active_ms_diffs_percentage) if len(active_ms_diffs_percentage) > 1 else 0, 2
),
"samples": len(active_ms_diffs_percentage),
}
# Log summary
await logger.ainfo(
"Completed comparison activity",
duration_seconds=duration,
v1_count=len(results_v1),
v2_count=len(results_v2),
only_in_v1_count=len(only_in_v1),
only_in_v2_count=len(only_in_v2),
total_differing_sessions=len(all_differing_sessions),
total_differing_sessions_excluding_active_ms=len(all_differing_sessions_excluding_active_ms),
active_ms_stats=active_ms_stats,
field_differences=field_differences,
time_range={
"started_after": started_after.isoformat(),
"started_before": started_before.isoformat(),
},
)
# Log example differences for each field separately
for field_name, examples in field_example_sessions.items():
if examples: # Only log fields that have differences
await logger.ainfo(
f"Example differences for field: {field_name}",
field=field_name,
examples=[
{"session_id": session_id, "team_id": team_id, "v1_value": v1_value, "v2_value": v2_value}
for session_id, team_id, v1_value, v2_value in examples
],
)
# Log sessions only in v1/v2 if any exist
if only_in_v1:
await logger.ainfo(
"Sessions only in v1",
session_ids=only_in_v1, # Already (session_id, team_id) tuples
)
if only_in_v2:
await logger.ainfo(
"Sessions only in v2",
session_ids=only_in_v2, # Already (session_id, team_id) tuples
)
@dataclasses.dataclass(frozen=True)
class CompareRecordingMetadataWorkflowInputs:
"""Inputs for the `CompareRecordingMetadataWorkflow`."""
started_after: str = dataclasses.field()
started_before: str = dataclasses.field()
window_seconds: int = dataclasses.field(default=300) # 5 minutes default
window_result_limit: int | None = dataclasses.field(default=None) # No limit by default
session_length_limit_seconds: int = dataclasses.field(default=172800) # 48h default
timestamp_leeway_seconds: int = dataclasses.field(default=0) # No leeway by default
@property
def properties_to_log(self) -> dict[str, typing.Any]:
return {
"started_after": self.started_after,
"started_before": self.started_before,
"window_seconds": self.window_seconds,
"window_result_limit": self.window_result_limit,
"session_length_limit_seconds": self.session_length_limit_seconds,
"timestamp_leeway_seconds": self.timestamp_leeway_seconds,
}
@temporalio.workflow.defn(name="compare-recording-metadata")
class CompareRecordingMetadataWorkflow(PostHogWorkflow):
"""Workflow to compare session recording metadata between storage backends."""
def __init__(self) -> None:
self.lock = asyncio.Lock()
self.paused = False
@staticmethod
def parse_inputs(inputs: list[str]) -> CompareRecordingMetadataWorkflowInputs:
"""Parse inputs from the management command CLI."""
loaded = json.loads(inputs[0])
# Convert ISO format strings to datetime objects
for field in ["started_after", "started_before"]:
if field not in loaded:
raise ValueError(f"Required field {field} not provided")
loaded[field] = dt.datetime.fromisoformat(loaded[field])
window_seconds = loaded.get("window_seconds", 300)
if not isinstance(window_seconds, int) or window_seconds <= 0:
raise ValueError("window_seconds must be a positive integer")
window_result_limit = loaded.get("window_result_limit")
if window_result_limit is not None and not isinstance(window_result_limit, int | None):
raise ValueError("window_result_limit must be an integer or None")
session_length_limit_seconds = loaded.get("session_length_limit_seconds", 172800)
if not isinstance(session_length_limit_seconds, int) or session_length_limit_seconds <= 0:
raise ValueError("session_length_limit_seconds must be a positive integer")
timestamp_leeway_seconds = loaded.get("timestamp_leeway_seconds", 0)
if not isinstance(timestamp_leeway_seconds, int) or timestamp_leeway_seconds < 0:
raise ValueError("timestamp_leeway_seconds must be a non-negative integer")
return CompareRecordingMetadataWorkflowInputs(
started_after=loaded["started_after"],
started_before=loaded["started_before"],
window_seconds=window_seconds,
window_result_limit=window_result_limit,
session_length_limit_seconds=session_length_limit_seconds,
timestamp_leeway_seconds=timestamp_leeway_seconds,
)
@staticmethod
def generate_time_windows(
start_time: dt.datetime, end_time: dt.datetime, window_seconds: int
) -> list[tuple[dt.datetime, dt.datetime]]:
"""Generate time windows between start and end time."""
windows = []
current = start_time
while current < end_time:
window_end = min(current + dt.timedelta(seconds=window_seconds), end_time)
windows.append((current, window_end))
current = window_end
return windows
@temporalio.workflow.run
async def run(self, inputs: CompareRecordingMetadataWorkflowInputs):
"""Run the comparison of session recording metadata."""
await temporalio.workflow.wait_condition(lambda: not self.paused)
started_after = dt.datetime.fromisoformat(inputs.started_after)
started_before = dt.datetime.fromisoformat(inputs.started_before)
logger = LOGGER.bind()
workflow_start = dt.datetime.now()
logger.info(
"Starting comparison workflow for sessions between %s and %s using %d second windows%s%s",
started_after,
started_before,
inputs.window_seconds,
f" (limiting to {inputs.window_result_limit} sessions per window)" if inputs.window_result_limit else "",
f" (with {inputs.timestamp_leeway_seconds}s timestamp leeway)" if inputs.timestamp_leeway_seconds else "",
)
# Generate time windows
windows = self.generate_time_windows(started_after, started_before, inputs.window_seconds)
logger.info("Generated %d time windows to process", len(windows))
# Process each window
for i, (window_start, window_end) in enumerate(windows, 1):
logger.info(
"Processing window %d/%d: %s to %s",
i,
len(windows),
window_start,
window_end,
)
activity_inputs = CompareRecordingMetadataActivityInputs(
started_after=window_start.isoformat(),
started_before=window_end.isoformat(),
window_result_limit=inputs.window_result_limit,
session_length_limit_seconds=inputs.session_length_limit_seconds,
timestamp_leeway_seconds=inputs.timestamp_leeway_seconds,
)
await temporalio.workflow.execute_activity(
compare_recording_metadata_activity,
activity_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=temporalio.common.RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=1,
non_retryable_error_types=[],
),
)
workflow_end = dt.datetime.now()
duration = (workflow_end - workflow_start).total_seconds()
logger.info(
"Completed comparison workflow in %.2f seconds. Processed %d time windows",
duration,
len(windows),
)
@temporalio.workflow.update
async def pause(self) -> None:
"""Signal handler for workflow to pause or unpause."""
async with self.lock:
if self.paused is True:
self.paused = False
else:
self.paused = True

View File

@@ -1,680 +0,0 @@
import json
import typing
import asyncio
import datetime as dt
import dataclasses
import temporalio.common
import temporalio.activity
import temporalio.workflow
from asgiref.sync import sync_to_async
from structlog.contextvars import bind_contextvars
from posthog.clickhouse.query_tagging import Product, tag_queries
from posthog.models import Team
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.temporal.common.base import PostHogWorkflow
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import get_logger
from posthog.temporal.session_recordings.queries import get_sampled_session_ids, get_session_metadata
from posthog.temporal.session_recordings.segmentation import compute_active_milliseconds
from posthog.temporal.session_recordings.session_comparer import (
add_url,
count_events_per_window,
get_console_level,
get_url_from_event,
group_events_by_type,
is_click,
is_console_log,
is_keypress,
is_mouse_activity,
)
from posthog.temporal.session_recordings.snapshot_utils import fetch_v1_snapshots, fetch_v2_snapshots
LOGGER = get_logger(__name__)
@dataclasses.dataclass(frozen=True)
class CompareSampledRecordingEventsActivityInputs:
"""Inputs for the recording events comparison activity."""
started_after: str = dataclasses.field()
started_before: str = dataclasses.field()
sample_size: int = dataclasses.field(default=100)
@property
def properties_to_log(self) -> dict[str, typing.Any]:
return {
"started_after": self.started_after,
"started_before": self.started_before,
"sample_size": self.sample_size,
}
@temporalio.activity.defn
async def compare_sampled_recording_events_activity(inputs: CompareSampledRecordingEventsActivityInputs) -> None:
"""Compare recording events between v1 and v2 storage for a sample of sessions."""
bind_contextvars()
logger = LOGGER.bind()
start_time = dt.datetime.now()
tag_queries(product=Product.REPLAY)
await logger.ainfo(
"Starting sampled events comparison activity",
started_after=inputs.started_after,
started_before=inputs.started_before,
sample_size=inputs.sample_size,
)
async with Heartbeater():
started_after = dt.datetime.fromisoformat(inputs.started_after)
started_before = dt.datetime.fromisoformat(inputs.started_before)
# Get sample of session IDs
session_ids = await asyncio.to_thread(
get_sampled_session_ids,
started_after,
started_before,
inputs.sample_size,
)
for session_id, team_id in session_ids:
await logger.ainfo(
"Processing session",
session_id=session_id,
team_id=team_id,
)
team = await sync_to_async(Team.objects.get)(id=team_id)
recording = await sync_to_async(SessionRecording.get_or_build)(session_id=session_id, team=team)
# Get v1 and v2 snapshots using the shared utility functions
try:
v1_snapshots = await asyncio.to_thread(fetch_v1_snapshots, recording)
except Exception as e:
await logger.awarn(
"Skipping session due to error when fetching v1 snapshots",
session_id=session_id,
team_id=team_id,
error=str(e),
error_type=type(e).__name__,
)
continue
try:
v2_snapshots = await asyncio.to_thread(fetch_v2_snapshots, recording)
except Exception as e:
await logger.awarn(
"Skipping session due to error when fetching v2 snapshots",
session_id=session_id,
team_id=team_id,
error=str(e),
error_type=type(e).__name__,
)
continue
# Convert snapshots to dictionaries for counting duplicates
v1_events: dict[str, int] = {}
v2_events: dict[str, int] = {}
for s in v1_snapshots:
event_key = json.dumps((s["window_id"], s["data"]), sort_keys=True)
v1_events[event_key] = v1_events.get(event_key, 0) + 1
for s in v2_snapshots:
event_key = json.dumps((s["window_id"], s["data"]), sort_keys=True)
v2_events[event_key] = v2_events.get(event_key, 0) + 1
# Find events in both versions with their counts
all_keys = set(v1_events.keys()) | set(v2_events.keys())
common_events = {
k: (v1_events.get(k, 0), v2_events.get(k, 0)) for k in all_keys if k in v1_events and k in v2_events
}
only_in_v1 = {k: v1_events[k] for k in v1_events.keys() - v2_events.keys()}
only_in_v2 = {k: v2_events[k] for k in v2_events.keys() - v1_events.keys()}
# Get metadata counts
v1_metadata = get_session_metadata(team.pk, recording.session_id, "session_replay_events")
v2_metadata = get_session_metadata(team.pk, recording.session_id, "session_replay_events_v2_test")
# Track URLs for both versions
v1_urls: set[str] = set()
v1_first_url: str | None = None
v2_urls: set[str] = set()
v2_first_url: str | None = None
# Count events by type in v1
v1_click_count = 0
v1_mouse_activity_count = 0
v1_keypress_count = 0
v1_console_log_count = 0
v1_console_warn_count = 0
v1_console_error_count = 0
for snapshot in v1_snapshots:
data = snapshot["data"]
if is_click(data):
v1_click_count += 1
if is_mouse_activity(data):
v1_mouse_activity_count += 1
if is_keypress(data):
v1_keypress_count += 1
if is_console_log(data):
level = get_console_level(data)
if level in [
"log",
"info",
"count",
"timeEnd",
"trace",
"dir",
"dirxml",
"group",
"groupCollapsed",
"debug",
"timeLog",
]:
v1_console_log_count += 1
elif level in ["warn", "countReset"]:
v1_console_warn_count += 1
elif level in ["error", "assert"]:
v1_console_error_count += 1
else: # default to log level for unknown levels
v1_console_log_count += 1
# Extract URL
url = get_url_from_event(data)
if url:
if v1_first_url is None:
v1_first_url = url[:4096] if len(url) > 4096 else url
add_url(v1_urls, url)
# Count events by type in v2
v2_click_count = 0
v2_mouse_activity_count = 0
v2_keypress_count = 0
v2_console_log_count = 0
v2_console_warn_count = 0
v2_console_error_count = 0
for snapshot in v2_snapshots:
data = snapshot["data"]
if is_click(data):
v2_click_count += 1
if is_mouse_activity(data):
v2_mouse_activity_count += 1
if is_keypress(data):
v2_keypress_count += 1
if is_console_log(data):
level = get_console_level(data)
if level in [
"log",
"info",
"count",
"timeEnd",
"trace",
"dir",
"dirxml",
"group",
"groupCollapsed",
"debug",
"timeLog",
]:
v2_console_log_count += 1
elif level in ["warn", "countReset"]:
v2_console_warn_count += 1
elif level in ["error", "assert"]:
v2_console_error_count += 1
else: # default to log level for unknown levels
v2_console_log_count += 1
# Extract URL
url = get_url_from_event(data)
if url:
if v2_first_url is None:
v2_first_url = url[:4096] if len(url) > 4096 else url
add_url(v2_urls, url)
# Compare URLs
await logger.ainfo(
"URL comparison",
session_id=session_id,
team_id=team_id,
v1_first_url=v1_first_url,
v2_first_url=v2_first_url,
first_url_matches=v1_first_url == v2_first_url,
v1_url_count=len(v1_urls),
v2_url_count=len(v2_urls),
urls_in_both=len(v1_urls & v2_urls),
only_in_v1=sorted(v1_urls - v2_urls)[:5], # Show up to 5 examples
only_in_v2=sorted(v2_urls - v1_urls)[:5], # Show up to 5 examples
metadata_comparison={
"v1": {
"first_url": v1_metadata["first_url"],
"all_urls": v1_metadata["all_urls"],
"first_url_matches_snapshot": v1_metadata["first_url"] == v1_first_url,
"all_urls_match_snapshot": set(v1_metadata["all_urls"]) == v1_urls,
},
"v2": {
"first_url": v2_metadata["first_url"],
"all_urls": v2_metadata["all_urls"],
"first_url_matches_snapshot": v2_metadata["first_url"] == v2_first_url,
"all_urls_match_snapshot": set(v2_metadata["all_urls"]) == v2_urls,
},
},
)
# Log event counts and differences
await logger.ainfo(
"Total event count comparison",
session_id=session_id,
team_id=team_id,
v1_snapshot_count=len(v1_snapshots),
v2_snapshot_count=len(v2_snapshots),
v1_metadata_count=v1_metadata["event_count"],
v2_metadata_count=v2_metadata["event_count"],
snapshot_difference=len(v2_snapshots) - len(v1_snapshots),
metadata_difference=v2_metadata["event_count"] - v1_metadata["event_count"],
snapshot_vs_metadata_v1_difference=len(v1_snapshots) - v1_metadata["event_count"],
snapshot_vs_metadata_v2_difference=len(v2_snapshots) - v2_metadata["event_count"],
)
await logger.ainfo(
"Click count comparison",
session_id=session_id,
team_id=team_id,
v1_snapshot_count=v1_click_count,
v2_snapshot_count=v2_click_count,
v1_metadata_count=v1_metadata["click_count"],
v2_metadata_count=v2_metadata["click_count"],
snapshot_difference=v2_click_count - v1_click_count,
metadata_difference=v2_metadata["click_count"] - v1_metadata["click_count"],
snapshot_vs_metadata_v1_difference=v1_click_count - v1_metadata["click_count"],
snapshot_vs_metadata_v2_difference=v2_click_count - v2_metadata["click_count"],
)
await logger.ainfo(
"Mouse activity count comparison",
session_id=session_id,
team_id=team_id,
v1_snapshot_count=v1_mouse_activity_count,
v2_snapshot_count=v2_mouse_activity_count,
v1_metadata_count=v1_metadata["mouse_activity_count"],
v2_metadata_count=v2_metadata["mouse_activity_count"],
snapshot_difference=v2_mouse_activity_count - v1_mouse_activity_count,
metadata_difference=v2_metadata["mouse_activity_count"] - v1_metadata["mouse_activity_count"],
snapshot_vs_metadata_v1_difference=v1_mouse_activity_count - v1_metadata["mouse_activity_count"],
snapshot_vs_metadata_v2_difference=v2_mouse_activity_count - v2_metadata["mouse_activity_count"],
)
await logger.ainfo(
"Keypress count comparison",
session_id=session_id,
team_id=team_id,
v1_snapshot_count=v1_keypress_count,
v2_snapshot_count=v2_keypress_count,
v1_metadata_count=v1_metadata["keypress_count"],
v2_metadata_count=v2_metadata["keypress_count"],
snapshot_difference=v2_keypress_count - v1_keypress_count,
metadata_difference=v2_metadata["keypress_count"] - v1_metadata["keypress_count"],
snapshot_vs_metadata_v1_difference=v1_keypress_count - v1_metadata["keypress_count"],
snapshot_vs_metadata_v2_difference=v2_keypress_count - v2_metadata["keypress_count"],
)
await logger.ainfo(
"Console log count comparison",
session_id=session_id,
team_id=team_id,
v1_snapshot_count=v1_console_log_count,
v2_snapshot_count=v2_console_log_count,
v1_metadata_count=v1_metadata["console_log_count"],
v2_metadata_count=v2_metadata["console_log_count"],
snapshot_difference=v2_console_log_count - v1_console_log_count,
metadata_difference=v2_metadata["console_log_count"] - v1_metadata["console_log_count"],
snapshot_vs_metadata_v1_difference=v1_console_log_count - v1_metadata["console_log_count"],
snapshot_vs_metadata_v2_difference=v2_console_log_count - v2_metadata["console_log_count"],
)
await logger.ainfo(
"Console warn count comparison",
session_id=session_id,
team_id=team_id,
v1_snapshot_count=v1_console_warn_count,
v2_snapshot_count=v2_console_warn_count,
v1_metadata_count=v1_metadata["console_warn_count"],
v2_metadata_count=v2_metadata["console_warn_count"],
snapshot_difference=v2_console_warn_count - v1_console_warn_count,
metadata_difference=v2_metadata["console_warn_count"] - v1_metadata["console_warn_count"],
snapshot_vs_metadata_v1_difference=v1_console_warn_count - v1_metadata["console_warn_count"],
snapshot_vs_metadata_v2_difference=v2_console_warn_count - v2_metadata["console_warn_count"],
)
await logger.ainfo(
"Console error count comparison",
session_id=session_id,
team_id=team_id,
v1_snapshot_count=v1_console_error_count,
v2_snapshot_count=v2_console_error_count,
v1_metadata_count=v1_metadata["console_error_count"],
v2_metadata_count=v2_metadata["console_error_count"],
snapshot_difference=v2_console_error_count - v1_console_error_count,
metadata_difference=v2_metadata["console_error_count"] - v1_metadata["console_error_count"],
snapshot_vs_metadata_v1_difference=v1_console_error_count - v1_metadata["console_error_count"],
snapshot_vs_metadata_v2_difference=v2_console_error_count - v2_metadata["console_error_count"],
)
# Log event type comparison
await logger.ainfo(
"Event type comparison",
session_id=session_id,
team_id=team_id,
common_events_count=sum(min(v1, v2) for v1, v2 in common_events.values()),
common_events_by_type=group_events_by_type({k: min(v1, v2) for k, (v1, v2) in common_events.items()}),
only_in_v1_count=sum(only_in_v1.values()),
only_in_v1_by_type=group_events_by_type(only_in_v1),
only_in_v2_count=sum(only_in_v2.values()),
only_in_v2_by_type=group_events_by_type(only_in_v2),
duplicate_stats={
"v1_total_duplicates": sum(count - 1 for count in v1_events.values() if count > 1),
"v2_total_duplicates": sum(count - 1 for count in v2_events.values() if count > 1),
"events_with_different_counts": {k: (v1, v2) for k, (v1, v2) in common_events.items() if v1 != v2},
},
)
# Compare snapshot metadata
await logger.ainfo(
"Snapshot metadata comparison",
session_id=session_id,
team_id=team_id,
v1_snapshot_source=v1_metadata["snapshot_source"],
v2_snapshot_source=v2_metadata["snapshot_source"],
v1_snapshot_library=v1_metadata["snapshot_library"],
v2_snapshot_library=v2_metadata["snapshot_library"],
snapshot_source_matches=v1_metadata["snapshot_source"] == v2_metadata["snapshot_source"],
snapshot_library_matches=v1_metadata["snapshot_library"] == v2_metadata["snapshot_library"],
)
# Compare active milliseconds
v1_computed_active_ms, v2_computed_active_ms = compute_active_milliseconds(v1_snapshots)
v1_computed_active_ms_v2, v2_computed_active_ms_v2 = compute_active_milliseconds(v2_snapshots)
# Calculate percentage differences
def safe_percentage_diff(a: int, b: int) -> float:
if a == 0 and b == 0:
return 0.0
if a == 0:
return 100.0
return ((b - a) / a) * 100
await logger.ainfo(
"Active milliseconds comparison",
session_id=session_id,
team_id=team_id,
v1_snapshot_computed_v1_alg=v1_computed_active_ms,
v2_snapshot_computed_v1_alg=v2_computed_active_ms,
v1_snapshot_computed_v2_alg=v1_computed_active_ms_v2,
v2_snapshot_computed_v2_alg=v2_computed_active_ms_v2,
# Compare v1 vs v2 algorithms on v1 snapshots
v1_snapshots_alg_difference=v1_computed_active_ms_v2 - v1_computed_active_ms,
v1_snapshots_alg_difference_percentage=safe_percentage_diff(
v1_computed_active_ms, v1_computed_active_ms_v2
),
# Compare v1 vs v2 algorithms on v2 snapshots
v2_snapshots_alg_difference=v2_computed_active_ms_v2 - v2_computed_active_ms,
v2_snapshots_alg_difference_percentage=safe_percentage_diff(
v2_computed_active_ms, v2_computed_active_ms_v2
),
v1_metadata_value=v1_metadata["active_milliseconds"],
v2_metadata_value=v2_metadata["active_milliseconds"],
snapshot_difference_v1_alg=v2_computed_active_ms - v1_computed_active_ms,
snapshot_difference_percentage_v1_alg=safe_percentage_diff(
v1_computed_active_ms, v2_computed_active_ms
),
snapshot_difference_v2_alg=v2_computed_active_ms_v2 - v1_computed_active_ms_v2,
snapshot_difference_percentage_v2_alg=safe_percentage_diff(
v1_computed_active_ms_v2, v2_computed_active_ms_v2
),
metadata_difference=v2_metadata["active_milliseconds"] - v1_metadata["active_milliseconds"],
metadata_difference_percentage=safe_percentage_diff(
v1_metadata["active_milliseconds"], v2_metadata["active_milliseconds"]
),
v1_computed_vs_metadata_difference_v1_alg=v1_computed_active_ms - v1_metadata["active_milliseconds"],
v1_computed_vs_metadata_percentage_v1_alg=safe_percentage_diff(
v1_metadata["active_milliseconds"], v1_computed_active_ms
),
v2_computed_vs_metadata_difference_v1_alg=v2_computed_active_ms - v2_metadata["active_milliseconds"],
v2_computed_vs_metadata_percentage_v1_alg=safe_percentage_diff(
v2_metadata["active_milliseconds"], v2_computed_active_ms
),
v1_computed_vs_metadata_difference_v2_alg=v1_computed_active_ms_v2 - v1_metadata["active_milliseconds"],
v1_computed_vs_metadata_percentage_v2_alg=safe_percentage_diff(
v1_metadata["active_milliseconds"], v1_computed_active_ms_v2
),
v2_computed_vs_metadata_difference_v2_alg=v2_computed_active_ms_v2 - v2_metadata["active_milliseconds"],
v2_computed_vs_metadata_percentage_v2_alg=safe_percentage_diff(
v2_metadata["active_milliseconds"], v2_computed_active_ms_v2
),
)
# Analyze events per window
v1_window_counts = count_events_per_window(v1_events)
v2_window_counts = count_events_per_window(v2_events)
# Find all window IDs
all_window_ids = set(v1_window_counts.keys()) | set(v2_window_counts.keys())
window_comparison = []
# Handle None first, then sort the rest
sorted_window_ids = ([None] if None in all_window_ids else []) + sorted(
id for id in all_window_ids if id is not None
)
for window_id in sorted_window_ids:
window_comparison.append(
{
"window_id": window_id,
"v1_events": v1_window_counts.get(window_id, 0),
"v2_events": v2_window_counts.get(window_id, 0),
}
)
await logger.ainfo(
"Events per window comparison",
session_id=session_id,
team_id=team_id,
window_counts=window_comparison,
total_windows=len(all_window_ids),
windows_in_v1=len(v1_window_counts),
windows_in_v2=len(v2_window_counts),
windows_in_both=len(set(v1_window_counts.keys()) & set(v2_window_counts.keys())),
)
# Check for differences in metadata vs snapshots
metadata_differences = any(
[
v1_metadata["click_count"] != v1_click_count,
v1_metadata["mouse_activity_count"] != v1_mouse_activity_count,
v1_metadata["keypress_count"] != v1_keypress_count,
v1_metadata["console_log_count"] != v1_console_log_count,
v1_metadata["console_warn_count"] != v1_console_warn_count,
v1_metadata["console_error_count"] != v1_console_error_count,
v2_metadata["click_count"] != v2_click_count,
v2_metadata["mouse_activity_count"] != v2_mouse_activity_count,
v2_metadata["keypress_count"] != v2_keypress_count,
v2_metadata["console_log_count"] != v2_console_log_count,
v2_metadata["console_warn_count"] != v2_console_warn_count,
v2_metadata["console_error_count"] != v2_console_error_count,
]
)
# Check if sessions differ in any way
sessions_differ = any(
[
len(v1_snapshots) != len(v2_snapshots),
v1_click_count != v2_click_count,
v1_mouse_activity_count != v2_mouse_activity_count,
v1_keypress_count != v2_keypress_count,
v1_console_log_count != v2_console_log_count,
v1_console_warn_count != v2_console_warn_count,
v1_console_error_count != v2_console_error_count,
v1_urls != v2_urls,
v1_first_url != v2_first_url,
bool(only_in_v1),
bool(only_in_v2),
]
)
# Log session summary
await logger.ainfo(
"Session comparison summary",
session_id=session_id,
team_id=team_id,
sessions_differ=sessions_differ,
metadata_snapshot_differences=metadata_differences,
v1_snapshot_count=len(v1_snapshots),
v2_snapshot_count=len(v2_snapshots),
)
end_time = dt.datetime.now()
duration = (end_time - start_time).total_seconds()
# Log activity summary
await logger.ainfo(
"Completed sampled events comparison activity",
duration_seconds=duration,
sessions_processed=len(session_ids),
)
@dataclasses.dataclass(frozen=True)
class CompareSampledRecordingEventsWorkflowInputs:
"""Inputs for the recording events comparison workflow."""
started_after: str = dataclasses.field()
started_before: str = dataclasses.field()
window_seconds: int = dataclasses.field(default=300) # 5 minutes default
sample_size: int = dataclasses.field(default=100)
@property
def properties_to_log(self) -> dict[str, typing.Any]:
return {
"started_after": self.started_after,
"started_before": self.started_before,
"window_seconds": self.window_seconds,
"sample_size": self.sample_size,
}
@temporalio.workflow.defn(name="compare-sampled-recording-events")
class CompareSampledRecordingEventsWorkflow(PostHogWorkflow):
"""Workflow to compare recording events between v1 and v2 for sampled sessions."""
def __init__(self) -> None:
self.lock = asyncio.Lock()
self.paused = False
@staticmethod
def parse_inputs(inputs: list[str]) -> CompareSampledRecordingEventsWorkflowInputs:
"""Parse inputs from the management command CLI."""
loaded = json.loads(inputs[0])
for field in ["started_after", "started_before"]:
if field not in loaded:
raise ValueError(f"Required field {field} not provided")
loaded[field] = dt.datetime.fromisoformat(loaded[field])
window_seconds = loaded.get("window_seconds", 300)
if not isinstance(window_seconds, int) or window_seconds <= 0:
raise ValueError("window_seconds must be a positive integer")
sample_size = loaded.get("sample_size", 100)
if not isinstance(sample_size, int) or sample_size <= 0:
raise ValueError("sample_size must be a positive integer")
return CompareSampledRecordingEventsWorkflowInputs(
started_after=loaded["started_after"],
started_before=loaded["started_before"],
window_seconds=window_seconds,
sample_size=sample_size,
)
@staticmethod
def generate_time_windows(
start_time: dt.datetime, end_time: dt.datetime, window_seconds: int
) -> list[tuple[dt.datetime, dt.datetime]]:
"""Generate time windows between start and end time."""
windows = []
current = start_time
while current < end_time:
window_end = min(current + dt.timedelta(seconds=window_seconds), end_time)
windows.append((current, window_end))
current = window_end
return windows
@temporalio.workflow.run
async def run(self, inputs: CompareSampledRecordingEventsWorkflowInputs):
"""Run the comparison of recording events."""
await temporalio.workflow.wait_condition(lambda: not self.paused)
started_after = dt.datetime.fromisoformat(inputs.started_after)
started_before = dt.datetime.fromisoformat(inputs.started_before)
logger = LOGGER.bind()
workflow_start = dt.datetime.now()
logger.info(
"Starting sampled events comparison workflow",
started_after=started_after,
started_before=started_before,
window_seconds=inputs.window_seconds,
sample_size=inputs.sample_size,
)
# Generate time windows
windows = self.generate_time_windows(started_after, started_before, inputs.window_seconds)
logger.info("Generated %d time windows to process", len(windows))
# Process each window
for i, (window_start, window_end) in enumerate(windows, 1):
logger.info(
"Processing window %d/%d: %s to %s",
i,
len(windows),
window_start,
window_end,
)
activity_inputs = CompareSampledRecordingEventsActivityInputs(
started_after=window_start.isoformat(),
started_before=window_end.isoformat(),
sample_size=inputs.sample_size,
)
await temporalio.workflow.execute_activity(
compare_sampled_recording_events_activity,
activity_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=temporalio.common.RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=1,
non_retryable_error_types=[],
),
)
workflow_end = dt.datetime.now()
duration = (workflow_end - workflow_start).total_seconds()
logger.info(
"Completed sampled events comparison workflow",
duration_seconds=duration,
windows_processed=len(windows),
)
@temporalio.workflow.update
async def pause(self) -> None:
"""Signal handler for workflow to pause or unpause."""
async with self.lock:
if self.paused is True:
self.paused = False
else:
self.paused = True

View File

@@ -1,97 +0,0 @@
import datetime as dt
from typing import Any
from posthog.clickhouse.client import sync_execute
def get_sampled_session_ids(
started_after: dt.datetime,
started_before: dt.datetime,
sample_size: int,
) -> list[tuple[str, int]]: # [(session_id, team_id), ...]
"""Get a random sample of session IDs from the specified time range."""
query = """
SELECT DISTINCT session_id, team_id
FROM session_replay_events
WHERE min_first_timestamp >= %(started_after)s
AND max_last_timestamp <= %(started_before)s
ORDER BY rand() -- Random sampling
LIMIT %(sample_size)s
"""
results = sync_execute(
query,
{
"started_after": started_after.strftime("%Y-%m-%d %H:%M:%S"),
"started_before": started_before.strftime("%Y-%m-%d %H:%M:%S"),
"sample_size": sample_size,
},
)
return [(str(row[0]), int(row[1])) for row in results]
def get_session_metadata(team_id: int, session_id: str, table_name: str) -> dict[str, Any]:
"""Get metadata counts for a specific session from the specified table."""
query = """
SELECT
session_id,
team_id,
any(distinct_id) as distinct_id,
min(min_first_timestamp) as min_first_timestamp_agg,
max(max_last_timestamp) as max_last_timestamp_agg,
argMinMerge(first_url) as first_url,
groupUniqArrayArray(all_urls) as all_urls,
sum(click_count) as click_count,
sum(keypress_count) as keypress_count,
sum(mouse_activity_count) as mouse_activity_count,
sum(active_milliseconds) as active_milliseconds,
sum(console_log_count) as console_log_count,
sum(console_warn_count) as console_warn_count,
sum(console_error_count) as console_error_count,
sum(event_count) as event_count,
argMinMerge(snapshot_source) as snapshot_source,
argMinMerge(snapshot_library) as snapshot_library
FROM {table}
WHERE team_id = %(team_id)s
AND session_id = %(session_id)s
GROUP BY session_id, team_id
LIMIT 1
"""
result = sync_execute(
query.format(table=table_name),
{
"team_id": team_id,
"session_id": session_id,
},
)
if not result:
return {
"click_count": 0,
"mouse_activity_count": 0,
"keypress_count": 0,
"event_count": 0,
"console_log_count": 0,
"console_warn_count": 0,
"console_error_count": 0,
"first_url": None,
"all_urls": [],
"snapshot_source": None,
"snapshot_library": None,
"active_milliseconds": 0,
}
row = result[0]
return {
"click_count": row[7], # click_count index
"keypress_count": row[8], # keypress_count index
"mouse_activity_count": row[9], # mouse_activity_count index
"console_log_count": row[11], # console_log_count index
"console_warn_count": row[12], # console_warn_count index
"console_error_count": row[13], # console_error_count index
"event_count": row[14], # event_count index
"first_url": row[5], # first_url index
"all_urls": row[6], # all_urls index
"snapshot_source": row[15], # snapshot_source index
"snapshot_library": row[16], # snapshot_library index
"active_milliseconds": row[10], # active_milliseconds index
}

View File

@@ -1,235 +0,0 @@
from dataclasses import dataclass
from enum import IntEnum
from typing import Any, Optional
class RRWebEventType(IntEnum):
"""RRWeb event types ported from rrweb-types.ts"""
DomContentLoaded = 0
Load = 1
FullSnapshot = 2
IncrementalSnapshot = 3
Meta = 4
Custom = 5
Plugin = 6
class RRWebEventSource(IntEnum):
"""RRWeb event sources ported from rrweb-types.ts"""
Mutation = 0
MouseMove = 1
MouseInteraction = 2
Scroll = 3
ViewportResize = 4
Input = 5
TouchMove = 6
MediaInteraction = 7
StyleSheetRule = 8
CanvasMutation = 9
Font = 10
Log = 11
Drag = 12
StyleDeclaration = 13
# V1 implementation uses raw numbers
V1_ACTIVE_SOURCES = [1, 2, 3, 4, 5, 6, 7, 12]
# V2 implementation uses enum
V2_ACTIVE_SOURCES = [
RRWebEventSource.MouseMove,
RRWebEventSource.MouseInteraction,
RRWebEventSource.Scroll,
RRWebEventSource.ViewportResize,
RRWebEventSource.Input,
RRWebEventSource.TouchMove,
RRWebEventSource.MediaInteraction,
RRWebEventSource.Drag,
]
ACTIVITY_THRESHOLD_MS = 5000
@dataclass
class V2SegmentationEvent:
"""Simplified event with just the essential information for activity tracking"""
timestamp: int
is_active: bool
@dataclass
class V2RecordingSegment:
"""Represents a segment of recording with activity information"""
kind: str # 'window' | 'buffer' | 'gap'
start_timestamp: int # Epoch time that the segment starts
end_timestamp: int # Epoch time that the segment ends
duration_ms: int
is_active: bool
@dataclass
class V1RecordingSegment:
"""V1 implementation of recording segment"""
kind: str # 'window' | 'buffer' | 'gap'
start_timestamp: int
end_timestamp: int
duration_ms: int
is_active: bool
def is_v1_active_event(event: dict[str, Any]) -> bool:
"""V1 implementation of active event check"""
return event.get("type") == 3 and (event.get("data", {}).get("source", -1) in V1_ACTIVE_SOURCES)
def is_v2_active_event(event: dict[str, Any]) -> bool:
"""V2 implementation of active event check"""
event_type = event.get("type")
data = event.get("data")
source = data.get("source") if isinstance(data, dict) else None
return event_type == RRWebEventType.IncrementalSnapshot and source in V2_ACTIVE_SOURCES
def to_v2_segmentation_event(event: dict[str, Any]) -> V2SegmentationEvent:
"""Converts an RRWeb event to a simplified V2SegmentationEvent"""
data = event.get("data", {})
if not isinstance(data, dict) or "timestamp" not in data:
raise ValueError("Invalid event data - missing timestamp")
return V2SegmentationEvent(timestamp=data["timestamp"], is_active=is_v2_active_event(event["data"]))
def create_v1_segments(snapshots: list[dict[str, Any]]) -> list[V1RecordingSegment]:
"""V1 implementation of segment creation"""
segments: list[V1RecordingSegment] = []
active_segment: Optional[V1RecordingSegment] = None
last_active_event_timestamp = 0
for snapshot in snapshots:
event_is_active = is_v1_active_event(snapshot["data"])
if not isinstance(snapshot.get("data"), dict) or "timestamp" not in snapshot["data"]:
continue
timestamp = snapshot["data"]["timestamp"]
# When do we create a new segment?
# 1. If we don't have one yet
is_new_segment = active_segment is None
# 2. If it is currently inactive but a new "active" event comes in
if event_is_active and not (active_segment and active_segment.is_active):
is_new_segment = True
# 3. If it is currently active but no new active event has been seen for the activity threshold
if (
active_segment
and active_segment.is_active
and last_active_event_timestamp + ACTIVITY_THRESHOLD_MS < timestamp
):
is_new_segment = True
# NOTE: We have to make sure that we set this _after_ we use it
last_active_event_timestamp = timestamp if event_is_active else last_active_event_timestamp
if is_new_segment:
if active_segment:
segments.append(active_segment)
active_segment = V1RecordingSegment(
kind="window",
start_timestamp=timestamp,
end_timestamp=timestamp,
duration_ms=0,
is_active=event_is_active,
)
elif active_segment:
active_segment.end_timestamp = timestamp
active_segment.duration_ms = active_segment.end_timestamp - active_segment.start_timestamp
if active_segment:
segments.append(active_segment)
return segments
def create_v2_segments_from_events(segmentation_events: list[V2SegmentationEvent]) -> list[V2RecordingSegment]:
"""V2 implementation of segment creation"""
sorted_events = sorted(segmentation_events, key=lambda x: x.timestamp)
segments: list[V2RecordingSegment] = []
active_segment: Optional[V2RecordingSegment] = None
last_active_event_timestamp = 0
for event in sorted_events:
# When do we create a new segment?
# 1. If we don't have one yet
is_new_segment = active_segment is None
# 2. If it is currently inactive but a new "active" event comes in
if event.is_active and not (active_segment and active_segment.is_active):
is_new_segment = True
# 3. If it is currently active but no new active event has been seen for the activity threshold
if (
active_segment
and active_segment.is_active
and last_active_event_timestamp + ACTIVITY_THRESHOLD_MS < event.timestamp
):
is_new_segment = True
# NOTE: We have to make sure that we set this _after_ we use it
if event.is_active:
last_active_event_timestamp = event.timestamp
if is_new_segment:
if active_segment:
segments.append(active_segment)
active_segment = V2RecordingSegment(
kind="window",
start_timestamp=event.timestamp,
end_timestamp=event.timestamp,
duration_ms=0,
is_active=event.is_active,
)
elif active_segment:
active_segment.end_timestamp = event.timestamp
active_segment.duration_ms = active_segment.end_timestamp - active_segment.start_timestamp
if active_segment:
segments.append(active_segment)
return segments
def v2_active_milliseconds_from_events(segmentation_events: list[V2SegmentationEvent]) -> int:
"""V2 implementation: Calculates the total active time in milliseconds from a list of segmentation events"""
segments = create_v2_segments_from_events(segmentation_events)
return v2_active_milliseconds_from_segments(segments)
def v2_active_milliseconds_from_segments(segments: list[V2RecordingSegment]) -> int:
"""V2 implementation: Calculates total active milliseconds from segments"""
return sum(max(1, segment.duration_ms) if segment.is_active else 0 for segment in segments)
def v1_active_milliseconds(snapshots: list[dict[str, Any]]) -> int:
"""V1 implementation: Compute active milliseconds from a list of snapshots"""
segments = create_v1_segments(snapshots)
return sum(max(1, segment.duration_ms) if segment.is_active else 0 for segment in segments)
def v2_active_milliseconds(snapshots: list[dict[str, Any]]) -> int:
"""V2 implementation: Compute active milliseconds from a list of snapshots"""
segmentation_events = [to_v2_segmentation_event(event) for event in snapshots]
return v2_active_milliseconds_from_events(segmentation_events)
def compute_active_milliseconds(snapshots: list[dict[str, Any]]) -> tuple[int, int]:
"""Compute active milliseconds using both v1 and v2 implementations"""
v1_ms = v1_active_milliseconds(snapshots)
v2_ms = v2_active_milliseconds(snapshots)
return v1_ms, v2_ms

View File

@@ -1,134 +0,0 @@
import json
from typing import Any
def is_click(event: dict) -> bool:
"""Check if event is a click event."""
CLICK_TYPES = [2, 4, 9, 3] # Click, DblClick, TouchEnd, ContextMenu
return (
event.get("type") == 3 # RRWebEventType.IncrementalSnapshot
and event.get("data", {}).get("source") == 2 # RRWebEventSource.MouseInteraction
and event.get("data", {}).get("type") in CLICK_TYPES
)
def is_mouse_activity(event: dict) -> bool:
"""Check if event is a mouse activity event."""
MOUSE_ACTIVITY_SOURCES = [2, 1, 6] # MouseInteraction, MouseMove, TouchMove
return (
event.get("type") == 3 # RRWebEventType.IncrementalSnapshot
and event.get("data", {}).get("source") in MOUSE_ACTIVITY_SOURCES
)
def is_keypress(event: dict) -> bool:
"""Check if event is a keypress event."""
return (
event.get("type") == 3 # RRWebEventType.IncrementalSnapshot
and event.get("data", {}).get("source") == 5 # RRWebEventSource.Input
)
def is_console_log(event: dict) -> bool:
"""Check if event is a console log event."""
return (
event.get("type") == 6 # RRWebEventType.Plugin
and event.get("data", {}).get("plugin") == "rrweb/console@1"
)
def get_console_level(event: dict) -> str | None:
"""Get console log level from event."""
if not is_console_log(event):
return None
return event.get("data", {}).get("payload", {}).get("level")
def get_url_from_event(event: dict) -> str | None:
"""Extract URL from event using same logic as hrefFrom in rrweb-types.ts."""
data = event.get("data", {})
if not isinstance(data, dict):
return None
meta_href = data.get("href", "")
meta_href = meta_href.strip() if isinstance(meta_href, str) else ""
payload = data.get("payload", {})
payload_href = payload.get("href", "") if isinstance(payload, dict) else ""
payload_href = payload_href.strip() if isinstance(payload_href, str) else ""
return meta_href or payload_href or None
def add_url(url_set: set[str], url: str, max_url_length: int = 4 * 1024, max_urls: int = 25) -> None:
"""Add URL to set with same constraints as snappy-session-recorder.ts."""
if not url:
return
if len(url) > max_url_length:
url = url[:max_url_length]
if len(url_set) < max_urls:
url_set.add(url)
def count_events_per_window(events: dict[str, int]) -> dict[str | None, int]:
"""Count events per window ID."""
window_counts: dict[str | None, int] = {}
for event_json, count in events.items():
window_id, _ = json.loads(event_json)
# Convert "null" string to None if needed (in case of JSON serialization)
if isinstance(window_id, str) and window_id.lower() == "null":
window_id = None
window_counts[window_id] = window_counts.get(window_id, 0) + count
return window_counts
def group_events_by_type(events: dict[str, int]) -> dict[str, int]:
"""Group events by their type."""
type_counts: dict[str, int] = {}
for event, count in events.items():
_, data = json.loads(event)
event_type = str(data.get("type", "unknown"))
type_counts[event_type] = type_counts.get(event_type, 0) + count
return type_counts
def get_structure(obj: Any, max_depth: int = 10) -> Any:
"""Get the structure of an object without its values."""
if max_depth <= 0:
return "..."
if isinstance(obj, dict):
return {k: get_structure(v, max_depth - 1) for k, v in obj.items()}
elif isinstance(obj, list):
if not obj:
return []
# Just show structure of first item for arrays
return [get_structure(obj[0], max_depth - 1)]
elif isinstance(obj, str | int | float | bool):
return type(obj).__name__
elif obj is None:
return None
return type(obj).__name__
def transform_v2_snapshot(raw_snapshot: list) -> dict:
"""Transform v2 snapshot format [windowId, serializedEvent] into {window_id, data} format."""
if not isinstance(raw_snapshot, list) or len(raw_snapshot) != 2:
raise ValueError("Invalid v2 snapshot format")
window_id, event = raw_snapshot
return {"window_id": window_id, "data": event}
def transform_v1_snapshots(snapshots: list[dict]) -> list[dict]:
"""Transform v1 snapshots from [{windowId, data: [event]}] to [{windowId, data: event}]."""
flattened = []
for snapshot in snapshots:
window_id = snapshot.get("window_id")
data_array = snapshot.get("data", [])
if not isinstance(data_array, list):
continue
for event in data_array:
flattened.append({"window_id": window_id, "data": event})
return flattened

View File

@@ -1,124 +0,0 @@
import gzip
import json
from typing import Any, cast
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.session_recordings.session_recording_v2_service import list_blocks
from posthog.storage import object_storage
from posthog.storage.session_recording_v2_object_storage import client as v2_client
from posthog.temporal.session_recordings.session_comparer import transform_v1_snapshots, transform_v2_snapshot
def decompress_and_parse_gzipped_json(data: bytes) -> list[Any]:
try:
decompressed = gzip.decompress(data).decode("utf-8")
return parse_jsonl_with_broken_newlines(decompressed)
except Exception as e:
# If decompression fails, try parsing as plain JSON
# as some older recordings might not be compressed
try:
text = data.decode("utf-8")
return parse_jsonl_with_broken_newlines(text)
except Exception:
raise e
def find_line_break(text: str, pos: int) -> str:
"""Find the line break sequence at the given position."""
if text[pos : pos + 2] == "\r\n":
return "\r\n"
return "\n"
def parse_jsonl_with_broken_newlines(text: str) -> list[Any]:
"""Parse JSONL that might have broken newlines within JSON objects."""
results = []
buffer = ""
pos = 0
while pos < len(text):
# Find next line break
next_pos = text.find("\n", pos)
if next_pos == -1:
# No more line breaks, process remaining text
line = text[pos:]
if line.strip():
buffer = f"{buffer}{line}" if buffer else line
break
# Get the line break sequence for this line
line_break = find_line_break(text, next_pos - 1)
line = text[pos : next_pos + (2 if line_break == "\r\n" else 1) - 1]
if not line.strip():
pos = next_pos + len(line_break)
continue
buffer = f"{buffer}{line_break}{line}" if buffer else line
try:
parsed = json.loads(buffer)
results.append(parsed)
buffer = "" # Reset buffer after successful parse
except json.JSONDecodeError:
# If we can't parse, keep accumulating in buffer
pass
pos = next_pos + len(line_break)
# Try to parse any remaining buffer
if buffer:
try:
results.append(json.loads(buffer))
except json.JSONDecodeError:
pass # Discard unparseable final buffer
return results
def fetch_v1_snapshots(recording: SessionRecording) -> list[dict[str, Any]]:
"""Fetch and transform v1 snapshots for a recording."""
v1_snapshots = []
if recording.object_storage_path:
blob_prefix = recording.object_storage_path
blob_keys = object_storage.list_objects(cast(str, blob_prefix))
if blob_keys:
for full_key in blob_keys:
blob_key = full_key.replace(blob_prefix.rstrip("/") + "/", "")
file_key = f"{recording.object_storage_path}/{blob_key}"
snapshots_data = object_storage.read_bytes(file_key)
if snapshots_data:
raw_snapshots = decompress_and_parse_gzipped_json(snapshots_data)
v1_snapshots.extend(transform_v1_snapshots(raw_snapshots))
else:
# Try ingestion storage path
blob_prefix = recording.build_blob_ingestion_storage_path()
blob_keys = object_storage.list_objects(blob_prefix)
if blob_keys:
for full_key in blob_keys:
blob_key = full_key.replace(blob_prefix.rstrip("/") + "/", "")
file_key = f"{blob_prefix}/{blob_key}"
snapshots_data = object_storage.read_bytes(file_key)
if snapshots_data:
raw_snapshots = decompress_and_parse_gzipped_json(snapshots_data)
v1_snapshots.extend(transform_v1_snapshots(raw_snapshots))
return v1_snapshots
def fetch_v2_snapshots(recording: SessionRecording) -> list[dict[str, Any]]:
"""Fetch and transform v2 snapshots for a recording."""
v2_snapshots: list[dict[str, Any]] = []
blocks = list_blocks(recording)
if blocks:
for block in blocks:
try:
decompressed_block = v2_client().fetch_block(block.url)
if decompressed_block:
# Parse the block using the same line parsing logic as v1
raw_snapshots = parse_jsonl_with_broken_newlines(decompressed_block)
# Transform each snapshot to match v1 format
v2_snapshots.extend(transform_v2_snapshot(snapshot) for snapshot in raw_snapshots)
except Exception:
# Exception handling should be done by the caller
raise
return v2_snapshots

View File

@@ -147,11 +147,6 @@ from posthog.session_recordings.sql.session_replay_event_sql import (
DROP_SESSION_REPLAY_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_TABLE_SQL,
)
from posthog.session_recordings.sql.session_replay_event_v2_test_sql import (
DROP_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_SQL,
SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL,
)
from posthog.test.assert_faster_than import assert_faster_than
# Make sure freezegun ignores our utils class that times functions
@@ -1174,7 +1169,6 @@ def reset_clickhouse_database() -> None:
DROP_RAW_SESSION_WRITABLE_TABLE_SQL(),
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(),
DROP_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(),
DROP_SESSION_TABLE_SQL(),
DROP_WEB_STATS_SQL(),
DROP_WEB_BOUNCES_SQL(),
@@ -1207,7 +1201,6 @@ def reset_clickhouse_database() -> None:
SESSIONS_TABLE_SQL(),
SESSION_RECORDING_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_SQL(),
CREATE_CUSTOM_METRICS_COUNTER_EVENTS_TABLE,
WEB_BOUNCES_DAILY_SQL(),
WEB_BOUNCES_HOURLY_SQL(),
@@ -1230,7 +1223,6 @@ def reset_clickhouse_database() -> None:
DISTRIBUTED_SESSIONS_TABLE_SQL(),
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL(),
CREATE_CUSTOM_METRICS_COUNTERS_VIEW,
CUSTOM_METRICS_EVENTS_RECENT_LAG_VIEW(),
CUSTOM_METRICS_TEST_VIEW(),

View File

@@ -3,7 +3,7 @@ env =
DEBUG=1
TEST=1
DJANGO_SETTINGS_MODULE = posthog.settings
addopts = -p no:warnings --reuse-db --ignore=posthog/user_scripts --ignore=posthog/clickhouse/migrations/0097_session_replay_events_v2_test.py
addopts = -p no:warnings --reuse-db --ignore=posthog/user_scripts
markers =
ee