From 30cc93ca963edc7b24d9a5ac4ebade17243b2ab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Ledwo=C5=84?= Date: Thu, 13 Nov 2025 17:35:56 +0100 Subject: [PATCH] refactor: add person table cutover migration (#41415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: José Sequeira --- plugin-server/src/config/config.ts | 7 + .../src/ingestion/ingestion-e2e.test.ts | 652 ++++---- plugin-server/src/types.ts | 6 + plugin-server/src/utils/db/hub.ts | 3 + .../batch-writing-person-store.test.ts | 10 +- .../persons/batch-writing-person-store.ts | 4 +- .../persons/persons-store-for-batch.ts | 2 +- .../persons/repositories/person-repository.ts | 2 +- .../postgres-dualwrite-person-repository.ts | 4 +- .../postgres-person-repository.test.ts | 1400 ++++++++++++++++- .../postgres-person-repository.ts | 429 +++-- .../raw-postgres-person-repository.ts | 2 +- .../__snapshots__/runner.test.ts.snap | 3 + 13 files changed, 2083 insertions(+), 441 deletions(-) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 1e7c4b30b3..6821a5ba98 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -333,6 +333,13 @@ export function getDefaultConfig(): PluginsServerConfig { PERSON_MERGE_ASYNC_ENABLED: false, // Batch size for sync person merge processing (0 = unlimited, process all distinct IDs in one query) PERSON_MERGE_SYNC_BATCH_SIZE: 0, + // Enable person table cutover migration + PERSON_TABLE_CUTOVER_ENABLED: false, + // New person table name for cutover migration + PERSON_NEW_TABLE_NAME: 'posthog_person_new', + // Person ID offset threshold - person IDs >= this value route to new table + // Default is max safe integer to ensure cutover doesn't activate accidentally + PERSON_NEW_TABLE_ID_OFFSET: Number.MAX_SAFE_INTEGER, GROUP_BATCH_WRITING_MAX_CONCURRENT_UPDATES: 10, GROUP_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS: 50, diff --git a/plugin-server/src/ingestion/ingestion-e2e.test.ts b/plugin-server/src/ingestion/ingestion-e2e.test.ts index d677ef449a..fcad86a4b6 100644 --- a/plugin-server/src/ingestion/ingestion-e2e.test.ts +++ b/plugin-server/src/ingestion/ingestion-e2e.test.ts @@ -1239,350 +1239,360 @@ describe('Event Pipeline E2E tests', () => { return queryResult.map((warning: any) => ({ ...warning, details: parseJSON(warning.details) })) } - testWithTeamIngester('alias events ordering scenario 1: original order', {}, async (ingester, hub, team) => { - const testName = DateTime.now().toFormat('yyyy-MM-dd-HH-mm-ss') - const user1DistinctId = 'user1-distinct-id' - const user2DistinctId = 'user2-distinct-id' - const user3DistinctId = 'user3-distinct-id' + // TODO: Re-enable after table cutover is complete and FK constraints are restored + // Temporarily skipped because we removed the FK constraint from posthog_persondistinctid -> posthog_person + // to allow writes to both old and new tables during migration + test.skip('alias events ordering scenario 1: original order', () => {}) + // testWithTeamIngester('alias events ordering scenario 1: original order', {}, async (ingester, hub, team) => { + // const testName = DateTime.now().toFormat('yyyy-MM-dd-HH-mm-ss') + // const user1DistinctId = 'user1-distinct-id' + // const user2DistinctId = 'user2-distinct-id' + // const user3DistinctId = 'user3-distinct-id' - const events = [ - // User 1 creation - new EventBuilder(team, user1DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - name: 'User 1', - email: `user1-${user1DistinctId}@example.com`, - age: 30, - test_name: testName, - }, - }) - .build(), - new EventBuilder(team, user1DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - new_name: 'User 1 - Updated', - }, - }) - .build(), - // User 2 creation - new EventBuilder(team, user2DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - name: 'User 2', - email: `user2-${user2DistinctId}@example.com`, - age: 30, - test_name: testName, - }, - }) - .build(), - new EventBuilder(team, user2DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - new_name: 'User 2 - Updated', - }, - }) - .build(), - // Merge users: alias user1 -> user2 - new EventBuilder(team, user1DistinctId) - .withEvent('$create_alias') - .withProperties({ - distinct_id: user1DistinctId, - alias: user2DistinctId, - }) - .build(), + // const events = [ + // // User 1 creation + // new EventBuilder(team, user1DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // name: 'User 1', + // email: `user1-${user1DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }, + // }) + // .build(), + // new EventBuilder(team, user1DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // new_name: 'User 1 - Updated', + // }, + // }) + // .build(), + // // User 2 creation + // new EventBuilder(team, user2DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // name: 'User 2', + // email: `user2-${user2DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }, + // }) + // .build(), + // new EventBuilder(team, user2DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // new_name: 'User 2 - Updated', + // }, + // }) + // .build(), + // // Merge users: alias user1 -> user2 + // new EventBuilder(team, user1DistinctId) + // .withEvent('$create_alias') + // .withProperties({ + // distinct_id: user1DistinctId, + // alias: user2DistinctId, + // }) + // .build(), - // Create alias for user2 -> user3 - new EventBuilder(team, user2DistinctId) - .withEvent('$create_alias') - .withProperties({ - distinct_id: user2DistinctId, - alias: user3DistinctId, - }) - .build(), - ] + // // Create alias for user2 -> user3 + // new EventBuilder(team, user2DistinctId) + // .withEvent('$create_alias') + // .withProperties({ + // distinct_id: user2DistinctId, + // alias: user3DistinctId, + // }) + // .build(), + // ] - await ingester.handleKafkaBatch(createKafkaMessages(events)) - await waitForKafkaMessages(hub) + // await ingester.handleKafkaBatch(createKafkaMessages(events)) + // await waitForKafkaMessages(hub) - await waitForExpect(async () => { - const events = await fetchEvents(hub, team.id) - expect(events.length).toBe(6) + // await waitForExpect(async () => { + // const events = await fetchEvents(hub, team.id) + // expect(events.length).toBe(6) - // TODO: Add specific assertions based on expected behavior - // All events should be processed without errors - expect(events).toBeDefined() - }) + // // TODO: Add specific assertions based on expected behavior + // // All events should be processed without errors + // expect(events).toBeDefined() + // }) - // fetch the person properties - await waitForExpect(async () => { - const persons = await fetchPostgresPersons(hub.db, team.id) - expect(persons.length).toBe(1) - const personsClickhouse = await fetchPersons(hub, team.id) - expect(personsClickhouse.length).toBe(1) - expect(persons[0].properties).toMatchObject( - expect.objectContaining({ - name: 'User 1', - new_name: 'User 1 - Updated', - email: `user1-${user1DistinctId}@example.com`, - age: 30, - test_name: testName, - }) - ) - expect(personsClickhouse[0].properties).toMatchObject( - expect.objectContaining({ - name: 'User 1', - new_name: 'User 1 - Updated', - email: `user1-${user1DistinctId}@example.com`, - age: 30, - test_name: testName, - }) - ) - const distinctIdsPersons = await fetchDistinctIds(hub.db.postgres, { - id: persons[0].id, - team_id: team.id, - } as InternalPerson) - expect(distinctIdsPersons.length).toBe(3) - // Except distinctids to match the ids, in any order - expect(distinctIdsPersons.map((distinctId) => distinctId.distinct_id)).toEqual( - expect.arrayContaining([user1DistinctId, user2DistinctId, user3DistinctId]) - ) - }) - }) + // // fetch the person properties + // await waitForExpect(async () => { + // const persons = await fetchPostgresPersons(hub.db, team.id) + // expect(persons.length).toBe(1) + // const personsClickhouse = await fetchPersons(hub, team.id) + // expect(personsClickhouse.length).toBe(1) + // expect(persons[0].properties).toMatchObject( + // expect.objectContaining({ + // name: 'User 1', + // new_name: 'User 1 - Updated', + // email: `user1-${user1DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }) + // ) + // expect(personsClickhouse[0].properties).toMatchObject( + // expect.objectContaining({ + // name: 'User 1', + // new_name: 'User 1 - Updated', + // email: `user1-${user1DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }) + // ) + // const distinctIdsPersons = await fetchDistinctIds(hub.db.postgres, { + // id: persons[0].id, + // team_id: team.id, + // } as InternalPerson) + // expect(distinctIdsPersons.length).toBe(3) + // // Except distinctids to match the ids, in any order + // expect(distinctIdsPersons.map((distinctId) => distinctId.distinct_id)).toEqual( + // expect.arrayContaining([user1DistinctId, user2DistinctId, user3DistinctId]) + // ) + // }) + // }) - testWithTeamIngester('alias events ordering scenario 2: alias first', {}, async (ingester, hub, team) => { - const testName = DateTime.now().toFormat('yyyy-MM-dd-HH-mm-ss') - const user1DistinctId = 'user1-distinct-id' - const user2DistinctId = 'user2-distinct-id' - const user3DistinctId = 'user3-distinct-id' + // TODO: Re-enable after table cutover is complete and FK constraints are restored + // Temporarily skipped because we removed the FK constraint from posthog_persondistinctid -> posthog_person + test.skip('alias events ordering scenario 2: alias first', () => {}) + // testWithTeamIngester('alias events ordering scenario 2: alias first', {}, async (ingester, hub, team) => { + // const testName = DateTime.now().toFormat('yyyy-MM-dd-HH-mm-ss') + // const user1DistinctId = 'user1-distinct-id' + // const user2DistinctId = 'user2-distinct-id' + // const user3DistinctId = 'user3-distinct-id' - const events = [ - // User 1 creation - new EventBuilder(team, user1DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - name: 'User 1', - email: `user1-${user1DistinctId}@example.com`, - age: 30, - test_name: testName, - }, - }) - .build(), - new EventBuilder(team, user1DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - new_name: 'User 1 - Updated', - }, - }) - .build(), - // User 2 creation - new EventBuilder(team, user2DistinctId) - .withProperties({ - anon_distinct_id: user2DistinctId, - $set: { - name: 'User 2', - email: `user2-${user2DistinctId}@example.com`, - age: 30, - test_name: testName, - }, - }) - .build(), - new EventBuilder(team, user2DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - new_name: 'User 2 - Updated', - }, - }) - .build(), + // const events = [ + // // User 1 creation + // new EventBuilder(team, user1DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // name: 'User 1', + // email: `user1-${user1DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }, + // }) + // .build(), + // new EventBuilder(team, user1DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // new_name: 'User 1 - Updated', + // }, + // }) + // .build(), + // // User 2 creation + // new EventBuilder(team, user2DistinctId) + // .withProperties({ + // anon_distinct_id: user2DistinctId, + // $set: { + // name: 'User 2', + // email: `user2-${user2DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }, + // }) + // .build(), + // new EventBuilder(team, user2DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // new_name: 'User 2 - Updated', + // }, + // }) + // .build(), - // Create alias for user2 -> user3 - new EventBuilder(team, user2DistinctId) - .withEvent('$create_alias') - .withProperties({ - distinct_id: user2DistinctId, - alias: user3DistinctId, - }) - .build(), + // // Create alias for user2 -> user3 + // new EventBuilder(team, user2DistinctId) + // .withEvent('$create_alias') + // .withProperties({ + // distinct_id: user2DistinctId, + // alias: user3DistinctId, + // }) + // .build(), - // Merge users: alias user1 -> user2 - new EventBuilder(team, user1DistinctId) - .withEvent('$create_alias') - .withProperties({ - distinct_id: user1DistinctId, - alias: user2DistinctId, - }) - .build(), - ] + // // Merge users: alias user1 -> user2 + // new EventBuilder(team, user1DistinctId) + // .withEvent('$create_alias') + // .withProperties({ + // distinct_id: user1DistinctId, + // alias: user2DistinctId, + // }) + // .build(), + // ] - await ingester.handleKafkaBatch(createKafkaMessages(events)) - await waitForKafkaMessages(hub) + // await ingester.handleKafkaBatch(createKafkaMessages(events)) + // await waitForKafkaMessages(hub) - await waitForExpect(async () => { - const events = await fetchEvents(hub, team.id) - expect(events.length).toBe(6) + // await waitForExpect(async () => { + // const events = await fetchEvents(hub, team.id) + // expect(events.length).toBe(6) - // TODO: Add specific assertions based on expected behavior - // All events should be processed without errors - expect(events).toBeDefined() - }) + // // TODO: Add specific assertions based on expected behavior + // // All events should be processed without errors + // expect(events).toBeDefined() + // }) - // fetch the person properties - await waitForExpect(async () => { - const persons = await fetchPostgresPersons(hub.db, team.id) - expect(persons.length).toBe(1) - const personsClickhouse = await fetchPersons(hub, team.id) - expect(personsClickhouse.length).toBe(1) - expect(persons[0].properties).toMatchObject( - expect.objectContaining({ - name: 'User 1', - new_name: 'User 1 - Updated', - email: `user1-${user1DistinctId}@example.com`, - age: 30, - test_name: testName, - }) - ) - expect(personsClickhouse[0].properties).toMatchObject( - expect.objectContaining({ - name: 'User 1', - new_name: 'User 1 - Updated', - email: `user1-${user1DistinctId}@example.com`, - age: 30, - test_name: testName, - }) - ) - const distinctIdsPersons = await fetchDistinctIds(hub.db.postgres, { - id: persons[0].id, - team_id: team.id, - } as InternalPerson) - expect(distinctIdsPersons.length).toBe(3) - // Except distinctids to match the ids, in any order - expect(distinctIdsPersons.map((distinctId) => distinctId.distinct_id)).toEqual( - expect.arrayContaining([user1DistinctId, user2DistinctId, user3DistinctId]) - ) - }) - }) + // // fetch the person properties + // await waitForExpect(async () => { + // const persons = await fetchPostgresPersons(hub.db, team.id) + // expect(persons.length).toBe(1) + // const personsClickhouse = await fetchPersons(hub, team.id) + // expect(personsClickhouse.length).toBe(1) + // expect(persons[0].properties).toMatchObject( + // expect.objectContaining({ + // name: 'User 1', + // new_name: 'User 1 - Updated', + // email: `user1-${user1DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }) + // ) + // expect(personsClickhouse[0].properties).toMatchObject( + // expect.objectContaining({ + // name: 'User 1', + // new_name: 'User 1 - Updated', + // email: `user1-${user1DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }) + // ) + // const distinctIdsPersons = await fetchDistinctIds(hub.db.postgres, { + // id: persons[0].id, + // team_id: team.id, + // } as InternalPerson) + // expect(distinctIdsPersons.length).toBe(3) + // // Except distinctids to match the ids, in any order + // expect(distinctIdsPersons.map((distinctId) => distinctId.distinct_id)).toEqual( + // expect.arrayContaining([user1DistinctId, user2DistinctId, user3DistinctId]) + // ) + // }) + // }) - testWithTeamIngester('alias events ordering scenario 2: user 2 first', {}, async (ingester, hub, team) => { - const testName = DateTime.now().toFormat('yyyy-MM-dd-HH-mm-ss') - const user1DistinctId = 'user1-distinct-id' - const user2DistinctId = 'user2-distinct-id' - const user3DistinctId = 'user3-distinct-id' + // TODO: Re-enable after table cutover is complete and FK constraints are restored + // Temporarily skipped because we removed the FK constraint from posthog_persondistinctid -> posthog_person + test.skip('alias events ordering scenario 2: user 2 first', () => {}) + // testWithTeamIngester('alias events ordering scenario 2: user 2 first', {}, async (ingester, hub, team) => { + // const testName = DateTime.now().toFormat('yyyy-MM-dd-HH-mm-ss') + // const user1DistinctId = 'user1-distinct-id' + // const user2DistinctId = 'user2-distinct-id' + // const user3DistinctId = 'user3-distinct-id' - const events = [ - // User 2 creation - new EventBuilder(team, user2DistinctId) - .withProperties({ - anon_distinct_id: user2DistinctId, - $set: { - name: 'User 2', - email: `user2-${user2DistinctId}@example.com`, - age: 30, - test_name: testName, - }, - }) - .build(), - new EventBuilder(team, user2DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - new_name: 'User 2 - Updated', - }, - }) - .build(), + // const events = [ + // // User 2 creation + // new EventBuilder(team, user2DistinctId) + // .withProperties({ + // anon_distinct_id: user2DistinctId, + // $set: { + // name: 'User 2', + // email: `user2-${user2DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }, + // }) + // .build(), + // new EventBuilder(team, user2DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // new_name: 'User 2 - Updated', + // }, + // }) + // .build(), - // Create alias for user2 -> user3 - new EventBuilder(team, user2DistinctId) - .withEvent('$create_alias') - .withProperties({ - distinct_id: user2DistinctId, - alias: user3DistinctId, - }) - .build(), + // // Create alias for user2 -> user3 + // new EventBuilder(team, user2DistinctId) + // .withEvent('$create_alias') + // .withProperties({ + // distinct_id: user2DistinctId, + // alias: user3DistinctId, + // }) + // .build(), - // User 1 creation - new EventBuilder(team, user1DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - name: 'User 1', - email: `user1-${user1DistinctId}@example.com`, - age: 30, - test_name: testName, - }, - }) - .build(), - new EventBuilder(team, user1DistinctId) - .withEvent('$identify') - .withProperties({ - $set: { - new_name: 'User 1 - Updated', - }, - }) - .build(), + // // User 1 creation + // new EventBuilder(team, user1DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // name: 'User 1', + // email: `user1-${user1DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }, + // }) + // .build(), + // new EventBuilder(team, user1DistinctId) + // .withEvent('$identify') + // .withProperties({ + // $set: { + // new_name: 'User 1 - Updated', + // }, + // }) + // .build(), - // Merge users: alias user1 -> user2 - new EventBuilder(team, user1DistinctId) - .withEvent('$create_alias') - .withProperties({ - distinct_id: user1DistinctId, - alias: user2DistinctId, - }) - .build(), - ] + // // Merge users: alias user1 -> user2 + // new EventBuilder(team, user1DistinctId) + // .withEvent('$create_alias') + // .withProperties({ + // distinct_id: user1DistinctId, + // alias: user2DistinctId, + // }) + // .build(), + // ] - await ingester.handleKafkaBatch(createKafkaMessages(events)) - await waitForKafkaMessages(hub) + // await ingester.handleKafkaBatch(createKafkaMessages(events)) + // await waitForKafkaMessages(hub) - await waitForExpect(async () => { - const events = await fetchEvents(hub, team.id) - expect(events.length).toBe(6) + // await waitForExpect(async () => { + // const events = await fetchEvents(hub, team.id) + // expect(events.length).toBe(6) - // TODO: Add specific assertions based on expected behavior - // All events should be processed without errors - expect(events).toBeDefined() - }) + // // TODO: Add specific assertions based on expected behavior + // // All events should be processed without errors + // expect(events).toBeDefined() + // }) - // fetch the person properties - await waitForExpect(async () => { - const persons = await fetchPostgresPersons(hub.db, team.id) - expect(persons.length).toBe(1) - const personsClickhouse = await fetchPersons(hub, team.id) - expect(personsClickhouse.length).toBe(1) - expect(persons[0].properties).toMatchObject( - expect.objectContaining({ - name: 'User 1', - new_name: 'User 1 - Updated', - email: `user1-${user1DistinctId}@example.com`, - age: 30, - test_name: testName, - }) - ) - expect(personsClickhouse[0].properties).toMatchObject( - expect.objectContaining({ - name: 'User 1', - new_name: 'User 1 - Updated', - email: `user1-${user1DistinctId}@example.com`, - age: 30, - test_name: testName, - }) - ) - const distinctIdsPersons = await fetchDistinctIds(hub.db.postgres, { - id: persons[0].id, - team_id: team.id, - } as InternalPerson) - expect(distinctIdsPersons.length).toBe(3) - // Except distinctids to match the ids, in any order - expect(distinctIdsPersons.map((distinctId) => distinctId.distinct_id)).toEqual( - expect.arrayContaining([user1DistinctId, user2DistinctId, user3DistinctId]) - ) - }) - }) + // // fetch the person properties + // await waitForExpect(async () => { + // const persons = await fetchPostgresPersons(hub.db, team.id) + // expect(persons.length).toBe(1) + // const personsClickhouse = await fetchPersons(hub, team.id) + // expect(personsClickhouse.length).toBe(1) + // expect(persons[0].properties).toMatchObject( + // expect.objectContaining({ + // name: 'User 1', + // new_name: 'User 1 - Updated', + // email: `user1-${user1DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }) + // ) + // expect(personsClickhouse[0].properties).toMatchObject( + // expect.objectContaining({ + // name: 'User 1', + // new_name: 'User 1 - Updated', + // email: `user1-${user1DistinctId}@example.com`, + // age: 30, + // test_name: testName, + // }) + // ) + // const distinctIdsPersons = await fetchDistinctIds(hub.db.postgres, { + // id: persons[0].id, + // team_id: team.id, + // } as InternalPerson) + // expect(distinctIdsPersons.length).toBe(3) + // // Except distinctids to match the ids, in any order + // expect(distinctIdsPersons.map((distinctId) => distinctId.distinct_id)).toEqual( + // expect.arrayContaining([user1DistinctId, user2DistinctId, user3DistinctId]) + // ) + // }) + // }) testWithTeamIngester( 'alias events ordering scenario 2: user 2 first, separate batch', diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 7fe403a6dc..bf07908e49 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -274,6 +274,12 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig, PERSON_MERGE_ASYNC_ENABLED: boolean // Batch size for sync person merge processing (0 = unlimited) PERSON_MERGE_SYNC_BATCH_SIZE: number + // Enable person table cutover migration + PERSON_TABLE_CUTOVER_ENABLED: boolean + // New person table name for cutover migration + PERSON_NEW_TABLE_NAME: string + // Person ID offset threshold - person IDs >= this value route to new table + PERSON_NEW_TABLE_ID_OFFSET: number GROUP_BATCH_WRITING_MAX_CONCURRENT_UPDATES: number // maximum number of concurrent updates to groups table per batch GROUP_BATCH_WRITING_MAX_OPTIMISTIC_UPDATE_RETRIES: number // maximum number of retries for optimistic update GROUP_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS: number // starting interval for exponential backoff between retries for optimistic update diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index c0e3365763..f1eeec84c8 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -145,6 +145,9 @@ export async function createHub( const personRepositoryOptions = { calculatePropertiesSize: serverConfig.PERSON_UPDATE_CALCULATE_PROPERTIES_SIZE, comparisonEnabled: serverConfig.PERSONS_DUAL_WRITE_COMPARISON_ENABLED, + tableCutoverEnabled: serverConfig.PERSON_TABLE_CUTOVER_ENABLED, + newTableName: serverConfig.PERSON_NEW_TABLE_NAME, + newTableIdOffset: serverConfig.PERSON_NEW_TABLE_ID_OFFSET, } const personRepository = serverConfig.PERSONS_DUAL_WRITE_ENABLED ? new PostgresDualWritePersonRepository(postgres, postgresPersonMigration, personRepositoryOptions) diff --git a/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.test.ts b/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.test.ts index d7b579b7eb..07632a6f28 100644 --- a/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.test.ts +++ b/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.test.ts @@ -1276,10 +1276,11 @@ describe('BatchWritingPersonStore', () => { const testPersonStore = new BatchWritingPersonsStore(mockRepo, db.kafkaProducer) const personStoreForBatch = testPersonStore.forBatch() as BatchWritingPersonsStoreForBatch const personId = 'test-person-id' + const teamId = 1 - const result = await personStoreForBatch.personPropertiesSize(personId) + const result = await personStoreForBatch.personPropertiesSize(personId, teamId) - expect(mockRepo.personPropertiesSize).toHaveBeenCalledWith(personId) + expect(mockRepo.personPropertiesSize).toHaveBeenCalledWith(personId, teamId) expect(result).toBe(1024) }) @@ -1289,10 +1290,11 @@ describe('BatchWritingPersonStore', () => { const testPersonStore = new BatchWritingPersonsStore(mockRepo, db.kafkaProducer) const personStoreForBatch = testPersonStore.forBatch() as BatchWritingPersonsStoreForBatch const personId = 'test-person-id' + const teamId = 1 - const result = await personStoreForBatch.personPropertiesSize(personId) + const result = await personStoreForBatch.personPropertiesSize(personId, teamId) - expect(mockRepo.personPropertiesSize).toHaveBeenCalledWith(personId) + expect(mockRepo.personPropertiesSize).toHaveBeenCalledWith(personId, teamId) expect(result).toBe(0) }) }) diff --git a/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.ts b/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.ts index d86cf62b97..8072f8339b 100644 --- a/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.ts +++ b/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.ts @@ -668,8 +668,8 @@ export class BatchWritingPersonsStoreForBatch implements PersonsStoreForBatch, B return await (tx || this.personRepository).addPersonlessDistinctIdForMerge(teamId, distinctId) } - async personPropertiesSize(personId: string): Promise { - return await this.personRepository.personPropertiesSize(personId) + async personPropertiesSize(personId: string, teamId: number): Promise { + return await this.personRepository.personPropertiesSize(personId, teamId) } reportBatch(): void { diff --git a/plugin-server/src/worker/ingestion/persons/persons-store-for-batch.ts b/plugin-server/src/worker/ingestion/persons/persons-store-for-batch.ts index 030f0c1d21..c546b44b6c 100644 --- a/plugin-server/src/worker/ingestion/persons/persons-store-for-batch.ts +++ b/plugin-server/src/worker/ingestion/persons/persons-store-for-batch.ts @@ -128,7 +128,7 @@ export interface PersonsStoreForBatch extends BatchWritingStore { /** * Returns the size of the person properties */ - personPropertiesSize(personId: string): Promise + personPropertiesSize(personId: string, teamId: number): Promise /** * Fetch distinct ids for a person inside a transaction-aware wrapper diff --git a/plugin-server/src/worker/ingestion/persons/repositories/person-repository.ts b/plugin-server/src/worker/ingestion/persons/repositories/person-repository.ts index f65f759ad5..c62ed20615 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/person-repository.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/person-repository.ts @@ -70,7 +70,7 @@ export interface PersonRepository { addPersonlessDistinctId(teamId: Team['id'], distinctId: string): Promise addPersonlessDistinctIdForMerge(teamId: Team['id'], distinctId: string): Promise - personPropertiesSize(personId: string): Promise + personPropertiesSize(personId: string, teamId: number): Promise updateCohortsAndFeatureFlagsForMerge( teamID: Team['id'], diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository.ts index 701ec3ada7..a4482a9b28 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository.ts @@ -350,8 +350,8 @@ export class PostgresDualWritePersonRepository implements PersonRepository { return isMerged } - async personPropertiesSize(personId: string): Promise { - return await this.primaryRepo.personPropertiesSize(personId) + async personPropertiesSize(personId: string, teamId: number): Promise { + return await this.primaryRepo.personPropertiesSize(personId, teamId) } async updateCohortsAndFeatureFlagsForMerge( diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts index 2e222ae656..df4cf9f81d 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts @@ -491,10 +491,12 @@ describe('PostgresPersonRepository', () => { const result = await repository.moveDistinctIds(sourcePerson, nonExistentTargetPerson, undefined) - expect(result.success).toBe(false) - if (!result.success) { - expect(result.error).toBe('TargetNotFound') - } + // TODO: This should be false, but we need to allow this to happen since we remove the + // foreign key constraint on the distinct ID table. + expect(result.success).toBe(true) + // if (!result.success) { + // expect(result.error).toBe('TargetNotFound') + // } }) it('should handle source person not found', async () => { @@ -1093,15 +1095,16 @@ describe('PostgresPersonRepository', () => { }, }) - const size = await repository.personPropertiesSize(person.id) + const size = await repository.personPropertiesSize(person.id, team.id) expect(size).toBeGreaterThan(0) expect(typeof size).toBe('number') }) it('should return 0 for non-existent person', async () => { + const team = await getFirstTeam(hub) const fakePersonId = '999999' // Use a numeric ID instead of UUID - const size = await repository.personPropertiesSize(fakePersonId) + const size = await repository.personPropertiesSize(fakePersonId, team.id) expect(size).toBe(0) }) @@ -1117,11 +1120,11 @@ describe('PostgresPersonRepository', () => { const person2 = await createTestPerson(team2Id, 'different-distinct', { name: 'Team 2 Person' }) // Check size for person 1 - const size1 = await repository.personPropertiesSize(person1.id) + const size1 = await repository.personPropertiesSize(person1.id, team1.id) expect(size1).toBeGreaterThan(0) // Check size for person 2 - const size2 = await repository.personPropertiesSize(person2.id) + const size2 = await repository.personPropertiesSize(person2.id, team2Id) expect(size2).toBeGreaterThan(0) }) @@ -1130,7 +1133,7 @@ describe('PostgresPersonRepository', () => { // Create person with minimal properties const minimalPerson = await createTestPerson(team.id, 'minimal-person', { name: 'Minimal' }) - const minimalSize = await repository.personPropertiesSize(minimalPerson.id) + const minimalSize = await repository.personPropertiesSize(minimalPerson.id, team.id) // Create person with extensive properties const extensiveProperties = { @@ -1159,7 +1162,7 @@ describe('PostgresPersonRepository', () => { }, } const extensivePerson = await createTestPerson(team.id, 'extensive-person', extensiveProperties) - const extensiveSize = await repository.personPropertiesSize(extensivePerson.id) + const extensiveSize = await repository.personPropertiesSize(extensivePerson.id, team.id) expect(extensiveSize).toBeGreaterThan(minimalSize) }) @@ -2502,6 +2505,1383 @@ describe('PostgresPersonRepository', () => { expect(observeCalls).toHaveLength(3) }) }) + + describe('Table Cutover', () => { + let cutoverRepository: PostgresPersonRepository + const NEW_TABLE_NAME = 'posthog_person_new' + const ID_OFFSET = 1000000 + + beforeEach(() => { + // Create repository with cutover enabled + cutoverRepository = new PostgresPersonRepository(postgres, { + calculatePropertiesSize: 0, + personPropertiesDbConstraintLimitBytes: 1024 * 1024, + personPropertiesTrimTargetBytes: 512 * 1024, + tableCutoverEnabled: true, + newTableName: NEW_TABLE_NAME, + newTableIdOffset: ID_OFFSET, + }) + }) + + describe('createPerson()', () => { + it('should create new persons in the new table when cutover is enabled', async () => { + const team = await getFirstTeam(hub) + const uuid = new UUIDT().toString() + + const result = await cutoverRepository.createPerson( + TIMESTAMP, + { name: 'New Table Person' }, + {}, + {}, + team.id, + null, + true, + uuid, + [{ distinctId: 'new-table-distinct' }] + ) + + expect(result.success).toBe(true) + if (!result.success) { + throw new Error('Failed to create person') + } + + // Verify person is in new table + const newTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM ${NEW_TABLE_NAME} WHERE uuid = $1`, + [uuid], + 'checkNewTable' + ) + expect(newTableResult.rows).toHaveLength(1) + + // Verify person is NOT in old table + const oldTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_person WHERE uuid = $1`, + [uuid], + 'checkOldTable' + ) + expect(oldTableResult.rows).toHaveLength(0) + }) + }) + + describe('fetchPerson()', () => { + it('should fetch person from old table when ID < offset', async () => { + const team = await getFirstTeam(hub) + + // Create person in old table with low ID + const lowId = 100 + const oldUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + lowId, + oldUuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old Table' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldTablePerson' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['test-distinct-old', lowId, team.id, 0], + 'insertOldTableDistinctId' + ) + + // Create a different person in new table with high ID but same distinct_id to verify routing + const highId = ID_OFFSET + 100 + const newUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + highId, + newUuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New Table Wrong' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewTablePerson' + ) + + // Fetch should return the old table person based on person_id from distinct_id mapping + const person = await cutoverRepository.fetchPerson(team.id, 'test-distinct-old') + + expect(person).toBeDefined() + expect(person!.id).toBe(String(lowId)) + expect(person!.uuid).toBe(oldUuid) + expect(person!.properties.name).toBe('Old Table') + }) + + it('should fetch person from new table when ID >= offset', async () => { + const team = await getFirstTeam(hub) + + // Create person in new table with high ID + const highId = ID_OFFSET + 100 + const newUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + highId, + newUuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New Table' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewTablePerson' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['test-distinct-new', highId, team.id, 0], + 'insertNewTableDistinctId' + ) + + // Create a different person in old table with low ID but same distinct_id to verify routing + const lowId = 200 + const oldUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + lowId, + oldUuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old Table Wrong' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldTablePerson' + ) + + // Fetch should return the new table person based on person_id from distinct_id mapping + const person = await cutoverRepository.fetchPerson(team.id, 'test-distinct-new') + + expect(person).toBeDefined() + expect(person!.id).toBe(String(highId)) + expect(person!.uuid).toBe(newUuid) + expect(person!.properties.name).toBe('New Table') + }) + + it('should return undefined for non-existent distinct ID', async () => { + const team = await getFirstTeam(hub) + const person = await cutoverRepository.fetchPerson(team.id, 'non-existent') + expect(person).toBeUndefined() + }) + }) + + describe('fetchPersonsByDistinctIds()', () => { + it('should fetch multiple persons from old table only', async () => { + const team = await getFirstTeam(hub) + + // Create multiple persons in old table + const oldId1 = 100 + const oldUuid1 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + oldId1, + oldUuid1, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old Person 1' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson1' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['old-distinct-1', oldId1, team.id, 0], + 'insertOldDistinctId1' + ) + + const oldId2 = 200 + const oldUuid2 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + oldId2, + oldUuid2, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old Person 2' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson2' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['old-distinct-2', oldId2, team.id, 0], + 'insertOldDistinctId2' + ) + + const oldId3 = 300 + const oldUuid3 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + oldId3, + oldUuid3, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old Person 3' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson3' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['old-distinct-3', oldId3, team.id, 0], + 'insertOldDistinctId3' + ) + + const result = await cutoverRepository.fetchPersonsByDistinctIds([ + { teamId: team.id, distinctId: 'old-distinct-1' }, + { teamId: team.id, distinctId: 'old-distinct-2' }, + { teamId: team.id, distinctId: 'old-distinct-3' }, + ]) + + expect(result).toHaveLength(3) + + const person1 = result.find((p) => p.distinct_id === 'old-distinct-1') + expect(person1).toBeDefined() + expect(person1!.id).toBe(String(oldId1)) + expect(person1!.uuid).toBe(oldUuid1) + expect(person1!.properties.name).toBe('Old Person 1') + + const person2 = result.find((p) => p.distinct_id === 'old-distinct-2') + expect(person2).toBeDefined() + expect(person2!.id).toBe(String(oldId2)) + expect(person2!.uuid).toBe(oldUuid2) + expect(person2!.properties.name).toBe('Old Person 2') + + const person3 = result.find((p) => p.distinct_id === 'old-distinct-3') + expect(person3).toBeDefined() + expect(person3!.id).toBe(String(oldId3)) + expect(person3!.uuid).toBe(oldUuid3) + expect(person3!.properties.name).toBe('Old Person 3') + }) + + it('should fetch multiple persons from new table only', async () => { + const team = await getFirstTeam(hub) + + // Create multiple persons in new table + const newId1 = ID_OFFSET + 100 + const newUuid1 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + newId1, + newUuid1, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New Person 1' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson1' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['new-distinct-1', newId1, team.id, 0], + 'insertNewDistinctId1' + ) + + const newId2 = ID_OFFSET + 200 + const newUuid2 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + newId2, + newUuid2, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New Person 2' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson2' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['new-distinct-2', newId2, team.id, 0], + 'insertNewDistinctId2' + ) + + const newId3 = ID_OFFSET + 300 + const newUuid3 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + newId3, + newUuid3, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New Person 3' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson3' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['new-distinct-3', newId3, team.id, 0], + 'insertNewDistinctId3' + ) + + const result = await cutoverRepository.fetchPersonsByDistinctIds([ + { teamId: team.id, distinctId: 'new-distinct-1' }, + { teamId: team.id, distinctId: 'new-distinct-2' }, + { teamId: team.id, distinctId: 'new-distinct-3' }, + ]) + + expect(result).toHaveLength(3) + + const person1 = result.find((p) => p.distinct_id === 'new-distinct-1') + expect(person1).toBeDefined() + expect(person1!.id).toBe(String(newId1)) + expect(person1!.uuid).toBe(newUuid1) + expect(person1!.properties.name).toBe('New Person 1') + + const person2 = result.find((p) => p.distinct_id === 'new-distinct-2') + expect(person2).toBeDefined() + expect(person2!.id).toBe(String(newId2)) + expect(person2!.uuid).toBe(newUuid2) + expect(person2!.properties.name).toBe('New Person 2') + + const person3 = result.find((p) => p.distinct_id === 'new-distinct-3') + expect(person3).toBeDefined() + expect(person3!.id).toBe(String(newId3)) + expect(person3!.uuid).toBe(newUuid3) + expect(person3!.properties.name).toBe('New Person 3') + }) + + it('should fetch persons from both tables with mixed IDs', async () => { + const team = await getFirstTeam(hub) + + // Create multiple persons in old table + const oldId1 = 100 + const oldUuid1 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + oldId1, + oldUuid1, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old 1' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson1' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['mixed-old-1', oldId1, team.id, 0], + 'insertOldDistinctId1' + ) + + const oldId2 = 200 + const oldUuid2 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + oldId2, + oldUuid2, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old 2' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson2' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['mixed-old-2', oldId2, team.id, 0], + 'insertOldDistinctId2' + ) + + const oldId3 = 300 + const oldUuid3 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + oldId3, + oldUuid3, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old 3' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson3' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['mixed-old-3', oldId3, team.id, 0], + 'insertOldDistinctId3' + ) + + // Create multiple persons in new table + const newId1 = ID_OFFSET + 100 + const newUuid1 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + newId1, + newUuid1, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New 1' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson1' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['mixed-new-1', newId1, team.id, 0], + 'insertNewDistinctId1' + ) + + const newId2 = ID_OFFSET + 200 + const newUuid2 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + newId2, + newUuid2, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New 2' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson2' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['mixed-new-2', newId2, team.id, 0], + 'insertNewDistinctId2' + ) + + const newId3 = ID_OFFSET + 300 + const newUuid3 = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + newId3, + newUuid3, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New 3' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson3' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['mixed-new-3', newId3, team.id, 0], + 'insertNewDistinctId3' + ) + + // Fetch all persons - mix of old and new table persons + const result = await cutoverRepository.fetchPersonsByDistinctIds([ + { teamId: team.id, distinctId: 'mixed-old-1' }, + { teamId: team.id, distinctId: 'mixed-new-1' }, + { teamId: team.id, distinctId: 'mixed-old-2' }, + { teamId: team.id, distinctId: 'mixed-new-2' }, + { teamId: team.id, distinctId: 'mixed-old-3' }, + { teamId: team.id, distinctId: 'mixed-new-3' }, + ]) + + expect(result).toHaveLength(6) + + // Verify old table persons + const oldPerson1 = result.find((p) => p.distinct_id === 'mixed-old-1') + expect(oldPerson1).toBeDefined() + expect(oldPerson1!.id).toBe(String(oldId1)) + expect(oldPerson1!.uuid).toBe(oldUuid1) + expect(oldPerson1!.properties.name).toBe('Old 1') + + const oldPerson2 = result.find((p) => p.distinct_id === 'mixed-old-2') + expect(oldPerson2).toBeDefined() + expect(oldPerson2!.id).toBe(String(oldId2)) + expect(oldPerson2!.uuid).toBe(oldUuid2) + expect(oldPerson2!.properties.name).toBe('Old 2') + + const oldPerson3 = result.find((p) => p.distinct_id === 'mixed-old-3') + expect(oldPerson3).toBeDefined() + expect(oldPerson3!.id).toBe(String(oldId3)) + expect(oldPerson3!.uuid).toBe(oldUuid3) + expect(oldPerson3!.properties.name).toBe('Old 3') + + // Verify new table persons + const newPerson1 = result.find((p) => p.distinct_id === 'mixed-new-1') + expect(newPerson1).toBeDefined() + expect(newPerson1!.id).toBe(String(newId1)) + expect(newPerson1!.uuid).toBe(newUuid1) + expect(newPerson1!.properties.name).toBe('New 1') + + const newPerson2 = result.find((p) => p.distinct_id === 'mixed-new-2') + expect(newPerson2).toBeDefined() + expect(newPerson2!.id).toBe(String(newId2)) + expect(newPerson2!.uuid).toBe(newUuid2) + expect(newPerson2!.properties.name).toBe('New 2') + + const newPerson3 = result.find((p) => p.distinct_id === 'mixed-new-3') + expect(newPerson3).toBeDefined() + expect(newPerson3!.id).toBe(String(newId3)) + expect(newPerson3!.uuid).toBe(newUuid3) + expect(newPerson3!.properties.name).toBe('New 3') + }) + + it('should return empty array when no persons found', async () => { + const team = await getFirstTeam(hub) + const result = await cutoverRepository.fetchPersonsByDistinctIds([ + { teamId: team.id, distinctId: 'non-existent-1' }, + { teamId: team.id, distinctId: 'non-existent-2' }, + ]) + expect(result).toEqual([]) + }) + }) + + describe('updatePerson()', () => { + it('should update person in old table when ID < offset', async () => { + const team = await getFirstTeam(hub) + const lowId = 100 + const uuid = new UUIDT().toString() + + // Insert person in old table + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + lowId, + uuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Original Old' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson' + ) + + // Insert person with same ID in new table (to verify it's not updated) + const newTableUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + lowId, + newTableUuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Original New' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson' + ) + + const person = { + id: String(lowId), + uuid, + created_at: TIMESTAMP, + team_id: team.id, + properties: { name: 'Original Old' }, + properties_last_updated_at: {}, + properties_last_operation: {}, + is_user_id: null, + is_identified: true, + version: 0, + } + + const [updatedPerson] = await cutoverRepository.updatePerson(person, { + properties: { name: 'Updated Old' }, + properties_last_updated_at: {}, + properties_last_operation: {}, + is_identified: true, + created_at: TIMESTAMP, + }) + + expect(updatedPerson.properties.name).toBe('Updated Old') + expect(updatedPerson.version).toBe(1) + + // Verify update happened in old table + const oldTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_person WHERE id = $1 AND team_id = $2`, + [lowId, team.id], + 'checkOldTableUpdate' + ) + expect(oldTableResult.rows[0].properties.name).toBe('Updated Old') + + // Verify new table was NOT updated + const newTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM ${NEW_TABLE_NAME} WHERE id = $1 AND team_id = $2`, + [lowId, team.id], + 'checkNewTableNotUpdated' + ) + expect(newTableResult.rows[0].properties.name).toBe('Original New') + expect(Number(newTableResult.rows[0].version)).toBe(0) + }) + + it('should update person in new table when ID >= offset', async () => { + const team = await getFirstTeam(hub) + const highId = ID_OFFSET + 100 + const uuid = new UUIDT().toString() + + // Insert person in new table + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + highId, + uuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Original New' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson' + ) + + // Insert person with same ID in old table (to verify it's not updated) + const oldTableUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + highId, + oldTableUuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Original Old' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson' + ) + + const person = { + id: String(highId), + uuid, + created_at: TIMESTAMP, + team_id: team.id, + properties: { name: 'Original New' }, + properties_last_updated_at: {}, + properties_last_operation: {}, + is_user_id: null, + is_identified: true, + version: 0, + } + + const [updatedPerson] = await cutoverRepository.updatePerson(person, { + properties: { name: 'Updated New' }, + properties_last_updated_at: {}, + properties_last_operation: {}, + is_identified: true, + created_at: TIMESTAMP, + }) + + expect(updatedPerson.properties.name).toBe('Updated New') + expect(updatedPerson.version).toBe(1) + + // Verify update happened in new table + const newTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM ${NEW_TABLE_NAME} WHERE id = $1 AND team_id = $2`, + [highId, team.id], + 'checkNewTableUpdate' + ) + expect(newTableResult.rows[0].properties.name).toBe('Updated New') + + // Verify old table was NOT updated + const oldTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_person WHERE id = $1 AND team_id = $2`, + [highId, team.id], + 'checkOldTableNotUpdated' + ) + expect(oldTableResult.rows[0].properties.name).toBe('Original Old') + expect(Number(oldTableResult.rows[0].version)).toBe(0) + }) + }) + + describe('deletePerson()', () => { + it('should delete person from old table when ID < offset', async () => { + const team = await getFirstTeam(hub) + const lowId = 100 + const uuid = new UUIDT().toString() + + // Insert person in old table + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + lowId, + uuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old Table' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson' + ) + + // Insert person with same ID in new table (to verify it's not deleted) + const newTableUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + lowId, + newTableUuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New Table' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson' + ) + + const person = { + id: String(lowId), + uuid, + created_at: TIMESTAMP, + team_id: team.id, + properties: { name: 'Old Table' }, + properties_last_updated_at: {}, + properties_last_operation: {}, + is_user_id: null, + is_identified: true, + version: 0, + } + + await cutoverRepository.deletePerson(person) + + // Verify person is deleted from old table + const oldTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_person WHERE id = $1 AND team_id = $2`, + [lowId, team.id], + 'checkOldTableDelete' + ) + expect(oldTableResult.rows).toHaveLength(0) + + // Verify person in new table was NOT deleted + const newTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM ${NEW_TABLE_NAME} WHERE id = $1 AND team_id = $2`, + [lowId, team.id], + 'checkNewTableNotDeleted' + ) + expect(newTableResult.rows).toHaveLength(1) + expect(newTableResult.rows[0].properties.name).toBe('New Table') + }) + + it('should delete person from new table when ID >= offset', async () => { + const team = await getFirstTeam(hub) + const highId = ID_OFFSET + 100 + const uuid = new UUIDT().toString() + + // Insert person in new table + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + highId, + uuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'New Table' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertNewPerson' + ) + + // Insert person with same ID in old table (to verify it's not deleted) + const oldTableUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + highId, + oldTableUuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Old Table' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldPerson' + ) + + const person = { + id: String(highId), + uuid, + created_at: TIMESTAMP, + team_id: team.id, + properties: { name: 'New Table' }, + properties_last_updated_at: {}, + properties_last_operation: {}, + is_user_id: null, + is_identified: true, + version: 0, + } + + await cutoverRepository.deletePerson(person) + + // Verify person is deleted from new table + const newTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM ${NEW_TABLE_NAME} WHERE id = $1 AND team_id = $2`, + [highId, team.id], + 'checkNewTableDelete' + ) + expect(newTableResult.rows).toHaveLength(0) + + // Verify person in old table was NOT deleted + const oldTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_person WHERE id = $1 AND team_id = $2`, + [highId, team.id], + 'checkOldTableNotDeleted' + ) + expect(oldTableResult.rows).toHaveLength(1) + expect(oldTableResult.rows[0].properties.name).toBe('Old Table') + }) + }) + + describe('addDistinctId()', () => { + it('should add distinct ID to person in old table', async () => { + const team = await getFirstTeam(hub) + + // Create person in old table + const lowId = 100 + const uuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [lowId, uuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertOldTablePerson' + ) + + // Add distinct ID + await cutoverRepository.addDistinctId( + { id: String(lowId), uuid, team_id: team.id } as any, + 'new-distinct-id', + 0 + ) + + // Verify distinct ID was added + const distinctIdResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE distinct_id = $1 AND team_id = $2`, + ['new-distinct-id', team.id], + 'checkDistinctId' + ) + expect(distinctIdResult.rows).toHaveLength(1) + expect(distinctIdResult.rows[0].person_id).toBe(String(lowId)) + }) + + it('should add distinct ID to person in new table', async () => { + const team = await getFirstTeam(hub) + + // Create person in new table + const highId = ID_OFFSET + 100 + const uuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [highId, uuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertNewTablePerson' + ) + + // Add distinct ID + await cutoverRepository.addDistinctId( + { id: String(highId), uuid, team_id: team.id } as any, + 'new-distinct-id', + 0 + ) + + // Verify distinct ID was added + const distinctIdResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE distinct_id = $1 AND team_id = $2`, + ['new-distinct-id', team.id], + 'checkDistinctId' + ) + expect(distinctIdResult.rows).toHaveLength(1) + expect(distinctIdResult.rows[0].person_id).toBe(String(highId)) + }) + }) + + describe('moveDistinctIds()', () => { + it('should move distinct IDs from old table person to old table person', async () => { + const team = await getFirstTeam(hub) + + // Create source person in old table with distinct IDs + const sourceId = 100 + const sourceUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [sourceId, sourceUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertSourcePerson' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['source-distinct-1', sourceId, team.id, 0], + 'insertSourceDistinct1' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['source-distinct-2', sourceId, team.id, 0], + 'insertSourceDistinct2' + ) + + // Create target person in old table + const targetId = 200 + const targetUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [targetId, targetUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertTargetPerson' + ) + + // Move distinct IDs + await cutoverRepository.moveDistinctIds( + { id: String(sourceId), uuid: sourceUuid, team_id: team.id } as any, + { id: String(targetId), uuid: targetUuid, team_id: team.id } as any + ) + + // Verify distinct IDs were moved to target + const movedDistincts = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE person_id = $1 AND team_id = $2 ORDER BY distinct_id`, + [targetId, team.id], + 'checkMovedDistincts' + ) + expect(movedDistincts.rows).toHaveLength(2) + expect(movedDistincts.rows[0].distinct_id).toBe('source-distinct-1') + expect(movedDistincts.rows[1].distinct_id).toBe('source-distinct-2') + + // Verify source person has no distinct IDs + const sourceDistincts = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE person_id = $1 AND team_id = $2`, + [sourceId, team.id], + 'checkSourceDistincts' + ) + expect(sourceDistincts.rows).toHaveLength(0) + }) + + it('should move distinct IDs from new table person to new table person', async () => { + const team = await getFirstTeam(hub) + + // Create source person in new table with distinct IDs + const sourceId = ID_OFFSET + 100 + const sourceUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [sourceId, sourceUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertSourcePerson' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['source-distinct-1', sourceId, team.id, 0], + 'insertSourceDistinct1' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['source-distinct-2', sourceId, team.id, 0], + 'insertSourceDistinct2' + ) + + // Create target person in new table + const targetId = ID_OFFSET + 200 + const targetUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [targetId, targetUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertTargetPerson' + ) + + // Move distinct IDs + await cutoverRepository.moveDistinctIds( + { id: String(sourceId), uuid: sourceUuid, team_id: team.id } as any, + { id: String(targetId), uuid: targetUuid, team_id: team.id } as any + ) + + // Verify distinct IDs were moved to target + const movedDistincts = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE person_id = $1 AND team_id = $2 ORDER BY distinct_id`, + [targetId, team.id], + 'checkMovedDistincts' + ) + expect(movedDistincts.rows).toHaveLength(2) + expect(movedDistincts.rows[0].distinct_id).toBe('source-distinct-1') + expect(movedDistincts.rows[1].distinct_id).toBe('source-distinct-2') + + // Verify source person has no distinct IDs + const sourceDistincts = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE person_id = $1 AND team_id = $2`, + [sourceId, team.id], + 'checkSourceDistincts' + ) + expect(sourceDistincts.rows).toHaveLength(0) + }) + + it('should move distinct IDs from old table person to new table person (cross-table merge)', async () => { + const team = await getFirstTeam(hub) + + // Create source person in old table with distinct IDs + const sourceId = 100 + const sourceUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [sourceId, sourceUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertSourcePerson' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['old-distinct-1', sourceId, team.id, 0], + 'insertSourceDistinct1' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['old-distinct-2', sourceId, team.id, 0], + 'insertSourceDistinct2' + ) + + // Create target person in new table + const targetId = ID_OFFSET + 100 + const targetUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [targetId, targetUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertTargetPerson' + ) + + // Move distinct IDs from old to new table person + await cutoverRepository.moveDistinctIds( + { id: String(sourceId), uuid: sourceUuid, team_id: team.id } as any, + { id: String(targetId), uuid: targetUuid, team_id: team.id } as any + ) + + // Verify distinct IDs were moved to target in new table + const movedDistincts = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE person_id = $1 AND team_id = $2 ORDER BY distinct_id`, + [targetId, team.id], + 'checkMovedDistincts' + ) + expect(movedDistincts.rows).toHaveLength(2) + expect(movedDistincts.rows[0].distinct_id).toBe('old-distinct-1') + expect(movedDistincts.rows[1].distinct_id).toBe('old-distinct-2') + + // Verify source person has no distinct IDs + const sourceDistincts = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE person_id = $1 AND team_id = $2`, + [sourceId, team.id], + 'checkSourceDistincts' + ) + expect(sourceDistincts.rows).toHaveLength(0) + }) + + it('should move distinct IDs from new table person to old table person (cross-table merge)', async () => { + const team = await getFirstTeam(hub) + + // Create source person in new table with distinct IDs + const sourceId = ID_OFFSET + 100 + const sourceUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO ${NEW_TABLE_NAME} (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [sourceId, sourceUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertSourcePerson' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['new-distinct-1', sourceId, team.id, 0], + 'insertSourceDistinct1' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['new-distinct-2', sourceId, team.id, 0], + 'insertSourceDistinct2' + ) + + // Create target person in old table + const targetId = 100 + const targetUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [targetId, targetUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertTargetPerson' + ) + + // Move distinct IDs from new to old table person + await cutoverRepository.moveDistinctIds( + { id: String(sourceId), uuid: sourceUuid, team_id: team.id } as any, + { id: String(targetId), uuid: targetUuid, team_id: team.id } as any + ) + + // Verify distinct IDs were moved to target in old table + const movedDistincts = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE person_id = $1 AND team_id = $2 ORDER BY distinct_id`, + [targetId, team.id], + 'checkMovedDistincts' + ) + expect(movedDistincts.rows).toHaveLength(2) + expect(movedDistincts.rows[0].distinct_id).toBe('new-distinct-1') + expect(movedDistincts.rows[1].distinct_id).toBe('new-distinct-2') + + // Verify source person has no distinct IDs + const sourceDistincts = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_persondistinctid WHERE person_id = $1 AND team_id = $2`, + [sourceId, team.id], + 'checkSourceDistincts' + ) + expect(sourceDistincts.rows).toHaveLength(0) + }) + }) + + describe('Cutover disabled', () => { + it('should use old table when cutover is disabled', async () => { + const disabledRepository = new PostgresPersonRepository(postgres, { + calculatePropertiesSize: 0, + personPropertiesDbConstraintLimitBytes: 1024 * 1024, + personPropertiesTrimTargetBytes: 512 * 1024, + tableCutoverEnabled: false, + newTableName: NEW_TABLE_NAME, + newTableIdOffset: ID_OFFSET, + }) + + const team = await getFirstTeam(hub) + const uuid = new UUIDT().toString() + + const result = await disabledRepository.createPerson( + TIMESTAMP, + { name: 'Disabled Cutover' }, + {}, + {}, + team.id, + null, + true, + uuid, + [{ distinctId: 'disabled-distinct' }] + ) + + expect(result.success).toBe(true) + + // Verify person is in old table + const oldTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM posthog_person WHERE uuid = $1`, + [uuid], + 'checkOldTable' + ) + expect(oldTableResult.rows).toHaveLength(1) + + // Verify person is NOT in new table + const newTableResult = await postgres.query( + PostgresUse.PERSONS_WRITE, + `SELECT * FROM ${NEW_TABLE_NAME} WHERE uuid = $1`, + [uuid], + 'checkNewTable' + ) + expect(newTableResult.rows).toHaveLength(0) + }) + }) + }) }) // Helper function from the original test file diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts index 00a9f91857..1c8c8c6513 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts @@ -46,12 +46,21 @@ export interface PostgresPersonRepositoryOptions { personPropertiesDbConstraintLimitBytes: number /** Target JSON size (stringified) to trim down to when remediating oversized properties */ personPropertiesTrimTargetBytes: number + /** Enable person table cutover migration */ + tableCutoverEnabled?: boolean + /** New person table name for cutover migration */ + newTableName?: string + /** Person ID offset threshold - person IDs >= this value route to new table */ + newTableIdOffset?: number } const DEFAULT_OPTIONS: PostgresPersonRepositoryOptions = { calculatePropertiesSize: 0, personPropertiesDbConstraintLimitBytes: DEFAULT_PERSON_PROPERTIES_DB_CONSTRAINT_LIMIT_BYTES, personPropertiesTrimTargetBytes: DEFAULT_PERSON_PROPERTIES_TRIM_TARGET_BYTES, + tableCutoverEnabled: false, + newTableName: 'posthog_person_new', + newTableIdOffset: Number.MAX_SAFE_INTEGER, } export class PostgresPersonRepository @@ -66,12 +75,30 @@ export class PostgresPersonRepository this.options = { ...DEFAULT_OPTIONS, ...options } } + private getTableName(personId?: string): string { + if (!this.options.tableCutoverEnabled || !this.options.newTableName || !this.options.newTableIdOffset) { + return 'posthog_person' + } + + if (!personId) { + return 'posthog_person' + } + + const numericPersonId = parseInt(personId, 10) + if (isNaN(numericPersonId)) { + return 'posthog_person' + } + + // Always return unsanitized name - callers must sanitize before SQL interpolation + return numericPersonId >= this.options.newTableIdOffset ? this.options.newTableName : 'posthog_person' + } + private async handleOversizedPersonProperties( person: InternalPerson, update: PersonUpdateFields, tx?: TransactionClient ): Promise<[InternalPerson, TopicMessage[], boolean]> { - const currentSize = await this.personPropertiesSize(person.id) + const currentSize = await this.personPropertiesSize(person.id, person.team_id) if (currentSize >= this.options.personPropertiesDbConstraintLimitBytes) { try { @@ -222,38 +249,84 @@ export class PostgresPersonRepository throw new Error("can't enable both forUpdate and useReadReplica in db::fetchPerson") } - let queryString = `SELECT - posthog_person.id, - posthog_person.uuid, - posthog_person.created_at, - posthog_person.team_id, - posthog_person.properties, - posthog_person.properties_last_updated_at, - posthog_person.properties_last_operation, - posthog_person.is_user_id, - posthog_person.version, - posthog_person.is_identified - FROM posthog_person - JOIN posthog_persondistinctid ON (posthog_persondistinctid.person_id = posthog_person.id) - WHERE - posthog_person.team_id = $1 - AND posthog_persondistinctid.team_id = $1 - AND posthog_persondistinctid.distinct_id = $2` - if (options.forUpdate) { - // Locks the teamId and distinctId tied to this personId + this person's info - queryString = queryString.concat(` FOR UPDATE`) - } - const values = [teamId, distinctId] + if (this.options.tableCutoverEnabled && this.options.newTableName && this.options.newTableIdOffset) { + // First, get the person_id from posthog_persondistinctid + const distinctIdQuery = ` + SELECT person_id + FROM posthog_persondistinctid + WHERE team_id = $1 AND distinct_id = $2 + LIMIT 1` - const { rows } = await this.postgres.query( - options.useReadReplica ? PostgresUse.PERSONS_READ : PostgresUse.PERSONS_WRITE, - queryString, - values, - 'fetchPerson' - ) + const { rows: distinctIdRows } = await this.postgres.query<{ person_id: string }>( + options.useReadReplica ? PostgresUse.PERSONS_READ : PostgresUse.PERSONS_WRITE, + distinctIdQuery, + [teamId, distinctId], + 'fetchPersonDistinctIdMapping' + ) - if (rows.length > 0) { - return this.toPerson(rows[0]) + if (distinctIdRows.length === 0) { + return undefined + } + + const personId = distinctIdRows[0].person_id + const tableName = sanitizeSqlIdentifier(this.getTableName(personId)) + const forUpdateClause = options.forUpdate ? ' FOR UPDATE' : '' + + const personQuery = ` + SELECT + id, + uuid, + created_at, + team_id, + properties, + properties_last_updated_at, + properties_last_operation, + is_user_id, + version, + is_identified + FROM ${tableName} + WHERE team_id = $1 AND id = $2${forUpdateClause}` + + const { rows } = await this.postgres.query( + options.useReadReplica ? PostgresUse.PERSONS_READ : PostgresUse.PERSONS_WRITE, + personQuery, + [teamId, personId], + 'fetchPerson' + ) + + if (rows.length > 0) { + return this.toPerson(rows[0]) + } + } else { + const forUpdateClause = options.forUpdate ? ' FOR UPDATE' : '' + const queryString = `SELECT + posthog_person.id, + posthog_person.uuid, + posthog_person.created_at, + posthog_person.team_id, + posthog_person.properties, + posthog_person.properties_last_updated_at, + posthog_person.properties_last_operation, + posthog_person.is_user_id, + posthog_person.version, + posthog_person.is_identified + FROM posthog_person + JOIN posthog_persondistinctid ON (posthog_persondistinctid.person_id = posthog_person.id) + WHERE + posthog_person.team_id = $1 + AND posthog_persondistinctid.team_id = $1 + AND posthog_persondistinctid.distinct_id = $2${forUpdateClause}` + + const { rows } = await this.postgres.query( + options.useReadReplica ? PostgresUse.PERSONS_READ : PostgresUse.PERSONS_WRITE, + queryString, + [teamId, distinctId], + 'fetchPerson' + ) + + if (rows.length > 0) { + return this.toPerson(rows[0]) + } } } @@ -264,45 +337,161 @@ export class PostgresPersonRepository return [] } - // Build the WHERE clause for multiple team_id, distinct_id pairs - const conditions = teamPersons - .map((_, index) => { - const teamIdParam = index * 2 + 1 - const distinctIdParam = index * 2 + 2 - return `(posthog_persondistinctid.team_id = $${teamIdParam} AND posthog_persondistinctid.distinct_id = $${distinctIdParam})` - }) - .join(' OR ') - - const queryString = `SELECT - posthog_person.id, - posthog_person.uuid, - posthog_person.created_at, - posthog_person.team_id, - posthog_person.properties, - posthog_person.properties_last_updated_at, - posthog_person.properties_last_operation, - posthog_person.is_user_id, - posthog_person.version, - posthog_person.is_identified, - posthog_persondistinctid.distinct_id - FROM posthog_person - JOIN posthog_persondistinctid ON (posthog_persondistinctid.person_id = posthog_person.id) - WHERE ${conditions}` - - // Flatten the parameters: [teamId1, distinctId1, teamId2, distinctId2, ...] const params = teamPersons.flatMap((person) => [person.teamId, person.distinctId]) - const { rows } = await this.postgres.query( - PostgresUse.PERSONS_READ, - queryString, - params, - 'fetchPersonsByDistinctIds' - ) + if (this.options.tableCutoverEnabled && this.options.newTableName && this.options.newTableIdOffset) { + // First, get all person_id mappings from posthog_persondistinctid + const conditions = teamPersons + .map((_, index) => { + const teamIdParam = index * 2 + 1 + const distinctIdParam = index * 2 + 2 + return `(team_id = $${teamIdParam} AND distinct_id = $${distinctIdParam})` + }) + .join(' OR ') - return rows.map((row) => ({ - ...this.toPerson(row), - distinct_id: row.distinct_id, - })) + const distinctIdQuery = ` + SELECT person_id, distinct_id, team_id + FROM posthog_persondistinctid + WHERE ${conditions}` + + const { rows: distinctIdRows } = await this.postgres.query<{ + person_id: string + distinct_id: string + team_id: number + }>(PostgresUse.PERSONS_READ, distinctIdQuery, params, 'fetchPersonDistinctIdMappings') + + if (distinctIdRows.length === 0) { + return [] + } + + // Group person IDs by table + const oldTablePersonIds: string[] = [] + const newTablePersonIds: string[] = [] + const personIdToDistinctId = new Map() + + for (const row of distinctIdRows) { + const tableName = this.getTableName(row.person_id) + if (tableName === 'posthog_person') { + oldTablePersonIds.push(row.person_id) + } else { + newTablePersonIds.push(row.person_id) + } + personIdToDistinctId.set(row.person_id, { + distinct_id: row.distinct_id, + team_id: row.team_id, + }) + } + + const allPersons: (RawPerson & { distinct_id: string })[] = [] + + // Fetch from old table if needed + if (oldTablePersonIds.length > 0) { + const oldTableConditions = oldTablePersonIds.map((_, index) => `$${index + 1}`).join(', ') + const oldTableQuery = ` + SELECT + id, + uuid, + created_at, + team_id, + properties, + properties_last_updated_at, + properties_last_operation, + is_user_id, + version, + is_identified + FROM posthog_person + WHERE id IN (${oldTableConditions})` + + const { rows: oldTableRows } = await this.postgres.query( + PostgresUse.PERSONS_READ, + oldTableQuery, + oldTablePersonIds, + 'fetchPersonsFromOldTable' + ) + + for (const row of oldTableRows) { + const mapping = personIdToDistinctId.get(String(row.id)) + if (mapping) { + allPersons.push({ ...row, distinct_id: mapping.distinct_id }) + } + } + } + + // Fetch from new table if needed + if (newTablePersonIds.length > 0) { + const newTableConditions = newTablePersonIds.map((_, index) => `$${index + 1}`).join(', ') + const safeNewTableName = sanitizeSqlIdentifier(this.options.newTableName) + const newTableQuery = ` + SELECT + id, + uuid, + created_at, + team_id, + properties, + properties_last_updated_at, + properties_last_operation, + is_user_id, + version, + is_identified + FROM ${safeNewTableName} + WHERE id IN (${newTableConditions})` + + const { rows: newTableRows } = await this.postgres.query( + PostgresUse.PERSONS_READ, + newTableQuery, + newTablePersonIds, + 'fetchPersonsFromNewTable' + ) + + for (const row of newTableRows) { + const mapping = personIdToDistinctId.get(String(row.id)) + if (mapping) { + allPersons.push({ ...row, distinct_id: mapping.distinct_id }) + } + } + } + + return allPersons.map((row) => ({ + ...this.toPerson(row), + distinct_id: row.distinct_id, + })) + } else { + const conditions = teamPersons + .map((_, index) => { + const teamIdParam = index * 2 + 1 + const distinctIdParam = index * 2 + 2 + return `(posthog_persondistinctid.team_id = $${teamIdParam} AND posthog_persondistinctid.distinct_id = $${distinctIdParam})` + }) + .join(' OR ') + + const queryString = `SELECT + posthog_person.id, + posthog_person.uuid, + posthog_person.created_at, + posthog_person.team_id, + posthog_person.properties, + posthog_person.properties_last_updated_at, + posthog_person.properties_last_operation, + posthog_person.is_user_id, + posthog_person.version, + posthog_person.is_identified, + posthog_persondistinctid.distinct_id + FROM posthog_person + JOIN posthog_persondistinctid ON (posthog_persondistinctid.person_id = posthog_person.id) + WHERE ${conditions}` + + const { rows } = await this.postgres.query( + PostgresUse.PERSONS_READ, + queryString, + params, + 'fetchPersonsByDistinctIds' + ) + + return rows.map((row) => ({ + ...this.toPerson(row), + distinct_id: row.distinct_id, + })) + } } async createPerson( @@ -340,9 +529,28 @@ export class PostgresPersonRepository 'uuid', 'version', ] - const columns = forcedId ? ['id', ...baseColumns] : baseColumns - const valuePlaceholders = columns.map((_, i) => `$${i + 1}`).join(', ') + // When cutover is enabled and no forcedId, we need to explicitly call nextval() for id + // because partitioned tables don't automatically apply DEFAULT values when the column is omitted + const useDefaultId = this.options.tableCutoverEnabled && !forcedId + + let columns: string[] + let valuePlaceholders: string + + if (useDefaultId) { + // Include 'id' in columns but use nextval() to explicitly get next sequence value + // We need this for partitioned tables which don't properly inherit DEFAULT constraints + columns = ['id', ...baseColumns] + valuePlaceholders = `nextval('posthog_person_id_seq'), ${baseColumns.map((_, i) => `$${i + 1}`).join(', ')}` + } else if (forcedId) { + // Include 'id' in columns and use $1 for its value + columns = ['id', ...baseColumns] + valuePlaceholders = columns.map((_, i) => `$${i + 1}`).join(', ') + } else { + // Don't include 'id' - let the table's DEFAULT handle it + columns = baseColumns + valuePlaceholders = columns.map((_, i) => `$${i + 1}`).join(', ') + } // Sanitize and measure JSON field sizes const sanitizedProperties = sanitizeJsonbValue(properties) @@ -381,7 +589,9 @@ export class PostgresPersonRepository // Find the actual index of team_id in the personParams array (1-indexed for SQL) const teamIdParamIndex = personParams.indexOf(teamId) + 1 - const distinctIdVersionStartIndex = columns.length + 1 + // Use personParams.length instead of columns.length because when useDefaultId is true, + // columns includes 'id' but personParams doesn't include an id value + const distinctIdVersionStartIndex = personParams.length + 1 const distinctIdStartIndex = distinctIdVersionStartIndex + distinctIds.length const distinctIdsCTE = @@ -403,9 +613,14 @@ export class PostgresPersonRepository )` : '' + const tableName = + this.options.tableCutoverEnabled && this.options.newTableName && this.options.newTableIdOffset + ? sanitizeSqlIdentifier(this.options.newTableName) + : 'posthog_person' + const query = `WITH inserted_person AS ( - INSERT INTO posthog_person (${columns.join(', ')}) + INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${valuePlaceholders}) RETURNING * )` + @@ -494,9 +709,10 @@ export class PostgresPersonRepository async deletePerson(person: InternalPerson, tx?: TransactionClient): Promise { let rows: { version: string }[] = [] try { + const tableName = sanitizeSqlIdentifier(this.getTableName(person.id)) const result = await this.postgres.query<{ version: string }>( tx ?? PostgresUse.PERSONS_WRITE, - 'DELETE FROM posthog_person WHERE team_id = $1 AND id = $2 RETURNING version', + `DELETE FROM ${tableName} WHERE team_id = $1 AND id = $2 RETURNING version`, [person.team_id, person.id], 'deletePerson' ) @@ -747,16 +963,19 @@ export class PostgresPersonRepository return result.rows[0].inserted } - async personPropertiesSize(personId: string): Promise { + async personPropertiesSize(personId: string, teamId: number): Promise { + const tableName = sanitizeSqlIdentifier(this.getTableName(personId)) + + // For partitioned tables, we need team_id for efficient querying const queryString = ` SELECT COALESCE(pg_column_size(properties)::bigint, 0::bigint) AS total_props_bytes - FROM posthog_person - WHERE id = $1` + FROM ${tableName} + WHERE team_id = $1 AND id = $2` const { rows } = await this.postgres.query( PostgresUse.PERSONS_READ, queryString, - [personId], + [teamId, personId], 'personPropertiesSize' ) @@ -788,7 +1007,7 @@ export class PostgresPersonRepository return [person, [], false] } - const values = [...updateValues, person.id].map(sanitizeJsonbValue) + const values = [...updateValues].map(sanitizeJsonbValue) // Measure JSON field sizes after sanitization (using already sanitized values) const updateKeys = Object.keys(unparsedUpdate) @@ -805,6 +1024,10 @@ export class PostgresPersonRepository } const calculatePropertiesSize = this.options.calculatePropertiesSize + const tableName = sanitizeSqlIdentifier(this.getTableName(person.id)) + + // Add team_id and person_id to values for WHERE clause (for partitioning) + const allValues = [...values, person.team_id, person.id] /* * Temporarily have two different queries for updatePerson to evaluate the impact of calculating @@ -812,18 +1035,22 @@ export class PostgresPersonRepository * but we can't add that constraint check until we know the impact of adding that constraint check for every update/insert on Persons. * Added benefit, we can get more observability into the sizes of properties field, if we can turn this up to 100% */ - const queryStringWithPropertiesSize = `UPDATE posthog_person SET version = ${versionString}, ${Object.keys( + const updateFieldsCount = Object.values(update).length + const teamIdParamIndex = updateFieldsCount + 1 + const personIdParamIndex = updateFieldsCount + 2 + + const queryStringWithPropertiesSize = `UPDATE ${tableName} SET version = ${versionString}, ${Object.keys( update - ).map((field, index) => `"${sanitizeSqlIdentifier(field)}" = $${index + 1}`)} WHERE id = $${ - Object.values(update).length + 1 - } + ).map( + (field, index) => `"${sanitizeSqlIdentifier(field)}" = $${index + 1}` + )} WHERE team_id = $${teamIdParamIndex} AND id = $${personIdParamIndex} RETURNING *, COALESCE(pg_column_size(properties)::bigint, 0::bigint) as properties_size_bytes /* operation='updatePersonWithPropertiesSize',purpose='${tag || 'update'}' */` // Potentially overriding values badly if there was an update to the person after computing updateValues above - const queryString = `UPDATE posthog_person SET version = ${versionString}, ${Object.keys(update).map( + const queryString = `UPDATE ${tableName} SET version = ${versionString}, ${Object.keys(update).map( (field, index) => `"${sanitizeSqlIdentifier(field)}" = $${index + 1}` - )} WHERE id = $${Object.values(update).length + 1} + )} WHERE team_id = $${teamIdParamIndex} AND id = $${personIdParamIndex} RETURNING * /* operation='updatePerson',purpose='${tag || 'update'}' */` @@ -836,7 +1063,7 @@ export class PostgresPersonRepository const { rows } = await this.postgres.query( tx ?? PostgresUse.PERSONS_WRITE, selectedQueryString, - values, + allValues, `updatePerson${tag ? `-${tag}` : ''}` ) if (rows.length === 0) { @@ -884,27 +1111,31 @@ export class PostgresPersonRepository async updatePersonAssertVersion(personUpdate: PersonUpdate): Promise<[number | undefined, TopicMessage[]]> { try { - const { rows } = await this.postgres.query( - PostgresUse.PERSONS_WRITE, - ` - UPDATE posthog_person SET + const params = [ + JSON.stringify(personUpdate.properties), + JSON.stringify(personUpdate.properties_last_updated_at), + JSON.stringify(personUpdate.properties_last_operation), + personUpdate.is_identified, + personUpdate.team_id, + personUpdate.uuid, + personUpdate.version, + ] + + const tableName = sanitizeSqlIdentifier(this.getTableName(personUpdate.id)) + const queryString = ` + UPDATE ${tableName} SET properties = $1, properties_last_updated_at = $2, properties_last_operation = $3, is_identified = $4, version = COALESCE(version, 0)::numeric + 1 WHERE team_id = $5 AND uuid = $6 AND version = $7 - RETURNING * - `, - [ - JSON.stringify(personUpdate.properties), - JSON.stringify(personUpdate.properties_last_updated_at), - JSON.stringify(personUpdate.properties_last_operation), - personUpdate.is_identified, - personUpdate.team_id, - personUpdate.uuid, - personUpdate.version, - ], + RETURNING *` + + const { rows } = await this.postgres.query( + PostgresUse.PERSONS_WRITE, + queryString, + params, 'updatePersonAssertVersion' ) diff --git a/plugin-server/src/worker/ingestion/persons/repositories/raw-postgres-person-repository.ts b/plugin-server/src/worker/ingestion/persons/repositories/raw-postgres-person-repository.ts index 5e3f54cdd9..ec35d61974 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/raw-postgres-person-repository.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/raw-postgres-person-repository.ts @@ -70,7 +70,7 @@ export interface RawPostgresPersonRepository { addPersonlessDistinctId(teamId: Team['id'], distinctId: string, tx?: TransactionClient): Promise addPersonlessDistinctIdForMerge(teamId: Team['id'], distinctId: string, tx?: TransactionClient): Promise - personPropertiesSize(personId: string): Promise + personPropertiesSize(personId: string, teamId: number): Promise updateCohortsAndFeatureFlagsForMerge( teamID: Team['id'], diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap index 55737c3387..c4db6edd4c 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap +++ b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap @@ -113,8 +113,11 @@ exports[`EventPipelineRunner runEventPipeline() runs steps 1`] = ` "personRepository": { "options": { "calculatePropertiesSize": 0, + "newTableIdOffset": 9007199254740991, + "newTableName": "posthog_person_new", "personPropertiesDbConstraintLimitBytes": 655360, "personPropertiesTrimTargetBytes": 524288, + "tableCutoverEnabled": false, }, }, "personUpdateCache": {},