diff --git a/plugin-server/src/cdp/consumers/cdp-behavioural-events.consumer.test.ts b/plugin-server/src/cdp/consumers/cdp-behavioural-events.consumer.test.ts index 8f69a3338a..e0f652a78a 100644 --- a/plugin-server/src/cdp/consumers/cdp-behavioural-events.consumer.test.ts +++ b/plugin-server/src/cdp/consumers/cdp-behavioural-events.consumer.test.ts @@ -2,7 +2,7 @@ import { mockProducerObserver } from '~/tests/helpers/mocks/producer.mock' import { resetKafka } from '~/tests/helpers/kafka' -import { createAction, getFirstTeam, resetTestDatabase } from '../../../tests/helpers/sql' +import { buildInlineFiltersForCohorts, createCohort, getFirstTeam, resetTestDatabase } from '../../../tests/helpers/sql' import { KAFKA_CDP_CLICKHOUSE_PREFILTERED_EVENTS } from '../../config/kafka-topics' import { Hub, RawClickHouseEvent, Team } from '../../types' import { closeHub, createHub } from '../../utils/db/hub' @@ -85,6 +85,68 @@ const TEST_FILTERS = { 4, 2, ], + + // Billing product activated filter + billingProductActivated: [ + '_H', + 1, + 32, + 'billing product activated', + 32, + 'event', + 1, + 1, + 11, + 32, + 'platform_and_support', + 32, + 'product_key', + 32, + 'properties', + 1, + 2, + 11, + 32, + 'teams-20240208', + 32, + 'plans__platform_and_support', + 32, + 'properties', + 1, + 2, + 11, + 3, + 2, + 3, + 2, + ], + + // Product unsubscribed filter + productUnsubscribed: [ + '_H', + 1, + 32, + 'product unsubscribed', + 32, + 'event', + 1, + 1, + 11, + 32, + 'platform_and_support', + 32, + 'product', + 32, + 'properties', + 1, + 2, + 11, + 3, + 2, + ], + + // Person property is_organization_first_user filter + isOrgFirstUser: ['_H', 1, 29, 32, 'is_organization_first_user', 32, 'properties', 32, 'person', 1, 3, 11], } describe('CdpBehaviouralEventsConsumer', () => { @@ -110,10 +172,38 @@ describe('CdpBehaviouralEventsConsumer', () => { jest.restoreAllMocks() }) - describe('action matching and Kafka publishing', () => { - it('should publish pre-calculated events to Kafka when action matches', async () => { - // Create an action with Chrome + pageview filter - const actionId = await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview) + describe('cohort filter matching and Kafka publishing', () => { + it('should publish pre-calculated events to Kafka when cohort filter matches', async () => { + // Create a cohort with complex behavioral filter: pageview with browser event filter + const conditionHash = 'test_hash_001' + const filters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'AND', + values: [ + { + key: '$pageview', + type: 'behavioral', + value: 'performed_event_multiple', + bytecode: TEST_FILTERS.chromePageview, + negation: false, + operator: 'gte', + event_type: 'events', + conditionHash: conditionHash, + event_filters: [ + { key: '$browser', type: 'event', value: 'Chrome', operator: 'exact' }, + ], + operator_value: 5, + explicit_datetime: '-30d', + }, + ], + }, + ], + }, + }) + await createCohort(hub.postgres, team.id, 'Test cohort', filters) // Create a matching event const personId = '550e8400-e29b-41d4-a716-446655440000' @@ -140,7 +230,7 @@ describe('CdpBehaviouralEventsConsumer', () => { // Parse messages which should create pre-calculated events const events = await processor._parseKafkaBatch(messages) - // Should create one pre-calculated event for the matching action + // Should create one pre-calculated event for the matching cohort filter expect(events).toHaveLength(1) const preCalculatedEvent = events[0] @@ -152,8 +242,8 @@ describe('CdpBehaviouralEventsConsumer', () => { evaluation_timestamp: '2025-03-03 18:15:46.319', person_id: personId, distinct_id: distinctId, - condition: String(actionId), - source: `action_${actionId}`, + condition: conditionHash, + source: `cohort_filter_${conditionHash}`, }) // Test publishing the events to Kafka await processor['publishEvents'](events) @@ -169,9 +259,16 @@ describe('CdpBehaviouralEventsConsumer', () => { expect(publishedMessage.value).toEqual(preCalculatedEvent.payload) }) - it('should not publish to Kafka when action does not match', async () => { - // Create an action with Chrome + pageview filter - await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview) + it('should not publish to Kafka when cohort filter does not match', async () => { + // Create a cohort with Chrome + pageview filter + const conditionHash = 'test_hash_002' + const filters = buildInlineFiltersForCohorts({ + bytecode: TEST_FILTERS.chromePageview, + conditionHash, + type: 'behavioral', + key: '$pageview', + }) + await createCohort(hub.postgres, team.id, 'Test cohort', filters) // Create a non-matching event (Firefox instead of Chrome) const personId = '550e8400-e29b-41d4-a716-446655440000' @@ -195,7 +292,7 @@ describe('CdpBehaviouralEventsConsumer', () => { // Parse messages const events = await processor._parseKafkaBatch(messages) - // Should not create any events since action doesn't match + // Should not create any events since cohort filter doesn't match expect(events).toHaveLength(0) // Verify nothing was published to Kafka @@ -205,5 +302,394 @@ describe('CdpBehaviouralEventsConsumer', () => { ) expect(kafkaMessages).toHaveLength(0) }) + + it('should deduplicate filters with same conditionHash for a team', async () => { + // Create two cohorts with the same filter (same conditionHash) + const conditionHash = 'dedup_test_hash_001' + const filters = buildInlineFiltersForCohorts({ bytecode: TEST_FILTERS.pageview, conditionHash }) + + // Create first cohort + await createCohort(hub.postgres, team.id, 'First cohort', filters) + // Create second cohort with same filter + await createCohort(hub.postgres, team.id, 'Second cohort', filters) + + // Create a matching event + const personId = '550e8400-e29b-41d4-a716-446655440000' + const distinctId = 'test-distinct-dedup' + const eventUuid = 'test-uuid-dedup' + + const messages = [ + { + value: Buffer.from( + JSON.stringify({ + team_id: team.id, + event: '$pageview', + person_id: personId, + distinct_id: distinctId, + properties: JSON.stringify({}), + timestamp: '2025-03-03T10:15:46.319000-08:00', + uuid: eventUuid, + } as RawClickHouseEvent) + ), + } as any, + ] + + // Parse messages + const events = await processor._parseKafkaBatch(messages) + + // Should only create one event despite having two cohorts with same conditionHash + expect(events).toHaveLength(1) + + const preCalculatedEvent = events[0] + expect(preCalculatedEvent.payload.condition).toBe(conditionHash) + expect(preCalculatedEvent.payload.source).toBe(`cohort_filter_${conditionHash}`) + }) + + it('should emit separate events for different cohorts with different conditionHashes', async () => { + // Create two cohorts with different complex filters + const conditionHash1 = 'multi_cohort_hash_001' + const conditionHash2 = 'multi_cohort_hash_002' + + // First cohort: simple pageview behavioral filter + const filters1 = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'OR', + values: [ + { + key: '$pageview', + type: 'behavioral', + value: 'performed_event', + bytecode: TEST_FILTERS.pageview, + negation: false, + event_type: 'events', + conditionHash: conditionHash1, + explicit_datetime: '-30d', + }, + ], + }, + ], + }, + }) + + // Second cohort: complex behavioral filter with event_filters (AND structure) + const filters2 = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'AND', + values: [ + { + key: '$pageview', + type: 'behavioral', + value: 'performed_event_multiple', + bytecode: TEST_FILTERS.chromePageview, + negation: false, + operator: 'gte', + event_type: 'events', + conditionHash: conditionHash2, + event_filters: [ + { key: '$browser', type: 'event', value: 'Chrome', operator: 'exact' }, + ], + operator_value: 5, + explicit_datetime: '-30d', + }, + ], + }, + ], + }, + }) + + // Create first cohort (pageview only) + await createCohort(hub.postgres, team.id, 'Pageview cohort', filters1) + // Create second cohort (Chrome + pageview with event filters) + await createCohort(hub.postgres, team.id, 'Chrome pageview cohort', filters2) + + // Create an event that matches both filters + const personId = '550e8400-e29b-41d4-a716-446655440000' + const distinctId = 'test-distinct-multi' + const eventUuid = 'test-uuid-multi' + + const messages = [ + { + value: Buffer.from( + JSON.stringify({ + team_id: team.id, + event: '$pageview', + person_id: personId, + distinct_id: distinctId, + properties: JSON.stringify({ $browser: 'Chrome' }), + timestamp: '2025-03-03T10:15:46.319000-08:00', + uuid: eventUuid, + } as RawClickHouseEvent) + ), + } as any, + ] + + // Parse messages + const events = await processor._parseKafkaBatch(messages) + + // Should create two events - one for each matching cohort filter + expect(events).toHaveLength(2) + + // Sort by condition hash for consistent testing + events.sort((a, b) => a.payload.condition.localeCompare(b.payload.condition)) + + const [event1, event2] = events + + // First event should be for pageview filter + expect(event1.payload.condition).toBe(conditionHash1) + expect(event1.payload.source).toBe(`cohort_filter_${conditionHash1}`) + expect(event1.key).toBe(distinctId) + + // Second event should be for Chrome + pageview filter + expect(event2.payload.condition).toBe(conditionHash2) + expect(event2.payload.source).toBe(`cohort_filter_${conditionHash2}`) + expect(event2.key).toBe(distinctId) + }) + + it('should handle complex billing cohort filter with OR/AND structure', async () => { + const filters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'AND', + values: [ + { + key: 'billing product activated', + type: 'behavioral', + value: 'performed_event', + bytecode: TEST_FILTERS.billingProductActivated, + negation: false, + event_type: 'events', + conditionHash: '2946b8444e88565c', + event_filters: [ + { + key: 'product_key', + type: 'event', + value: ['platform_and_support'], + operator: 'exact', + }, + { + key: 'plans__platform_and_support', + type: 'event', + value: ['teams-20240208'], + operator: 'exact', + }, + ], + explicit_datetime: '-30d', + }, + { + key: 'product unsubscribed', + type: 'behavioral', + value: 'performed_event', + bytecode: TEST_FILTERS.productUnsubscribed, + negation: true, + event_type: 'events', + conditionHash: '4c6bb89ec315ba80', + event_filters: [ + { + key: 'product', + type: 'event', + value: ['platform_and_support'], + operator: 'exact', + }, + ], + explicit_datetime: '-30d', + }, + { + key: 'is_organization_first_user', + type: 'person', + value: ['true'], + bytecode: TEST_FILTERS.isOrgFirstUser, + negation: false, + operator: 'exact', + conditionHash: '7937ba56a3e6348a', + }, + ], + }, + ], + }, + }) + + await createCohort(hub.postgres, team.id, 'Billing Product Cohort', filters) + + // Test 1: Event that matches billing product activated filter + const personId1 = '950e8400-e29b-41d4-a716-446655440001' + const distinctId1 = 'billing-cohort-test-1' + const eventUuid1 = 'billing-cohort-uuid-1' + const timestamp1 = '2025-03-03T17:00:00.000000-08:00' + + const messages1 = [ + { + value: Buffer.from( + JSON.stringify({ + team_id: team.id, + event: 'billing product activated', + person_id: personId1, + distinct_id: distinctId1, + properties: JSON.stringify({ + product_key: 'platform_and_support', + plans__platform_and_support: 'teams-20240208', + }), + timestamp: timestamp1, + uuid: eventUuid1, + } as RawClickHouseEvent) + ), + } as any, + ] + + const events1 = await processor._parseKafkaBatch(messages1) + + expect(events1).toHaveLength(1) + + const preCalculatedEvent1 = events1[0] + expect(preCalculatedEvent1.key).toBe(distinctId1) + expect(preCalculatedEvent1.payload).toMatchObject({ + uuid: eventUuid1, + team_id: team.id, + person_id: personId1, + distinct_id: distinctId1, + condition: '2946b8444e88565c', + source: 'cohort_filter_2946b8444e88565c', + }) + }) + + it('should not process person property filters as they are filtered out', async () => { + // Create a cohort with person property filter + const filters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'AND', + values: [ + { + key: 'is_organization_first_user', + type: 'person', // This type is filtered out + value: ['true'], + bytecode: TEST_FILTERS.isOrgFirstUser, + negation: false, + operator: 'exact', + conditionHash: 'person_prop_test_001', + }, + ], + }, + ], + }, + }) + + await createCohort(hub.postgres, team.id, 'First Org User Cohort', filters) + + const personId = '850e8400-e29b-41d4-a716-446655440002' + const distinctId = 'person-cohort-test-1' + const eventUuid = 'person-cohort-uuid-1' + const timestamp = '2025-03-03T18:00:00.000000-08:00' + + const messages = [ + { + value: Buffer.from( + JSON.stringify({ + team_id: team.id, + event: 'any event', + person_id: personId, + distinct_id: distinctId, + properties: JSON.stringify({}), + person_properties: JSON.stringify({ + is_organization_first_user: 'true', + }), + timestamp, + uuid: eventUuid, + } as RawClickHouseEvent) + ), + } as any, + ] + + const events = await processor._parseKafkaBatch(messages) + + // Should NOT create any events since person filters are filtered out + expect(events).toHaveLength(0) + }) + + it('should produce events for negated filters', async () => { + // negated events will produce matching events + const filters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'AND', + values: [ + { + key: 'product unsubscribed', + type: 'behavioral', + value: 'performed_event', + bytecode: TEST_FILTERS.productUnsubscribed, + negation: true, // Negation flag is stored but not processed by consumer + event_type: 'events', + conditionHash: 'negated_unsub_test', + event_filters: [ + { + key: 'product', + type: 'event', + value: ['platform_and_support'], + operator: 'exact', + }, + ], + explicit_datetime: '-30d', + }, + ], + }, + ], + }, + }) + + await createCohort(hub.postgres, team.id, 'Not Unsubscribed Cohort', filters) + + const personId = '750e8400-e29b-41d4-a716-446655440003' + const distinctId = 'negation-test-1' + const eventUuid = 'negation-uuid-1' + const timestamp = '2025-03-03T19:00:00.000000-08:00' + + // Send a product unsubscribed event + const messages = [ + { + value: Buffer.from( + JSON.stringify({ + team_id: team.id, + event: 'product unsubscribed', + person_id: personId, + distinct_id: distinctId, + properties: JSON.stringify({ + product: 'platform_and_support', + }), + timestamp, + uuid: eventUuid, + } as RawClickHouseEvent) + ), + } as any, + ] + + const events = await processor._parseKafkaBatch(messages) + + // Should create an event because consumer doesn't handle negation + // It just evaluates the bytecode which will return true for matching event + expect(events).toHaveLength(1) + + const preCalculatedEvent = events[0] + expect(preCalculatedEvent.key).toBe(distinctId) + expect(preCalculatedEvent.payload).toMatchObject({ + uuid: eventUuid, + team_id: team.id, + person_id: personId, + distinct_id: distinctId, + condition: 'negated_unsub_test', + source: 'cohort_filter_negated_unsub_test', + }) + }) }) }) diff --git a/plugin-server/src/cdp/consumers/cdp-behavioural-events.consumer.ts b/plugin-server/src/cdp/consumers/cdp-behavioural-events.consumer.ts index 0fb1de2ed5..5ee07c6f85 100644 --- a/plugin-server/src/cdp/consumers/cdp-behavioural-events.consumer.ts +++ b/plugin-server/src/cdp/consumers/cdp-behavioural-events.consumer.ts @@ -2,7 +2,10 @@ import { Message } from 'node-rdkafka' import { Histogram } from 'prom-client' import { instrumentFn, instrumented } from '~/common/tracing/tracing-utils' -import { Action, ActionManagerCDP } from '~/utils/action-manager-cdp' +import { + RealtimeSupportedFilter, + RealtimeSupportedFilterManagerCDP, +} from '~/utils/realtime-supported-filter-manager-cdp' import { KAFKA_CDP_CLICKHOUSE_PREFILTERED_EVENTS, KAFKA_EVENTS_JSON } from '../../config/kafka-topics' import { KafkaConsumer } from '../../kafka/consumer' @@ -39,12 +42,12 @@ export const histogramBatchProcessingSteps = new Histogram({ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase { protected name = 'CdpBehaviouralEventsConsumer' private kafkaConsumer: KafkaConsumer - private actionManager: ActionManagerCDP + private realtimeSupportedFilterManager: RealtimeSupportedFilterManagerCDP constructor(hub: Hub, topic: string = KAFKA_EVENTS_JSON, groupId: string = 'cdp-behavioural-events-consumer') { super(hub) this.kafkaConsumer = new KafkaConsumer({ groupId, topic }) - this.actionManager = new ActionManagerCDP(hub.db.postgres) + this.realtimeSupportedFilterManager = new RealtimeSupportedFilterManagerCDP(hub.db.postgres) } @instrumented('cdpBehaviouralEventsConsumer.publishEvents') @@ -69,24 +72,25 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase { } } - // Evaluate if event matches action using bytecode execution - private async evaluateEventAgainstAction( + // Evaluate if event matches realtime supported filter using bytecode execution + private async evaluateEventAgainstRealtimeSupportedFilter( filterGlobals: HogFunctionFilterGlobals, - action: Action + filter: RealtimeSupportedFilter ): Promise { - if (!action.bytecode) { + if (!filter.bytecode) { return false } try { - const { execResult } = await execHog(action.bytecode, { + const { execResult } = await execHog(filter.bytecode, { globals: filterGlobals, }) return execResult?.result ?? false } catch (error) { - logger.error('Error executing action bytecode', { - actionId: action.id, + logger.error('Error executing realtime supported filter bytecode', { + conditionHash: filter.conditionHash, + cohortId: filter.cohort_id, error, }) return false @@ -125,17 +129,17 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase { } } - // Step 2: Fetch all actions for all teams in one query + // Step 2: Fetch all realtime supported filters for all teams in one query const teamIds = Array.from(eventsByTeam.keys()) - const actionsByTeam = await this.actionManager.getActionsForTeams(teamIds) + const filtersByTeam = await this.realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeams(teamIds) - // Step 3: Process each team's events with their actions + // Step 3: Process each team's events with their realtime supported filters for (const [teamId, teamEvents] of Array.from(eventsByTeam.entries())) { try { - const actions = actionsByTeam[String(teamId)] || [] + const filters = filtersByTeam[String(teamId)] || [] - if (actions.length === 0) { - // Skip teams with no actions + if (filters.length === 0) { + // Skip teams with no realtime supported filters continue } @@ -148,17 +152,20 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase { .replace('T', ' ') .replace('Z', '') - // Convert to filter globals for action evaluation + // Convert to filter globals for filter evaluation const filterGlobals = convertClickhouseRawEventToFilterGlobals(clickHouseEvent) - // Evaluate event against each action for this team - for (const action of actions) { - const matches = await this.evaluateEventAgainstAction(filterGlobals, action) + // Evaluate event against each realtime supported filter for this team + for (const filter of filters) { + const matches = await this.evaluateEventAgainstRealtimeSupportedFilter( + filterGlobals, + filter + ) - // Only publish if event matches the action (don't publish non-matches) + // Only publish if event matches the filter (don't publish non-matches) if (matches) { - // Hash the action bytecode/id as the condition identifier - // This ensures consistent condition hashes for the same action + // Use the filter's conditionHash as the condition identifier + // This ensures consistent condition hashes across cohorts const preCalculatedEvent: ProducedEvent = { key: filterGlobals.distinct_id, @@ -168,8 +175,8 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase { evaluation_timestamp: evaluationTimestamp, person_id: clickHouseEvent.person_id!, distinct_id: filterGlobals.distinct_id, - condition: String(action.id), - source: `action_${action.id}`, + condition: filter.conditionHash, + source: `cohort_filter_${filter.conditionHash}`, }, } diff --git a/plugin-server/src/utils/realtime-supported-filter-manager-cdp.test.ts b/plugin-server/src/utils/realtime-supported-filter-manager-cdp.test.ts new file mode 100644 index 0000000000..084f5330ed --- /dev/null +++ b/plugin-server/src/utils/realtime-supported-filter-manager-cdp.test.ts @@ -0,0 +1,651 @@ +import { + buildInlineFiltersForCohorts, + createCohort, + createTeam, + getFirstTeam, + resetTestDatabase, +} from '../../tests/helpers/sql' +import { defaultConfig } from '../config/config' +import { Hub, Team } from '../types' +import { closeHub, createHub } from './db/hub' +import { PostgresRouter } from './db/postgres' +import { RealtimeSupportedFilterManagerCDP } from './realtime-supported-filter-manager-cdp' + +describe('RealtimeSupportedFilterManagerCDP()', () => { + let hub: Hub + let realtimeSupportedFilterManager: RealtimeSupportedFilterManagerCDP + let postgres: PostgresRouter + let teamId: Team['id'] + let fetchRealtimeSupportedFiltersSpy: jest.SpyInstance + + beforeEach(async () => { + const now = Date.now() + jest.spyOn(Date, 'now').mockImplementation(() => now) + + hub = await createHub() + await resetTestDatabase() + + postgres = new PostgresRouter(defaultConfig) + realtimeSupportedFilterManager = new RealtimeSupportedFilterManagerCDP(postgres) + const team = await getFirstTeam(hub) + teamId = team.id + fetchRealtimeSupportedFiltersSpy = jest.spyOn( + realtimeSupportedFilterManager as any, + 'fetchRealtimeSupportedFilters' + ) + }) + + afterEach(async () => { + await closeHub(hub) + }) + + describe('getRealtimeSupportedFiltersForTeam()', () => { + it('returns empty array if no realtime cohorts exist', async () => { + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toEqual([]) + }) + + it('returns realtime supported filters for a team', async () => { + const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11] + const conditionHash = 'test_hash_001' + const filters = buildInlineFiltersForCohorts({ bytecode, conditionHash, type: 'event', key: '$browser' }) + + // Create a realtime cohort + const cohortId = await createCohort(postgres, teamId, 'Test Cohort', filters) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toHaveLength(1) + expect(result[0]).toMatchObject({ + conditionHash: conditionHash, + bytecode: bytecode, + team_id: teamId, + cohort_id: cohortId, + }) + }) + + it('filters out deleted cohorts', async () => { + const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11] + const filters = buildInlineFiltersForCohorts({ + bytecode, + conditionHash: 'test_hash_001', + type: 'event', + key: '$browser', + }) + + // Create active cohort + await createCohort(postgres, teamId, 'Active Cohort', filters) + + // Create deleted cohort + await createCohort(postgres, teamId, 'Deleted Cohort', filters, { deleted: true }) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toHaveLength(1) + expect(result[0].cohort_id).not.toBe('Deleted Cohort') + }) + + it('filters out cohorts without filters', async () => { + // Create cohort without filters (uses default empty filters) + await createCohort(postgres, teamId, 'No Filters Cohort', null) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toEqual([]) + }) + + it('filters out non-realtime cohorts', async () => { + const bytecode = ['_H', 1, 32, 'test'] + const filters = buildInlineFiltersForCohorts({ bytecode, conditionHash: 'test_hash_001' }) + + // Create behavioral cohort (not realtime) + await createCohort(postgres, teamId, 'Behavioral Cohort', filters, { cohort_type: 'behavioral' }) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toEqual([]) + }) + + it('deduplicates filters by conditionHash across complex nested structures', async () => { + const combinedBytecode = [ + '_H', + 1, + 32, + '$pageview', + 32, + 'event', + 1, + 1, + 11, + 31, + 32, + '$browser', + 32, + 'properties', + 1, + 2, + 12, + 31, + 32, + '$browser_language', + 32, + 'properties', + 1, + 2, + 12, + 3, + 2, + 3, + 2, + ] + const conditionHash = 'bcdc95b22cf3e527' + + // Complex filter structure matching the user's example + const filters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'AND', + values: [ + { + key: '$pageview', + type: 'behavioral', + value: 'performed_event_multiple', + bytecode: combinedBytecode, + negation: false, + operator: 'exact', + event_type: 'events', + conditionHash: conditionHash, + event_filters: [ + { key: '$browser', type: 'event', value: 'is_set', operator: 'is_set' }, + { + key: '$browser_language', + type: 'event', + value: 'is_set', + operator: 'is_set', + }, + ], + operator_value: 5, + explicit_datetime: '-30d', + }, + ], + }, + ], + }, + }) + + // Create two cohorts with the same conditionHash in complex structures + await createCohort(postgres, teamId, 'Cohort 1', filters) + await createCohort(postgres, teamId, 'Cohort 2', filters) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toHaveLength(1) // Should be deduplicated + expect(result[0].conditionHash).toBe(conditionHash) + expect(result[0].bytecode).toEqual(combinedBytecode) + }) + + it('handles multiple filters in single cohort with complex nested structure', async () => { + // Complex filter: behavioral filter with event_filters (combined bytecode) + const behavioralWithEventFilterBytecode = [ + '_H', + 1, + 32, + '$pageview', + 32, + 'event', + 1, + 1, + 11, + 31, + 32, + '$browser', + 32, + 'properties', + 1, + 2, + 12, + 3, + 2, + ] + const pageviewOnlyBytecode = ['_H', 1, 32, '$pageleave', 32, 'event', 1, 1, 11] + + const filters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'AND', + values: [ + { + key: '$pageview', + type: 'behavioral', + value: 'performed_event_multiple', + bytecode: behavioralWithEventFilterBytecode, + negation: false, + operator: 'gte', + event_type: 'events', + conditionHash: '512ef57e6f504fc6', + event_filters: [ + { key: '$browser', type: 'event', value: 'is_set', operator: 'is_set' }, + ], + operator_value: 5, + explicit_datetime: '-30d', + }, + { + key: '$pageleave', + type: 'behavioral', + value: 'performed_event', + bytecode: pageviewOnlyBytecode, + negation: false, + event_type: 'events', + conditionHash: 'e0418e34fcd847e5', + explicit_datetime: '-30d', + }, + ], + }, + ], + }, + }) + + await createCohort(postgres, teamId, 'Multi-Filter Cohort', filters) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toHaveLength(2) + expect(result.map((f) => f.conditionHash).sort()).toEqual(['512ef57e6f504fc6', 'e0418e34fcd847e5']) + // Verify the combined bytecode for the behavioral filter with event_filters + const behavioralFilter = result.find((f) => f.conditionHash === '512ef57e6f504fc6') + expect(behavioralFilter?.bytecode).toEqual(behavioralWithEventFilterBytecode) + }) + + it('filters out person property bytecodes explicitly via type in complex structure', async () => { + const personPropBytecode = ['_H', 1, 31, 32, '$host', 32, 'properties', 32, 'person', 1, 3, 12] + const behavioralBytecode = ['_H', 1, 32, '$pageview', 32, 'event', 1, 1, 11] + + // Complex structure with both person and behavioral filters - only behavioral should be extracted + const filters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'AND', + values: [ + { + key: '$pageview', + type: 'behavioral', + value: 'performed_event', + bytecode: behavioralBytecode, + negation: false, + event_type: 'events', + conditionHash: 'e0418e34fcd847e5', + explicit_datetime: '-30d', + }, + { + key: '$host', + type: 'person', + bytecode: personPropBytecode, + negation: false, + operator: 'is_set', + conditionHash: '30b9607b69c556bf', + }, + ], + }, + ], + }, + }) + + await createCohort(postgres, teamId, 'Mixed Filters Cohort', filters) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + // Should only return the behavioral filter, not the person property filter + expect(result).toHaveLength(1) + expect(result[0].conditionHash).toBe('e0418e34fcd847e5') + expect(result[0].bytecode).toEqual(behavioralBytecode) + }) + + it('filters out cohort filters and only keeps event filters', async () => { + // This test uses the exact structure from the user's example + const filters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'OR', + values: [ + { + key: 'id', + type: 'cohort', + value: 41, + bytecode: ['_H', 1, 33, 41, 2, 'inCohort', 1], + negation: false, + conditionHash: '5e6d68bd7c7babae', + }, + ], + }, + { + type: 'OR', + values: [ + { + key: '$pageview', + type: 'behavioral', + value: 'performed_event', + bytecode: ['_H', 1, 32, '$pageview', 32, 'event', 1, 1, 11], + negation: false, + event_type: 'events', + conditionHash: 'f9c616030a87e68f', + explicit_datetime: '-30d', + }, + ], + }, + ], + }, + }) + + await createCohort(postgres, teamId, 'Cohort with InCohort Filter', filters) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + + // Should filter out the cohort filter and only return the event filter + expect(result).toHaveLength(1) + expect(result[0].conditionHash).toBe('f9c616030a87e68f') + expect(result[0].bytecode).toEqual(['_H', 1, 32, '$pageview', 32, 'event', 1, 1, 11]) + + // Verify the cohort filter was filtered out + const cohortFilter = result.find((f) => f.conditionHash === '5e6d68bd7c7babae') + expect(cohortFilter).toBeUndefined() + }) + + it('handles complex OR structure with multiple filter groups', async () => { + // Test structure with OR at top level containing multiple groups (matching user's second example) + const pageviewWithBrowserBytecode = [ + '_H', + 1, + 32, + '$pageview', + 32, + 'event', + 1, + 1, + 11, + 31, + 32, + '$browser', + 32, + 'properties', + 1, + 2, + 12, + 3, + 2, + ] + const pageleaveBytecode = ['_H', 1, 32, '$pageleave', 32, 'event', 1, 1, 11] + const groupidentifyBytecode = [ + '_H', + 1, + 32, + '$groupidentify', + 32, + 'event', + 1, + 1, + 11, + 31, + 32, + 'id', + 32, + 'properties', + 1, + 2, + 12, + 3, + 2, + ] + + const filters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'AND', + values: [ + { + key: '$pageview', + type: 'behavioral', + value: 'performed_event_multiple', + bytecode: pageviewWithBrowserBytecode, + negation: false, + operator: 'gte', + event_type: 'events', + conditionHash: '512ef57e6f504fc6', + event_filters: [ + { key: '$browser', type: 'event', value: 'is_set', operator: 'is_set' }, + ], + operator_value: 5, + explicit_datetime: '-30d', + }, + { + key: '$pageleave', + type: 'behavioral', + value: 'performed_event', + bytecode: pageleaveBytecode, + negation: false, + event_type: 'events', + conditionHash: 'e0418e34fcd847e5', + explicit_datetime: '-30d', + }, + ], + }, + { + type: 'OR', + values: [ + { + key: '$groupidentify', + type: 'behavioral', + value: 'performed_event', + bytecode: groupidentifyBytecode, + negation: false, + event_type: 'events', + conditionHash: 'f0bbe0140a9cfe05', + event_filters: [{ key: 'id', type: 'event', value: 'is_set', operator: 'is_set' }], + explicit_datetime: '-30d', + }, + ], + }, + ], + }, + }) + + await createCohort(postgres, teamId, 'Complex OR Cohort', filters) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toHaveLength(3) + const hashes = result.map((f) => f.conditionHash).sort() + expect(hashes).toEqual(['512ef57e6f504fc6', 'e0418e34fcd847e5', 'f0bbe0140a9cfe05']) + + // Verify all bytecodes are correctly extracted + expect(result.find((f) => f.conditionHash === '512ef57e6f504fc6')?.bytecode).toEqual( + pageviewWithBrowserBytecode + ) + expect(result.find((f) => f.conditionHash === 'e0418e34fcd847e5')?.bytecode).toEqual(pageleaveBytecode) + expect(result.find((f) => f.conditionHash === 'f0bbe0140a9cfe05')?.bytecode).toEqual(groupidentifyBytecode) + }) + + it('handles malformed filters gracefully', async () => { + const validBytecode = ['_H', 1, 32, 'test'] + const validFilters = buildInlineFiltersForCohorts({ bytecode: validBytecode, conditionHash: 'valid_hash' }) + + // Create cohort with valid filters + await createCohort(postgres, teamId, 'Valid Cohort', validFilters) + + // Create cohort with malformed filters (missing bytecode/conditionHash) + const malformedFilters = JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'OR', + values: [ + { + key: '$test', + type: 'behavioral', + // Missing bytecode and conditionHash + }, + ], + }, + ], + }, + }) + await createCohort(postgres, teamId, 'Malformed Cohort', malformedFilters) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toHaveLength(1) // Only valid filter should be returned + expect(result[0].conditionHash).toBe('valid_hash') + }) + + it('caches filters for subsequent calls', async () => { + const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11] + const filters = buildInlineFiltersForCohorts({ + bytecode, + conditionHash: 'cached_hash', + type: 'event', + key: '$browser', + }) + + await createCohort(postgres, teamId, 'Cached Cohort', filters) + + // First call + const result1 = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result1).toHaveLength(1) + expect(fetchRealtimeSupportedFiltersSpy).toHaveBeenCalledTimes(1) + + // Second call should use cache + const result2 = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result2).toHaveLength(1) + expect(fetchRealtimeSupportedFiltersSpy).toHaveBeenCalledTimes(1) + }) + + it('returns empty array for non-existent team', async () => { + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(99999) + expect(result).toEqual([]) + }) + }) + + describe('getRealtimeSupportedFiltersForTeams()', () => { + it('returns filters for multiple teams', async () => { + const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11] + const filters1 = buildInlineFiltersForCohorts({ + bytecode, + conditionHash: 'team1_hash', + type: 'event', + key: '$browser', + }) + const filters2 = buildInlineFiltersForCohorts({ + bytecode, + conditionHash: 'team2_hash', + type: 'event', + key: '$browser', + }) + + // Create another team + const team = await hub.teamManager.getTeam(teamId) + const team2Id = await createTeam(postgres, team!.organization_id) + + // Create cohorts for both teams + await createCohort(postgres, teamId, 'Team 1 Cohort', filters1) + await createCohort(postgres, team2Id, 'Team 2 Cohort', filters2) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeams([teamId, team2Id]) + expect(result[String(teamId)]).toHaveLength(1) + expect(result[String(teamId)]![0].conditionHash).toBe('team1_hash') + expect(result[String(team2Id)]).toHaveLength(1) + expect(result[String(team2Id)]![0].conditionHash).toBe('team2_hash') + }) + + it('returns empty arrays for teams with no realtime cohorts', async () => { + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeams([teamId, 99999]) + expect(result[String(teamId)]).toEqual([]) + expect(result['99999']).toEqual([]) + }) + + it('efficiently loads multiple teams with single database call', async () => { + const bytecode = ['_H', 1, 32, 'Chrome', 32, '$browser', 32, 'properties', 1, 2, 11] + const filters = buildInlineFiltersForCohorts({ + bytecode, + conditionHash: 'test_hash', + type: 'event', + key: '$browser', + }) + + await createCohort(postgres, teamId, 'Test Cohort', filters) + + const promises = [ + realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId), + realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId), + realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(99999), + ] + + const results = await Promise.all(promises) + expect(fetchRealtimeSupportedFiltersSpy).toHaveBeenCalledTimes(1) + expect(results[0]).toHaveLength(1) + expect(results[1]).toHaveLength(1) + expect(results[2]).toHaveLength(0) + }) + + it('deduplicates filters across multiple teams', async () => { + const bytecode = ['_H', 1, 32, 'shared', 32, 'filter'] + const sharedHash = 'shared_condition_hash' + const filters = buildInlineFiltersForCohorts({ bytecode, conditionHash: sharedHash }) + + // Create another team + const team = await hub.teamManager.getTeam(teamId) + const team2Id = await createTeam(postgres, team!.organization_id) + + // Create cohorts with same conditionHash for both teams + await createCohort(postgres, teamId, 'Team 1 Cohort', filters) + await createCohort(postgres, team2Id, 'Team 2 Cohort', filters) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeams([teamId, team2Id]) + + // Should be deduplicated - only one filter per team even though they have same hash + expect(result[String(teamId)]).toHaveLength(1) + expect(result[String(team2Id)]).toHaveLength(1) + expect(result[String(teamId)]![0].conditionHash).toBe(sharedHash) + expect(result[String(team2Id)]![0].conditionHash).toBe(sharedHash) + }) + }) + + describe('fetchRealtimeSupportedFilters()', () => { + it('handles empty team ID array', async () => { + const result = await (realtimeSupportedFilterManager as any).fetchRealtimeSupportedFilters([]) + expect(result).toEqual({}) + }) + + it('handles invalid team IDs', async () => { + const result = await (realtimeSupportedFilterManager as any).fetchRealtimeSupportedFilters([ + 'invalid', + 'NaN', + ]) + expect(result).toEqual({}) + }) + + it('orders cohorts by team_id and created_at DESC', async () => { + const bytecode = ['_H', 1, 32, 'test'] + + // Create cohorts with different created_at times + const olderTime = new Date(Date.now() - 3600000).toISOString() // 1 hour ago + const newerTime = new Date().toISOString() + + const olderFilters = buildInlineFiltersForCohorts({ bytecode, conditionHash: 'older_hash' }) + const newerFilters = buildInlineFiltersForCohorts({ bytecode, conditionHash: 'newer_hash' }) + + await createCohort(postgres, teamId, 'Older Cohort', olderFilters, { + created_at: olderTime, + }) + + await createCohort(postgres, teamId, 'Newer Cohort', newerFilters, { + created_at: newerTime, + }) + + const result = await realtimeSupportedFilterManager.getRealtimeSupportedFiltersForTeam(teamId) + expect(result).toHaveLength(2) + expect(result[0].conditionHash).toBe('newer_hash') // Should be first due to DESC order + expect(result[1].conditionHash).toBe('older_hash') + }) + }) +}) diff --git a/plugin-server/src/utils/realtime-supported-filter-manager-cdp.ts b/plugin-server/src/utils/realtime-supported-filter-manager-cdp.ts new file mode 100644 index 0000000000..136f62cf4d --- /dev/null +++ b/plugin-server/src/utils/realtime-supported-filter-manager-cdp.ts @@ -0,0 +1,154 @@ +import { PostgresRouter, PostgresUse } from './db/postgres' +import { LazyLoader } from './lazy-loader' + +export interface RealtimeSupportedFilter { + conditionHash: string // The 16-char SHA256 hash from the filter + bytecode: any // HogQL bytecode for execution + team_id: number + cohort_id: number // For tracking which cohort this filter belongs to +} + +interface CohortRow { + cohort_id: number + team_id: number + filters: any | null // JSON object (PostgreSQL deserializes JSON/JSONB columns automatically) +} + +export class RealtimeSupportedFilterManagerCDP { + private lazyLoader: LazyLoader + + constructor(private postgres: PostgresRouter) { + this.lazyLoader = new LazyLoader({ + name: 'RealtimeSupportedFilterManagerCDP', + refreshAgeMs: 5 * 60 * 1000, // 5 minutes + refreshJitterMs: 60 * 1000, // 1 minute + loader: async (teamIds: string[]) => { + return await this.fetchRealtimeSupportedFilters(teamIds) + }, + }) + } + + public async getRealtimeSupportedFiltersForTeam(teamId: number): Promise { + const filters = await this.lazyLoader.get(String(teamId)) + return filters || [] + } + + public async getRealtimeSupportedFiltersForTeams( + teamIds: number[] + ): Promise> { + return this.lazyLoader.getMany(teamIds.map(String)) + } + + private async fetchRealtimeSupportedFilters(teamIds: string[]): Promise> { + const teamIdNumbers = teamIds.map(Number).filter((id) => !isNaN(id)) + + if (teamIdNumbers.length === 0) { + return {} + } + + const result = await this.postgres.query( + PostgresUse.COMMON_READ, + `SELECT + id as cohort_id, + team_id, + filters + FROM posthog_cohort + WHERE team_id = ANY($1) + AND deleted = FALSE + AND filters IS NOT NULL + AND cohort_type = 'realtime' + ORDER BY team_id, created_at DESC`, + [teamIdNumbers], + 'fetch-realtime-supported-filters-by-team' + ) + + // Initialize result record with empty arrays for all requested team IDs + const resultRecord: Record = {} + for (const teamId of teamIds) { + resultRecord[teamId] = [] + } + + // Process filters from each cohort and deduplicate by conditionHash per team + const seenConditionHashesByTeam = new Map>() + + result.rows.forEach((cohortRow) => { + const teamIdStr = String(cohortRow.team_id) + + if (!resultRecord[teamIdStr]) { + resultRecord[teamIdStr] = [] + } + + if (!seenConditionHashesByTeam.has(teamIdStr)) { + seenConditionHashesByTeam.set(teamIdStr, new Set()) + } + + const teamSeenHashes = seenConditionHashesByTeam.get(teamIdStr)! + const filtersJson = cohortRow.filters || {} + + const extracted = this.extractRealtimeFiltersFromFiltersJson(filtersJson, cohortRow, teamSeenHashes) + if (extracted.length > 0) { + resultRecord[teamIdStr].push(...extracted) + } + }) + + return resultRecord + } + + // Extracts realtime-executable filters from a cohort's filters JSON + private extractRealtimeFiltersFromFiltersJson( + filtersJson: any, + cohortRow: CohortRow, + teamSeenHashes: Set + ): RealtimeSupportedFilter[] { + const collected: RealtimeSupportedFilter[] = [] + + const visitLeaf = (node: any) => { + // Only accept leaf nodes that have inline bytecode and conditionHash + if (!node || !node.conditionHash || !node.bytecode) { + return + } + + // Only accept event filters - skip person and cohort filters + // Note: 'behavioral' filters are event-related and should pass through + if (node.type === 'person' || node.type === 'cohort') { + return + } + + const conditionHash = node.conditionHash as string + if (teamSeenHashes.has(conditionHash)) { + return + } + teamSeenHashes.add(conditionHash) + + collected.push({ + conditionHash, + bytecode: node.bytecode, + team_id: cohortRow.team_id, + cohort_id: cohortRow.cohort_id, + }) + } + + if (filtersJson && filtersJson.properties) { + this.traverseFilterTree(filtersJson.properties, visitLeaf) + } + + return collected + } + + // Generic DFS over the filter tree; calls visit on every leaf node + private traverseFilterTree(node: any, visit: (leaf: any) => void): void { + if (!node) { + return + } + const isGroup = node.type === 'OR' || node.type === 'AND' + if (isGroup) { + if (Array.isArray(node.values)) { + for (const child of node.values) { + this.traverseFilterTree(child, visit) + } + } + return + } + visit(node) + } +} diff --git a/plugin-server/tests/helpers/sql.ts b/plugin-server/tests/helpers/sql.ts index ad2c50b040..a33e37011f 100644 --- a/plugin-server/tests/helpers/sql.ts +++ b/plugin-server/tests/helpers/sql.ts @@ -656,3 +656,86 @@ export async function resetBehavioralCohortsDatabase(db: PostgresRouter): Promis 'reset-behavioral-cohorts-db' ) } + +export const createCohort = async ( + pg: PostgresRouter, + teamId: number, + name: string, + filters: string | null = null, + cohortSettings?: Record +): Promise => { + // KLUDGE: auto increment IDs can be racy in tests so we ensure IDs don't clash + const id = Math.round(Math.random() * 1000000000) + await insertRow(pg, 'posthog_cohort', { + id, + name, + description: `Test cohort: ${name}`, + team_id: teamId, + deleted: false, + filters: + filters || + JSON.stringify({ + properties: { + type: 'AND', + values: [], + }, + }), + query: null, + version: null, + pending_version: null, + count: null, + is_calculating: false, + last_calculation: null, + errors_calculating: 0, + last_error_at: null, + is_static: false, + cohort_type: 'realtime', + created_at: new Date().toISOString(), + created_by_id: commonUserId, + groups: JSON.stringify([]), + ...cohortSettings, + }) + return id +} + +// Build an inline filters JSON string for cohorts with embedded bytecode and conditionHash +export const buildInlineFiltersForCohorts = ({ + bytecode, + conditionHash, + type = 'behavioral', + key = '$test_event', + extra, +}: { + bytecode: any[] + conditionHash: string + type?: string + key?: string + extra?: Record +}): string => { + const filter: any = { + key, + type, + bytecode, + conditionHash, + ...(extra || {}), + } + + if (type === 'behavioral') { + filter.value = 'performed_event' + filter.event_type = 'events' + } else if (type === 'person') { + filter.operator = 'is_set' + } + + return JSON.stringify({ + properties: { + type: 'OR', + values: [ + { + type: 'OR', + values: [filter], + }, + ], + }, + }) +}