From fc60c1537d73be95b8e1c8095009acc51a965ddd Mon Sep 17 00:00:00 2001 From: Meikel Ratz Date: Mon, 13 Oct 2025 10:28:01 +0200 Subject: [PATCH] feat(myke): reverts add cohort membership consumer (#39536) --- .github/workflows/ci-nodejs.yml | 3 - bin/migrate | 7 - bin/migrate-behavioral-cohorts.sh | 37 --- bin/mprocs-minimal.yaml | 3 - bin/mprocs.yaml | 3 - docker-compose.base.yml | 1 - docker-compose.hobby.yml | 1 - plugin-server/.node-pg-migrate.json | 2 +- .../001_create_cohort_membership_tables.js | 42 ---- plugin-server/package.json | 7 +- plugin-server/src/capabilities.ts | 6 - plugin-server/src/cdp/_tests/fixtures.ts | 53 ---- .../cdp-cohort-membership.consumer.test.ts | 229 ------------------ .../cdp-cohort-membership.consumer.ts | 145 ----------- plugin-server/src/config/config.ts | 23 +- plugin-server/src/config/kafka-topics.ts | 1 - .../migrations/001_create_counters_tables.js | 58 +++++ plugin-server/src/server.ts | 9 - plugin-server/src/types.ts | 9 +- plugin-server/src/utils/db/postgres.ts | 18 +- plugin-server/tests/helpers/kafka.ts | 2 - plugin-server/tests/helpers/sql.ts | 21 +- posthog/kafka_client/topics.py | 1 - 23 files changed, 73 insertions(+), 608 deletions(-) delete mode 100755 bin/migrate-behavioral-cohorts.sh delete mode 100644 plugin-server/migrations/001_create_cohort_membership_tables.js delete mode 100644 plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.test.ts delete mode 100644 plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.ts create mode 100644 plugin-server/src/migrations/001_create_counters_tables.js diff --git a/.github/workflows/ci-nodejs.yml b/.github/workflows/ci-nodejs.yml index c45bce7280..175df69faa 100644 --- a/.github/workflows/ci-nodejs.yml +++ b/.github/workflows/ci-nodejs.yml @@ -206,15 +206,12 @@ jobs: TEST: 'true' SECRET_KEY: 'abcdef' # unsafe - for testing only DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/posthog' - BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/behavioral_cohorts' - run: pnpm --filter=@posthog/plugin-server setup:test - name: Test with Jest env: # Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_posthog' - BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts' PERSONS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_persons' PERSONS_READONLY_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_persons' PERSONS_READONLY_MIGRATION_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_persons_migration' diff --git a/bin/migrate b/bin/migrate index 330b0cec22..b8be0f08c3 100755 --- a/bin/migrate +++ b/bin/migrate @@ -20,13 +20,6 @@ if [ -d "$SCRIPT_DIR/../rust/bin" ]; then fi fi -# Run behavioral cohorts migrations -bash $SCRIPT_DIR/migrate-behavioral-cohorts.sh -if [ $? -ne 0 ]; then - echo "Error in migrate-behavioral-cohorts.sh, exiting." - exit 1 -fi - # Create a temporary file for background process status # Ensure cleanup of temp file on script exit trap 'rm -f "$CLICKHOUSE_STATUS" 2>/dev/null' EXIT diff --git a/bin/migrate-behavioral-cohorts.sh b/bin/migrate-behavioral-cohorts.sh deleted file mode 100755 index 22716341eb..0000000000 --- a/bin/migrate-behavioral-cohorts.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash - -# Simple bash script for behavioral cohorts migration -set -e - -# Set database name based on environment -if [[ "${NODE_ENV}" == "test" || "${TEST}" == "1" ]]; then - DB_NAME="test_behavioral_cohorts" -else - DB_NAME="behavioral_cohorts" -fi - -# Use environment variables with defaults -DB_HOST="${POSTGRES_BEHAVIORAL_COHORTS_HOST:-localhost}" -DB_USER="${POSTGRES_BEHAVIORAL_COHORTS_USER:-posthog}" -DB_PASS="${POSTGRES_BEHAVIORAL_COHORTS_PASSWORD:-posthog}" -DB_PORT="5432" - -# Get database URL from environment or construct from defaults -BEHAVIORAL_COHORTS_DB_URL="${BEHAVIORAL_COHORTS_DATABASE_URL:-postgres://${DB_USER}:${DB_PASS}@${DB_HOST}:${DB_PORT}/${DB_NAME}}" - -echo "Performing behavioral cohorts migrations" -echo "Database name: ${DB_NAME}" - -# Create database if it doesn't exist -if [[ -n "${DB_PASS}" ]]; then - export PGPASSWORD="${DB_PASS}" -fi -psql -h "${DB_HOST}" -p "${DB_PORT}" -U "${DB_USER}" -d postgres -c "CREATE DATABASE ${DB_NAME}" 2>/dev/null || true - -# Change to plugin-server directory to ensure correct paths -cd "$(dirname "$0")/../plugin-server" - -# Run migrations -DATABASE_URL="${BEHAVIORAL_COHORTS_DB_URL}" npx node-pg-migrate up - -echo "Behavioral cohorts migrations completed successfully" diff --git a/bin/mprocs-minimal.yaml b/bin/mprocs-minimal.yaml index 2ffcc71bea..7f195f30c7 100644 --- a/bin/mprocs-minimal.yaml +++ b/bin/mprocs-minimal.yaml @@ -54,9 +54,6 @@ procs: migrate-persons-db: shell: 'bin/check_postgres_up posthog_persons && cd rust && DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog_persons sqlx migrate run --source persons_migrations' - migrate-behavioral-cohorts: - shell: 'bin/check_postgres_up behavioral_cohorts && bin/migrate-behavioral-cohorts.sh' - storybook: shell: 'pnpm --filter=@posthog/storybook install && pnpm run storybook' autostart: false diff --git a/bin/mprocs.yaml b/bin/mprocs.yaml index 21ed2ce5e5..6f0bc52059 100755 --- a/bin/mprocs.yaml +++ b/bin/mprocs.yaml @@ -112,9 +112,6 @@ procs: migrate-persons-db: shell: 'bin/check_postgres_up posthog_persons && cd rust && DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog_persons sqlx migrate run --source persons_migrations' - migrate-behavioral-cohorts: - shell: 'bin/check_postgres_up behavioral_cohorts && bin/migrate-behavioral-cohorts.sh' - generate-demo-data: shell: |- bin/check_postgres_up && \ diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 86e37f3d5b..06364b00c2 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -290,7 +290,6 @@ services: environment: DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog' PERSONS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog' - BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog' KAFKA_HOSTS: 'kafka:9092' REDIS_URL: 'redis://redis:6379/' CLICKHOUSE_HOST: 'clickhouse' diff --git a/docker-compose.hobby.yml b/docker-compose.hobby.yml index b2d9cbeb03..6dbd1fc7e4 100644 --- a/docker-compose.hobby.yml +++ b/docker-compose.hobby.yml @@ -139,7 +139,6 @@ services: ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS CYCLOTRON_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog' PERSONS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog' - BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog' OTEL_EXPORTER_OTLP_ENDPOINT: '' depends_on: - db diff --git a/plugin-server/.node-pg-migrate.json b/plugin-server/.node-pg-migrate.json index bba33a282c..691ce7e57c 100644 --- a/plugin-server/.node-pg-migrate.json +++ b/plugin-server/.node-pg-migrate.json @@ -1,6 +1,6 @@ { "migrationsTable": "pgmigrations", - "dir": "migrations", + "dir": "src/migrations", "checkOrder": true, "migrationFilenameFormat": "utc", "createMigrationsSchema": false, diff --git a/plugin-server/migrations/001_create_cohort_membership_tables.js b/plugin-server/migrations/001_create_cohort_membership_tables.js deleted file mode 100644 index 2abb2b6169..0000000000 --- a/plugin-server/migrations/001_create_cohort_membership_tables.js +++ /dev/null @@ -1,42 +0,0 @@ -exports.up = (pgm) => { - // Table for tracking cohort membership - replicating ClickHouse structure - pgm.createTable('cohort_membership', { - id: { - type: 'bigserial', - primaryKey: true, - }, - team_id: { - type: 'bigint', - notNull: true, - }, - cohort_id: { - type: 'bigint', - notNull: true, - }, - person_id: { - type: 'uuid', - notNull: true, - }, - in_cohort: { - type: 'boolean', - notNull: true, - }, - last_updated: { - type: 'timestamp', - notNull: true, - default: pgm.func('CURRENT_TIMESTAMP'), - }, - }) - - // Add index on person_id, cohort_id, and team_id for query performance - pgm.createIndex('cohort_membership', ['person_id', 'cohort_id', 'team_id']) - - // Add unique constraint to prevent duplicate entries - pgm.addConstraint('cohort_membership', 'cohort_membership_unique', { - unique: ['team_id', 'cohort_id', 'person_id'], - }) -} - -exports.down = (pgm) => { - pgm.dropTable('cohort_membership') -} diff --git a/plugin-server/package.json b/plugin-server/package.json index 81aefb4f8a..a624c29018 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -26,15 +26,10 @@ "prettier:check": "prettier --check .", "prepublishOnly": "pnpm build", "setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse", - "setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron && pnpm run setup:test:persons && pnpm run setup:test:behavioral-cohorts && pnpm run setup:test:persons-migration", + "setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron && pnpm run setup:test:persons && pnpm run setup:test:persons-migration", "setup:test:cyclotron": "CYCLOTRON_DATABASE_NAME=test_cyclotron ../rust/bin/migrate-cyclotron", "setup:test:persons": "PERSONS_DATABASE_NAME=test_persons ../rust/bin/migrate-persons", "setup:test:persons-migration": "PERSONS_DATABASE_NAME=test_persons_migration ../rust/bin/migrate-persons", - "setup:test:behavioral-cohorts": "TEST=1 BEHAVIORAL_COHORTS_DATABASE_URL='postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts' ../bin/migrate-behavioral-cohorts.sh", - "migrate:behavioral-cohorts": "../bin/migrate-behavioral-cohorts.sh", - "migrate:behavioral-cohorts:test": "BEHAVIORAL_COHORTS_DATABASE_URL='postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts' node-pg-migrate up", - "migrate:behavioral-cohorts:down": "node-pg-migrate down", - "migrate:behavioral-cohorts:create": "node-pg-migrate create --javascript-file", "services:start": "cd .. && docker compose -f docker-compose.dev.yml up", "services:stop": "cd .. && docker compose -f docker-compose.dev.yml down", "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index c9584c2322..92de0f890a 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -21,7 +21,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin cdpCyclotronWorkerHogFlow: true, cdpCyclotronWorkerDelay: true, cdpBehaviouralEvents: true, - cdpCohortMembership: true, cdpApi: true, } @@ -36,7 +35,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin cdpCyclotronWorkerHogFlow: true, cdpCyclotronWorkerDelay: true, cdpBehaviouralEvents: true, - cdpCohortMembership: true, cdpApi: true, } @@ -87,10 +85,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin return { cdpBehaviouralEvents: true, } - case PluginServerMode.cdp_cohort_membership: - return { - cdpCohortMembership: true, - } case PluginServerMode.cdp_legacy_on_event: return { cdpLegacyOnEvent: true, diff --git a/plugin-server/src/cdp/_tests/fixtures.ts b/plugin-server/src/cdp/_tests/fixtures.ts index ed7c9f3c15..d9117aca7f 100644 --- a/plugin-server/src/cdp/_tests/fixtures.ts +++ b/plugin-server/src/cdp/_tests/fixtures.ts @@ -7,7 +7,6 @@ import { insertRow } from '~/tests/helpers/sql' import { ClickHousePerson, ClickHouseTimestamp, ProjectId, RawClickHouseEvent, Team } from '../../types' import { PostgresRouter } from '../../utils/db/postgres' import { UUIDT } from '../../utils/utils' -import { CohortMembershipChange } from '../consumers/cdp-cohort-membership.consumer' import { CdpInternalEvent } from '../schema' import { compileHog } from '../templates/compiler' import { @@ -287,55 +286,3 @@ export const createExampleInvocation = ( queuePriority: 0, } } - -// Cohort Membership Test Helpers -export const createCohortMembershipEvent = ( - overrides: Partial = {} -): CohortMembershipChange => { - return { - personId: new UUIDT().toString(), - cohortId: 1, - teamId: 1, - cohort_membership_changed: 'entered', - ...overrides, - } -} - -export const createCohortMembershipEvents = (events: Partial[]): CohortMembershipChange[] => { - return events.map((event) => createCohortMembershipEvent(event)) -} - -export interface CohortMembershipRecord { - team_id: number - cohort_id: number - person_id: string - in_cohort: boolean - last_updated?: Date -} - -export const insertCohortMembership = async ( - db: PostgresRouter, - membership: Partial -): Promise => { - const record: CohortMembershipRecord = { - team_id: 1, - cohort_id: 1, - person_id: new UUIDT().toString(), - in_cohort: true, - ...membership, - } - - // insertRow now automatically determines the correct database based on table name - return await insertRow(db, 'cohort_membership', record) -} - -export const insertCohortMemberships = async ( - db: PostgresRouter, - memberships: Partial[] -): Promise => { - const results = [] - for (const membership of memberships) { - results.push(await insertCohortMembership(db, membership)) - } - return results -} diff --git a/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.test.ts b/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.test.ts deleted file mode 100644 index c3a862e490..0000000000 --- a/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.test.ts +++ /dev/null @@ -1,229 +0,0 @@ -import { Message } from 'node-rdkafka' - -import { resetKafka } from '~/tests/helpers/kafka' -import { UUIDT } from '~/utils/utils' - -import { resetBehavioralCohortsDatabase } from '../../../tests/helpers/sql' -import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics' -import { Hub } from '../../types' -import { closeHub, createHub } from '../../utils/db/hub' -import { PostgresUse } from '../../utils/db/postgres' -import { createCohortMembershipEvent, createCohortMembershipEvents, createKafkaMessage } from '../_tests/fixtures' -import { CdpCohortMembershipConsumer } from './cdp-cohort-membership.consumer' - -jest.setTimeout(20_000) - -describe('CdpCohortMembershipConsumer', () => { - let hub: Hub - let consumer: CdpCohortMembershipConsumer - - beforeEach(async () => { - await resetKafka() - hub = await createHub() - consumer = new CdpCohortMembershipConsumer(hub) - await resetBehavioralCohortsDatabase(hub.postgres) - }) - - afterEach(async () => { - await consumer.stop() - await closeHub(hub) - }) - - describe('end-to-end cohort membership processing', () => { - const personId1 = new UUIDT().toString() - const personId2 = new UUIDT().toString() - const personId3 = new UUIDT().toString() - it('should process entered and left events and write to PostgreSQL correctly', async () => { - // Test data using helper functions - const testEvents = createCohortMembershipEvents([ - { - personId: personId1, - cohortId: 456, - teamId: 1, - cohort_membership_changed: 'entered', - }, - { - personId: personId2, - cohortId: 456, - teamId: 1, - cohort_membership_changed: 'entered', - }, - { - personId: personId3, - cohortId: 457, - teamId: 1, - cohort_membership_changed: 'left', - }, - ]) - - // Create mock Kafka messages - const messages = testEvents.map((event, index) => - createKafkaMessage(event, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: index }) - ) - - // Process the batch of messages - await (consumer as any).handleBatch(messages) - - // Verify data was written to PostgreSQL - const result = await hub.postgres.query( - PostgresUse.BEHAVIORAL_COHORTS_RW, - 'SELECT * FROM cohort_membership WHERE team_id = $1 ORDER BY person_id, cohort_id', - [1], - 'testQuery' - ) - - expect(result.rows).toHaveLength(3) - - // Verify first entered event - expect(result.rows[0]).toMatchObject({ - team_id: '1', - cohort_id: '456', - person_id: personId1, - in_cohort: true, - }) - - // Verify second entered event - expect(result.rows[1]).toMatchObject({ - team_id: '1', - cohort_id: '456', - person_id: personId2, - in_cohort: true, - }) - - // Verify left event - expect(result.rows[2]).toMatchObject({ - team_id: '1', - cohort_id: '457', - person_id: personId3, - in_cohort: false, - }) - }) - - it('should handle complete person lifecycle: enter -> leave -> re-enter cohort', async () => { - // Step 1: Person enters the cohort for the first time - const enterEvent = createCohortMembershipEvent({ - personId: personId1, - cohortId: 456, - teamId: 1, - cohort_membership_changed: 'entered', - }) - - await (consumer as any).handleBatch([ - createKafkaMessage(enterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 0 }), - ]) - - let result = await hub.postgres.query( - PostgresUse.BEHAVIORAL_COHORTS_RW, - 'SELECT * FROM cohort_membership WHERE team_id = $1 AND person_id = $2 AND cohort_id = $3', - [1, personId1, 456], - 'testQuery' - ) - - expect(result.rows[0].in_cohort).toBe(true) - const firstTimestamp = result.rows[0].last_updated - - // Wait to ensure timestamp difference - await new Promise((resolve) => setTimeout(resolve, 10)) - - // Step 2: Person leaves the cohort - const leaveEvent = createCohortMembershipEvent({ - personId: personId1, - cohortId: 456, - teamId: 1, - cohort_membership_changed: 'left', - }) - - await (consumer as any).handleBatch([ - createKafkaMessage(leaveEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 1 }), - ]) - - result = await hub.postgres.query( - PostgresUse.BEHAVIORAL_COHORTS_RW, - 'SELECT * FROM cohort_membership WHERE team_id = $1 AND person_id = $2 AND cohort_id = $3', - [1, personId1, 456], - 'testQuery' - ) - - expect(result.rows).toHaveLength(1) // Same record, just updated - expect(result.rows[0].in_cohort).toBe(false) - const secondTimestamp = result.rows[0].last_updated - expect(new Date(secondTimestamp).getTime()).toBeGreaterThan(new Date(firstTimestamp).getTime()) - - // Wait to ensure timestamp difference - await new Promise((resolve) => setTimeout(resolve, 10)) - - // Step 3: Person re-enters the cohort - const reEnterEvent = createCohortMembershipEvent({ - personId: personId1, - cohortId: 456, - teamId: 1, - cohort_membership_changed: 'entered', - }) - - await (consumer as any).handleBatch([ - createKafkaMessage(reEnterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 2 }), - ]) - - result = await hub.postgres.query( - PostgresUse.BEHAVIORAL_COHORTS_RW, - 'SELECT * FROM cohort_membership WHERE team_id = $1 AND person_id = $2 AND cohort_id = $3', - [1, personId1, 456], - 'testQuery' - ) - - expect(result.rows).toHaveLength(1) // Still same record, just updated again - expect(result.rows[0].in_cohort).toBe(true) // Back in the cohort - const thirdTimestamp = result.rows[0].last_updated - expect(new Date(thirdTimestamp).getTime()).toBeGreaterThan(new Date(secondTimestamp).getTime()) - }) - - it('should reject entire batch when invalid messages are present', async () => { - const validEvent = { - personId: personId1, - cohortId: 456, - teamId: 1, - cohort_membership_changed: 'entered', - } - - const messages: Message[] = [ - // Valid message - createKafkaMessage(validEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 0 }), - // Invalid JSON (manually create this one since it's intentionally malformed) - { - value: Buffer.from('invalid json'), - topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, - partition: 0, - offset: 1, - timestamp: Date.now(), - key: null, - size: 0, - }, - // Missing required fields - createKafkaMessage({ personId: 124 }, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 2 }), - // Empty message (manually create this one since it has null value) - { - value: null, - topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, - partition: 0, - offset: 3, - timestamp: Date.now(), - key: null, - size: 0, - }, - ] - - // Should throw due to invalid messages in batch - await expect((consumer as any).handleBatch(messages)).rejects.toThrow() - - // Verify NO data was inserted - const result = await hub.postgres.query( - PostgresUse.BEHAVIORAL_COHORTS_RW, - 'SELECT * FROM cohort_membership WHERE team_id = $1', - [1], - 'testQuery' - ) - - expect(result.rows).toHaveLength(0) // No data should be inserted - }) - }) -}) diff --git a/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.ts b/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.ts deleted file mode 100644 index 6c921ccac0..0000000000 --- a/plugin-server/src/cdp/consumers/cdp-cohort-membership.consumer.ts +++ /dev/null @@ -1,145 +0,0 @@ -import { Message } from 'node-rdkafka' -import { z } from 'zod' - -import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics' -import { KafkaConsumer } from '../../kafka/consumer' -import { HealthCheckResult, Hub } from '../../types' -import { PostgresUse } from '../../utils/db/postgres' -import { parseJSON } from '../../utils/json-parse' -import { logger } from '../../utils/logger' -import { CdpConsumerBase } from './cdp-base.consumer' - -// Zod schema for validation -const CohortMembershipChangeSchema = z.object({ - personId: z.string().uuid(), - cohortId: z.number(), - teamId: z.number(), - cohort_membership_changed: z.enum(['entered', 'left']), -}) - -export type CohortMembershipChange = z.infer - -export class CdpCohortMembershipConsumer extends CdpConsumerBase { - protected name = 'CdpCohortMembershipConsumer' - private kafkaConsumer: KafkaConsumer - - constructor(hub: Hub) { - super(hub) - this.kafkaConsumer = new KafkaConsumer({ - groupId: 'cdp-cohort-membership-consumer', - topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, - }) - } - - private async handleBatchCohortMembership(changes: CohortMembershipChange[]): Promise { - if (changes.length === 0) { - return - } - - try { - // Build the VALUES clause for batch upsert - const values: any[] = [] - const placeholders: string[] = [] - let paramIndex = 1 - - for (const change of changes) { - const inCohort = change.cohort_membership_changed === 'entered' - placeholders.push( - `($${paramIndex}, $${paramIndex + 1}, $${paramIndex + 2}, $${paramIndex + 3}, CURRENT_TIMESTAMP)` - ) - values.push(change.teamId, change.cohortId, change.personId, inCohort) - paramIndex += 4 - } - - // Single batch UPSERT query - handles both entered and left events - const query = ` - INSERT INTO cohort_membership (team_id, cohort_id, person_id, in_cohort, last_updated) - VALUES ${placeholders.join(', ')} - ON CONFLICT (team_id, cohort_id, person_id) - DO UPDATE SET - in_cohort = EXCLUDED.in_cohort, - last_updated = CURRENT_TIMESTAMP - ` - - await this.hub.postgres.query( - PostgresUse.BEHAVIORAL_COHORTS_RW, - query, - values, - 'batchUpsertCohortMembership' - ) - } catch (error) { - logger.error('Failed to process batch cohort membership changes', { - error, - count: changes.length, - }) - throw error - } - } - - private async handleBatch(messages: Message[]): Promise { - const cohortMembershipChanges: CohortMembershipChange[] = [] - - // Process and validate all messages - for (const message of messages) { - try { - const messageValue = message.value?.toString() - if (!messageValue) { - throw new Error('Empty message received') - } - - const parsedMessage = parseJSON(messageValue) - - // Validate using Zod schema - const validationResult = CohortMembershipChangeSchema.safeParse(parsedMessage) - - if (!validationResult.success) { - logger.error('Invalid cohort membership change message', { - errors: validationResult.error.errors, - message: messageValue, - }) - throw new Error(`Invalid cohort membership change message: ${validationResult.error.message}`) - } - - const cohortMembershipChange = validationResult.data - cohortMembershipChanges.push(cohortMembershipChange) - } catch (error) { - logger.error('Error processing cohort membership change message', { - error, - message: message.value?.toString(), - }) - throw error - } - } - - await this.handleBatchCohortMembership(cohortMembershipChanges) - } - - public async start(): Promise { - await super.start() - - logger.info('🚀', `${this.name} starting...`) - - await this.kafkaConsumer.connect(async (messages) => { - logger.info('🔁', `${this.name} - handling batch`, { - size: messages.length, - }) - - await this.handleBatch(messages) - }) - - logger.info('✅', `${this.name} started successfully`) - } - - public async stop(): Promise { - logger.info('💤', `Stopping ${this.name}...`) - await this.kafkaConsumer.disconnect() - - // IMPORTANT: super always comes last - await super.stop() - logger.info('💤', `${this.name} stopped!`) - } - - public isHealthy(): HealthCheckResult { - return this.kafkaConsumer.isHealthy() - } -} diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index fa170e6d52..abf8c5e3c2 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -40,11 +40,6 @@ export function getDefaultConfig(): PluginsServerConfig { : isDevEnv() ? 'postgres://posthog:posthog@localhost:5432/posthog_persons' : '', - BEHAVIORAL_COHORTS_DATABASE_URL: isTestEnv() - ? 'postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts' - : isDevEnv() - ? 'postgres://posthog:posthog@localhost:5432/behavioral_cohorts' - : '', PERSONS_MIGRATION_DATABASE_URL: isTestEnv() ? 'postgres://posthog:posthog@localhost:5432/test_persons_migration' : isDevEnv() @@ -61,9 +56,9 @@ export function getDefaultConfig(): PluginsServerConfig { POSTHOG_DB_PASSWORD: '', POSTHOG_POSTGRES_HOST: 'localhost', POSTHOG_POSTGRES_PORT: 5432, - POSTGRES_BEHAVIORAL_COHORTS_HOST: 'localhost', - POSTGRES_BEHAVIORAL_COHORTS_USER: 'postgres', - POSTGRES_BEHAVIORAL_COHORTS_PASSWORD: '', + POSTGRES_COUNTERS_HOST: 'localhost', + POSTGRES_COUNTERS_USER: 'postgres', + POSTGRES_COUNTERS_PASSWORD: '', EVENT_OVERFLOW_BUCKET_CAPACITY: 1000, EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: 1.0, KAFKA_BATCH_START_LOGGING_ENABLED: false, @@ -395,18 +390,6 @@ export function overrideWithEnv( ).join(', ')}` ) } - - if ( - !newConfig.BEHAVIORAL_COHORTS_DATABASE_URL && - newConfig.POSTGRES_BEHAVIORAL_COHORTS_HOST && - newConfig.POSTGRES_BEHAVIORAL_COHORTS_USER && - newConfig.POSTGRES_BEHAVIORAL_COHORTS_PASSWORD - ) { - const encodedUser = encodeURIComponent(newConfig.POSTGRES_BEHAVIORAL_COHORTS_USER) - const encodedPassword = encodeURIComponent(newConfig.POSTGRES_BEHAVIORAL_COHORTS_PASSWORD) - newConfig.BEHAVIORAL_COHORTS_DATABASE_URL = `postgres://${encodedUser}:${encodedPassword}@${newConfig.POSTGRES_BEHAVIORAL_COHORTS_HOST}:5432/behavioral_cohorts` - } - return newConfig } diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index 401611b0fa..adda8d9f71 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -46,7 +46,6 @@ export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}` export const KAFKA_CDP_FUNCTION_OVERFLOW = `${prefix}cdp_function_overflow${suffix}` export const KAFKA_CDP_INTERNAL_EVENTS = `${prefix}cdp_internal_events${suffix}` export const KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES = `${prefix}clickhouse_behavioral_cohorts_matches${suffix}` -export const KAFKA_COHORT_MEMBERSHIP_CHANGED = `${prefix}cohort_membership_changed${suffix}` // Error tracking topics export const KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT = `${prefix}clickhouse_error_tracking_issue_fingerprint${suffix}` diff --git a/plugin-server/src/migrations/001_create_counters_tables.js b/plugin-server/src/migrations/001_create_counters_tables.js new file mode 100644 index 0000000000..b913d8df0f --- /dev/null +++ b/plugin-server/src/migrations/001_create_counters_tables.js @@ -0,0 +1,58 @@ +exports.up = (pgm) => { + // Table for person performed events + pgm.createTable('person_performed_events', { + team_id: { + type: 'integer', + notNull: true, + }, + person_id: { + type: 'uuid', + notNull: true, + }, + event_name: { + type: 'text', + notNull: true, + }, + }) + + // Add composite primary key + pgm.addConstraint('person_performed_events', 'person_performed_events_pkey', { + primaryKey: ['team_id', 'person_id', 'event_name'], + }) + + // Table for behavioural filter matched events + pgm.createTable('behavioural_filter_matched_events', { + team_id: { + type: 'integer', + notNull: true, + }, + person_id: { + type: 'uuid', + notNull: true, + }, + filter_hash: { + type: 'text', + notNull: true, + }, + date: { + type: 'date', + notNull: true, + }, + counter: { + type: 'integer', + notNull: true, + default: 0, + }, + }) + + // Add composite primary key with date first for better cache locality + // Since we only update today's data, this keeps the working set small + pgm.addConstraint('behavioural_filter_matched_events', 'behavioural_filter_matched_events_pkey', { + primaryKey: ['date', 'team_id', 'person_id', 'filter_hash'], + }) +} + +exports.down = (pgm) => { + pgm.dropTable('behavioural_filter_matched_events') + pgm.dropTable('person_performed_events') +} diff --git a/plugin-server/src/server.ts b/plugin-server/src/server.ts index 5bf87a2993..31be22f32f 100644 --- a/plugin-server/src/server.ts +++ b/plugin-server/src/server.ts @@ -11,7 +11,6 @@ import { setupCommonRoutes, setupExpressApp } from './api/router' import { getPluginServerCapabilities } from './capabilities' import { CdpApi } from './cdp/cdp-api' import { CdpBehaviouralEventsConsumer } from './cdp/consumers/cdp-behavioural-events.consumer' -import { CdpCohortMembershipConsumer } from './cdp/consumers/cdp-cohort-membership.consumer' import { CdpCyclotronDelayConsumer } from './cdp/consumers/cdp-cyclotron-delay.consumer' import { CdpCyclotronWorkerHogFlow } from './cdp/consumers/cdp-cyclotron-worker-hogflow.consumer' import { CdpCyclotronWorker } from './cdp/consumers/cdp-cyclotron-worker.consumer' @@ -274,14 +273,6 @@ export class PluginServer { }) } - if (capabilities.cdpCohortMembership) { - serviceLoaders.push(async () => { - const consumer = new CdpCohortMembershipConsumer(hub) - await consumer.start() - return consumer.service - }) - } - const readyServices = await Promise.all(serviceLoaders.map((loader) => loader())) this.services.push(...readyServices) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 5db2345c51..fcdad14a7b 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -81,7 +81,6 @@ export enum PluginServerMode { cdp_internal_events = 'cdp-internal-events', cdp_cyclotron_worker = 'cdp-cyclotron-worker', cdp_behavioural_events = 'cdp-behavioural-events', - cdp_cohort_membership = 'cdp-cohort-membership', cdp_cyclotron_worker_hogflow = 'cdp-cyclotron-worker-hogflow', cdp_cyclotron_worker_delay = 'cdp-cyclotron-worker-delay', cdp_api = 'cdp-api', @@ -280,7 +279,6 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig DATABASE_URL: string // Postgres database URL DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database PERSONS_DATABASE_URL: string // Optional read-write Postgres database for persons - BEHAVIORAL_COHORTS_DATABASE_URL: string // Optional read-write Postgres database for behavioral cohorts PERSONS_READONLY_DATABASE_URL: string // Optional read-only replica to the persons Postgres database PERSONS_MIGRATION_DATABASE_URL: string // Read-write Postgres database for persons during dual write/migration PERSONS_MIGRATION_READONLY_DATABASE_URL: string // Optional read-only replica to the persons Postgres database during dual write/migration @@ -291,9 +289,9 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig POSTHOG_DB_PASSWORD: string POSTHOG_POSTGRES_HOST: string POSTHOG_POSTGRES_PORT: number - POSTGRES_BEHAVIORAL_COHORTS_HOST: string - POSTGRES_BEHAVIORAL_COHORTS_USER: string - POSTGRES_BEHAVIORAL_COHORTS_PASSWORD: string + POSTGRES_COUNTERS_HOST: string + POSTGRES_COUNTERS_USER: string + POSTGRES_COUNTERS_PASSWORD: string CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string // Redis url pretty much only used locally / self hosted @@ -549,7 +547,6 @@ export interface PluginServerCapabilities { cdpCyclotronWorkerHogFlow?: boolean cdpCyclotronWorkerDelay?: boolean cdpBehaviouralEvents?: boolean - cdpCohortMembership?: boolean cdpApi?: boolean appManagementSingleton?: boolean } diff --git a/plugin-server/src/utils/db/postgres.ts b/plugin-server/src/utils/db/postgres.ts index acf4030fc7..3d978d8a84 100644 --- a/plugin-server/src/utils/db/postgres.ts +++ b/plugin-server/src/utils/db/postgres.ts @@ -17,7 +17,7 @@ export enum PostgresUse { PLUGIN_STORAGE_RW, // Plugin Storage table, no read replica for it PERSONS_READ, // Person database, read replica PERSONS_WRITE, // Person database, write - BEHAVIORAL_COHORTS_RW, // Behavioral cohorts database for behavioral cohorts + COUNTERS_RW, // Counters database for aggregations } export class TransactionClient { @@ -49,7 +49,7 @@ export class PostgresRouter { [PostgresUse.COMMON_READ, commonClient], [PostgresUse.PLUGIN_STORAGE_RW, commonClient], [PostgresUse.PERSONS_WRITE, commonClient], - [PostgresUse.BEHAVIORAL_COHORTS_RW, commonClient], + [PostgresUse.COUNTERS_RW, commonClient], ]) if (serverConfig.DATABASE_READONLY_URL) { @@ -88,20 +88,6 @@ export class PostgresRouter { ) logger.info('👍', `Persons Postgresql ready`) } - - if (serverConfig.BEHAVIORAL_COHORTS_DATABASE_URL) { - logger.info('🤔', `Connecting to behavioral cohorts Postgresql...`) - this.pools.set( - PostgresUse.BEHAVIORAL_COHORTS_RW, - createPostgresPool( - serverConfig.BEHAVIORAL_COHORTS_DATABASE_URL, - serverConfig.POSTGRES_CONNECTION_POOL_SIZE, - app_name - ) - ) - logger.info('👍', `Behavioral cohorts Postgresql ready`) - } - if (serverConfig.PERSONS_READONLY_DATABASE_URL) { logger.info('🤔', `Connecting to persons read-only Postgresql...`) this.pools.set( diff --git a/plugin-server/tests/helpers/kafka.ts b/plugin-server/tests/helpers/kafka.ts index 0f08a8a67d..90b39a18f5 100644 --- a/plugin-server/tests/helpers/kafka.ts +++ b/plugin-server/tests/helpers/kafka.ts @@ -8,7 +8,6 @@ import { KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES, KAFKA_CLICKHOUSE_HEATMAP_EVENTS, KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS, - KAFKA_COHORT_MEMBERSHIP_CHANGED, KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES, KAFKA_EVENTS_DEAD_LETTER_QUEUE, KAFKA_EVENTS_JSON, @@ -58,7 +57,6 @@ export async function resetKafka(extraServerConfig?: Partial) { @@ -648,11 +637,11 @@ export async function fetchPostgresDistinctIdsForPerson(db: DB, personId: string ) } -export async function resetBehavioralCohortsDatabase(db: PostgresRouter): Promise { +export async function resetCountersDatabase(db: PostgresRouter): Promise { await db.query( - PostgresUse.BEHAVIORAL_COHORTS_RW, - 'TRUNCATE TABLE cohort_membership', + PostgresUse.COUNTERS_RW, + 'TRUNCATE TABLE person_performed_events, behavioural_filter_matched_events', undefined, - 'reset-behavioral-cohorts-db' + 'reset-counters-db' ) } diff --git a/posthog/kafka_client/topics.py b/posthog/kafka_client/topics.py index 3a080f12f9..d51d63eb15 100644 --- a/posthog/kafka_client/topics.py +++ b/posthog/kafka_client/topics.py @@ -49,4 +49,3 @@ KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_EMBEDDINGS = ( KAFKA_DOCUMENT_EMBEDDINGS_TOPIC = f"{KAFKA_PREFIX}clickhouse_document_embeddings{SUFFIX}" KAFKA_CDP_INTERNAL_EVENTS = f"{KAFKA_PREFIX}cdp_internal_events{SUFFIX}" -KAFKA_COHORT_MEMBERSHIP_CHANGED = f"{KAFKA_PREFIX}cohort_membership_changed{SUFFIX}"