feat(realtime-cohorts): publish cohort membership changed entry to kafka (#40784)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Meikel Ratz
2025-11-06 13:05:45 +01:00
committed by GitHub
parent 1d1a794e14
commit ce1b794687
5 changed files with 157 additions and 17 deletions

View File

@@ -1,10 +1,12 @@
import { mockProducerObserver } from '~/tests/helpers/mocks/producer.mock'
import { Message } from 'node-rdkafka'
import { resetKafka } from '~/tests/helpers/kafka'
import { UUIDT } from '~/utils/utils'
import { resetBehavioralCohortsDatabase } from '../../../tests/helpers/sql'
import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics'
import { KAFKA_COHORT_MEMBERSHIP_CHANGED, KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { closeHub, createHub } from '../../utils/db/hub'
import { PostgresUse } from '../../utils/db/postgres'
@@ -21,6 +23,7 @@ describe('CdpCohortMembershipConsumer', () => {
await resetKafka()
hub = await createHub()
consumer = new CdpCohortMembershipConsumer(hub)
await consumer.start()
await resetBehavioralCohortsDatabase(hub.postgres)
})
@@ -33,6 +36,12 @@ describe('CdpCohortMembershipConsumer', () => {
const personId1 = new UUIDT().toString()
const personId2 = new UUIDT().toString()
const personId3 = new UUIDT().toString()
beforeEach(() => {
// Reset the mock producer before each test to avoid message accumulation
mockProducerObserver.resetKafkaProducer()
})
it('should process entered and left events and write to PostgreSQL correctly', async () => {
// Test data using helper functions
const testEvents = createCohortMembershipEvents([
@@ -61,8 +70,9 @@ describe('CdpCohortMembershipConsumer', () => {
createKafkaMessage(event, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: index })
)
// Process the batch of messages
await (consumer as any).handleBatch(messages)
const cohortMembershipChanges = consumer['_parseAndValidateBatch'](messages)
await consumer['persistCohortMembershipChanges'](cohortMembershipChanges)
await consumer['publishCohortMembershipTriggers'](cohortMembershipChanges)
// Verify data was written to PostgreSQL
const result = await hub.postgres.query(
@@ -97,6 +107,22 @@ describe('CdpCohortMembershipConsumer', () => {
person_id: personId3,
in_cohort: false,
})
// Verify trigger events were published to Kafka
const kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic(
KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER
)
expect(kafkaMessages).toHaveLength(3)
// Verify each published message
expect(kafkaMessages[0].key).toBe(personId1)
expect(kafkaMessages[0].value).toEqual(testEvents[0])
expect(kafkaMessages[1].key).toBe(personId2)
expect(kafkaMessages[1].value).toEqual(testEvents[1])
expect(kafkaMessages[2].key).toBe(personId3)
expect(kafkaMessages[2].value).toEqual(testEvents[2])
})
it('should handle complete person lifecycle: enter -> leave -> re-enter cohort', async () => {
@@ -108,9 +134,12 @@ describe('CdpCohortMembershipConsumer', () => {
cohort_membership_changed: 'entered',
})
await (consumer as any).handleBatch([
const enterMessages = [
createKafkaMessage(enterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 0 }),
])
]
const enterChanges = consumer['_parseAndValidateBatch'](enterMessages)
await consumer['persistCohortMembershipChanges'](enterChanges)
await consumer['publishCohortMembershipTriggers'](enterChanges)
let result = await hub.postgres.query(
PostgresUse.BEHAVIORAL_COHORTS_RW,
@@ -122,10 +151,18 @@ describe('CdpCohortMembershipConsumer', () => {
expect(result.rows[0].in_cohort).toBe(true)
const firstTimestamp = result.rows[0].last_updated
// Verify first trigger event
let kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic(
KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER
)
expect(kafkaMessages).toHaveLength(1)
expect(kafkaMessages[0].value).toEqual(enterEvent)
// Wait to ensure timestamp difference
await new Promise((resolve) => setTimeout(resolve, 10))
// Step 2: Person leaves the cohort
mockProducerObserver.resetKafkaProducer()
const leaveEvent = createCohortMembershipEvent({
personId: personId1,
cohortId: 456,
@@ -133,9 +170,12 @@ describe('CdpCohortMembershipConsumer', () => {
cohort_membership_changed: 'left',
})
await (consumer as any).handleBatch([
const leaveMessages = [
createKafkaMessage(leaveEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 1 }),
])
]
const leaveChanges = consumer['_parseAndValidateBatch'](leaveMessages)
await consumer['persistCohortMembershipChanges'](leaveChanges)
await consumer['publishCohortMembershipTriggers'](leaveChanges)
result = await hub.postgres.query(
PostgresUse.BEHAVIORAL_COHORTS_RW,
@@ -149,10 +189,18 @@ describe('CdpCohortMembershipConsumer', () => {
const secondTimestamp = result.rows[0].last_updated
expect(new Date(secondTimestamp).getTime()).toBeGreaterThan(new Date(firstTimestamp).getTime())
// Verify leave trigger event
kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic(
KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER
)
expect(kafkaMessages).toHaveLength(1)
expect(kafkaMessages[0].value).toEqual(leaveEvent)
// Wait to ensure timestamp difference
await new Promise((resolve) => setTimeout(resolve, 10))
// Step 3: Person re-enters the cohort
mockProducerObserver.resetKafkaProducer()
const reEnterEvent = createCohortMembershipEvent({
personId: personId1,
cohortId: 456,
@@ -160,9 +208,12 @@ describe('CdpCohortMembershipConsumer', () => {
cohort_membership_changed: 'entered',
})
await (consumer as any).handleBatch([
const reEnterMessages = [
createKafkaMessage(reEnterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 2 }),
])
]
const reEnterChanges = consumer['_parseAndValidateBatch'](reEnterMessages)
await consumer['persistCohortMembershipChanges'](reEnterChanges)
await consumer['publishCohortMembershipTriggers'](reEnterChanges)
result = await hub.postgres.query(
PostgresUse.BEHAVIORAL_COHORTS_RW,
@@ -175,6 +226,13 @@ describe('CdpCohortMembershipConsumer', () => {
expect(result.rows[0].in_cohort).toBe(true) // Back in the cohort
const thirdTimestamp = result.rows[0].last_updated
expect(new Date(thirdTimestamp).getTime()).toBeGreaterThan(new Date(secondTimestamp).getTime())
// Verify re-enter trigger event
kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic(
KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER
)
expect(kafkaMessages).toHaveLength(1)
expect(kafkaMessages[0].value).toEqual(reEnterEvent)
})
it('should reject entire batch when invalid messages are present', async () => {
@@ -213,7 +271,7 @@ describe('CdpCohortMembershipConsumer', () => {
]
// Should throw due to invalid messages in batch
await expect((consumer as any).handleBatch(messages)).rejects.toThrow()
expect(() => consumer['_parseAndValidateBatch'](messages)).toThrow()
// Verify NO data was inserted
const result = await hub.postgres.query(
@@ -225,5 +283,55 @@ describe('CdpCohortMembershipConsumer', () => {
expect(result.rows).toHaveLength(0) // No data should be inserted
})
it('should not publish to Kafka when database insertion fails', async () => {
const testEvents = createCohortMembershipEvents([
{
personId: personId1,
cohortId: 456,
teamId: 1,
cohort_membership_changed: 'entered',
},
{
personId: personId2,
cohortId: 456,
teamId: 1,
cohort_membership_changed: 'entered',
},
])
const messages = testEvents.map((event, index) =>
createKafkaMessage(event, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: index })
)
const cohortMembershipChanges = consumer['_parseAndValidateBatch'](messages)
// Mock the database query to fail
const originalQuery = hub.postgres.query.bind(hub.postgres)
hub.postgres.query = jest.fn().mockRejectedValue(new Error('Database connection failed'))
// Attempt to persist changes (should fail)
await expect(consumer['persistCohortMembershipChanges'](cohortMembershipChanges)).rejects.toThrow(
'Database connection failed'
)
// Verify NO messages were published to Kafka since DB insertion failed
const kafkaMessages = mockProducerObserver.getProducedKafkaMessagesForTopic(
KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER
)
expect(kafkaMessages).toHaveLength(0)
// Restore original query function
hub.postgres.query = originalQuery
// Verify data was NOT written to PostgreSQL
const result = await hub.postgres.query(
PostgresUse.BEHAVIORAL_COHORTS_RW,
'SELECT * FROM cohort_membership WHERE team_id = $1',
[1],
'testQuery'
)
expect(result.rows).toHaveLength(0)
})
})
})

View File

@@ -1,7 +1,9 @@
import { Message } from 'node-rdkafka'
import { z } from 'zod'
import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics'
import { instrumentFn } from '~/common/tracing/tracing-utils'
import { KAFKA_COHORT_MEMBERSHIP_CHANGED, KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER } from '../../config/kafka-topics'
import { KafkaConsumer } from '../../kafka/consumer'
import { HealthCheckResult, Hub } from '../../types'
import { PostgresUse } from '../../utils/db/postgres'
@@ -31,7 +33,23 @@ export class CdpCohortMembershipConsumer extends CdpConsumerBase {
})
}
private async handleBatchCohortMembership(changes: CohortMembershipChange[]): Promise<void> {
private async publishCohortMembershipTriggers(changes: CohortMembershipChange[]): Promise<void> {
if (!this.kafkaProducer || changes.length === 0) {
return
}
const messages = changes.map((change) => ({
value: JSON.stringify(change),
key: change.personId,
}))
await this.kafkaProducer.queueMessages({
topic: KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER,
messages,
})
}
private async persistCohortMembershipChanges(changes: CohortMembershipChange[]): Promise<void> {
if (changes.length === 0) {
return
}
@@ -76,7 +94,7 @@ export class CdpCohortMembershipConsumer extends CdpConsumerBase {
}
}
private async handleBatch(messages: Message[]): Promise<void> {
private _parseAndValidateBatch(messages: Message[]): CohortMembershipChange[] {
const cohortMembershipChanges: CohortMembershipChange[] = []
// Process and validate all messages
@@ -111,7 +129,7 @@ export class CdpCohortMembershipConsumer extends CdpConsumerBase {
}
}
await this.handleBatchCohortMembership(cohortMembershipChanges)
return cohortMembershipChanges
}
public async start(): Promise<void> {
@@ -124,10 +142,20 @@ export class CdpCohortMembershipConsumer extends CdpConsumerBase {
size: messages.length,
})
await this.handleBatch(messages)
})
return instrumentFn('cdpCohortMembershipConsumer.handleEachBatch', async () => {
const cohortMembershipChanges = this._parseAndValidateBatch(messages)
logger.info('✅', `${this.name} started successfully`)
// First persist changes to the database
await this.persistCohortMembershipChanges(cohortMembershipChanges)
// Then publish trigger events as a background task
const backgroundTask = this.publishCohortMembershipTriggers(cohortMembershipChanges).catch((error) => {
logger.error('Failed to publish cohort membership triggers', { error })
})
return { backgroundTask }
})
})
}
public async stop(): Promise<void> {

View File

@@ -48,6 +48,7 @@ export const KAFKA_CDP_INTERNAL_EVENTS = `${prefix}cdp_internal_events${suffix}`
export const KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES = `${prefix}clickhouse_behavioral_cohorts_matches${suffix}`
export const KAFKA_CDP_CLICKHOUSE_PREFILTERED_EVENTS = `${prefix}clickhouse_prefiltered_events${suffix}`
export const KAFKA_COHORT_MEMBERSHIP_CHANGED = `${prefix}cohort_membership_changed${suffix}`
export const KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER = `${prefix}cohort_membership_changed_trigger${suffix}`
// Error tracking topics
export const KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT = `${prefix}clickhouse_error_tracking_issue_fingerprint${suffix}`

View File

@@ -10,6 +10,7 @@ import {
KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
KAFKA_COHORT_MEMBERSHIP_CHANGED,
KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER,
KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES,
KAFKA_EVENTS_DEAD_LETTER_QUEUE,
KAFKA_EVENTS_JSON,
@@ -61,6 +62,7 @@ export async function resetKafka(extraServerConfig?: Partial<PluginsServerConfig
KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES,
KAFKA_CDP_CLICKHOUSE_PREFILTERED_EVENTS,
KAFKA_COHORT_MEMBERSHIP_CHANGED,
KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER,
])
}

View File

@@ -51,3 +51,4 @@ KAFKA_DOCUMENT_EMBEDDINGS_TOPIC = f"{KAFKA_PREFIX}clickhouse_document_embeddings
KAFKA_CDP_INTERNAL_EVENTS = f"{KAFKA_PREFIX}cdp_internal_events{SUFFIX}"
KAFKA_COHORT_MEMBERSHIP_CHANGED = f"{KAFKA_PREFIX}cohort_membership_changed{SUFFIX}"
KAFKA_COHORT_MEMBERSHIP_CHANGED_TRIGGER = f"{KAFKA_PREFIX}cohort_membership_changed_trigger{SUFFIX}"