feat(cdp): adjust it to use cohorts for behavioral consumer (#40555)

Co-authored-by: Ben White <ben@posthog.com>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Meikel Ratz
2025-11-05 12:01:10 +01:00
committed by GitHub
parent 03c703d7d9
commit 6f34e5b92d
5 changed files with 1418 additions and 37 deletions

View File

@@ -2,7 +2,7 @@ import { mockProducerObserver } from '~/tests/helpers/mocks/producer.mock'
import { resetKafka } from '~/tests/helpers/kafka'
import { createAction, getFirstTeam, resetTestDatabase } from '../../../tests/helpers/sql'
import { buildInlineFiltersForCohorts, createCohort, getFirstTeam, resetTestDatabase } from '../../../tests/helpers/sql'
import { KAFKA_CDP_CLICKHOUSE_PREFILTERED_EVENTS } from '../../config/kafka-topics'
import { Hub, RawClickHouseEvent, Team } from '../../types'
import { closeHub, createHub } from '../../utils/db/hub'
@@ -85,6 +85,68 @@ const TEST_FILTERS = {
4,
2,
],
// Billing product activated filter
billingProductActivated: [
'_H',
1,
32,
'billing product activated',
32,
'event',
1,
1,
11,
32,
'platform_and_support',
32,
'product_key',
32,
'properties',
1,
2,
11,
32,
'teams-20240208',
32,
'plans__platform_and_support',
32,
'properties',
1,
2,
11,
3,
2,
3,
2,
],
// Product unsubscribed filter
productUnsubscribed: [
'_H',
1,
32,
'product unsubscribed',
32,
'event',
1,
1,
11,
32,
'platform_and_support',
32,
'product',
32,
'properties',
1,
2,
11,
3,
2,
],
// Person property is_organization_first_user filter
isOrgFirstUser: ['_H', 1, 29, 32, 'is_organization_first_user', 32, 'properties', 32, 'person', 1, 3, 11],
}
describe('CdpBehaviouralEventsConsumer', () => {
@@ -110,10 +172,38 @@ describe('CdpBehaviouralEventsConsumer', () => {
jest.restoreAllMocks()
})
describe('action matching and Kafka publishing', () => {
it('should publish pre-calculated events to Kafka when action matches', async () => {
// Create an action with Chrome + pageview filter
const actionId = await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview)
describe('cohort filter matching and Kafka publishing', () => {
it('should publish pre-calculated events to Kafka when cohort filter matches', async () => {
// Create a cohort with complex behavioral filter: pageview with browser event filter
const conditionHash = 'test_hash_001'
const filters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'AND',
values: [
{
key: '$pageview',
type: 'behavioral',
value: 'performed_event_multiple',
bytecode: TEST_FILTERS.chromePageview,
negation: false,
operator: 'gte',
event_type: 'events',
conditionHash: conditionHash,
event_filters: [
{ key: '$browser', type: 'event', value: 'Chrome', operator: 'exact' },
],
operator_value: 5,
explicit_datetime: '-30d',
},
],
},
],
},
})
await createCohort(hub.postgres, team.id, 'Test cohort', filters)
// Create a matching event
const personId = '550e8400-e29b-41d4-a716-446655440000'
@@ -140,7 +230,7 @@ describe('CdpBehaviouralEventsConsumer', () => {
// Parse messages which should create pre-calculated events
const events = await processor._parseKafkaBatch(messages)
// Should create one pre-calculated event for the matching action
// Should create one pre-calculated event for the matching cohort filter
expect(events).toHaveLength(1)
const preCalculatedEvent = events[0]
@@ -152,8 +242,8 @@ describe('CdpBehaviouralEventsConsumer', () => {
evaluation_timestamp: '2025-03-03 18:15:46.319',
person_id: personId,
distinct_id: distinctId,
condition: String(actionId),
source: `action_${actionId}`,
condition: conditionHash,
source: `cohort_filter_${conditionHash}`,
})
// Test publishing the events to Kafka
await processor['publishEvents'](events)
@@ -169,9 +259,16 @@ describe('CdpBehaviouralEventsConsumer', () => {
expect(publishedMessage.value).toEqual(preCalculatedEvent.payload)
})
it('should not publish to Kafka when action does not match', async () => {
// Create an action with Chrome + pageview filter
await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview)
it('should not publish to Kafka when cohort filter does not match', async () => {
// Create a cohort with Chrome + pageview filter
const conditionHash = 'test_hash_002'
const filters = buildInlineFiltersForCohorts({
bytecode: TEST_FILTERS.chromePageview,
conditionHash,
type: 'behavioral',
key: '$pageview',
})
await createCohort(hub.postgres, team.id, 'Test cohort', filters)
// Create a non-matching event (Firefox instead of Chrome)
const personId = '550e8400-e29b-41d4-a716-446655440000'
@@ -195,7 +292,7 @@ describe('CdpBehaviouralEventsConsumer', () => {
// Parse messages
const events = await processor._parseKafkaBatch(messages)
// Should not create any events since action doesn't match
// Should not create any events since cohort filter doesn't match
expect(events).toHaveLength(0)
// Verify nothing was published to Kafka
@@ -205,5 +302,394 @@ describe('CdpBehaviouralEventsConsumer', () => {
)
expect(kafkaMessages).toHaveLength(0)
})
it('should deduplicate filters with same conditionHash for a team', async () => {
// Create two cohorts with the same filter (same conditionHash)
const conditionHash = 'dedup_test_hash_001'
const filters = buildInlineFiltersForCohorts({ bytecode: TEST_FILTERS.pageview, conditionHash })
// Create first cohort
await createCohort(hub.postgres, team.id, 'First cohort', filters)
// Create second cohort with same filter
await createCohort(hub.postgres, team.id, 'Second cohort', filters)
// Create a matching event
const personId = '550e8400-e29b-41d4-a716-446655440000'
const distinctId = 'test-distinct-dedup'
const eventUuid = 'test-uuid-dedup'
const messages = [
{
value: Buffer.from(
JSON.stringify({
team_id: team.id,
event: '$pageview',
person_id: personId,
distinct_id: distinctId,
properties: JSON.stringify({}),
timestamp: '2025-03-03T10:15:46.319000-08:00',
uuid: eventUuid,
} as RawClickHouseEvent)
),
} as any,
]
// Parse messages
const events = await processor._parseKafkaBatch(messages)
// Should only create one event despite having two cohorts with same conditionHash
expect(events).toHaveLength(1)
const preCalculatedEvent = events[0]
expect(preCalculatedEvent.payload.condition).toBe(conditionHash)
expect(preCalculatedEvent.payload.source).toBe(`cohort_filter_${conditionHash}`)
})
it('should emit separate events for different cohorts with different conditionHashes', async () => {
// Create two cohorts with different complex filters
const conditionHash1 = 'multi_cohort_hash_001'
const conditionHash2 = 'multi_cohort_hash_002'
// First cohort: simple pageview behavioral filter
const filters1 = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'OR',
values: [
{
key: '$pageview',
type: 'behavioral',
value: 'performed_event',
bytecode: TEST_FILTERS.pageview,
negation: false,
event_type: 'events',
conditionHash: conditionHash1,
explicit_datetime: '-30d',
},
],
},
],
},
})
// Second cohort: complex behavioral filter with event_filters (AND structure)
const filters2 = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'AND',
values: [
{
key: '$pageview',
type: 'behavioral',
value: 'performed_event_multiple',
bytecode: TEST_FILTERS.chromePageview,
negation: false,
operator: 'gte',
event_type: 'events',
conditionHash: conditionHash2,
event_filters: [
{ key: '$browser', type: 'event', value: 'Chrome', operator: 'exact' },
],
operator_value: 5,
explicit_datetime: '-30d',
},
],
},
],
},
})
// Create first cohort (pageview only)
await createCohort(hub.postgres, team.id, 'Pageview cohort', filters1)
// Create second cohort (Chrome + pageview with event filters)
await createCohort(hub.postgres, team.id, 'Chrome pageview cohort', filters2)
// Create an event that matches both filters
const personId = '550e8400-e29b-41d4-a716-446655440000'
const distinctId = 'test-distinct-multi'
const eventUuid = 'test-uuid-multi'
const messages = [
{
value: Buffer.from(
JSON.stringify({
team_id: team.id,
event: '$pageview',
person_id: personId,
distinct_id: distinctId,
properties: JSON.stringify({ $browser: 'Chrome' }),
timestamp: '2025-03-03T10:15:46.319000-08:00',
uuid: eventUuid,
} as RawClickHouseEvent)
),
} as any,
]
// Parse messages
const events = await processor._parseKafkaBatch(messages)
// Should create two events - one for each matching cohort filter
expect(events).toHaveLength(2)
// Sort by condition hash for consistent testing
events.sort((a, b) => a.payload.condition.localeCompare(b.payload.condition))
const [event1, event2] = events
// First event should be for pageview filter
expect(event1.payload.condition).toBe(conditionHash1)
expect(event1.payload.source).toBe(`cohort_filter_${conditionHash1}`)
expect(event1.key).toBe(distinctId)
// Second event should be for Chrome + pageview filter
expect(event2.payload.condition).toBe(conditionHash2)
expect(event2.payload.source).toBe(`cohort_filter_${conditionHash2}`)
expect(event2.key).toBe(distinctId)
})
it('should handle complex billing cohort filter with OR/AND structure', async () => {
const filters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'AND',
values: [
{
key: 'billing product activated',
type: 'behavioral',
value: 'performed_event',
bytecode: TEST_FILTERS.billingProductActivated,
negation: false,
event_type: 'events',
conditionHash: '2946b8444e88565c',
event_filters: [
{
key: 'product_key',
type: 'event',
value: ['platform_and_support'],
operator: 'exact',
},
{
key: 'plans__platform_and_support',
type: 'event',
value: ['teams-20240208'],
operator: 'exact',
},
],
explicit_datetime: '-30d',
},
{
key: 'product unsubscribed',
type: 'behavioral',
value: 'performed_event',
bytecode: TEST_FILTERS.productUnsubscribed,
negation: true,
event_type: 'events',
conditionHash: '4c6bb89ec315ba80',
event_filters: [
{
key: 'product',
type: 'event',
value: ['platform_and_support'],
operator: 'exact',
},
],
explicit_datetime: '-30d',
},
{
key: 'is_organization_first_user',
type: 'person',
value: ['true'],
bytecode: TEST_FILTERS.isOrgFirstUser,
negation: false,
operator: 'exact',
conditionHash: '7937ba56a3e6348a',
},
],
},
],
},
})
await createCohort(hub.postgres, team.id, 'Billing Product Cohort', filters)
// Test 1: Event that matches billing product activated filter
const personId1 = '950e8400-e29b-41d4-a716-446655440001'
const distinctId1 = 'billing-cohort-test-1'
const eventUuid1 = 'billing-cohort-uuid-1'
const timestamp1 = '2025-03-03T17:00:00.000000-08:00'
const messages1 = [
{
value: Buffer.from(
JSON.stringify({
team_id: team.id,
event: 'billing product activated',
person_id: personId1,
distinct_id: distinctId1,
properties: JSON.stringify({
product_key: 'platform_and_support',
plans__platform_and_support: 'teams-20240208',
}),
timestamp: timestamp1,
uuid: eventUuid1,
} as RawClickHouseEvent)
),
} as any,
]
const events1 = await processor._parseKafkaBatch(messages1)
expect(events1).toHaveLength(1)
const preCalculatedEvent1 = events1[0]
expect(preCalculatedEvent1.key).toBe(distinctId1)
expect(preCalculatedEvent1.payload).toMatchObject({
uuid: eventUuid1,
team_id: team.id,
person_id: personId1,
distinct_id: distinctId1,
condition: '2946b8444e88565c',
source: 'cohort_filter_2946b8444e88565c',
})
})
it('should not process person property filters as they are filtered out', async () => {
// Create a cohort with person property filter
const filters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'AND',
values: [
{
key: 'is_organization_first_user',
type: 'person', // This type is filtered out
value: ['true'],
bytecode: TEST_FILTERS.isOrgFirstUser,
negation: false,
operator: 'exact',
conditionHash: 'person_prop_test_001',
},
],
},
],
},
})
await createCohort(hub.postgres, team.id, 'First Org User Cohort', filters)
const personId = '850e8400-e29b-41d4-a716-446655440002'
const distinctId = 'person-cohort-test-1'
const eventUuid = 'person-cohort-uuid-1'
const timestamp = '2025-03-03T18:00:00.000000-08:00'
const messages = [
{
value: Buffer.from(
JSON.stringify({
team_id: team.id,
event: 'any event',
person_id: personId,
distinct_id: distinctId,
properties: JSON.stringify({}),
person_properties: JSON.stringify({
is_organization_first_user: 'true',
}),
timestamp,
uuid: eventUuid,
} as RawClickHouseEvent)
),
} as any,
]
const events = await processor._parseKafkaBatch(messages)
// Should NOT create any events since person filters are filtered out
expect(events).toHaveLength(0)
})
it('should produce events for negated filters', async () => {
// negated events will produce matching events
const filters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'AND',
values: [
{
key: 'product unsubscribed',
type: 'behavioral',
value: 'performed_event',
bytecode: TEST_FILTERS.productUnsubscribed,
negation: true, // Negation flag is stored but not processed by consumer
event_type: 'events',
conditionHash: 'negated_unsub_test',
event_filters: [
{
key: 'product',
type: 'event',
value: ['platform_and_support'],
operator: 'exact',
},
],
explicit_datetime: '-30d',
},
],
},
],
},
})
await createCohort(hub.postgres, team.id, 'Not Unsubscribed Cohort', filters)
const personId = '750e8400-e29b-41d4-a716-446655440003'
const distinctId = 'negation-test-1'
const eventUuid = 'negation-uuid-1'
const timestamp = '2025-03-03T19:00:00.000000-08:00'
// Send a product unsubscribed event
const messages = [
{
value: Buffer.from(
JSON.stringify({
team_id: team.id,
event: 'product unsubscribed',
person_id: personId,
distinct_id: distinctId,
properties: JSON.stringify({
product: 'platform_and_support',
}),
timestamp,
uuid: eventUuid,
} as RawClickHouseEvent)
),
} as any,
]
const events = await processor._parseKafkaBatch(messages)
// Should create an event because consumer doesn't handle negation
// It just evaluates the bytecode which will return true for matching event
expect(events).toHaveLength(1)
const preCalculatedEvent = events[0]
expect(preCalculatedEvent.key).toBe(distinctId)
expect(preCalculatedEvent.payload).toMatchObject({
uuid: eventUuid,
team_id: team.id,
person_id: personId,
distinct_id: distinctId,
condition: 'negated_unsub_test',
source: 'cohort_filter_negated_unsub_test',
})
})
})
})

View File

@@ -2,7 +2,10 @@ import { Message } from 'node-rdkafka'
import { Histogram } from 'prom-client'
import { instrumentFn, instrumented } from '~/common/tracing/tracing-utils'
import { Action, ActionManagerCDP } from '~/utils/action-manager-cdp'
import {
RealtimeSupportedFilter,
RealtimeSupportedFilterManagerCDP,
} from '~/utils/realtime-supported-filter-manager-cdp'
import { KAFKA_CDP_CLICKHOUSE_PREFILTERED_EVENTS, KAFKA_EVENTS_JSON } from '../../config/kafka-topics'
import { KafkaConsumer } from '../../kafka/consumer'
@@ -39,12 +42,12 @@ export const histogramBatchProcessingSteps = new Histogram({
export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
protected name = 'CdpBehaviouralEventsConsumer'
private kafkaConsumer: KafkaConsumer
private actionManager: ActionManagerCDP
private realtimeSupportedFilterManager: RealtimeSupportedFilterManagerCDP
constructor(hub: Hub, topic: string = KAFKA_EVENTS_JSON, groupId: string = 'cdp-behavioural-events-consumer') {
super(hub)
this.kafkaConsumer = new KafkaConsumer({ groupId, topic })
this.actionManager = new ActionManagerCDP(hub.db.postgres)
this.realtimeSupportedFilterManager = new RealtimeSupportedFilterManagerCDP(hub.db.postgres)
}
@instrumented('cdpBehaviouralEventsConsumer.publishEvents')
@@ -69,24 +72,25 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
}
}
// Evaluate if event matches action using bytecode execution
private async evaluateEventAgainstAction(
// Evaluate if event matches realtime supported filter using bytecode execution
private async evaluateEventAgainstRealtimeSupportedFilter(
filterGlobals: HogFunctionFilterGlobals,
action: Action
filter: RealtimeSupportedFilter
): Promise<boolean> {
if (!action.bytecode) {
if (!filter.bytecode) {
return false
}
try {
const { execResult } = await execHog(action.bytecode, {
const { execResult } = await execHog(filter.bytecode, {
globals: filterGlobals,
})
return execResult?.result ?? false
} catch (error) {
logger.error('Error executing action bytecode', {
actionId: action.id,
logger.error('Error executing realtime supported filter bytecode', {
conditionHash: filter.conditionHash,
cohortId: filter.cohort_id,
error,
})
return false
@@ -125,17 +129,17 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
}
}
// Step 2: Fetch all actions for all teams in one query
// Step 2: Fetch all realtime supported filters for all teams in one query
const teamIds = Array.from(eventsByTeam.keys())
const actionsByTeam = await this.actionManager.getActionsForTeams(teamIds)
const filtersByTeam = await this.realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeams(teamIds)
// Step 3: Process each team's events with their actions
// Step 3: Process each team's events with their realtime supported filters
for (const [teamId, teamEvents] of Array.from(eventsByTeam.entries())) {
try {
const actions = actionsByTeam[String(teamId)] || []
const filters = filtersByTeam[String(teamId)] || []
if (actions.length === 0) {
// Skip teams with no actions
if (filters.length === 0) {
// Skip teams with no realtime supported filters
continue
}
@@ -148,17 +152,20 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
.replace('T', ' ')
.replace('Z', '')
// Convert to filter globals for action evaluation
// Convert to filter globals for filter evaluation
const filterGlobals = convertClickhouseRawEventToFilterGlobals(clickHouseEvent)
// Evaluate event against each action for this team
for (const action of actions) {
const matches = await this.evaluateEventAgainstAction(filterGlobals, action)
// Evaluate event against each realtime supported filter for this team
for (const filter of filters) {
const matches = await this.evaluateEventAgainstRealtimeSupportedFilter(
filterGlobals,
filter
)
// Only publish if event matches the action (don't publish non-matches)
// Only publish if event matches the filter (don't publish non-matches)
if (matches) {
// Hash the action bytecode/id as the condition identifier
// This ensures consistent condition hashes for the same action
// Use the filter's conditionHash as the condition identifier
// This ensures consistent condition hashes across cohorts
const preCalculatedEvent: ProducedEvent = {
key: filterGlobals.distinct_id,
@@ -168,8 +175,8 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
evaluation_timestamp: evaluationTimestamp,
person_id: clickHouseEvent.person_id!,
distinct_id: filterGlobals.distinct_id,
condition: String(action.id),
source: `action_${action.id}`,
condition: filter.conditionHash,
source: `cohort_filter_${filter.conditionHash}`,
},
}

View File

@@ -0,0 +1,651 @@
import {
buildInlineFiltersForCohorts,
createCohort,
createTeam,
getFirstTeam,
resetTestDatabase,
} from '../../tests/helpers/sql'
import { defaultConfig } from '../config/config'
import { Hub, Team } from '../types'
import { closeHub, createHub } from './db/hub'
import { PostgresRouter } from './db/postgres'
import { RealtimeSupportedFilterManagerCDP } from './realtime-supported-filter-manager-cdp'
describe('RealtimeSupportedFilterManagerCDP()', () => {
let hub: Hub
let realtimeSupportedFilterManager: RealtimeSupportedFilterManagerCDP
let postgres: PostgresRouter
let teamId: Team['id']
let fetchRealtimeSupportedFiltersSpy: jest.SpyInstance
beforeEach(async () => {
const now = Date.now()
jest.spyOn(Date, 'now').mockImplementation(() => now)
hub = await createHub()
await resetTestDatabase()
postgres = new PostgresRouter(defaultConfig)
realtimeSupportedFilterManager = new RealtimeSupportedFilterManagerCDP(postgres)
const team = await getFirstTeam(hub)
teamId = team.id
fetchRealtimeSupportedFiltersSpy = jest.spyOn(
realtimeSupportedFilterManager as any,
'fetchRealtimeSupportedFilters'
)
})
afterEach(async () => {
await closeHub(hub)
})
describe('getRealtimeSupportedFiltersForTeam()', () => {
it('returns empty array if no realtime cohorts exist', async () => {
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toEqual([])
})
it('returns realtime supported filters for a team', async () => {
const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11]
const conditionHash = 'test_hash_001'
const filters = buildInlineFiltersForCohorts({ bytecode, conditionHash, type: 'event', key: '$browser' })
// Create a realtime cohort
const cohortId = await createCohort(postgres, teamId, 'Test Cohort', filters)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toHaveLength(1)
expect(result[0]).toMatchObject({
conditionHash: conditionHash,
bytecode: bytecode,
team_id: teamId,
cohort_id: cohortId,
})
})
it('filters out deleted cohorts', async () => {
const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11]
const filters = buildInlineFiltersForCohorts({
bytecode,
conditionHash: 'test_hash_001',
type: 'event',
key: '$browser',
})
// Create active cohort
await createCohort(postgres, teamId, 'Active Cohort', filters)
// Create deleted cohort
await createCohort(postgres, teamId, 'Deleted Cohort', filters, { deleted: true })
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toHaveLength(1)
expect(result[0].cohort_id).not.toBe('Deleted Cohort')
})
it('filters out cohorts without filters', async () => {
// Create cohort without filters (uses default empty filters)
await createCohort(postgres, teamId, 'No Filters Cohort', null)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toEqual([])
})
it('filters out non-realtime cohorts', async () => {
const bytecode = ['_H', 1, 32, 'test']
const filters = buildInlineFiltersForCohorts({ bytecode, conditionHash: 'test_hash_001' })
// Create behavioral cohort (not realtime)
await createCohort(postgres, teamId, 'Behavioral Cohort', filters, { cohort_type: 'behavioral' })
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toEqual([])
})
it('deduplicates filters by conditionHash across complex nested structures', async () => {
const combinedBytecode = [
'_H',
1,
32,
'$pageview',
32,
'event',
1,
1,
11,
31,
32,
'$browser',
32,
'properties',
1,
2,
12,
31,
32,
'$browser_language',
32,
'properties',
1,
2,
12,
3,
2,
3,
2,
]
const conditionHash = 'bcdc95b22cf3e527'
// Complex filter structure matching the user's example
const filters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'AND',
values: [
{
key: '$pageview',
type: 'behavioral',
value: 'performed_event_multiple',
bytecode: combinedBytecode,
negation: false,
operator: 'exact',
event_type: 'events',
conditionHash: conditionHash,
event_filters: [
{ key: '$browser', type: 'event', value: 'is_set', operator: 'is_set' },
{
key: '$browser_language',
type: 'event',
value: 'is_set',
operator: 'is_set',
},
],
operator_value: 5,
explicit_datetime: '-30d',
},
],
},
],
},
})
// Create two cohorts with the same conditionHash in complex structures
await createCohort(postgres, teamId, 'Cohort 1', filters)
await createCohort(postgres, teamId, 'Cohort 2', filters)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toHaveLength(1) // Should be deduplicated
expect(result[0].conditionHash).toBe(conditionHash)
expect(result[0].bytecode).toEqual(combinedBytecode)
})
it('handles multiple filters in single cohort with complex nested structure', async () => {
// Complex filter: behavioral filter with event_filters (combined bytecode)
const behavioralWithEventFilterBytecode = [
'_H',
1,
32,
'$pageview',
32,
'event',
1,
1,
11,
31,
32,
'$browser',
32,
'properties',
1,
2,
12,
3,
2,
]
const pageviewOnlyBytecode = ['_H', 1, 32, '$pageleave', 32, 'event', 1, 1, 11]
const filters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'AND',
values: [
{
key: '$pageview',
type: 'behavioral',
value: 'performed_event_multiple',
bytecode: behavioralWithEventFilterBytecode,
negation: false,
operator: 'gte',
event_type: 'events',
conditionHash: '512ef57e6f504fc6',
event_filters: [
{ key: '$browser', type: 'event', value: 'is_set', operator: 'is_set' },
],
operator_value: 5,
explicit_datetime: '-30d',
},
{
key: '$pageleave',
type: 'behavioral',
value: 'performed_event',
bytecode: pageviewOnlyBytecode,
negation: false,
event_type: 'events',
conditionHash: 'e0418e34fcd847e5',
explicit_datetime: '-30d',
},
],
},
],
},
})
await createCohort(postgres, teamId, 'Multi-Filter Cohort', filters)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toHaveLength(2)
expect(result.map((f) => f.conditionHash).sort()).toEqual(['512ef57e6f504fc6', 'e0418e34fcd847e5'])
// Verify the combined bytecode for the behavioral filter with event_filters
const behavioralFilter = result.find((f) => f.conditionHash === '512ef57e6f504fc6')
expect(behavioralFilter?.bytecode).toEqual(behavioralWithEventFilterBytecode)
})
it('filters out person property bytecodes explicitly via type in complex structure', async () => {
const personPropBytecode = ['_H', 1, 31, 32, '$host', 32, 'properties', 32, 'person', 1, 3, 12]
const behavioralBytecode = ['_H', 1, 32, '$pageview', 32, 'event', 1, 1, 11]
// Complex structure with both person and behavioral filters - only behavioral should be extracted
const filters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'AND',
values: [
{
key: '$pageview',
type: 'behavioral',
value: 'performed_event',
bytecode: behavioralBytecode,
negation: false,
event_type: 'events',
conditionHash: 'e0418e34fcd847e5',
explicit_datetime: '-30d',
},
{
key: '$host',
type: 'person',
bytecode: personPropBytecode,
negation: false,
operator: 'is_set',
conditionHash: '30b9607b69c556bf',
},
],
},
],
},
})
await createCohort(postgres, teamId, 'Mixed Filters Cohort', filters)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
// Should only return the behavioral filter, not the person property filter
expect(result).toHaveLength(1)
expect(result[0].conditionHash).toBe('e0418e34fcd847e5')
expect(result[0].bytecode).toEqual(behavioralBytecode)
})
it('filters out cohort filters and only keeps event filters', async () => {
// This test uses the exact structure from the user's example
const filters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'OR',
values: [
{
key: 'id',
type: 'cohort',
value: 41,
bytecode: ['_H', 1, 33, 41, 2, 'inCohort', 1],
negation: false,
conditionHash: '5e6d68bd7c7babae',
},
],
},
{
type: 'OR',
values: [
{
key: '$pageview',
type: 'behavioral',
value: 'performed_event',
bytecode: ['_H', 1, 32, '$pageview', 32, 'event', 1, 1, 11],
negation: false,
event_type: 'events',
conditionHash: 'f9c616030a87e68f',
explicit_datetime: '-30d',
},
],
},
],
},
})
await createCohort(postgres, teamId, 'Cohort with InCohort Filter', filters)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
// Should filter out the cohort filter and only return the event filter
expect(result).toHaveLength(1)
expect(result[0].conditionHash).toBe('f9c616030a87e68f')
expect(result[0].bytecode).toEqual(['_H', 1, 32, '$pageview', 32, 'event', 1, 1, 11])
// Verify the cohort filter was filtered out
const cohortFilter = result.find((f) => f.conditionHash === '5e6d68bd7c7babae')
expect(cohortFilter).toBeUndefined()
})
it('handles complex OR structure with multiple filter groups', async () => {
// Test structure with OR at top level containing multiple groups (matching user's second example)
const pageviewWithBrowserBytecode = [
'_H',
1,
32,
'$pageview',
32,
'event',
1,
1,
11,
31,
32,
'$browser',
32,
'properties',
1,
2,
12,
3,
2,
]
const pageleaveBytecode = ['_H', 1, 32, '$pageleave', 32, 'event', 1, 1, 11]
const groupidentifyBytecode = [
'_H',
1,
32,
'$groupidentify',
32,
'event',
1,
1,
11,
31,
32,
'id',
32,
'properties',
1,
2,
12,
3,
2,
]
const filters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'AND',
values: [
{
key: '$pageview',
type: 'behavioral',
value: 'performed_event_multiple',
bytecode: pageviewWithBrowserBytecode,
negation: false,
operator: 'gte',
event_type: 'events',
conditionHash: '512ef57e6f504fc6',
event_filters: [
{ key: '$browser', type: 'event', value: 'is_set', operator: 'is_set' },
],
operator_value: 5,
explicit_datetime: '-30d',
},
{
key: '$pageleave',
type: 'behavioral',
value: 'performed_event',
bytecode: pageleaveBytecode,
negation: false,
event_type: 'events',
conditionHash: 'e0418e34fcd847e5',
explicit_datetime: '-30d',
},
],
},
{
type: 'OR',
values: [
{
key: '$groupidentify',
type: 'behavioral',
value: 'performed_event',
bytecode: groupidentifyBytecode,
negation: false,
event_type: 'events',
conditionHash: 'f0bbe0140a9cfe05',
event_filters: [{ key: 'id', type: 'event', value: 'is_set', operator: 'is_set' }],
explicit_datetime: '-30d',
},
],
},
],
},
})
await createCohort(postgres, teamId, 'Complex OR Cohort', filters)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toHaveLength(3)
const hashes = result.map((f) => f.conditionHash).sort()
expect(hashes).toEqual(['512ef57e6f504fc6', 'e0418e34fcd847e5', 'f0bbe0140a9cfe05'])
// Verify all bytecodes are correctly extracted
expect(result.find((f) => f.conditionHash === '512ef57e6f504fc6')?.bytecode).toEqual(
pageviewWithBrowserBytecode
)
expect(result.find((f) => f.conditionHash === 'e0418e34fcd847e5')?.bytecode).toEqual(pageleaveBytecode)
expect(result.find((f) => f.conditionHash === 'f0bbe0140a9cfe05')?.bytecode).toEqual(groupidentifyBytecode)
})
it('handles malformed filters gracefully', async () => {
const validBytecode = ['_H', 1, 32, 'test']
const validFilters = buildInlineFiltersForCohorts({ bytecode: validBytecode, conditionHash: 'valid_hash' })
// Create cohort with valid filters
await createCohort(postgres, teamId, 'Valid Cohort', validFilters)
// Create cohort with malformed filters (missing bytecode/conditionHash)
const malformedFilters = JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'OR',
values: [
{
key: '$test',
type: 'behavioral',
// Missing bytecode and conditionHash
},
],
},
],
},
})
await createCohort(postgres, teamId, 'Malformed Cohort', malformedFilters)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toHaveLength(1) // Only valid filter should be returned
expect(result[0].conditionHash).toBe('valid_hash')
})
it('caches filters for subsequent calls', async () => {
const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11]
const filters = buildInlineFiltersForCohorts({
bytecode,
conditionHash: 'cached_hash',
type: 'event',
key: '$browser',
})
await createCohort(postgres, teamId, 'Cached Cohort', filters)
// First call
const result1 = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result1).toHaveLength(1)
expect(fetchRealtimeSupportedFiltersSpy).toHaveBeenCalledTimes(1)
// Second call should use cache
const result2 = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result2).toHaveLength(1)
expect(fetchRealtimeSupportedFiltersSpy).toHaveBeenCalledTimes(1)
})
it('returns empty array for non-existent team', async () => {
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(99999)
expect(result).toEqual([])
})
})
describe('getRealtimeSupportedFiltersForTeams()', () => {
it('returns filters for multiple teams', async () => {
const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11]
const filters1 = buildInlineFiltersForCohorts({
bytecode,
conditionHash: 'team1_hash',
type: 'event',
key: '$browser',
})
const filters2 = buildInlineFiltersForCohorts({
bytecode,
conditionHash: 'team2_hash',
type: 'event',
key: '$browser',
})
// Create another team
const team = await hub.teamManager.getTeam(teamId)
const team2Id = await createTeam(postgres, team!.organization_id)
// Create cohorts for both teams
await createCohort(postgres, teamId, 'Team 1 Cohort', filters1)
await createCohort(postgres, team2Id, 'Team 2 Cohort', filters2)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeams([teamId, team2Id])
expect(result[String(teamId)]).toHaveLength(1)
expect(result[String(teamId)]![0].conditionHash).toBe('team1_hash')
expect(result[String(team2Id)]).toHaveLength(1)
expect(result[String(team2Id)]![0].conditionHash).toBe('team2_hash')
})
it('returns empty arrays for teams with no realtime cohorts', async () => {
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeams([teamId, 99999])
expect(result[String(teamId)]).toEqual([])
expect(result['99999']).toEqual([])
})
it('efficiently loads multiple teams with single database call', async () => {
const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11]
const filters = buildInlineFiltersForCohorts({
bytecode,
conditionHash: 'test_hash',
type: 'event',
key: '$browser',
})
await createCohort(postgres, teamId, 'Test Cohort', filters)
const promises = [
realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId),
realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId),
realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(99999),
]
const results = await Promise.all(promises)
expect(fetchRealtimeSupportedFiltersSpy).toHaveBeenCalledTimes(1)
expect(results[0]).toHaveLength(1)
expect(results[1]).toHaveLength(1)
expect(results[2]).toHaveLength(0)
})
it('deduplicates filters across multiple teams', async () => {
const bytecode = ['_H', 1, 32, 'shared', 32, 'filter']
const sharedHash = 'shared_condition_hash'
const filters = buildInlineFiltersForCohorts({ bytecode, conditionHash: sharedHash })
// Create another team
const team = await hub.teamManager.getTeam(teamId)
const team2Id = await createTeam(postgres, team!.organization_id)
// Create cohorts with same conditionHash for both teams
await createCohort(postgres, teamId, 'Team 1 Cohort', filters)
await createCohort(postgres, team2Id, 'Team 2 Cohort', filters)
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeams([teamId, team2Id])
// Should be deduplicated - only one filter per team even though they have same hash
expect(result[String(teamId)]).toHaveLength(1)
expect(result[String(team2Id)]).toHaveLength(1)
expect(result[String(teamId)]![0].conditionHash).toBe(sharedHash)
expect(result[String(team2Id)]![0].conditionHash).toBe(sharedHash)
})
})
describe('fetchRealtimeSupportedFilters()', () => {
it('handles empty team ID array', async () => {
const result = await (realtimeSupportedFilterManager as any).fetchRealtimeSupportedFilters([])
expect(result).toEqual({})
})
it('handles invalid team IDs', async () => {
const result = await (realtimeSupportedFilterManager as any).fetchRealtimeSupportedFilters([
'invalid',
'NaN',
])
expect(result).toEqual({})
})
it('orders cohorts by team_id and created_at DESC', async () => {
const bytecode = ['_H', 1, 32, 'test']
// Create cohorts with different created_at times
const olderTime = new Date(Date.now() - 3600000).toISOString() // 1 hour ago
const newerTime = new Date().toISOString()
const olderFilters = buildInlineFiltersForCohorts({ bytecode, conditionHash: 'older_hash' })
const newerFilters = buildInlineFiltersForCohorts({ bytecode, conditionHash: 'newer_hash' })
await createCohort(postgres, teamId, 'Older Cohort', olderFilters, {
created_at: olderTime,
})
await createCohort(postgres, teamId, 'Newer Cohort', newerFilters, {
created_at: newerTime,
})
const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId)
expect(result).toHaveLength(2)
expect(result[0].conditionHash).toBe('newer_hash') // Should be first due to DESC order
expect(result[1].conditionHash).toBe('older_hash')
})
})
})

View File

@@ -0,0 +1,154 @@
import { PostgresRouter, PostgresUse } from './db/postgres'
import { LazyLoader } from './lazy-loader'
export interface RealtimeSupportedFilter {
conditionHash: string // The 16-char SHA256 hash from the filter
bytecode: any // HogQL bytecode for execution
team_id: number
cohort_id: number // For tracking which cohort this filter belongs to
}
interface CohortRow {
cohort_id: number
team_id: number
filters: any | null // JSON object (PostgreSQL deserializes JSON/JSONB columns automatically)
}
export class RealtimeSupportedFilterManagerCDP {
private lazyLoader: LazyLoader<RealtimeSupportedFilter[]>
constructor(private postgres: PostgresRouter) {
this.lazyLoader = new LazyLoader({
name: 'RealtimeSupportedFilterManagerCDP',
refreshAgeMs: 5 * 60 * 1000, // 5 minutes
refreshJitterMs: 60 * 1000, // 1 minute
loader: async (teamIds: string[]) => {
return await this.fetchRealtimeSupportedFilters(teamIds)
},
})
}
public async getRealtimeSupportedFiltersForTeam(teamId: number): Promise<RealtimeSupportedFilter[]> {
const filters = await this.lazyLoader.get(String(teamId))
return filters || []
}
public async getRealtimeSupportedFiltersForTeams(
teamIds: number[]
): Promise<Record<string, RealtimeSupportedFilter[] | null>> {
return this.lazyLoader.getMany(teamIds.map(String))
}
private async fetchRealtimeSupportedFilters(teamIds: string[]): Promise<Record<string, RealtimeSupportedFilter[]>> {
const teamIdNumbers = teamIds.map(Number).filter((id) => !isNaN(id))
if (teamIdNumbers.length === 0) {
return {}
}
const result = await this.postgres.query<CohortRow>(
PostgresUse.COMMON_READ,
`SELECT
id as cohort_id,
team_id,
filters
FROM posthog_cohort
WHERE team_id = ANY($1)
AND deleted = FALSE
AND filters IS NOT NULL
AND cohort_type = 'realtime'
ORDER BY team_id, created_at DESC`,
[teamIdNumbers],
'fetch-realtime-supported-filters-by-team'
)
// Initialize result record with empty arrays for all requested team IDs
const resultRecord: Record<string, RealtimeSupportedFilter[]> = {}
for (const teamId of teamIds) {
resultRecord[teamId] = []
}
// Process filters from each cohort and deduplicate by conditionHash per team
const seenConditionHashesByTeam = new Map<string, Set<string>>()
result.rows.forEach((cohortRow) => {
const teamIdStr = String(cohortRow.team_id)
if (!resultRecord[teamIdStr]) {
resultRecord[teamIdStr] = []
}
if (!seenConditionHashesByTeam.has(teamIdStr)) {
seenConditionHashesByTeam.set(teamIdStr, new Set<string>())
}
const teamSeenHashes = seenConditionHashesByTeam.get(teamIdStr)!
const filtersJson = cohortRow.filters || {}
const extracted = this.extractRealtimeFiltersFromFiltersJson(filtersJson, cohortRow, teamSeenHashes)
if (extracted.length > 0) {
resultRecord[teamIdStr].push(...extracted)
}
})
return resultRecord
}
// Extracts realtime-executable filters from a cohort's filters JSON
private extractRealtimeFiltersFromFiltersJson(
filtersJson: any,
cohortRow: CohortRow,
teamSeenHashes: Set<string>
): RealtimeSupportedFilter[] {
const collected: RealtimeSupportedFilter[] = []
const visitLeaf = (node: any) => {
// Only accept leaf nodes that have inline bytecode and conditionHash
if (!node || !node.conditionHash || !node.bytecode) {
return
}
// Only accept event filters - skip person and cohort filters
// Note: 'behavioral' filters are event-related and should pass through
if (node.type === 'person' || node.type === 'cohort') {
return
}
const conditionHash = node.conditionHash as string
if (teamSeenHashes.has(conditionHash)) {
return
}
teamSeenHashes.add(conditionHash)
collected.push({
conditionHash,
bytecode: node.bytecode,
team_id: cohortRow.team_id,
cohort_id: cohortRow.cohort_id,
})
}
if (filtersJson && filtersJson.properties) {
this.traverseFilterTree(filtersJson.properties, visitLeaf)
}
return collected
}
// Generic DFS over the filter tree; calls visit on every leaf node
private traverseFilterTree(node: any, visit: (leaf: any) => void): void {
if (!node) {
return
}
const isGroup = node.type === 'OR' || node.type === 'AND'
if (isGroup) {
if (Array.isArray(node.values)) {
for (const child of node.values) {
this.traverseFilterTree(child, visit)
}
}
return
}
visit(node)
}
}

View File

@@ -656,3 +656,86 @@ export async function resetBehavioralCohortsDatabase(db: PostgresRouter): Promis
'reset-behavioral-cohorts-db'
)
}
export const createCohort = async (
pg: PostgresRouter,
teamId: number,
name: string,
filters: string | null = null,
cohortSettings?: Record<string, any>
): Promise<number> => {
// KLUDGE: auto increment IDs can be racy in tests so we ensure IDs don't clash
const id = Math.round(Math.random() * 1000000000)
await insertRow(pg, 'posthog_cohort', {
id,
name,
description: `Test cohort: ${name}`,
team_id: teamId,
deleted: false,
filters:
filters ||
JSON.stringify({
properties: {
type: 'AND',
values: [],
},
}),
query: null,
version: null,
pending_version: null,
count: null,
is_calculating: false,
last_calculation: null,
errors_calculating: 0,
last_error_at: null,
is_static: false,
cohort_type: 'realtime',
created_at: new Date().toISOString(),
created_by_id: commonUserId,
groups: JSON.stringify([]),
...cohortSettings,
})
return id
}
// Build an inline filters JSON string for cohorts with embedded bytecode and conditionHash
export const buildInlineFiltersForCohorts = ({
bytecode,
conditionHash,
type = 'behavioral',
key = '$test_event',
extra,
}: {
bytecode: any[]
conditionHash: string
type?: string
key?: string
extra?: Record<string, any>
}): string => {
const filter: any = {
key,
type,
bytecode,
conditionHash,
...(extra || {}),
}
if (type === 'behavioral') {
filter.value = 'performed_event'
filter.event_type = 'events'
} else if (type === 'person') {
filter.operator = 'is_set'
}
return JSON.stringify({
properties: {
type: 'OR',
values: [
{
type: 'OR',
values: [filter],
},
],
},
})
}