mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat(myke): reverts add cohort membership consumer (#39536)
This commit is contained in:
3
.github/workflows/ci-nodejs.yml
vendored
3
.github/workflows/ci-nodejs.yml
vendored
@@ -206,15 +206,12 @@ jobs:
|
||||
TEST: 'true'
|
||||
SECRET_KEY: 'abcdef' # unsafe - for testing only
|
||||
DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/posthog'
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/behavioral_cohorts'
|
||||
|
||||
run: pnpm --filter=@posthog/plugin-server setup:test
|
||||
|
||||
- name: Test with Jest
|
||||
env:
|
||||
# Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB
|
||||
DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_posthog'
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts'
|
||||
PERSONS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_persons'
|
||||
PERSONS_READONLY_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_persons'
|
||||
PERSONS_READONLY_MIGRATION_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_persons_migration'
|
||||
|
||||
@@ -20,13 +20,6 @@ if [ -d "$SCRIPT_DIR/../rust/bin" ]; then
|
||||
fi
|
||||
fi
|
||||
|
||||
# Run behavioral cohorts migrations
|
||||
bash $SCRIPT_DIR/migrate-behavioral-cohorts.sh
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Error in migrate-behavioral-cohorts.sh, exiting."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Create a temporary file for background process status
|
||||
# Ensure cleanup of temp file on script exit
|
||||
trap 'rm -f "$CLICKHOUSE_STATUS" 2>/dev/null' EXIT
|
||||
|
||||
@@ -1,37 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Simple bash script for behavioral cohorts migration
|
||||
set -e
|
||||
|
||||
# Set database name based on environment
|
||||
if [[ "${NODE_ENV}" == "test" || "${TEST}" == "1" ]]; then
|
||||
DB_NAME="test_behavioral_cohorts"
|
||||
else
|
||||
DB_NAME="behavioral_cohorts"
|
||||
fi
|
||||
|
||||
# Use environment variables with defaults
|
||||
DB_HOST="${POSTGRES_BEHAVIORAL_COHORTS_HOST:-localhost}"
|
||||
DB_USER="${POSTGRES_BEHAVIORAL_COHORTS_USER:-posthog}"
|
||||
DB_PASS="${POSTGRES_BEHAVIORAL_COHORTS_PASSWORD:-posthog}"
|
||||
DB_PORT="5432"
|
||||
|
||||
# Get database URL from environment or construct from defaults
|
||||
BEHAVIORAL_COHORTS_DB_URL="${BEHAVIORAL_COHORTS_DATABASE_URL:-postgres://${DB_USER}:${DB_PASS}@${DB_HOST}:${DB_PORT}/${DB_NAME}}"
|
||||
|
||||
echo "Performing behavioral cohorts migrations"
|
||||
echo "Database name: ${DB_NAME}"
|
||||
|
||||
# Create database if it doesn't exist
|
||||
if [[ -n "${DB_PASS}" ]]; then
|
||||
export PGPASSWORD="${DB_PASS}"
|
||||
fi
|
||||
psql -h "${DB_HOST}" -p "${DB_PORT}" -U "${DB_USER}" -d postgres -c "CREATE DATABASE ${DB_NAME}" 2>/dev/null || true
|
||||
|
||||
# Change to plugin-server directory to ensure correct paths
|
||||
cd "$(dirname "$0")/../plugin-server"
|
||||
|
||||
# Run migrations
|
||||
DATABASE_URL="${BEHAVIORAL_COHORTS_DB_URL}" npx node-pg-migrate up
|
||||
|
||||
echo "Behavioral cohorts migrations completed successfully"
|
||||
@@ -54,9 +54,6 @@ procs:
|
||||
migrate-persons-db:
|
||||
shell: 'bin/check_postgres_up posthog_persons && cd rust && DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog_persons sqlx migrate run --source persons_migrations'
|
||||
|
||||
migrate-behavioral-cohorts:
|
||||
shell: 'bin/check_postgres_up behavioral_cohorts && bin/migrate-behavioral-cohorts.sh'
|
||||
|
||||
storybook:
|
||||
shell: 'pnpm --filter=@posthog/storybook install && pnpm run storybook'
|
||||
autostart: false
|
||||
|
||||
@@ -112,9 +112,6 @@ procs:
|
||||
migrate-persons-db:
|
||||
shell: 'bin/check_postgres_up posthog_persons && cd rust && DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog_persons sqlx migrate run --source persons_migrations'
|
||||
|
||||
migrate-behavioral-cohorts:
|
||||
shell: 'bin/check_postgres_up behavioral_cohorts && bin/migrate-behavioral-cohorts.sh'
|
||||
|
||||
generate-demo-data:
|
||||
shell: |-
|
||||
bin/check_postgres_up && \
|
||||
|
||||
@@ -290,7 +290,6 @@ services:
|
||||
environment:
|
||||
DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
PERSONS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
KAFKA_HOSTS: 'kafka:9092'
|
||||
REDIS_URL: 'redis://redis:6379/'
|
||||
CLICKHOUSE_HOST: 'clickhouse'
|
||||
|
||||
@@ -139,7 +139,6 @@ services:
|
||||
ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS
|
||||
CYCLOTRON_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
PERSONS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
OTEL_EXPORTER_OTLP_ENDPOINT: ''
|
||||
depends_on:
|
||||
- db
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"migrationsTable": "pgmigrations",
|
||||
"dir": "migrations",
|
||||
"dir": "src/migrations",
|
||||
"checkOrder": true,
|
||||
"migrationFilenameFormat": "utc",
|
||||
"createMigrationsSchema": false,
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
exports.up = (pgm) => {
|
||||
// Table for tracking cohort membership - replicating ClickHouse structure
|
||||
pgm.createTable('cohort_membership', {
|
||||
id: {
|
||||
type: 'bigserial',
|
||||
primaryKey: true,
|
||||
},
|
||||
team_id: {
|
||||
type: 'bigint',
|
||||
notNull: true,
|
||||
},
|
||||
cohort_id: {
|
||||
type: 'bigint',
|
||||
notNull: true,
|
||||
},
|
||||
person_id: {
|
||||
type: 'uuid',
|
||||
notNull: true,
|
||||
},
|
||||
in_cohort: {
|
||||
type: 'boolean',
|
||||
notNull: true,
|
||||
},
|
||||
last_updated: {
|
||||
type: 'timestamp',
|
||||
notNull: true,
|
||||
default: pgm.func('CURRENT_TIMESTAMP'),
|
||||
},
|
||||
})
|
||||
|
||||
// Add index on person_id, cohort_id, and team_id for query performance
|
||||
pgm.createIndex('cohort_membership', ['person_id', 'cohort_id', 'team_id'])
|
||||
|
||||
// Add unique constraint to prevent duplicate entries
|
||||
pgm.addConstraint('cohort_membership', 'cohort_membership_unique', {
|
||||
unique: ['team_id', 'cohort_id', 'person_id'],
|
||||
})
|
||||
}
|
||||
|
||||
exports.down = (pgm) => {
|
||||
pgm.dropTable('cohort_membership')
|
||||
}
|
||||
@@ -26,15 +26,10 @@
|
||||
"prettier:check": "prettier --check .",
|
||||
"prepublishOnly": "pnpm build",
|
||||
"setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse",
|
||||
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron && pnpm run setup:test:persons && pnpm run setup:test:behavioral-cohorts && pnpm run setup:test:persons-migration",
|
||||
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron && pnpm run setup:test:persons && pnpm run setup:test:persons-migration",
|
||||
"setup:test:cyclotron": "CYCLOTRON_DATABASE_NAME=test_cyclotron ../rust/bin/migrate-cyclotron",
|
||||
"setup:test:persons": "PERSONS_DATABASE_NAME=test_persons ../rust/bin/migrate-persons",
|
||||
"setup:test:persons-migration": "PERSONS_DATABASE_NAME=test_persons_migration ../rust/bin/migrate-persons",
|
||||
"setup:test:behavioral-cohorts": "TEST=1 BEHAVIORAL_COHORTS_DATABASE_URL='postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts' ../bin/migrate-behavioral-cohorts.sh",
|
||||
"migrate:behavioral-cohorts": "../bin/migrate-behavioral-cohorts.sh",
|
||||
"migrate:behavioral-cohorts:test": "BEHAVIORAL_COHORTS_DATABASE_URL='postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts' node-pg-migrate up",
|
||||
"migrate:behavioral-cohorts:down": "node-pg-migrate down",
|
||||
"migrate:behavioral-cohorts:create": "node-pg-migrate create --javascript-file",
|
||||
"services:start": "cd .. && docker compose -f docker-compose.dev.yml up",
|
||||
"services:stop": "cd .. && docker compose -f docker-compose.dev.yml down",
|
||||
"services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v",
|
||||
|
||||
@@ -21,7 +21,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
|
||||
cdpCyclotronWorkerHogFlow: true,
|
||||
cdpCyclotronWorkerDelay: true,
|
||||
cdpBehaviouralEvents: true,
|
||||
cdpCohortMembership: true,
|
||||
cdpApi: true,
|
||||
}
|
||||
|
||||
@@ -36,7 +35,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
|
||||
cdpCyclotronWorkerHogFlow: true,
|
||||
cdpCyclotronWorkerDelay: true,
|
||||
cdpBehaviouralEvents: true,
|
||||
cdpCohortMembership: true,
|
||||
cdpApi: true,
|
||||
}
|
||||
|
||||
@@ -87,10 +85,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
|
||||
return {
|
||||
cdpBehaviouralEvents: true,
|
||||
}
|
||||
case PluginServerMode.cdp_cohort_membership:
|
||||
return {
|
||||
cdpCohortMembership: true,
|
||||
}
|
||||
case PluginServerMode.cdp_legacy_on_event:
|
||||
return {
|
||||
cdpLegacyOnEvent: true,
|
||||
|
||||
@@ -7,7 +7,6 @@ import { insertRow } from '~/tests/helpers/sql'
|
||||
import { ClickHousePerson, ClickHouseTimestamp, ProjectId, RawClickHouseEvent, Team } from '../../types'
|
||||
import { PostgresRouter } from '../../utils/db/postgres'
|
||||
import { UUIDT } from '../../utils/utils'
|
||||
import { CohortMembershipChange } from '../consumers/cdp-cohort-membership.consumer'
|
||||
import { CdpInternalEvent } from '../schema'
|
||||
import { compileHog } from '../templates/compiler'
|
||||
import {
|
||||
@@ -287,55 +286,3 @@ export const createExampleInvocation = (
|
||||
queuePriority: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Cohort Membership Test Helpers
|
||||
export const createCohortMembershipEvent = (
|
||||
overrides: Partial<CohortMembershipChange> = {}
|
||||
): CohortMembershipChange => {
|
||||
return {
|
||||
personId: new UUIDT().toString(),
|
||||
cohortId: 1,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
...overrides,
|
||||
}
|
||||
}
|
||||
|
||||
export const createCohortMembershipEvents = (events: Partial<CohortMembershipChange>[]): CohortMembershipChange[] => {
|
||||
return events.map((event) => createCohortMembershipEvent(event))
|
||||
}
|
||||
|
||||
export interface CohortMembershipRecord {
|
||||
team_id: number
|
||||
cohort_id: number
|
||||
person_id: string
|
||||
in_cohort: boolean
|
||||
last_updated?: Date
|
||||
}
|
||||
|
||||
export const insertCohortMembership = async (
|
||||
db: PostgresRouter,
|
||||
membership: Partial<CohortMembershipRecord>
|
||||
): Promise<CohortMembershipRecord> => {
|
||||
const record: CohortMembershipRecord = {
|
||||
team_id: 1,
|
||||
cohort_id: 1,
|
||||
person_id: new UUIDT().toString(),
|
||||
in_cohort: true,
|
||||
...membership,
|
||||
}
|
||||
|
||||
// insertRow now automatically determines the correct database based on table name
|
||||
return await insertRow(db, 'cohort_membership', record)
|
||||
}
|
||||
|
||||
export const insertCohortMemberships = async (
|
||||
db: PostgresRouter,
|
||||
memberships: Partial<CohortMembershipRecord>[]
|
||||
): Promise<CohortMembershipRecord[]> => {
|
||||
const results = []
|
||||
for (const membership of memberships) {
|
||||
results.push(await insertCohortMembership(db, membership))
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
@@ -1,229 +0,0 @@
|
||||
import { Message } from 'node-rdkafka'
|
||||
|
||||
import { resetKafka } from '~/tests/helpers/kafka'
|
||||
import { UUIDT } from '~/utils/utils'
|
||||
|
||||
import { resetBehavioralCohortsDatabase } from '../../../tests/helpers/sql'
|
||||
import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics'
|
||||
import { Hub } from '../../types'
|
||||
import { closeHub, createHub } from '../../utils/db/hub'
|
||||
import { PostgresUse } from '../../utils/db/postgres'
|
||||
import { createCohortMembershipEvent, createCohortMembershipEvents, createKafkaMessage } from '../_tests/fixtures'
|
||||
import { CdpCohortMembershipConsumer } from './cdp-cohort-membership.consumer'
|
||||
|
||||
jest.setTimeout(20_000)
|
||||
|
||||
describe('CdpCohortMembershipConsumer', () => {
|
||||
let hub: Hub
|
||||
let consumer: CdpCohortMembershipConsumer
|
||||
|
||||
beforeEach(async () => {
|
||||
await resetKafka()
|
||||
hub = await createHub()
|
||||
consumer = new CdpCohortMembershipConsumer(hub)
|
||||
await resetBehavioralCohortsDatabase(hub.postgres)
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await consumer.stop()
|
||||
await closeHub(hub)
|
||||
})
|
||||
|
||||
describe('end-to-end cohort membership processing', () => {
|
||||
const personId1 = new UUIDT().toString()
|
||||
const personId2 = new UUIDT().toString()
|
||||
const personId3 = new UUIDT().toString()
|
||||
it('should process entered and left events and write to PostgreSQL correctly', async () => {
|
||||
// Test data using helper functions
|
||||
const testEvents = createCohortMembershipEvents([
|
||||
{
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
},
|
||||
{
|
||||
personId: personId2,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
},
|
||||
{
|
||||
personId: personId3,
|
||||
cohortId: 457,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'left',
|
||||
},
|
||||
])
|
||||
|
||||
// Create mock Kafka messages
|
||||
const messages = testEvents.map((event, index) =>
|
||||
createKafkaMessage(event, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: index })
|
||||
)
|
||||
|
||||
// Process the batch of messages
|
||||
await (consumer as any).handleBatch(messages)
|
||||
|
||||
// Verify data was written to PostgreSQL
|
||||
const result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1 ORDER BY person_id, cohort_id',
|
||||
[1],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows).toHaveLength(3)
|
||||
|
||||
// Verify first entered event
|
||||
expect(result.rows[0]).toMatchObject({
|
||||
team_id: '1',
|
||||
cohort_id: '456',
|
||||
person_id: personId1,
|
||||
in_cohort: true,
|
||||
})
|
||||
|
||||
// Verify second entered event
|
||||
expect(result.rows[1]).toMatchObject({
|
||||
team_id: '1',
|
||||
cohort_id: '456',
|
||||
person_id: personId2,
|
||||
in_cohort: true,
|
||||
})
|
||||
|
||||
// Verify left event
|
||||
expect(result.rows[2]).toMatchObject({
|
||||
team_id: '1',
|
||||
cohort_id: '457',
|
||||
person_id: personId3,
|
||||
in_cohort: false,
|
||||
})
|
||||
})
|
||||
|
||||
it('should handle complete person lifecycle: enter -> leave -> re-enter cohort', async () => {
|
||||
// Step 1: Person enters the cohort for the first time
|
||||
const enterEvent = createCohortMembershipEvent({
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
})
|
||||
|
||||
await (consumer as any).handleBatch([
|
||||
createKafkaMessage(enterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 0 }),
|
||||
])
|
||||
|
||||
let result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1 AND person_id = $2 AND cohort_id = $3',
|
||||
[1, personId1, 456],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows[0].in_cohort).toBe(true)
|
||||
const firstTimestamp = result.rows[0].last_updated
|
||||
|
||||
// Wait to ensure timestamp difference
|
||||
await new Promise((resolve) => setTimeout(resolve, 10))
|
||||
|
||||
// Step 2: Person leaves the cohort
|
||||
const leaveEvent = createCohortMembershipEvent({
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'left',
|
||||
})
|
||||
|
||||
await (consumer as any).handleBatch([
|
||||
createKafkaMessage(leaveEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 1 }),
|
||||
])
|
||||
|
||||
result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1 AND person_id = $2 AND cohort_id = $3',
|
||||
[1, personId1, 456],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows).toHaveLength(1) // Same record, just updated
|
||||
expect(result.rows[0].in_cohort).toBe(false)
|
||||
const secondTimestamp = result.rows[0].last_updated
|
||||
expect(new Date(secondTimestamp).getTime()).toBeGreaterThan(new Date(firstTimestamp).getTime())
|
||||
|
||||
// Wait to ensure timestamp difference
|
||||
await new Promise((resolve) => setTimeout(resolve, 10))
|
||||
|
||||
// Step 3: Person re-enters the cohort
|
||||
const reEnterEvent = createCohortMembershipEvent({
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
})
|
||||
|
||||
await (consumer as any).handleBatch([
|
||||
createKafkaMessage(reEnterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 2 }),
|
||||
])
|
||||
|
||||
result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1 AND person_id = $2 AND cohort_id = $3',
|
||||
[1, personId1, 456],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows).toHaveLength(1) // Still same record, just updated again
|
||||
expect(result.rows[0].in_cohort).toBe(true) // Back in the cohort
|
||||
const thirdTimestamp = result.rows[0].last_updated
|
||||
expect(new Date(thirdTimestamp).getTime()).toBeGreaterThan(new Date(secondTimestamp).getTime())
|
||||
})
|
||||
|
||||
it('should reject entire batch when invalid messages are present', async () => {
|
||||
const validEvent = {
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
}
|
||||
|
||||
const messages: Message[] = [
|
||||
// Valid message
|
||||
createKafkaMessage(validEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 0 }),
|
||||
// Invalid JSON (manually create this one since it's intentionally malformed)
|
||||
{
|
||||
value: Buffer.from('invalid json'),
|
||||
topic: KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
partition: 0,
|
||||
offset: 1,
|
||||
timestamp: Date.now(),
|
||||
key: null,
|
||||
size: 0,
|
||||
},
|
||||
// Missing required fields
|
||||
createKafkaMessage({ personId: 124 }, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 2 }),
|
||||
// Empty message (manually create this one since it has null value)
|
||||
{
|
||||
value: null,
|
||||
topic: KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
partition: 0,
|
||||
offset: 3,
|
||||
timestamp: Date.now(),
|
||||
key: null,
|
||||
size: 0,
|
||||
},
|
||||
]
|
||||
|
||||
// Should throw due to invalid messages in batch
|
||||
await expect((consumer as any).handleBatch(messages)).rejects.toThrow()
|
||||
|
||||
// Verify NO data was inserted
|
||||
const result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1',
|
||||
[1],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows).toHaveLength(0) // No data should be inserted
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1,145 +0,0 @@
|
||||
import { Message } from 'node-rdkafka'
|
||||
import { z } from 'zod'
|
||||
|
||||
import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics'
|
||||
import { KafkaConsumer } from '../../kafka/consumer'
|
||||
import { HealthCheckResult, Hub } from '../../types'
|
||||
import { PostgresUse } from '../../utils/db/postgres'
|
||||
import { parseJSON } from '../../utils/json-parse'
|
||||
import { logger } from '../../utils/logger'
|
||||
import { CdpConsumerBase } from './cdp-base.consumer'
|
||||
|
||||
// Zod schema for validation
|
||||
const CohortMembershipChangeSchema = z.object({
|
||||
personId: z.string().uuid(),
|
||||
cohortId: z.number(),
|
||||
teamId: z.number(),
|
||||
cohort_membership_changed: z.enum(['entered', 'left']),
|
||||
})
|
||||
|
||||
export type CohortMembershipChange = z.infer<typeof CohortMembershipChangeSchema>
|
||||
|
||||
export class CdpCohortMembershipConsumer extends CdpConsumerBase {
|
||||
protected name = 'CdpCohortMembershipConsumer'
|
||||
private kafkaConsumer: KafkaConsumer
|
||||
|
||||
constructor(hub: Hub) {
|
||||
super(hub)
|
||||
this.kafkaConsumer = new KafkaConsumer({
|
||||
groupId: 'cdp-cohort-membership-consumer',
|
||||
topic: KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
})
|
||||
}
|
||||
|
||||
private async handleBatchCohortMembership(changes: CohortMembershipChange[]): Promise<void> {
|
||||
if (changes.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
// Build the VALUES clause for batch upsert
|
||||
const values: any[] = []
|
||||
const placeholders: string[] = []
|
||||
let paramIndex = 1
|
||||
|
||||
for (const change of changes) {
|
||||
const inCohort = change.cohort_membership_changed === 'entered'
|
||||
placeholders.push(
|
||||
`($${paramIndex}, $${paramIndex + 1}, $${paramIndex + 2}, $${paramIndex + 3}, CURRENT_TIMESTAMP)`
|
||||
)
|
||||
values.push(change.teamId, change.cohortId, change.personId, inCohort)
|
||||
paramIndex += 4
|
||||
}
|
||||
|
||||
// Single batch UPSERT query - handles both entered and left events
|
||||
const query = `
|
||||
INSERT INTO cohort_membership (team_id, cohort_id, person_id, in_cohort, last_updated)
|
||||
VALUES ${placeholders.join(', ')}
|
||||
ON CONFLICT (team_id, cohort_id, person_id)
|
||||
DO UPDATE SET
|
||||
in_cohort = EXCLUDED.in_cohort,
|
||||
last_updated = CURRENT_TIMESTAMP
|
||||
`
|
||||
|
||||
await this.hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
query,
|
||||
values,
|
||||
'batchUpsertCohortMembership'
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error('Failed to process batch cohort membership changes', {
|
||||
error,
|
||||
count: changes.length,
|
||||
})
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private async handleBatch(messages: Message[]): Promise<void> {
|
||||
const cohortMembershipChanges: CohortMembershipChange[] = []
|
||||
|
||||
// Process and validate all messages
|
||||
for (const message of messages) {
|
||||
try {
|
||||
const messageValue = message.value?.toString()
|
||||
if (!messageValue) {
|
||||
throw new Error('Empty message received')
|
||||
}
|
||||
|
||||
const parsedMessage = parseJSON(messageValue)
|
||||
|
||||
// Validate using Zod schema
|
||||
const validationResult = CohortMembershipChangeSchema.safeParse(parsedMessage)
|
||||
|
||||
if (!validationResult.success) {
|
||||
logger.error('Invalid cohort membership change message', {
|
||||
errors: validationResult.error.errors,
|
||||
message: messageValue,
|
||||
})
|
||||
throw new Error(`Invalid cohort membership change message: ${validationResult.error.message}`)
|
||||
}
|
||||
|
||||
const cohortMembershipChange = validationResult.data
|
||||
cohortMembershipChanges.push(cohortMembershipChange)
|
||||
} catch (error) {
|
||||
logger.error('Error processing cohort membership change message', {
|
||||
error,
|
||||
message: message.value?.toString(),
|
||||
})
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
await this.handleBatchCohortMembership(cohortMembershipChanges)
|
||||
}
|
||||
|
||||
public async start(): Promise<void> {
|
||||
await super.start()
|
||||
|
||||
logger.info('🚀', `${this.name} starting...`)
|
||||
|
||||
await this.kafkaConsumer.connect(async (messages) => {
|
||||
logger.info('🔁', `${this.name} - handling batch`, {
|
||||
size: messages.length,
|
||||
})
|
||||
|
||||
await this.handleBatch(messages)
|
||||
})
|
||||
|
||||
logger.info('✅', `${this.name} started successfully`)
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
logger.info('💤', `Stopping ${this.name}...`)
|
||||
await this.kafkaConsumer.disconnect()
|
||||
|
||||
// IMPORTANT: super always comes last
|
||||
await super.stop()
|
||||
logger.info('💤', `${this.name} stopped!`)
|
||||
}
|
||||
|
||||
public isHealthy(): HealthCheckResult {
|
||||
return this.kafkaConsumer.isHealthy()
|
||||
}
|
||||
}
|
||||
@@ -40,11 +40,6 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
: isDevEnv()
|
||||
? 'postgres://posthog:posthog@localhost:5432/posthog_persons'
|
||||
: '',
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: isTestEnv()
|
||||
? 'postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts'
|
||||
: isDevEnv()
|
||||
? 'postgres://posthog:posthog@localhost:5432/behavioral_cohorts'
|
||||
: '',
|
||||
PERSONS_MIGRATION_DATABASE_URL: isTestEnv()
|
||||
? 'postgres://posthog:posthog@localhost:5432/test_persons_migration'
|
||||
: isDevEnv()
|
||||
@@ -61,9 +56,9 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
POSTHOG_DB_PASSWORD: '',
|
||||
POSTHOG_POSTGRES_HOST: 'localhost',
|
||||
POSTHOG_POSTGRES_PORT: 5432,
|
||||
POSTGRES_BEHAVIORAL_COHORTS_HOST: 'localhost',
|
||||
POSTGRES_BEHAVIORAL_COHORTS_USER: 'postgres',
|
||||
POSTGRES_BEHAVIORAL_COHORTS_PASSWORD: '',
|
||||
POSTGRES_COUNTERS_HOST: 'localhost',
|
||||
POSTGRES_COUNTERS_USER: 'postgres',
|
||||
POSTGRES_COUNTERS_PASSWORD: '',
|
||||
EVENT_OVERFLOW_BUCKET_CAPACITY: 1000,
|
||||
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: 1.0,
|
||||
KAFKA_BATCH_START_LOGGING_ENABLED: false,
|
||||
@@ -395,18 +390,6 @@ export function overrideWithEnv(
|
||||
).join(', ')}`
|
||||
)
|
||||
}
|
||||
|
||||
if (
|
||||
!newConfig.BEHAVIORAL_COHORTS_DATABASE_URL &&
|
||||
newConfig.POSTGRES_BEHAVIORAL_COHORTS_HOST &&
|
||||
newConfig.POSTGRES_BEHAVIORAL_COHORTS_USER &&
|
||||
newConfig.POSTGRES_BEHAVIORAL_COHORTS_PASSWORD
|
||||
) {
|
||||
const encodedUser = encodeURIComponent(newConfig.POSTGRES_BEHAVIORAL_COHORTS_USER)
|
||||
const encodedPassword = encodeURIComponent(newConfig.POSTGRES_BEHAVIORAL_COHORTS_PASSWORD)
|
||||
newConfig.BEHAVIORAL_COHORTS_DATABASE_URL = `postgres://${encodedUser}:${encodedPassword}@${newConfig.POSTGRES_BEHAVIORAL_COHORTS_HOST}:5432/behavioral_cohorts`
|
||||
}
|
||||
|
||||
return newConfig
|
||||
}
|
||||
|
||||
|
||||
@@ -46,7 +46,6 @@ export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`
|
||||
export const KAFKA_CDP_FUNCTION_OVERFLOW = `${prefix}cdp_function_overflow${suffix}`
|
||||
export const KAFKA_CDP_INTERNAL_EVENTS = `${prefix}cdp_internal_events${suffix}`
|
||||
export const KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES = `${prefix}clickhouse_behavioral_cohorts_matches${suffix}`
|
||||
export const KAFKA_COHORT_MEMBERSHIP_CHANGED = `${prefix}cohort_membership_changed${suffix}`
|
||||
|
||||
// Error tracking topics
|
||||
export const KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT = `${prefix}clickhouse_error_tracking_issue_fingerprint${suffix}`
|
||||
|
||||
58
plugin-server/src/migrations/001_create_counters_tables.js
Normal file
58
plugin-server/src/migrations/001_create_counters_tables.js
Normal file
@@ -0,0 +1,58 @@
|
||||
exports.up = (pgm) => {
|
||||
// Table for person performed events
|
||||
pgm.createTable('person_performed_events', {
|
||||
team_id: {
|
||||
type: 'integer',
|
||||
notNull: true,
|
||||
},
|
||||
person_id: {
|
||||
type: 'uuid',
|
||||
notNull: true,
|
||||
},
|
||||
event_name: {
|
||||
type: 'text',
|
||||
notNull: true,
|
||||
},
|
||||
})
|
||||
|
||||
// Add composite primary key
|
||||
pgm.addConstraint('person_performed_events', 'person_performed_events_pkey', {
|
||||
primaryKey: ['team_id', 'person_id', 'event_name'],
|
||||
})
|
||||
|
||||
// Table for behavioural filter matched events
|
||||
pgm.createTable('behavioural_filter_matched_events', {
|
||||
team_id: {
|
||||
type: 'integer',
|
||||
notNull: true,
|
||||
},
|
||||
person_id: {
|
||||
type: 'uuid',
|
||||
notNull: true,
|
||||
},
|
||||
filter_hash: {
|
||||
type: 'text',
|
||||
notNull: true,
|
||||
},
|
||||
date: {
|
||||
type: 'date',
|
||||
notNull: true,
|
||||
},
|
||||
counter: {
|
||||
type: 'integer',
|
||||
notNull: true,
|
||||
default: 0,
|
||||
},
|
||||
})
|
||||
|
||||
// Add composite primary key with date first for better cache locality
|
||||
// Since we only update today's data, this keeps the working set small
|
||||
pgm.addConstraint('behavioural_filter_matched_events', 'behavioural_filter_matched_events_pkey', {
|
||||
primaryKey: ['date', 'team_id', 'person_id', 'filter_hash'],
|
||||
})
|
||||
}
|
||||
|
||||
exports.down = (pgm) => {
|
||||
pgm.dropTable('behavioural_filter_matched_events')
|
||||
pgm.dropTable('person_performed_events')
|
||||
}
|
||||
@@ -11,7 +11,6 @@ import { setupCommonRoutes, setupExpressApp } from './api/router'
|
||||
import { getPluginServerCapabilities } from './capabilities'
|
||||
import { CdpApi } from './cdp/cdp-api'
|
||||
import { CdpBehaviouralEventsConsumer } from './cdp/consumers/cdp-behavioural-events.consumer'
|
||||
import { CdpCohortMembershipConsumer } from './cdp/consumers/cdp-cohort-membership.consumer'
|
||||
import { CdpCyclotronDelayConsumer } from './cdp/consumers/cdp-cyclotron-delay.consumer'
|
||||
import { CdpCyclotronWorkerHogFlow } from './cdp/consumers/cdp-cyclotron-worker-hogflow.consumer'
|
||||
import { CdpCyclotronWorker } from './cdp/consumers/cdp-cyclotron-worker.consumer'
|
||||
@@ -274,14 +273,6 @@ export class PluginServer {
|
||||
})
|
||||
}
|
||||
|
||||
if (capabilities.cdpCohortMembership) {
|
||||
serviceLoaders.push(async () => {
|
||||
const consumer = new CdpCohortMembershipConsumer(hub)
|
||||
await consumer.start()
|
||||
return consumer.service
|
||||
})
|
||||
}
|
||||
|
||||
const readyServices = await Promise.all(serviceLoaders.map((loader) => loader()))
|
||||
this.services.push(...readyServices)
|
||||
|
||||
|
||||
@@ -81,7 +81,6 @@ export enum PluginServerMode {
|
||||
cdp_internal_events = 'cdp-internal-events',
|
||||
cdp_cyclotron_worker = 'cdp-cyclotron-worker',
|
||||
cdp_behavioural_events = 'cdp-behavioural-events',
|
||||
cdp_cohort_membership = 'cdp-cohort-membership',
|
||||
cdp_cyclotron_worker_hogflow = 'cdp-cyclotron-worker-hogflow',
|
||||
cdp_cyclotron_worker_delay = 'cdp-cyclotron-worker-delay',
|
||||
cdp_api = 'cdp-api',
|
||||
@@ -280,7 +279,6 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
|
||||
DATABASE_URL: string // Postgres database URL
|
||||
DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database
|
||||
PERSONS_DATABASE_URL: string // Optional read-write Postgres database for persons
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: string // Optional read-write Postgres database for behavioral cohorts
|
||||
PERSONS_READONLY_DATABASE_URL: string // Optional read-only replica to the persons Postgres database
|
||||
PERSONS_MIGRATION_DATABASE_URL: string // Read-write Postgres database for persons during dual write/migration
|
||||
PERSONS_MIGRATION_READONLY_DATABASE_URL: string // Optional read-only replica to the persons Postgres database during dual write/migration
|
||||
@@ -291,9 +289,9 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
|
||||
POSTHOG_DB_PASSWORD: string
|
||||
POSTHOG_POSTGRES_HOST: string
|
||||
POSTHOG_POSTGRES_PORT: number
|
||||
POSTGRES_BEHAVIORAL_COHORTS_HOST: string
|
||||
POSTGRES_BEHAVIORAL_COHORTS_USER: string
|
||||
POSTGRES_BEHAVIORAL_COHORTS_PASSWORD: string
|
||||
POSTGRES_COUNTERS_HOST: string
|
||||
POSTGRES_COUNTERS_USER: string
|
||||
POSTGRES_COUNTERS_PASSWORD: string
|
||||
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string
|
||||
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string
|
||||
// Redis url pretty much only used locally / self hosted
|
||||
@@ -549,7 +547,6 @@ export interface PluginServerCapabilities {
|
||||
cdpCyclotronWorkerHogFlow?: boolean
|
||||
cdpCyclotronWorkerDelay?: boolean
|
||||
cdpBehaviouralEvents?: boolean
|
||||
cdpCohortMembership?: boolean
|
||||
cdpApi?: boolean
|
||||
appManagementSingleton?: boolean
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ export enum PostgresUse {
|
||||
PLUGIN_STORAGE_RW, // Plugin Storage table, no read replica for it
|
||||
PERSONS_READ, // Person database, read replica
|
||||
PERSONS_WRITE, // Person database, write
|
||||
BEHAVIORAL_COHORTS_RW, // Behavioral cohorts database for behavioral cohorts
|
||||
COUNTERS_RW, // Counters database for aggregations
|
||||
}
|
||||
|
||||
export class TransactionClient {
|
||||
@@ -49,7 +49,7 @@ export class PostgresRouter {
|
||||
[PostgresUse.COMMON_READ, commonClient],
|
||||
[PostgresUse.PLUGIN_STORAGE_RW, commonClient],
|
||||
[PostgresUse.PERSONS_WRITE, commonClient],
|
||||
[PostgresUse.BEHAVIORAL_COHORTS_RW, commonClient],
|
||||
[PostgresUse.COUNTERS_RW, commonClient],
|
||||
])
|
||||
|
||||
if (serverConfig.DATABASE_READONLY_URL) {
|
||||
@@ -88,20 +88,6 @@ export class PostgresRouter {
|
||||
)
|
||||
logger.info('👍', `Persons Postgresql ready`)
|
||||
}
|
||||
|
||||
if (serverConfig.BEHAVIORAL_COHORTS_DATABASE_URL) {
|
||||
logger.info('🤔', `Connecting to behavioral cohorts Postgresql...`)
|
||||
this.pools.set(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
createPostgresPool(
|
||||
serverConfig.BEHAVIORAL_COHORTS_DATABASE_URL,
|
||||
serverConfig.POSTGRES_CONNECTION_POOL_SIZE,
|
||||
app_name
|
||||
)
|
||||
)
|
||||
logger.info('👍', `Behavioral cohorts Postgresql ready`)
|
||||
}
|
||||
|
||||
if (serverConfig.PERSONS_READONLY_DATABASE_URL) {
|
||||
logger.info('🤔', `Connecting to persons read-only Postgresql...`)
|
||||
this.pools.set(
|
||||
|
||||
@@ -8,7 +8,6 @@ import {
|
||||
KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES,
|
||||
KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
|
||||
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
|
||||
KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES,
|
||||
KAFKA_EVENTS_DEAD_LETTER_QUEUE,
|
||||
KAFKA_EVENTS_JSON,
|
||||
@@ -58,7 +57,6 @@ export async function resetKafka(extraServerConfig?: Partial<PluginsServerConfig
|
||||
KAFKA_EVENTS_RECENT_JSON,
|
||||
KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES,
|
||||
KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES,
|
||||
KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
])
|
||||
}
|
||||
|
||||
|
||||
@@ -218,20 +218,9 @@ export async function resetTestDatabase(
|
||||
|
||||
// Helper function to determine which database a table belongs to
|
||||
function getPostgresUseForTable(table: string): PostgresUse {
|
||||
// Behavioral cohorts tables
|
||||
if (table === 'cohort_membership') {
|
||||
return PostgresUse.BEHAVIORAL_COHORTS_RW
|
||||
}
|
||||
|
||||
// Persons-related tables
|
||||
const personsTablesRegex =
|
||||
/^posthog_(person|persondistinctid|personlessdistinctid|personoverridemapping|personoverride|pendingpersonoverride|flatpersonoverride|featureflaghashkeyoverride|cohortpeople|group|grouptypemapping)$/
|
||||
if (personsTablesRegex.test(table)) {
|
||||
return PostgresUse.PERSONS_WRITE
|
||||
}
|
||||
|
||||
// Default to common tables
|
||||
return PostgresUse.COMMON_WRITE
|
||||
return personsTablesRegex.test(table) ? PostgresUse.PERSONS_WRITE : PostgresUse.COMMON_WRITE
|
||||
}
|
||||
|
||||
export async function insertRow(db: PostgresRouter, table: string, objectProvided: Record<string, any>) {
|
||||
@@ -648,11 +637,11 @@ export async function fetchPostgresDistinctIdsForPerson(db: DB, personId: string
|
||||
)
|
||||
}
|
||||
|
||||
export async function resetBehavioralCohortsDatabase(db: PostgresRouter): Promise<void> {
|
||||
export async function resetCountersDatabase(db: PostgresRouter): Promise<void> {
|
||||
await db.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'TRUNCATE TABLE cohort_membership',
|
||||
PostgresUse.COUNTERS_RW,
|
||||
'TRUNCATE TABLE person_performed_events, behavioural_filter_matched_events',
|
||||
undefined,
|
||||
'reset-behavioral-cohorts-db'
|
||||
'reset-counters-db'
|
||||
)
|
||||
}
|
||||
|
||||
@@ -49,4 +49,3 @@ KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_EMBEDDINGS = (
|
||||
KAFKA_DOCUMENT_EMBEDDINGS_TOPIC = f"{KAFKA_PREFIX}clickhouse_document_embeddings{SUFFIX}"
|
||||
|
||||
KAFKA_CDP_INTERNAL_EVENTS = f"{KAFKA_PREFIX}cdp_internal_events{SUFFIX}"
|
||||
KAFKA_COHORT_MEMBERSHIP_CHANGED = f"{KAFKA_PREFIX}cohort_membership_changed{SUFFIX}"
|
||||
|
||||
Reference in New Issue
Block a user