diff --git a/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.test.ts b/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.test.ts index c3a862e490..a39a9345e0 100644 --- a/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.test.ts +++ b/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.test.ts @@ -1,10 +1,12 @@ +import { mockProducerObserver } from '~/tests/helpers/mocks/producer.mock' + import { Message } from 'node-rdkafka' import { resetKafka } from '~/tests/helpers/kafka' import { UUIDT } from '~/utils/utils' import { resetBehavioralCohortsDatabase } from '../../../tests/helpers/sql' -import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics' +import { KAFKA_COHORT_MEMBERSHIP_CHANGED, KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER } from '../../config/kafka-topics' import { Hub } from '../../types' import { closeHub, createHub } from '../../utils/db/hub' import { PostgresUse } from '../../utils/db/postgres' @@ -21,6 +23,7 @@ describe('CdpCohortMembershipConsumer', () => { await resetKafka() hub = await createHub() consumer = new CdpCohortMembershipConsumer(hub) + await consumer.start() await resetBehavioralCohortsDatabase(hub.postgres) }) @@ -33,6 +36,12 @@ describe('CdpCohortMembershipConsumer', () => { const personId1 = new UUIDT().toString() const personId2 = new UUIDT().toString() const personId3 = new UUIDT().toString() + + beforeEach(() => { + // Reset the mock producer before each test to avoid message accumulation + mockProducerObserver.resetKafkaProducer() + }) + it('should process entered and left events and write to PostgreSQL correctly', async () => { // Test data using helper functions const testEvents = createCohortMembershipEvents([ @@ -61,8 +70,9 @@ describe('CdpCohortMembershipConsumer', () => { createKafkaMessage(event, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: index }) ) - // Process the batch of messages - await (consumer as any).handleBatch(messages) + const cohortMembershipChanges = consumer['_parseAndValidateBatch'](messages) + await consumer['persistCohortMembershipChanges'](cohortMembershipChanges) + await consumer['publishCohortMembershipTriggers'](cohortMembershipChanges) // Verify data was written to PostgreSQL const result = await hub.postgres.query( @@ -97,6 +107,22 @@ describe('CdpCohortMembershipConsumer', () => { person_id: personId3, in_cohort: false, }) + + // Verify trigger events were published to Kafka + const kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic( + KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER + ) + expect(kafkaMessages).toHaveLength(3) + + // Verify each published message + expect(kafkaMessages[0].key).toBe(personId1) + expect(kafkaMessages[0].value).toEqual(testEvents[0]) + + expect(kafkaMessages[1].key).toBe(personId2) + expect(kafkaMessages[1].value).toEqual(testEvents[1]) + + expect(kafkaMessages[2].key).toBe(personId3) + expect(kafkaMessages[2].value).toEqual(testEvents[2]) }) it('should handle complete person lifecycle: enter -> leave -> re-enter cohort', async () => { @@ -108,9 +134,12 @@ describe('CdpCohortMembershipConsumer', () => { cohort_membership_changed: 'entered', }) - await (consumer as any).handleBatch([ + const enterMessages = [ createKafkaMessage(enterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 0 }), - ]) + ] + const enterChanges = consumer['_parseAndValidateBatch'](enterMessages) + await consumer['persistCohortMembershipChanges'](enterChanges) + await consumer['publishCohortMembershipTriggers'](enterChanges) let result = await hub.postgres.query( PostgresUse.BEHAVIORAL_COHORTS_RW, @@ -122,10 +151,18 @@ describe('CdpCohortMembershipConsumer', () => { expect(result.rows[0].in_cohort).toBe(true) const firstTimestamp = result.rows[0].last_updated + // Verify first trigger event + let kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic( + KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER + ) + expect(kafkaMessages).toHaveLength(1) + expect(kafkaMessages[0].value).toEqual(enterEvent) + // Wait to ensure timestamp difference await new Promise((resolve) => setTimeout(resolve, 10)) // Step 2: Person leaves the cohort + mockProducerObserver.resetKafkaProducer() const leaveEvent = createCohortMembershipEvent({ personId: personId1, cohortId: 456, @@ -133,9 +170,12 @@ describe('CdpCohortMembershipConsumer', () => { cohort_membership_changed: 'left', }) - await (consumer as any).handleBatch([ + const leaveMessages = [ createKafkaMessage(leaveEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 1 }), - ]) + ] + const leaveChanges = consumer['_parseAndValidateBatch'](leaveMessages) + await consumer['persistCohortMembershipChanges'](leaveChanges) + await consumer['publishCohortMembershipTriggers'](leaveChanges) result = await hub.postgres.query( PostgresUse.BEHAVIORAL_COHORTS_RW, @@ -149,10 +189,18 @@ describe('CdpCohortMembershipConsumer', () => { const secondTimestamp = result.rows[0].last_updated expect(new Date(secondTimestamp).getTime()).toBeGreaterThan(new Date(firstTimestamp).getTime()) + // Verify leave trigger event + kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic( + KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER + ) + expect(kafkaMessages).toHaveLength(1) + expect(kafkaMessages[0].value).toEqual(leaveEvent) + // Wait to ensure timestamp difference await new Promise((resolve) => setTimeout(resolve, 10)) // Step 3: Person re-enters the cohort + mockProducerObserver.resetKafkaProducer() const reEnterEvent = createCohortMembershipEvent({ personId: personId1, cohortId: 456, @@ -160,9 +208,12 @@ describe('CdpCohortMembershipConsumer', () => { cohort_membership_changed: 'entered', }) - await (consumer as any).handleBatch([ + const reEnterMessages = [ createKafkaMessage(reEnterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 2 }), - ]) + ] + const reEnterChanges = consumer['_parseAndValidateBatch'](reEnterMessages) + await consumer['persistCohortMembershipChanges'](reEnterChanges) + await consumer['publishCohortMembershipTriggers'](reEnterChanges) result = await hub.postgres.query( PostgresUse.BEHAVIORAL_COHORTS_RW, @@ -175,6 +226,13 @@ describe('CdpCohortMembershipConsumer', () => { expect(result.rows[0].in_cohort).toBe(true) // Back in the cohort const thirdTimestamp = result.rows[0].last_updated expect(new Date(thirdTimestamp).getTime()).toBeGreaterThan(new Date(secondTimestamp).getTime()) + + // Verify re-enter trigger event + kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic( + KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER + ) + expect(kafkaMessages).toHaveLength(1) + expect(kafkaMessages[0].value).toEqual(reEnterEvent) }) it('should reject entire batch when invalid messages are present', async () => { @@ -213,7 +271,7 @@ describe('CdpCohortMembershipConsumer', () => { ] // Should throw due to invalid messages in batch - await expect((consumer as any).handleBatch(messages)).rejects.toThrow() + expect(() => consumer['_parseAndValidateBatch'](messages)).toThrow() // Verify NO data was inserted const result = await hub.postgres.query( @@ -225,5 +283,55 @@ describe('CdpCohortMembershipConsumer', () => { expect(result.rows).toHaveLength(0) // No data should be inserted }) + + it('should not publish to Kafka when database insertion fails', async () => { + const testEvents = createCohortMembershipEvents([ + { + personId: personId1, + cohortId: 456, + teamId: 1, + cohort_membership_changed: 'entered', + }, + { + personId: personId2, + cohortId: 456, + teamId: 1, + cohort_membership_changed: 'entered', + }, + ]) + + const messages = testEvents.map((event, index) => + createKafkaMessage(event, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: index }) + ) + + const cohortMembershipChanges = consumer['_parseAndValidateBatch'](messages) + + // Mock the database query to fail + const originalQuery = hub.postgres.query.bind(hub.postgres) + hub.postgres.query = jest.fn().mockRejectedValue(new Error('Database connection failed')) + + // Attempt to persist changes (should fail) + await expect(consumer['persistCohortMembershipChanges'](cohortMembershipChanges)).rejects.toThrow( + 'Database connection failed' + ) + + // Verify NO messages were published to Kafka since DB insertion failed + const kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic( + KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER + ) + expect(kafkaMessages).toHaveLength(0) + + // Restore original query function + hub.postgres.query = originalQuery + + // Verify data was NOT written to PostgreSQL + const result = await hub.postgres.query( + PostgresUse.BEHAVIORAL_COHORTS_RW, + 'SELECT * FROM cohort_membership WHERE team_id = $1', + [1], + 'testQuery' + ) + expect(result.rows).toHaveLength(0) + }) }) }) diff --git a/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.ts b/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.ts index 6c921ccac0..369f33d604 100644 --- a/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.ts +++ b/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.ts @@ -1,7 +1,9 @@ import { Message } from 'node-rdkafka' import { z } from 'zod' -import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics' +import { instrumentFn } from '~/common/tracing/tracing-utils' + +import { KAFKA_COHORT_MEMBERSHIP_CHANGED, KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER } from '../../config/kafka-topics' import { KafkaConsumer } from '../../kafka/consumer' import { HealthCheckResult, Hub } from '../../types' import { PostgresUse } from '../../utils/db/postgres' @@ -31,7 +33,23 @@ export class CdpCohortMembershipConsumer extends CdpConsumerBase { }) } - private async handleBatchCohortMembership(changes: CohortMembershipChange[]): Promise { + private async publishCohortMembershipTriggers(changes: CohortMembershipChange[]): Promise { + if (!this.kafkaProducer || changes.length === 0) { + return + } + + const messages = changes.map((change) => ({ + value: JSON.stringify(change), + key: change.personId, + })) + + await this.kafkaProducer.queueMessages({ + topic: KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER, + messages, + }) + } + + private async persistCohortMembershipChanges(changes: CohortMembershipChange[]): Promise { if (changes.length === 0) { return } @@ -76,7 +94,7 @@ export class CdpCohortMembershipConsumer extends CdpConsumerBase { } } - private async handleBatch(messages: Message[]): Promise { + private _parseAndValidateBatch(messages: Message[]): CohortMembershipChange[] { const cohortMembershipChanges: CohortMembershipChange[] = [] // Process and validate all messages @@ -111,7 +129,7 @@ export class CdpCohortMembershipConsumer extends CdpConsumerBase { } } - await this.handleBatchCohortMembership(cohortMembershipChanges) + return cohortMembershipChanges } public async start(): Promise { @@ -124,10 +142,20 @@ export class CdpCohortMembershipConsumer extends CdpConsumerBase { size: messages.length, }) - await this.handleBatch(messages) - }) + return instrumentFn('cdpCohortMembershipConsumer.handleEachBatch', async () => { + const cohortMembershipChanges = this._parseAndValidateBatch(messages) - logger.info('✅', `${this.name} started successfully`) + // First persist changes to the database + await this.persistCohortMembershipChanges(cohortMembershipChanges) + + // Then publish trigger events as a background task + const backgroundTask = this.publishCohortMembershipTriggers(cohortMembershipChanges).catch((error) => { + logger.error('Failed to publish cohort membership triggers', { error }) + }) + + return { backgroundTask } + }) + }) } public async stop(): Promise { diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index b32dff5c09..fcd063b5f5 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -48,6 +48,7 @@ export const KAFKA_CDP_INTERNAL_EVENTS = `${prefix}cdp_internal_events${suffix}` export const KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES = `${prefix}clickhouse_behavioral_cohorts_matches${suffix}` export const KAFKA_CDP_CLICKHOUSE_PREFILTERED_EVENTS = `${prefix}clickhouse_prefiltered_events${suffix}` export const KAFKA_COHORT_MEMBERSHIP_CHANGED = `${prefix}cohort_membership_changed${suffix}` +export const KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER = `${prefix}cohort_membership_changed_trigger${suffix}` // Error tracking topics export const KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT = `${prefix}clickhouse_error_tracking_issue_fingerprint${suffix}` diff --git a/plugin-server/tests/helpers/kafka.ts b/plugin-server/tests/helpers/kafka.ts index 23868d55c2..6d2cbc9fb7 100644 --- a/plugin-server/tests/helpers/kafka.ts +++ b/plugin-server/tests/helpers/kafka.ts @@ -10,6 +10,7 @@ import { KAFKA_CLICKHOUSE_HEATMAP_EVENTS, KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS, KAFKA_COHORT_MEMBERSHIP_CHANGED, + KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER, KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES, KAFKA_EVENTS_DEAD_LETTER_QUEUE, KAFKA_EVENTS_JSON, @@ -61,6 +62,7 @@ export async function resetKafka(extraServerConfig?: Partial