mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat(messaging): add cohort membership consumer and migrations (#39561)
This commit is contained in:
3
.github/workflows/ci-nodejs.yml
vendored
3
.github/workflows/ci-nodejs.yml
vendored
@@ -206,12 +206,15 @@ jobs:
|
||||
TEST: 'true'
|
||||
SECRET_KEY: 'abcdef' # unsafe - for testing only
|
||||
DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/posthog'
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts'
|
||||
|
||||
run: pnpm --filter=@posthog/plugin-server setup:test
|
||||
|
||||
- name: Test with Jest
|
||||
env:
|
||||
# Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB
|
||||
DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_posthog'
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts'
|
||||
PERSONS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_persons'
|
||||
PERSONS_READONLY_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_persons'
|
||||
PERSONS_READONLY_MIGRATION_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_persons_migration'
|
||||
|
||||
@@ -18,6 +18,13 @@ if [ -d "$SCRIPT_DIR/../rust/bin" ]; then
|
||||
echo "Error in rust/bin/migrate-persons, exiting."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Running behavioral cohorts migrations via external non-django migrator..."
|
||||
bash $SCRIPT_DIR/../rust/bin/migrate-behavioral-cohorts
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Error in rust/bin/migrate-behavioral-cohorts, exiting."
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
# Create a temporary file for background process status
|
||||
|
||||
@@ -54,6 +54,9 @@ procs:
|
||||
migrate-persons-db:
|
||||
shell: 'bin/check_postgres_up posthog_persons && cd rust && DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog_persons sqlx migrate run --source persons_migrations'
|
||||
|
||||
migrate-behavioral-cohorts:
|
||||
shell: 'bin/check_postgres_up behavioral_cohorts && rust/bin/migrate-behavioral-cohorts'
|
||||
|
||||
storybook:
|
||||
shell: 'pnpm --filter=@posthog/storybook install && pnpm run storybook'
|
||||
autostart: false
|
||||
|
||||
@@ -112,6 +112,9 @@ procs:
|
||||
migrate-persons-db:
|
||||
shell: 'bin/check_postgres_up posthog_persons && cd rust && DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog_persons sqlx migrate run --source persons_migrations'
|
||||
|
||||
migrate-behavioral-cohorts:
|
||||
shell: 'bin/check_postgres_up behavioral_cohorts && rust/bin/migrate-behavioral-cohorts'
|
||||
|
||||
generate-demo-data:
|
||||
shell: |-
|
||||
bin/check_postgres_up && \
|
||||
|
||||
@@ -290,6 +290,7 @@ services:
|
||||
environment:
|
||||
DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
PERSONS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
KAFKA_HOSTS: 'kafka:9092'
|
||||
REDIS_URL: 'redis://redis:6379/'
|
||||
CLICKHOUSE_HOST: 'clickhouse'
|
||||
|
||||
@@ -139,6 +139,7 @@ services:
|
||||
ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS
|
||||
CYCLOTRON_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
PERSONS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: 'postgres://posthog:posthog@db:5432/posthog'
|
||||
OTEL_EXPORTER_OTLP_ENDPOINT: ''
|
||||
depends_on:
|
||||
- db
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
{
|
||||
"migrationsTable": "pgmigrations",
|
||||
"dir": "src/migrations",
|
||||
"checkOrder": true,
|
||||
"migrationFilenameFormat": "utc",
|
||||
"createMigrationsSchema": false,
|
||||
"createSchema": false
|
||||
}
|
||||
@@ -26,10 +26,12 @@
|
||||
"prettier:check": "prettier --check .",
|
||||
"prepublishOnly": "pnpm build",
|
||||
"setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse",
|
||||
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron && pnpm run setup:test:persons && pnpm run setup:test:persons-migration",
|
||||
"setup:test:cyclotron": "CYCLOTRON_DATABASE_NAME=test_cyclotron ../rust/bin/migrate-cyclotron",
|
||||
"setup:test:persons": "PERSONS_DATABASE_NAME=test_persons ../rust/bin/migrate-persons",
|
||||
"setup:test:persons-migration": "PERSONS_DATABASE_NAME=test_persons_migration ../rust/bin/migrate-persons",
|
||||
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:rust",
|
||||
"setup:test:rust": "CYCLOTRON_DATABASE_NAME=test_cyclotron PERSONS_DATABASE_NAME=test_persons BEHAVIORAL_COHORTS_DATABASE_NAME=test_behavioral_cohorts ../rust/bin/migrate-all all && PERSONS_DATABASE_NAME=test_persons_migration ../rust/bin/migrate-persons",
|
||||
"migrate:persons": "../rust/bin/migrate-persons",
|
||||
"migrate:cyclotron": "../rust/bin/migrate-cyclotron",
|
||||
"migrate:behavioral-cohorts": "../rust/bin/migrate-behavioral-cohorts",
|
||||
"migrate:rust": "../rust/bin/migrate-all all",
|
||||
"services:start": "cd .. && docker compose -f docker-compose.dev.yml up",
|
||||
"services:stop": "cd .. && docker compose -f docker-compose.dev.yml down",
|
||||
"services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v",
|
||||
@@ -162,7 +164,6 @@
|
||||
"eslint-plugin-node": "^11.1.0",
|
||||
"eslint-plugin-promise": "^6.1.1",
|
||||
"jest": "^30.0.0",
|
||||
"node-pg-migrate": "^8.0.3",
|
||||
"pino-pretty": "^9.1.0",
|
||||
"prettier": "^3.6.2",
|
||||
"supertest": "^7.0.0",
|
||||
|
||||
@@ -21,6 +21,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
|
||||
cdpCyclotronWorkerHogFlow: true,
|
||||
cdpCyclotronWorkerDelay: true,
|
||||
cdpBehaviouralEvents: true,
|
||||
cdpCohortMembership: true,
|
||||
cdpApi: true,
|
||||
}
|
||||
|
||||
@@ -35,6 +36,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
|
||||
cdpCyclotronWorkerHogFlow: true,
|
||||
cdpCyclotronWorkerDelay: true,
|
||||
cdpBehaviouralEvents: true,
|
||||
cdpCohortMembership: true,
|
||||
cdpApi: true,
|
||||
}
|
||||
|
||||
@@ -85,6 +87,10 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
|
||||
return {
|
||||
cdpBehaviouralEvents: true,
|
||||
}
|
||||
case PluginServerMode.cdp_cohort_membership:
|
||||
return {
|
||||
cdpCohortMembership: true,
|
||||
}
|
||||
case PluginServerMode.cdp_legacy_on_event:
|
||||
return {
|
||||
cdpLegacyOnEvent: true,
|
||||
|
||||
@@ -7,6 +7,7 @@ import { insertRow } from '~/tests/helpers/sql'
|
||||
import { ClickHousePerson, ClickHouseTimestamp, ProjectId, RawClickHouseEvent, Team } from '../../types'
|
||||
import { PostgresRouter } from '../../utils/db/postgres'
|
||||
import { UUIDT } from '../../utils/utils'
|
||||
import { CohortMembershipChange } from '../consumers/cdp-cohort-membership.consumer'
|
||||
import { CdpInternalEvent } from '../schema'
|
||||
import { compileHog } from '../templates/compiler'
|
||||
import {
|
||||
@@ -286,3 +287,55 @@ export const createExampleInvocation = (
|
||||
queuePriority: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Cohort Membership Test Helpers
|
||||
export const createCohortMembershipEvent = (
|
||||
overrides: Partial<CohortMembershipChange> = {}
|
||||
): CohortMembershipChange => {
|
||||
return {
|
||||
personId: new UUIDT().toString(),
|
||||
cohortId: 1,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
...overrides,
|
||||
}
|
||||
}
|
||||
|
||||
export const createCohortMembershipEvents = (events: Partial<CohortMembershipChange>[]): CohortMembershipChange[] => {
|
||||
return events.map((event) => createCohortMembershipEvent(event))
|
||||
}
|
||||
|
||||
export interface CohortMembershipRecord {
|
||||
team_id: number
|
||||
cohort_id: number
|
||||
person_id: string
|
||||
in_cohort: boolean
|
||||
last_updated?: Date
|
||||
}
|
||||
|
||||
export const insertCohortMembership = async (
|
||||
db: PostgresRouter,
|
||||
membership: Partial<CohortMembershipRecord>
|
||||
): Promise<CohortMembershipRecord> => {
|
||||
const record: CohortMembershipRecord = {
|
||||
team_id: 1,
|
||||
cohort_id: 1,
|
||||
person_id: new UUIDT().toString(),
|
||||
in_cohort: true,
|
||||
...membership,
|
||||
}
|
||||
|
||||
// insertRow now automatically determines the correct database based on table name
|
||||
return await insertRow(db, 'cohort_membership', record)
|
||||
}
|
||||
|
||||
export const insertCohortMemberships = async (
|
||||
db: PostgresRouter,
|
||||
memberships: Partial<CohortMembershipRecord>[]
|
||||
): Promise<CohortMembershipRecord[]> => {
|
||||
const results = []
|
||||
for (const membership of memberships) {
|
||||
results.push(await insertCohortMembership(db, membership))
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
@@ -0,0 +1,229 @@
|
||||
import { Message } from 'node-rdkafka'
|
||||
|
||||
import { resetKafka } from '~/tests/helpers/kafka'
|
||||
import { UUIDT } from '~/utils/utils'
|
||||
|
||||
import { resetBehavioralCohortsDatabase } from '../../../tests/helpers/sql'
|
||||
import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics'
|
||||
import { Hub } from '../../types'
|
||||
import { closeHub, createHub } from '../../utils/db/hub'
|
||||
import { PostgresUse } from '../../utils/db/postgres'
|
||||
import { createCohortMembershipEvent, createCohortMembershipEvents, createKafkaMessage } from '../_tests/fixtures'
|
||||
import { CdpCohortMembershipConsumer } from './cdp-cohort-membership.consumer'
|
||||
|
||||
jest.setTimeout(20_000)
|
||||
|
||||
describe('CdpCohortMembershipConsumer', () => {
|
||||
let hub: Hub
|
||||
let consumer: CdpCohortMembershipConsumer
|
||||
|
||||
beforeEach(async () => {
|
||||
await resetKafka()
|
||||
hub = await createHub()
|
||||
consumer = new CdpCohortMembershipConsumer(hub)
|
||||
await resetBehavioralCohortsDatabase(hub.postgres)
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await consumer.stop()
|
||||
await closeHub(hub)
|
||||
})
|
||||
|
||||
describe('end-to-end cohort membership processing', () => {
|
||||
const personId1 = new UUIDT().toString()
|
||||
const personId2 = new UUIDT().toString()
|
||||
const personId3 = new UUIDT().toString()
|
||||
it('should process entered and left events and write to PostgreSQL correctly', async () => {
|
||||
// Test data using helper functions
|
||||
const testEvents = createCohortMembershipEvents([
|
||||
{
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
},
|
||||
{
|
||||
personId: personId2,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
},
|
||||
{
|
||||
personId: personId3,
|
||||
cohortId: 457,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'left',
|
||||
},
|
||||
])
|
||||
|
||||
// Create mock Kafka messages
|
||||
const messages = testEvents.map((event, index) =>
|
||||
createKafkaMessage(event, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: index })
|
||||
)
|
||||
|
||||
// Process the batch of messages
|
||||
await (consumer as any).handleBatch(messages)
|
||||
|
||||
// Verify data was written to PostgreSQL
|
||||
const result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1 ORDER BY person_id, cohort_id',
|
||||
[1],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows).toHaveLength(3)
|
||||
|
||||
// Verify first entered event
|
||||
expect(result.rows[0]).toMatchObject({
|
||||
team_id: '1',
|
||||
cohort_id: '456',
|
||||
person_id: personId1,
|
||||
in_cohort: true,
|
||||
})
|
||||
|
||||
// Verify second entered event
|
||||
expect(result.rows[1]).toMatchObject({
|
||||
team_id: '1',
|
||||
cohort_id: '456',
|
||||
person_id: personId2,
|
||||
in_cohort: true,
|
||||
})
|
||||
|
||||
// Verify left event
|
||||
expect(result.rows[2]).toMatchObject({
|
||||
team_id: '1',
|
||||
cohort_id: '457',
|
||||
person_id: personId3,
|
||||
in_cohort: false,
|
||||
})
|
||||
})
|
||||
|
||||
it('should handle complete person lifecycle: enter -> leave -> re-enter cohort', async () => {
|
||||
// Step 1: Person enters the cohort for the first time
|
||||
const enterEvent = createCohortMembershipEvent({
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
})
|
||||
|
||||
await (consumer as any).handleBatch([
|
||||
createKafkaMessage(enterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 0 }),
|
||||
])
|
||||
|
||||
let result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1 AND person_id = $2 AND cohort_id = $3',
|
||||
[1, personId1, 456],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows[0].in_cohort).toBe(true)
|
||||
const firstTimestamp = result.rows[0].last_updated
|
||||
|
||||
// Wait to ensure timestamp difference
|
||||
await new Promise((resolve) => setTimeout(resolve, 10))
|
||||
|
||||
// Step 2: Person leaves the cohort
|
||||
const leaveEvent = createCohortMembershipEvent({
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'left',
|
||||
})
|
||||
|
||||
await (consumer as any).handleBatch([
|
||||
createKafkaMessage(leaveEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 1 }),
|
||||
])
|
||||
|
||||
result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1 AND person_id = $2 AND cohort_id = $3',
|
||||
[1, personId1, 456],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows).toHaveLength(1) // Same record, just updated
|
||||
expect(result.rows[0].in_cohort).toBe(false)
|
||||
const secondTimestamp = result.rows[0].last_updated
|
||||
expect(new Date(secondTimestamp).getTime()).toBeGreaterThan(new Date(firstTimestamp).getTime())
|
||||
|
||||
// Wait to ensure timestamp difference
|
||||
await new Promise((resolve) => setTimeout(resolve, 10))
|
||||
|
||||
// Step 3: Person re-enters the cohort
|
||||
const reEnterEvent = createCohortMembershipEvent({
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
})
|
||||
|
||||
await (consumer as any).handleBatch([
|
||||
createKafkaMessage(reEnterEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 2 }),
|
||||
])
|
||||
|
||||
result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1 AND person_id = $2 AND cohort_id = $3',
|
||||
[1, personId1, 456],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows).toHaveLength(1) // Still same record, just updated again
|
||||
expect(result.rows[0].in_cohort).toBe(true) // Back in the cohort
|
||||
const thirdTimestamp = result.rows[0].last_updated
|
||||
expect(new Date(thirdTimestamp).getTime()).toBeGreaterThan(new Date(secondTimestamp).getTime())
|
||||
})
|
||||
|
||||
it('should reject entire batch when invalid messages are present', async () => {
|
||||
const validEvent = {
|
||||
personId: personId1,
|
||||
cohortId: 456,
|
||||
teamId: 1,
|
||||
cohort_membership_changed: 'entered',
|
||||
}
|
||||
|
||||
const messages: Message[] = [
|
||||
// Valid message
|
||||
createKafkaMessage(validEvent, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 0 }),
|
||||
// Invalid JSON (manually create this one since it's intentionally malformed)
|
||||
{
|
||||
value: Buffer.from('invalid json'),
|
||||
topic: KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
partition: 0,
|
||||
offset: 1,
|
||||
timestamp: Date.now(),
|
||||
key: null,
|
||||
size: 0,
|
||||
},
|
||||
// Missing required fields
|
||||
createKafkaMessage({ personId: 124 }, { topic: KAFKA_COHORT_MEMBERSHIP_CHANGED, offset: 2 }),
|
||||
// Empty message (manually create this one since it has null value)
|
||||
{
|
||||
value: null,
|
||||
topic: KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
partition: 0,
|
||||
offset: 3,
|
||||
timestamp: Date.now(),
|
||||
key: null,
|
||||
size: 0,
|
||||
},
|
||||
]
|
||||
|
||||
// Should throw due to invalid messages in batch
|
||||
await expect((consumer as any).handleBatch(messages)).rejects.toThrow()
|
||||
|
||||
// Verify NO data was inserted
|
||||
const result = await hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'SELECT * FROM cohort_membership WHERE team_id = $1',
|
||||
[1],
|
||||
'testQuery'
|
||||
)
|
||||
|
||||
expect(result.rows).toHaveLength(0) // No data should be inserted
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -0,0 +1,145 @@
|
||||
import { Message } from 'node-rdkafka'
|
||||
import { z } from 'zod'
|
||||
|
||||
import { KAFKA_COHORT_MEMBERSHIP_CHANGED } from '../../config/kafka-topics'
|
||||
import { KafkaConsumer } from '../../kafka/consumer'
|
||||
import { HealthCheckResult, Hub } from '../../types'
|
||||
import { PostgresUse } from '../../utils/db/postgres'
|
||||
import { parseJSON } from '../../utils/json-parse'
|
||||
import { logger } from '../../utils/logger'
|
||||
import { CdpConsumerBase } from './cdp-base.consumer'
|
||||
|
||||
// Zod schema for validation
|
||||
const CohortMembershipChangeSchema = z.object({
|
||||
personId: z.string().uuid(),
|
||||
cohortId: z.number(),
|
||||
teamId: z.number(),
|
||||
cohort_membership_changed: z.enum(['entered', 'left']),
|
||||
})
|
||||
|
||||
export type CohortMembershipChange = z.infer<typeof CohortMembershipChangeSchema>
|
||||
|
||||
export class CdpCohortMembershipConsumer extends CdpConsumerBase {
|
||||
protected name = 'CdpCohortMembershipConsumer'
|
||||
private kafkaConsumer: KafkaConsumer
|
||||
|
||||
constructor(hub: Hub) {
|
||||
super(hub)
|
||||
this.kafkaConsumer = new KafkaConsumer({
|
||||
groupId: 'cdp-cohort-membership-consumer',
|
||||
topic: KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
})
|
||||
}
|
||||
|
||||
private async handleBatchCohortMembership(changes: CohortMembershipChange[]): Promise<void> {
|
||||
if (changes.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
// Build the VALUES clause for batch upsert
|
||||
const values: any[] = []
|
||||
const placeholders: string[] = []
|
||||
let paramIndex = 1
|
||||
|
||||
for (const change of changes) {
|
||||
const inCohort = change.cohort_membership_changed === 'entered'
|
||||
placeholders.push(
|
||||
`($${paramIndex}, $${paramIndex + 1}, $${paramIndex + 2}, $${paramIndex + 3}, CURRENT_TIMESTAMP)`
|
||||
)
|
||||
values.push(change.teamId, change.cohortId, change.personId, inCohort)
|
||||
paramIndex += 4
|
||||
}
|
||||
|
||||
// Single batch UPSERT query - handles both entered and left events
|
||||
const query = `
|
||||
INSERT INTO cohort_membership (team_id, cohort_id, person_id, in_cohort, last_updated)
|
||||
VALUES ${placeholders.join(', ')}
|
||||
ON CONFLICT (team_id, cohort_id, person_id)
|
||||
DO UPDATE SET
|
||||
in_cohort = EXCLUDED.in_cohort,
|
||||
last_updated = CURRENT_TIMESTAMP
|
||||
`
|
||||
|
||||
await this.hub.postgres.query(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
query,
|
||||
values,
|
||||
'batchUpsertCohortMembership'
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error('Failed to process batch cohort membership changes', {
|
||||
error,
|
||||
count: changes.length,
|
||||
})
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private async handleBatch(messages: Message[]): Promise<void> {
|
||||
const cohortMembershipChanges: CohortMembershipChange[] = []
|
||||
|
||||
// Process and validate all messages
|
||||
for (const message of messages) {
|
||||
try {
|
||||
const messageValue = message.value?.toString()
|
||||
if (!messageValue) {
|
||||
throw new Error('Empty message received')
|
||||
}
|
||||
|
||||
const parsedMessage = parseJSON(messageValue)
|
||||
|
||||
// Validate using Zod schema
|
||||
const validationResult = CohortMembershipChangeSchema.safeParse(parsedMessage)
|
||||
|
||||
if (!validationResult.success) {
|
||||
logger.error('Invalid cohort membership change message', {
|
||||
errors: validationResult.error.errors,
|
||||
message: messageValue,
|
||||
})
|
||||
throw new Error(`Invalid cohort membership change message: ${validationResult.error.message}`)
|
||||
}
|
||||
|
||||
const cohortMembershipChange = validationResult.data
|
||||
cohortMembershipChanges.push(cohortMembershipChange)
|
||||
} catch (error) {
|
||||
logger.error('Error processing cohort membership change message', {
|
||||
error,
|
||||
message: message.value?.toString(),
|
||||
})
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
await this.handleBatchCohortMembership(cohortMembershipChanges)
|
||||
}
|
||||
|
||||
public async start(): Promise<void> {
|
||||
await super.start()
|
||||
|
||||
logger.info('🚀', `${this.name} starting...`)
|
||||
|
||||
await this.kafkaConsumer.connect(async (messages) => {
|
||||
logger.info('🔁', `${this.name} - handling batch`, {
|
||||
size: messages.length,
|
||||
})
|
||||
|
||||
await this.handleBatch(messages)
|
||||
})
|
||||
|
||||
logger.info('✅', `${this.name} started successfully`)
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
logger.info('💤', `Stopping ${this.name}...`)
|
||||
await this.kafkaConsumer.disconnect()
|
||||
|
||||
// IMPORTANT: super always comes last
|
||||
await super.stop()
|
||||
logger.info('💤', `${this.name} stopped!`)
|
||||
}
|
||||
|
||||
public isHealthy(): HealthCheckResult {
|
||||
return this.kafkaConsumer.isHealthy()
|
||||
}
|
||||
}
|
||||
@@ -40,6 +40,11 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
: isDevEnv()
|
||||
? 'postgres://posthog:posthog@localhost:5432/posthog_persons'
|
||||
: '',
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: isTestEnv()
|
||||
? 'postgres://posthog:posthog@localhost:5432/test_behavioral_cohorts'
|
||||
: isDevEnv()
|
||||
? 'postgres://posthog:posthog@localhost:5432/behavioral_cohorts'
|
||||
: '',
|
||||
PERSONS_MIGRATION_DATABASE_URL: isTestEnv()
|
||||
? 'postgres://posthog:posthog@localhost:5432/test_persons_migration'
|
||||
: isDevEnv()
|
||||
@@ -56,9 +61,9 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
POSTHOG_DB_PASSWORD: '',
|
||||
POSTHOG_POSTGRES_HOST: 'localhost',
|
||||
POSTHOG_POSTGRES_PORT: 5432,
|
||||
POSTGRES_COUNTERS_HOST: 'localhost',
|
||||
POSTGRES_COUNTERS_USER: 'postgres',
|
||||
POSTGRES_COUNTERS_PASSWORD: '',
|
||||
POSTGRES_BEHAVIORAL_COHORTS_HOST: 'localhost',
|
||||
POSTGRES_BEHAVIORAL_COHORTS_USER: 'postgres',
|
||||
POSTGRES_BEHAVIORAL_COHORTS_PASSWORD: '',
|
||||
EVENT_OVERFLOW_BUCKET_CAPACITY: 1000,
|
||||
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: 1.0,
|
||||
KAFKA_BATCH_START_LOGGING_ENABLED: false,
|
||||
@@ -390,6 +395,18 @@ export function overrideWithEnv(
|
||||
).join(', ')}`
|
||||
)
|
||||
}
|
||||
|
||||
if (
|
||||
!newConfig.BEHAVIORAL_COHORTS_DATABASE_URL &&
|
||||
newConfig.POSTGRES_BEHAVIORAL_COHORTS_HOST &&
|
||||
newConfig.POSTGRES_BEHAVIORAL_COHORTS_USER &&
|
||||
newConfig.POSTGRES_BEHAVIORAL_COHORTS_PASSWORD
|
||||
) {
|
||||
const encodedUser = encodeURIComponent(newConfig.POSTGRES_BEHAVIORAL_COHORTS_USER)
|
||||
const encodedPassword = encodeURIComponent(newConfig.POSTGRES_BEHAVIORAL_COHORTS_PASSWORD)
|
||||
newConfig.BEHAVIORAL_COHORTS_DATABASE_URL = `postgres://${encodedUser}:${encodedPassword}@${newConfig.POSTGRES_BEHAVIORAL_COHORTS_HOST}:5432/behavioral_cohorts`
|
||||
}
|
||||
|
||||
return newConfig
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`
|
||||
export const KAFKA_CDP_FUNCTION_OVERFLOW = `${prefix}cdp_function_overflow${suffix}`
|
||||
export const KAFKA_CDP_INTERNAL_EVENTS = `${prefix}cdp_internal_events${suffix}`
|
||||
export const KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES = `${prefix}clickhouse_behavioral_cohorts_matches${suffix}`
|
||||
export const KAFKA_COHORT_MEMBERSHIP_CHANGED = `${prefix}cohort_membership_changed${suffix}`
|
||||
|
||||
// Error tracking topics
|
||||
export const KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT = `${prefix}clickhouse_error_tracking_issue_fingerprint${suffix}`
|
||||
|
||||
@@ -1,58 +0,0 @@
|
||||
exports.up = (pgm) => {
|
||||
// Table for person performed events
|
||||
pgm.createTable('person_performed_events', {
|
||||
team_id: {
|
||||
type: 'integer',
|
||||
notNull: true,
|
||||
},
|
||||
person_id: {
|
||||
type: 'uuid',
|
||||
notNull: true,
|
||||
},
|
||||
event_name: {
|
||||
type: 'text',
|
||||
notNull: true,
|
||||
},
|
||||
})
|
||||
|
||||
// Add composite primary key
|
||||
pgm.addConstraint('person_performed_events', 'person_performed_events_pkey', {
|
||||
primaryKey: ['team_id', 'person_id', 'event_name'],
|
||||
})
|
||||
|
||||
// Table for behavioural filter matched events
|
||||
pgm.createTable('behavioural_filter_matched_events', {
|
||||
team_id: {
|
||||
type: 'integer',
|
||||
notNull: true,
|
||||
},
|
||||
person_id: {
|
||||
type: 'uuid',
|
||||
notNull: true,
|
||||
},
|
||||
filter_hash: {
|
||||
type: 'text',
|
||||
notNull: true,
|
||||
},
|
||||
date: {
|
||||
type: 'date',
|
||||
notNull: true,
|
||||
},
|
||||
counter: {
|
||||
type: 'integer',
|
||||
notNull: true,
|
||||
default: 0,
|
||||
},
|
||||
})
|
||||
|
||||
// Add composite primary key with date first for better cache locality
|
||||
// Since we only update today's data, this keeps the working set small
|
||||
pgm.addConstraint('behavioural_filter_matched_events', 'behavioural_filter_matched_events_pkey', {
|
||||
primaryKey: ['date', 'team_id', 'person_id', 'filter_hash'],
|
||||
})
|
||||
}
|
||||
|
||||
exports.down = (pgm) => {
|
||||
pgm.dropTable('behavioural_filter_matched_events')
|
||||
pgm.dropTable('person_performed_events')
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import { setupCommonRoutes, setupExpressApp } from './api/router'
|
||||
import { getPluginServerCapabilities } from './capabilities'
|
||||
import { CdpApi } from './cdp/cdp-api'
|
||||
import { CdpBehaviouralEventsConsumer } from './cdp/consumers/cdp-behavioural-events.consumer'
|
||||
import { CdpCohortMembershipConsumer } from './cdp/consumers/cdp-cohort-membership.consumer'
|
||||
import { CdpCyclotronDelayConsumer } from './cdp/consumers/cdp-cyclotron-delay.consumer'
|
||||
import { CdpCyclotronWorkerHogFlow } from './cdp/consumers/cdp-cyclotron-worker-hogflow.consumer'
|
||||
import { CdpCyclotronWorker } from './cdp/consumers/cdp-cyclotron-worker.consumer'
|
||||
@@ -273,6 +274,14 @@ export class PluginServer {
|
||||
})
|
||||
}
|
||||
|
||||
if (capabilities.cdpCohortMembership) {
|
||||
serviceLoaders.push(async () => {
|
||||
const consumer = new CdpCohortMembershipConsumer(hub)
|
||||
await consumer.start()
|
||||
return consumer.service
|
||||
})
|
||||
}
|
||||
|
||||
const readyServices = await Promise.all(serviceLoaders.map((loader) => loader()))
|
||||
this.services.push(...readyServices)
|
||||
|
||||
|
||||
@@ -81,6 +81,7 @@ export enum PluginServerMode {
|
||||
cdp_internal_events = 'cdp-internal-events',
|
||||
cdp_cyclotron_worker = 'cdp-cyclotron-worker',
|
||||
cdp_behavioural_events = 'cdp-behavioural-events',
|
||||
cdp_cohort_membership = 'cdp-cohort-membership',
|
||||
cdp_cyclotron_worker_hogflow = 'cdp-cyclotron-worker-hogflow',
|
||||
cdp_cyclotron_worker_delay = 'cdp-cyclotron-worker-delay',
|
||||
cdp_api = 'cdp-api',
|
||||
@@ -279,6 +280,7 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
|
||||
DATABASE_URL: string // Postgres database URL
|
||||
DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database
|
||||
PERSONS_DATABASE_URL: string // Optional read-write Postgres database for persons
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL: string // Optional read-write Postgres database for behavioral cohorts
|
||||
PERSONS_READONLY_DATABASE_URL: string // Optional read-only replica to the persons Postgres database
|
||||
PERSONS_MIGRATION_DATABASE_URL: string // Read-write Postgres database for persons during dual write/migration
|
||||
PERSONS_MIGRATION_READONLY_DATABASE_URL: string // Optional read-only replica to the persons Postgres database during dual write/migration
|
||||
@@ -289,9 +291,9 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
|
||||
POSTHOG_DB_PASSWORD: string
|
||||
POSTHOG_POSTGRES_HOST: string
|
||||
POSTHOG_POSTGRES_PORT: number
|
||||
POSTGRES_COUNTERS_HOST: string
|
||||
POSTGRES_COUNTERS_USER: string
|
||||
POSTGRES_COUNTERS_PASSWORD: string
|
||||
POSTGRES_BEHAVIORAL_COHORTS_HOST: string
|
||||
POSTGRES_BEHAVIORAL_COHORTS_USER: string
|
||||
POSTGRES_BEHAVIORAL_COHORTS_PASSWORD: string
|
||||
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string
|
||||
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string
|
||||
// Redis url pretty much only used locally / self hosted
|
||||
@@ -547,6 +549,7 @@ export interface PluginServerCapabilities {
|
||||
cdpCyclotronWorkerHogFlow?: boolean
|
||||
cdpCyclotronWorkerDelay?: boolean
|
||||
cdpBehaviouralEvents?: boolean
|
||||
cdpCohortMembership?: boolean
|
||||
cdpApi?: boolean
|
||||
appManagementSingleton?: boolean
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ export enum PostgresUse {
|
||||
PLUGIN_STORAGE_RW, // Plugin Storage table, no read replica for it
|
||||
PERSONS_READ, // Person database, read replica
|
||||
PERSONS_WRITE, // Person database, write
|
||||
COUNTERS_RW, // Counters database for aggregations
|
||||
BEHAVIORAL_COHORTS_RW, // Behavioral cohorts database for behavioral cohorts
|
||||
}
|
||||
|
||||
export class TransactionClient {
|
||||
@@ -49,7 +49,7 @@ export class PostgresRouter {
|
||||
[PostgresUse.COMMON_READ, commonClient],
|
||||
[PostgresUse.PLUGIN_STORAGE_RW, commonClient],
|
||||
[PostgresUse.PERSONS_WRITE, commonClient],
|
||||
[PostgresUse.COUNTERS_RW, commonClient],
|
||||
[PostgresUse.BEHAVIORAL_COHORTS_RW, commonClient],
|
||||
])
|
||||
|
||||
if (serverConfig.DATABASE_READONLY_URL) {
|
||||
@@ -88,6 +88,20 @@ export class PostgresRouter {
|
||||
)
|
||||
logger.info('👍', `Persons Postgresql ready`)
|
||||
}
|
||||
|
||||
if (serverConfig.BEHAVIORAL_COHORTS_DATABASE_URL) {
|
||||
logger.info('🤔', `Connecting to behavioral cohorts Postgresql...`)
|
||||
this.pools.set(
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
createPostgresPool(
|
||||
serverConfig.BEHAVIORAL_COHORTS_DATABASE_URL,
|
||||
serverConfig.POSTGRES_CONNECTION_POOL_SIZE,
|
||||
app_name
|
||||
)
|
||||
)
|
||||
logger.info('👍', `Behavioral cohorts Postgresql ready`)
|
||||
}
|
||||
|
||||
if (serverConfig.PERSONS_READONLY_DATABASE_URL) {
|
||||
logger.info('🤔', `Connecting to persons read-only Postgresql...`)
|
||||
this.pools.set(
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES,
|
||||
KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
|
||||
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
|
||||
KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES,
|
||||
KAFKA_EVENTS_DEAD_LETTER_QUEUE,
|
||||
KAFKA_EVENTS_JSON,
|
||||
@@ -57,6 +58,7 @@ export async function resetKafka(extraServerConfig?: Partial<PluginsServerConfig
|
||||
KAFKA_EVENTS_RECENT_JSON,
|
||||
KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES,
|
||||
KAFKA_CDP_CLICKHOUSE_BEHAVIORAL_COHORTS_MATCHES,
|
||||
KAFKA_COHORT_MEMBERSHIP_CHANGED,
|
||||
])
|
||||
}
|
||||
|
||||
|
||||
@@ -218,9 +218,20 @@ export async function resetTestDatabase(
|
||||
|
||||
// Helper function to determine which database a table belongs to
|
||||
function getPostgresUseForTable(table: string): PostgresUse {
|
||||
// Behavioral cohorts tables
|
||||
if (table === 'cohort_membership') {
|
||||
return PostgresUse.BEHAVIORAL_COHORTS_RW
|
||||
}
|
||||
|
||||
// Persons-related tables
|
||||
const personsTablesRegex =
|
||||
/^posthog_(person|persondistinctid|personlessdistinctid|personoverridemapping|personoverride|pendingpersonoverride|flatpersonoverride|featureflaghashkeyoverride|cohortpeople|group|grouptypemapping)$/
|
||||
return personsTablesRegex.test(table) ? PostgresUse.PERSONS_WRITE : PostgresUse.COMMON_WRITE
|
||||
if (personsTablesRegex.test(table)) {
|
||||
return PostgresUse.PERSONS_WRITE
|
||||
}
|
||||
|
||||
// Default to common tables
|
||||
return PostgresUse.COMMON_WRITE
|
||||
}
|
||||
|
||||
export async function insertRow(db: PostgresRouter, table: string, objectProvided: Record<string, any>) {
|
||||
@@ -637,11 +648,11 @@ export async function fetchPostgresDistinctIdsForPerson(db: DB, personId: string
|
||||
)
|
||||
}
|
||||
|
||||
export async function resetCountersDatabase(db: PostgresRouter): Promise<void> {
|
||||
export async function resetBehavioralCohortsDatabase(db: PostgresRouter): Promise<void> {
|
||||
await db.query(
|
||||
PostgresUse.COUNTERS_RW,
|
||||
'TRUNCATE TABLE person_performed_events, behavioural_filter_matched_events',
|
||||
PostgresUse.BEHAVIORAL_COHORTS_RW,
|
||||
'TRUNCATE TABLE cohort_membership',
|
||||
undefined,
|
||||
'reset-counters-db'
|
||||
'reset-behavioral-cohorts-db'
|
||||
)
|
||||
}
|
||||
|
||||
22
pnpm-lock.yaml
generated
22
pnpm-lock.yaml
generated
@@ -1533,9 +1533,6 @@ importers:
|
||||
jest:
|
||||
specifier: ^30.0.0
|
||||
version: 30.0.5(@types/node@22.15.17)(esbuild-register@3.5.0(esbuild@0.25.10))(ts-node@10.9.1(@swc/core@1.11.4(@swc/helpers@0.5.15))(@types/node@22.15.17)(typescript@5.2.2))
|
||||
node-pg-migrate:
|
||||
specifier: ^8.0.3
|
||||
version: 8.0.3(@types/pg@8.6.6)(pg@8.10.0)
|
||||
pino-pretty:
|
||||
specifier: ^9.1.0
|
||||
version: 9.4.0
|
||||
@@ -15324,17 +15321,6 @@ packages:
|
||||
node-int64@0.4.0:
|
||||
resolution: {integrity: sha512-O5lz91xSOeoXP6DulyHfllpq+Eg00MWitZIbtPfoSEvqIHdl5gfcY6hYzDWnj0qD5tz52PI08u9qUvSVeUBeHw==}
|
||||
|
||||
node-pg-migrate@8.0.3:
|
||||
resolution: {integrity: sha512-oKzZyzTULTryO1jehX19VnyPCGf3G/3oWZg3gODphvID56T0WjPOShTVPVnxGdlcueaIW3uAVrr7M8xLZq5TcA==}
|
||||
engines: {node: '>=20.11.0'}
|
||||
hasBin: true
|
||||
peerDependencies:
|
||||
'@types/pg': '>=6.0.0 <9.0.0'
|
||||
pg: '>=4.3.0 <9.0.0'
|
||||
peerDependenciesMeta:
|
||||
'@types/pg':
|
||||
optional: true
|
||||
|
||||
node-preload@0.2.1:
|
||||
resolution: {integrity: sha512-RM5oyBy45cLEoHqCeh+MNuFAxO0vTFBLskvQbOKnEE7YTTSN4tbN8QWDIPQ6L+WvKsB/qLEGpYe2ZZ9d4W9OIQ==}
|
||||
engines: {node: '>=8'}
|
||||
@@ -36830,14 +36816,6 @@ snapshots:
|
||||
|
||||
node-int64@0.4.0: {}
|
||||
|
||||
node-pg-migrate@8.0.3(@types/pg@8.6.6)(pg@8.10.0):
|
||||
dependencies:
|
||||
glob: 11.0.0
|
||||
pg: 8.10.0
|
||||
yargs: 17.7.2
|
||||
optionalDependencies:
|
||||
'@types/pg': 8.6.6
|
||||
|
||||
node-preload@0.2.1:
|
||||
dependencies:
|
||||
process-on-spawn: 1.0.0
|
||||
|
||||
@@ -50,3 +50,4 @@ KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT_EMBEDDINGS = (
|
||||
KAFKA_DOCUMENT_EMBEDDINGS_TOPIC = f"{KAFKA_PREFIX}clickhouse_document_embeddings{SUFFIX}"
|
||||
|
||||
KAFKA_CDP_INTERNAL_EVENTS = f"{KAFKA_PREFIX}cdp_internal_events{SUFFIX}"
|
||||
KAFKA_COHORT_MEMBERSHIP_CHANGED = f"{KAFKA_PREFIX}cohort_membership_changed{SUFFIX}"
|
||||
|
||||
@@ -19,14 +19,17 @@ WORKDIR /migrations
|
||||
|
||||
COPY ./persons_migrations ./persons_migrations
|
||||
COPY ./cyclotron-core/migrations ./cyclotron-core/migrations
|
||||
COPY ./behavioral_cohorts_migrations ./behavioral_cohorts_migrations
|
||||
|
||||
COPY ./bin/migrate-persons ./bin/migrate-persons
|
||||
COPY ./bin/migrate-cyclotron ./bin/migrate-cyclotron
|
||||
COPY ./bin/migrate-behavioral-cohorts ./bin/migrate-behavioral-cohorts
|
||||
COPY ./bin/migrate-all ./bin/migrate-all
|
||||
|
||||
RUN chmod +x /migrations/bin/migrate-all && \
|
||||
chmod +x /migrations/bin/migrate-persons && \
|
||||
chmod +x /migrations/bin/migrate-cyclotron
|
||||
chmod +x /migrations/bin/migrate-cyclotron && \
|
||||
chmod +x /migrations/bin/migrate-behavioral-cohorts
|
||||
|
||||
ENTRYPOINT ["/migrations/bin/migrate-all"]
|
||||
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
-- Create cohort membership table for behavioral cohorts
|
||||
-- This table tracks which persons are members of which cohorts
|
||||
|
||||
CREATE TABLE IF NOT EXISTS cohort_membership (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
team_id BIGINT NOT NULL,
|
||||
cohort_id BIGINT NOT NULL,
|
||||
person_id UUID NOT NULL,
|
||||
in_cohort BOOLEAN NOT NULL,
|
||||
last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
-- Add index on person_id, cohort_id, and team_id for query performance
|
||||
CREATE INDEX IF NOT EXISTS idx_cohort_membership_lookup
|
||||
ON cohort_membership (person_id, cohort_id, team_id);
|
||||
|
||||
-- Add unique constraint to prevent duplicate entries
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_constraint
|
||||
WHERE conname = 'cohort_membership_unique'
|
||||
AND conrelid = 'cohort_membership'::regclass
|
||||
) THEN
|
||||
ALTER TABLE cohort_membership
|
||||
ADD CONSTRAINT cohort_membership_unique
|
||||
UNIQUE (team_id, cohort_id, person_id);
|
||||
END IF;
|
||||
END $$;
|
||||
@@ -42,6 +42,28 @@ run_cyclotron_migrations() {
|
||||
echo "Cyclotron migrations completed successfully"
|
||||
}
|
||||
|
||||
run_behavioral_cohorts_migrations() {
|
||||
echo "Running behavioral cohorts migrations..."
|
||||
|
||||
BEHAVIORAL_COHORTS_DATABASE_NAME=${BEHAVIORAL_COHORTS_DATABASE_NAME:-behavioral_cohorts}
|
||||
|
||||
# Use environment variables with defaults
|
||||
DB_HOST="${POSTGRES_BEHAVIORAL_COHORTS_HOST:-localhost}"
|
||||
DB_USER="${POSTGRES_BEHAVIORAL_COHORTS_USER:-posthog}"
|
||||
DB_PASS="${POSTGRES_BEHAVIORAL_COHORTS_PASSWORD:-posthog}"
|
||||
|
||||
BEHAVIORAL_COHORTS_DATABASE_URL=${BEHAVIORAL_COHORTS_DATABASE_URL:-postgres://${DB_USER}:${DB_PASS}@${DB_HOST}:5432/$BEHAVIORAL_COHORTS_DATABASE_NAME}
|
||||
|
||||
echo "Performing behavioral cohorts migrations for $BEHAVIORAL_COHORTS_DATABASE_URL"
|
||||
echo "Database name: ${BEHAVIORAL_COHORTS_DATABASE_NAME}"
|
||||
|
||||
sqlx database create -D "$BEHAVIORAL_COHORTS_DATABASE_URL"
|
||||
|
||||
sqlx migrate run -D "$BEHAVIORAL_COHORTS_DATABASE_URL" --source "$MIGRATIONS_BASE/behavioral_cohorts_migrations/"
|
||||
|
||||
echo "Behavioral cohorts migrations completed successfully"
|
||||
}
|
||||
|
||||
case "$MIGRATION_TYPE" in
|
||||
persons)
|
||||
run_persons_migrations
|
||||
@@ -49,15 +71,20 @@ case "$MIGRATION_TYPE" in
|
||||
cyclotron)
|
||||
run_cyclotron_migrations
|
||||
;;
|
||||
behavioral-cohorts)
|
||||
run_behavioral_cohorts_migrations
|
||||
;;
|
||||
all)
|
||||
run_persons_migrations
|
||||
run_cyclotron_migrations
|
||||
run_behavioral_cohorts_migrations
|
||||
;;
|
||||
*)
|
||||
echo "Usage: $0 [all|persons|cyclotron]"
|
||||
echo " all - Run both persons and cyclotron migrations (default)"
|
||||
echo " persons - Run only persons migrations"
|
||||
echo " cyclotron - Run only cyclotron migrations"
|
||||
echo "Usage: $0 [all|persons|cyclotron|behavioral-cohorts]"
|
||||
echo " all - Run all migrations (default)"
|
||||
echo " persons - Run only persons migrations"
|
||||
echo " cyclotron - Run only cyclotron migrations"
|
||||
echo " behavioral-cohorts - Run only behavioral cohorts migrations"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
4
rust/bin/migrate-behavioral-cohorts
Executable file
4
rust/bin/migrate-behavioral-cohorts
Executable file
@@ -0,0 +1,4 @@
|
||||
#!/bin/sh
|
||||
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
|
||||
|
||||
exec "$SCRIPT_DIR/migrate-all" behavioral-cohorts
|
||||
Reference in New Issue
Block a user