diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts index 601aab36ce..7b4ec5bd7d 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts @@ -2740,7 +2740,7 @@ describe('PostgresPersonRepository', () => { expect(person!.__useNewTable).toBe(true) }) - it('should check old table and mark __useNewTable=false when person only in old table', async () => { + it('should opportunistically copy person from old table to new table and mark __useNewTable=true', async () => { const team = await getFirstTeam(hub) // Create person in old table only @@ -2772,14 +2772,177 @@ describe('PostgresPersonRepository', () => { 'insertDistinctId' ) - // Fetch should check new table first (not found), then old table + // Verify person doesn't exist in new table yet + const { rows: beforeRows } = await postgres.query( + PostgresUse.PERSONS_READ, + 'SELECT * FROM posthog_person_new WHERE id = $1 AND team_id = $2', + [oldId, team.id], + 'checkNewTableBefore' + ) + expect(beforeRows.length).toBe(0) + + // Fetch should check new table first (not found), find in old table, then copy to new table const person = await cutoverRepository.fetchPerson(team.id, 'test-in-old') expect(person).toBeDefined() expect(person!.id).toBe(String(oldId)) expect(person!.uuid).toBe(uuid) expect(person!.properties.name).toBe('In Old Table') - expect(person!.__useNewTable).toBe(false) + // After opportunistic copy, __useNewTable should be true + expect(person!.__useNewTable).toBe(true) + + // Verify person was copied to new table + const { rows: afterRows } = await postgres.query( + PostgresUse.PERSONS_READ, + 'SELECT * FROM posthog_person_new WHERE id = $1 AND team_id = $2', + [oldId, team.id], + 'checkNewTableAfter' + ) + expect(afterRows.length).toBe(1) + expect(afterRows[0].id).toBe(String(oldId)) + expect(afterRows[0].uuid).toBe(uuid) + expect(afterRows[0].properties).toEqual({ name: 'In Old Table' }) + }) + + it('should handle ON CONFLICT gracefully when person already exists in new table', async () => { + const team = await getFirstTeam(hub) + + // Create person in BOTH old and new tables (simulating a partial migration state) + const personId = 100 + const uuid = new UUIDT().toString() + + // Insert in old table + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + personId, + uuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'In Old Table' }), + '{}', + '{}', + null, + true, + 5, + ], + 'insertOldTablePerson' + ) + + // Insert in new table with different version (newer) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person_new (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + personId, + uuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Already in New Table' }), + '{}', + '{}', + null, + true, + 10, + ], + 'insertNewTablePerson' + ) + + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['test-conflict', personId, team.id, 0], + 'insertDistinctId' + ) + + // Fetch should find in new table first (not attempt copy from old) + const person = await cutoverRepository.fetchPerson(team.id, 'test-conflict') + + expect(person).toBeDefined() + expect(person!.__useNewTable).toBe(true) + expect(person!.properties.name).toBe('Already in New Table') + expect(person!.version).toBe(10) // New table version, not old table version + + // Verify new table data is unchanged (ON CONFLICT DO NOTHING didn't overwrite) + const { rows } = await postgres.query( + PostgresUse.PERSONS_READ, + 'SELECT properties, version FROM posthog_person_new WHERE id = $1 AND team_id = $2', + [personId, team.id], + 'checkNewTableUnchanged' + ) + expect(rows.length).toBe(1) + expect(rows[0].properties).toEqual({ name: 'Already in New Table' }) + expect(rows[0].version).toBe('10') + }) + + it('should route subsequent updates to new table after opportunistic copy', async () => { + const team = await getFirstTeam(hub) + + // Create person in old table only + const oldId = 100 + const uuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + oldId, + uuid, + TIMESTAMP.toISO(), + team.id, + JSON.stringify({ name: 'Original' }), + '{}', + '{}', + null, + true, + 0, + ], + 'insertOldTablePerson' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($1, $2, $3, $4)`, + ['test-update-after-copy', oldId, team.id, 0], + 'insertDistinctId' + ) + + // First fetch - triggers opportunistic copy + const person = await cutoverRepository.fetchPerson(team.id, 'test-update-after-copy') + expect(person!.__useNewTable).toBe(true) + + // Update the person - should go to new table (avoiding slow old table trigger) + await cutoverRepository.updatePerson(person!, { + properties: { name: 'Updated in New Table' }, + properties_last_updated_at: {}, + properties_last_operation: {}, + is_identified: true, + created_at: TIMESTAMP, + }) + + // Verify update went to new table + const { rows: newTableRows } = await postgres.query( + PostgresUse.PERSONS_READ, + 'SELECT properties FROM posthog_person_new WHERE id = $1 AND team_id = $2', + [oldId, team.id], + 'checkNewTableUpdate' + ) + expect(newTableRows.length).toBe(1) + expect(newTableRows[0].properties).toEqual({ name: 'Updated in New Table' }) + + // Verify old table was NOT updated (still has original value) + const { rows: oldTableRows } = await postgres.query( + PostgresUse.PERSONS_READ, + 'SELECT properties FROM posthog_person WHERE id = $1 AND team_id = $2', + [oldId, team.id], + 'checkOldTableNotUpdated' + ) + expect(oldTableRows.length).toBe(1) + expect(oldTableRows[0].properties).toEqual({ name: 'Original' }) }) it('should route updates to new table when __useNewTable=true', async () => { @@ -4095,6 +4258,77 @@ describe('PostgresPersonRepository', () => { ) expect(sourceDistincts.rows).toHaveLength(0) }) + + it('should handle full merge with __useNewTable routing (fetch + delete)', async () => { + const team = await getFirstTeam(hub) + + // Create source person in old table + const sourceId = 100 + const sourceUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [sourceId, sourceUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertSourcePerson' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['source-distinct', sourceId, team.id, 0], + 'insertSourceDistinct' + ) + + // Create target person in new table + const targetId = 200 + const targetUuid = new UUIDT().toString() + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_person_new (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [targetId, targetUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0], + 'insertTargetPerson' + ) + await postgres.query( + PostgresUse.PERSONS_WRITE, + `INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`, + ['target-distinct', targetId, team.id, 0], + 'insertTargetDistinct' + ) + + // Fetch both persons + // With opportunistic copy, both will have __useNewTable=true after fetch + const sourcePerson = await cutoverRepository.fetchPerson(team.id, 'source-distinct') + const targetPerson = await cutoverRepository.fetchPerson(team.id, 'target-distinct') + + // After opportunistic copy, source is now in new table too + expect(sourcePerson!.__useNewTable).toBe(true) // copied to new table + expect(targetPerson!.__useNewTable).toBe(true) // already in new table + + // Move distinct IDs from source to target + await cutoverRepository.moveDistinctIds(sourcePerson!, targetPerson!) + + // Delete source person - should route to NEW table based on __useNewTable flag + await cutoverRepository.deletePerson(sourcePerson!) + + // Verify source was deleted from new table + const { rows: newTableRows } = await postgres.query( + PostgresUse.PERSONS_READ, + 'SELECT * FROM posthog_person_new WHERE id = $1 AND team_id = $2', + [sourceId, team.id], + 'verifySourceDeletedFromNew' + ) + expect(newTableRows.length).toBe(0) + + // Verify target still exists in new table + const { rows: targetRows } = await postgres.query( + PostgresUse.PERSONS_READ, + 'SELECT * FROM posthog_person_new WHERE id = $1 AND team_id = $2', + [targetId, team.id], + 'verifyTargetStillExists' + ) + expect(targetRows.length).toBe(1) + }) }) describe('Cutover disabled', () => { diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts index d36ef2c7b0..a663805f21 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts @@ -333,8 +333,63 @@ export class PostgresPersonRepository if (oldTableRows.length > 0) { const person = this.toPerson(oldTableRows[0]) - // Mark that this person exists in the old table - ;(person as any).__useNewTable = false + + // Opportunistically copy person to new table + // This allows all future operations to go directly to new table (avoiding slow triggers) + // Skip copy when using read replica to maintain read-only intent + if (!options.useReadReplica) { + try { + const copyQuery = ` + INSERT INTO ${newTableName} ( + id, + uuid, + created_at, + team_id, + properties, + properties_last_updated_at, + properties_last_operation, + is_user_id, + version, + is_identified + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (team_id, id) DO NOTHING + RETURNING id` + + await this.postgres.query( + PostgresUse.PERSONS_WRITE, + copyQuery, + [ + person.id, + person.uuid, + person.created_at.toISO(), + person.team_id, + sanitizeJsonbValue(person.properties), + sanitizeJsonbValue(person.properties_last_updated_at), + sanitizeJsonbValue(person.properties_last_operation), + person.is_user_id, + person.version, + person.is_identified, + ], + 'copyPersonToNewTable' + ) + + // Person is now in new table, future operations can use it + ;(person as any).__useNewTable = true + } catch (error) { + // If copy fails for any reason, log but continue with old table routing + logger.warn('Failed to copy person to new table', { + error: error instanceof Error ? error.message : String(error), + person_id: person.id, + team_id: person.team_id, + }) + ;(person as any).__useNewTable = false + } + } else { + // When using read replica, don't attempt write operation + ;(person as any).__useNewTable = false + } + return person } } else { @@ -404,7 +459,7 @@ export class PostgresPersonRepository return [] } - // Group person IDs by table + // Group person IDs by table using ID-based routing const oldTablePersonIds: string[] = [] const newTablePersonIds: string[] = [] const personIdToDistinctId = new Map()