fix(persons): optimization on dual distinct id insertion (#40696)

This commit is contained in:
Nick Best
2025-10-31 08:46:34 -07:00
committed by GitHub
parent 0c30061f8e
commit da6055c77c
4 changed files with 58 additions and 44 deletions

View File

@@ -8,6 +8,7 @@ import { parseJSON } from '../../../../utils/json-parse'
import { NoRowsUpdatedError, UUIDT } from '../../../../utils/utils'
import { PersonPropertiesSizeViolationError } from './person-repository'
import { PostgresPersonRepository } from './postgres-person-repository'
import { fetchDistinctIdValues, fetchDistinctIds } from './test-helpers'
jest.mock('../../../../utils/logger')
@@ -244,6 +245,14 @@ describe('PostgresPersonRepository', () => {
expect(kafkaMessages[0].topic).toBe('clickhouse_person_test')
expect(kafkaMessages[1].topic).toBe('clickhouse_person_distinct_id_test')
expect(kafkaMessages[2].topic).toBe('clickhouse_person_distinct_id_test')
const distinctIds = await fetchDistinctIdValues(hub.db.postgres, person)
expect(distinctIds).toHaveLength(2)
expect(distinctIds).toEqual(expect.arrayContaining(['distinct-1', 'distinct-2']))
const distinctIdRecords = await fetchDistinctIds(hub.db.postgres, person)
expect(distinctIdRecords.find((d) => d.distinct_id === 'distinct-1')?.version).toBe('0')
expect(distinctIdRecords.find((d) => d.distinct_id === 'distinct-2')?.version).toBe('1')
})
it('creates a person without distinct IDs', async () => {

View File

@@ -357,26 +357,32 @@ export class PostgresPersonRepository
const distinctIdVersionStartIndex = columns.length + 1
const distinctIdStartIndex = distinctIdVersionStartIndex + distinctIds.length
const distinctIdsCTE =
distinctIds.length > 0
? `, distinct_ids AS (
INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
VALUES ${distinctIds
.map(
// NOTE: Keep this in sync with the posthog_persondistinctid INSERT in
// `addDistinctId`
(_, index) => `(
$${distinctIdStartIndex + index},
(SELECT id FROM inserted_person),
$${teamIdParamIndex},
$${distinctIdVersionStartIndex + index}
)`
)
.join(', ')}
)`
: ''
const query =
`WITH inserted_person AS (
INSERT INTO posthog_person (${columns.join(', ')})
VALUES (${valuePlaceholders})
RETURNING *
)` +
distinctIds
.map(
// NOTE: Keep this in sync with the posthog_persondistinctid INSERT in
// `addDistinctId`
(_, index) => `, distinct_id_${index} AS (
INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
VALUES (
$${distinctIdStartIndex + index},
(SELECT id FROM inserted_person),
$${teamIdParamIndex},
$${distinctIdVersionStartIndex + index})
)`
)
.join('') +
distinctIdsCTE +
` SELECT * FROM inserted_person;`
const { rows } = await this.postgres.query<RawPerson>(

View File

@@ -509,10 +509,9 @@ describe('processEvent', () => {
)
expect((await fetchPersons(hub.db.postgres)).length).toBe(1)
expect(await fetchDistinctIdValues(hub.db.postgres, (await fetchPersons(hub.db.postgres))[0])).toEqual([
'new_distinct_id',
'old_distinct_id',
])
const distinctIds = await fetchDistinctIdValues(hub.db.postgres, (await fetchPersons(hub.db.postgres))[0])
expect(distinctIds).toEqual(expect.arrayContaining(['new_distinct_id', 'old_distinct_id']))
expect(distinctIds).toHaveLength(2)
})
test('alias both existing', async () => {
@@ -874,7 +873,9 @@ describe('processEvent', () => {
expect(people.length).toEqual(2)
expect(people[1].team_id).toEqual(team.id)
expect(people[1].properties).toEqual({})
expect(await fetchDistinctIdValues(hub.db.postgres, people[1])).toEqual(['1', '2'])
const distinctIdsForPerson1 = await fetchDistinctIdValues(hub.db.postgres, people[1])
expect(distinctIdsForPerson1).toEqual(expect.arrayContaining(['1', '2']))
expect(distinctIdsForPerson1).toHaveLength(2)
expect(people[0].team_id).toEqual(team2.id)
expect(await fetchDistinctIdValues(hub.db.postgres, people[0])).toEqual(['2'])
})
@@ -964,16 +965,13 @@ describe('processEvent', () => {
// Get pairins of person distinctIds and the events associated with them
const eventsByPerson = await getEventsByPerson(hub)
expect(eventsByPerson).toEqual([
[
[initialDistinctId, anonymousId],
['$identify', 'event 2', '$identify'],
],
[
[p2DistinctId, p2NewDistinctId],
['event 3', '$identify', 'event 4'],
],
])
expect(eventsByPerson).toHaveLength(2)
expect(eventsByPerson[0][0]).toEqual(expect.arrayContaining([initialDistinctId, anonymousId]))
expect(eventsByPerson[0][0]).toHaveLength(2)
expect(eventsByPerson[0][1]).toEqual(['$identify', 'event 2', '$identify'])
expect(eventsByPerson[1][0]).toEqual(expect.arrayContaining([p2DistinctId, p2NewDistinctId]))
expect(eventsByPerson[1][0]).toHaveLength(2)
expect(eventsByPerson[1][1]).toEqual(['event 3', '$identify', 'event 4'])
// Make sure the persons are identified
const persons = await fetchPersons(hub.db.postgres)
@@ -1090,12 +1088,10 @@ describe('processEvent', () => {
const eventsByPerson = await getEventsByPerson(hub)
// There should just be one person, to which all events are associated
expect(eventsByPerson).toEqual([
[
[initialDistinctId, anonymousId],
['$identify', 'anonymous event', '$create_alias'],
],
])
expect(eventsByPerson).toHaveLength(1)
expect(eventsByPerson[0][0]).toEqual(expect.arrayContaining([initialDistinctId, anonymousId]))
expect(eventsByPerson[0][0]).toHaveLength(2)
expect(eventsByPerson[0][1]).toEqual(['$identify', 'anonymous event', '$create_alias'])
// Make sure there is one identified person
const persons = await fetchPersons(hub.db.postgres)
@@ -1118,12 +1114,10 @@ describe('processEvent', () => {
const eventsByPerson = await getEventsByPerson(hub)
// There should just be one person, to which all events are associated
expect(eventsByPerson).toEqual([
[
[initialDistinctId, anonymousId],
['$identify', 'anonymous event', '$create_alias'],
],
])
expect(eventsByPerson).toHaveLength(1)
expect(eventsByPerson[0][0]).toEqual(expect.arrayContaining([initialDistinctId, anonymousId]))
expect(eventsByPerson[0][0]).toHaveLength(2)
expect(eventsByPerson[0][1]).toEqual(['$identify', 'anonymous event', '$create_alias'])
// Make sure there is one identified person
const persons = await fetchPersons(hub.db.postgres)
@@ -1171,7 +1165,10 @@ describe('processEvent', () => {
const eventsByPerson = await getEventsByPerson(hub)
// There should just be one person, to which all events are associated
expect(eventsByPerson).toEqual([[[anonymous1, anonymous2], ['$create_alias']]])
expect(eventsByPerson).toHaveLength(1)
expect(eventsByPerson[0][0]).toEqual(expect.arrayContaining([anonymous1, anonymous2]))
expect(eventsByPerson[0][0]).toHaveLength(2)
expect(eventsByPerson[0][1]).toEqual(['$create_alias'])
const persons = await fetchPersons(hub.db.postgres)
expect(persons.map((person) => person.is_identified)).toEqual([true])

View File

@@ -123,7 +123,8 @@ describe('postgres parity', () => {
},
])
const clickHouseDistinctIds = await clickhouse.fetchDistinctIdValues(person)
expect(clickHouseDistinctIds).toEqual(['distinct1', 'distinct2'])
expect(clickHouseDistinctIds).toEqual(expect.arrayContaining(['distinct1', 'distinct2']))
expect(clickHouseDistinctIds).toHaveLength(2)
const postgresPersons = await fetchPersons(hub.db.postgres)
expect(postgresPersons).toEqual([
@@ -150,7 +151,8 @@ describe('postgres parity', () => {
},
])
const postgresDistinctIds = await fetchDistinctIdValues(hub.db.postgres, person)
expect(postgresDistinctIds).toEqual(['distinct1', 'distinct2'])
expect(postgresDistinctIds).toEqual(expect.arrayContaining(['distinct1', 'distinct2']))
expect(postgresDistinctIds).toHaveLength(2)
const newClickHouseDistinctIdValues = await clickhouse.fetchDistinctIds(person)
expect(newClickHouseDistinctIdValues).toMatchObject([