feat(cutover): Copy persons on read (#41529)

This commit is contained in:
José Sequeira
2025-11-14 15:34:45 +01:00
committed by GitHub
parent e510d5ad38
commit db6fdd1f04
2 changed files with 295 additions and 6 deletions

View File

@@ -2740,7 +2740,7 @@ describe('PostgresPersonRepository', () => {
expect(person!.__useNewTable).toBe(true)
})
it('should check old table and mark __useNewTable=false when person only in old table', async () => {
it('should opportunistically copy person from old table to new table and mark __useNewTable=true', async () => {
const team = await getFirstTeam(hub)
// Create person in old table only
@@ -2772,14 +2772,177 @@ describe('PostgresPersonRepository', () => {
'insertDistinctId'
)
// Fetch should check new table first (not found), then old table
// Verify person doesn't exist in new table yet
const { rows: beforeRows } = await postgres.query(
PostgresUse.PERSONS_READ,
'SELECT * FROM posthog_person_new WHERE id = $1 AND team_id = $2',
[oldId, team.id],
'checkNewTableBefore'
)
expect(beforeRows.length).toBe(0)
// Fetch should check new table first (not found), find in old table, then copy to new table
const person = await cutoverRepository.fetchPerson(team.id, 'test-in-old')
expect(person).toBeDefined()
expect(person!.id).toBe(String(oldId))
expect(person!.uuid).toBe(uuid)
expect(person!.properties.name).toBe('In Old Table')
expect(person!.__useNewTable).toBe(false)
// After opportunistic copy, __useNewTable should be true
expect(person!.__useNewTable).toBe(true)
// Verify person was copied to new table
const { rows: afterRows } = await postgres.query(
PostgresUse.PERSONS_READ,
'SELECT * FROM posthog_person_new WHERE id = $1 AND team_id = $2',
[oldId, team.id],
'checkNewTableAfter'
)
expect(afterRows.length).toBe(1)
expect(afterRows[0].id).toBe(String(oldId))
expect(afterRows[0].uuid).toBe(uuid)
expect(afterRows[0].properties).toEqual({ name: 'In Old Table' })
})
it('should handle ON CONFLICT gracefully when person already exists in new table', async () => {
const team = await getFirstTeam(hub)
// Create person in BOTH old and new tables (simulating a partial migration state)
const personId = 100
const uuid = new UUIDT().toString()
// Insert in old table
await postgres.query(
PostgresUse.PERSONS_WRITE,
`INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
[
personId,
uuid,
TIMESTAMP.toISO(),
team.id,
JSON.stringify({ name: 'In Old Table' }),
'{}',
'{}',
null,
true,
5,
],
'insertOldTablePerson'
)
// Insert in new table with different version (newer)
await postgres.query(
PostgresUse.PERSONS_WRITE,
`INSERT INTO posthog_person_new (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
[
personId,
uuid,
TIMESTAMP.toISO(),
team.id,
JSON.stringify({ name: 'Already in New Table' }),
'{}',
'{}',
null,
true,
10,
],
'insertNewTablePerson'
)
await postgres.query(
PostgresUse.PERSONS_WRITE,
`INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
VALUES ($1, $2, $3, $4)`,
['test-conflict', personId, team.id, 0],
'insertDistinctId'
)
// Fetch should find in new table first (not attempt copy from old)
const person = await cutoverRepository.fetchPerson(team.id, 'test-conflict')
expect(person).toBeDefined()
expect(person!.__useNewTable).toBe(true)
expect(person!.properties.name).toBe('Already in New Table')
expect(person!.version).toBe(10) // New table version, not old table version
// Verify new table data is unchanged (ON CONFLICT DO NOTHING didn't overwrite)
const { rows } = await postgres.query(
PostgresUse.PERSONS_READ,
'SELECT properties, version FROM posthog_person_new WHERE id = $1 AND team_id = $2',
[personId, team.id],
'checkNewTableUnchanged'
)
expect(rows.length).toBe(1)
expect(rows[0].properties).toEqual({ name: 'Already in New Table' })
expect(rows[0].version).toBe('10')
})
it('should route subsequent updates to new table after opportunistic copy', async () => {
const team = await getFirstTeam(hub)
// Create person in old table only
const oldId = 100
const uuid = new UUIDT().toString()
await postgres.query(
PostgresUse.PERSONS_WRITE,
`INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
[
oldId,
uuid,
TIMESTAMP.toISO(),
team.id,
JSON.stringify({ name: 'Original' }),
'{}',
'{}',
null,
true,
0,
],
'insertOldTablePerson'
)
await postgres.query(
PostgresUse.PERSONS_WRITE,
`INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
VALUES ($1, $2, $3, $4)`,
['test-update-after-copy', oldId, team.id, 0],
'insertDistinctId'
)
// First fetch - triggers opportunistic copy
const person = await cutoverRepository.fetchPerson(team.id, 'test-update-after-copy')
expect(person!.__useNewTable).toBe(true)
// Update the person - should go to new table (avoiding slow old table trigger)
await cutoverRepository.updatePerson(person!, {
properties: { name: 'Updated in New Table' },
properties_last_updated_at: {},
properties_last_operation: {},
is_identified: true,
created_at: TIMESTAMP,
})
// Verify update went to new table
const { rows: newTableRows } = await postgres.query(
PostgresUse.PERSONS_READ,
'SELECT properties FROM posthog_person_new WHERE id = $1 AND team_id = $2',
[oldId, team.id],
'checkNewTableUpdate'
)
expect(newTableRows.length).toBe(1)
expect(newTableRows[0].properties).toEqual({ name: 'Updated in New Table' })
// Verify old table was NOT updated (still has original value)
const { rows: oldTableRows } = await postgres.query(
PostgresUse.PERSONS_READ,
'SELECT properties FROM posthog_person WHERE id = $1 AND team_id = $2',
[oldId, team.id],
'checkOldTableNotUpdated'
)
expect(oldTableRows.length).toBe(1)
expect(oldTableRows[0].properties).toEqual({ name: 'Original' })
})
it('should route updates to new table when __useNewTable=true', async () => {
@@ -4095,6 +4258,77 @@ describe('PostgresPersonRepository', () => {
)
expect(sourceDistincts.rows).toHaveLength(0)
})
it('should handle full merge with __useNewTable routing (fetch + delete)', async () => {
const team = await getFirstTeam(hub)
// Create source person in old table
const sourceId = 100
const sourceUuid = new UUIDT().toString()
await postgres.query(
PostgresUse.PERSONS_WRITE,
`INSERT INTO posthog_person (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
[sourceId, sourceUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0],
'insertSourcePerson'
)
await postgres.query(
PostgresUse.PERSONS_WRITE,
`INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`,
['source-distinct', sourceId, team.id, 0],
'insertSourceDistinct'
)
// Create target person in new table
const targetId = 200
const targetUuid = new UUIDT().toString()
await postgres.query(
PostgresUse.PERSONS_WRITE,
`INSERT INTO posthog_person_new (id, uuid, created_at, team_id, properties, properties_last_updated_at, properties_last_operation, is_user_id, is_identified, version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
[targetId, targetUuid, TIMESTAMP.toISO(), team.id, JSON.stringify({}), '{}', '{}', null, true, 0],
'insertTargetPerson'
)
await postgres.query(
PostgresUse.PERSONS_WRITE,
`INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, $4)`,
['target-distinct', targetId, team.id, 0],
'insertTargetDistinct'
)
// Fetch both persons
// With opportunistic copy, both will have __useNewTable=true after fetch
const sourcePerson = await cutoverRepository.fetchPerson(team.id, 'source-distinct')
const targetPerson = await cutoverRepository.fetchPerson(team.id, 'target-distinct')
// After opportunistic copy, source is now in new table too
expect(sourcePerson!.__useNewTable).toBe(true) // copied to new table
expect(targetPerson!.__useNewTable).toBe(true) // already in new table
// Move distinct IDs from source to target
await cutoverRepository.moveDistinctIds(sourcePerson!, targetPerson!)
// Delete source person - should route to NEW table based on __useNewTable flag
await cutoverRepository.deletePerson(sourcePerson!)
// Verify source was deleted from new table
const { rows: newTableRows } = await postgres.query(
PostgresUse.PERSONS_READ,
'SELECT * FROM posthog_person_new WHERE id = $1 AND team_id = $2',
[sourceId, team.id],
'verifySourceDeletedFromNew'
)
expect(newTableRows.length).toBe(0)
// Verify target still exists in new table
const { rows: targetRows } = await postgres.query(
PostgresUse.PERSONS_READ,
'SELECT * FROM posthog_person_new WHERE id = $1 AND team_id = $2',
[targetId, team.id],
'verifyTargetStillExists'
)
expect(targetRows.length).toBe(1)
})
})
describe('Cutover disabled', () => {

View File

@@ -333,8 +333,63 @@ export class PostgresPersonRepository
if (oldTableRows.length > 0) {
const person = this.toPerson(oldTableRows[0])
// Mark that this person exists in the old table
;(person as any).__useNewTable = false
// Opportunistically copy person to new table
// This allows all future operations to go directly to new table (avoiding slow triggers)
// Skip copy when using read replica to maintain read-only intent
if (!options.useReadReplica) {
try {
const copyQuery = `
INSERT INTO ${newTableName} (
id,
uuid,
created_at,
team_id,
properties,
properties_last_updated_at,
properties_last_operation,
is_user_id,
version,
is_identified
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (team_id, id) DO NOTHING
RETURNING id`
await this.postgres.query(
PostgresUse.PERSONS_WRITE,
copyQuery,
[
person.id,
person.uuid,
person.created_at.toISO(),
person.team_id,
sanitizeJsonbValue(person.properties),
sanitizeJsonbValue(person.properties_last_updated_at),
sanitizeJsonbValue(person.properties_last_operation),
person.is_user_id,
person.version,
person.is_identified,
],
'copyPersonToNewTable'
)
// Person is now in new table, future operations can use it
;(person as any).__useNewTable = true
} catch (error) {
// If copy fails for any reason, log but continue with old table routing
logger.warn('Failed to copy person to new table', {
error: error instanceof Error ? error.message : String(error),
person_id: person.id,
team_id: person.team_id,
})
;(person as any).__useNewTable = false
}
} else {
// When using read replica, don't attempt write operation
;(person as any).__useNewTable = false
}
return person
}
} else {
@@ -404,7 +459,7 @@ export class PostgresPersonRepository
return []
}
// Group person IDs by table
// Group person IDs by table using ID-based routing
const oldTablePersonIds: string[] = []
const newTablePersonIds: string[] = []
const personIdToDistinctId = new Map<string, { distinct_id: string; team_id: number }>()