chore(plugin-server): Don't update immutable person attributes (#41250)

This commit is contained in:
José Sequeira
2025-11-11 19:21:30 +01:00
committed by GitHub
parent a42810b923
commit a036f3c822
15 changed files with 234 additions and 108 deletions

View File

@@ -1008,6 +1008,16 @@ export interface InternalPerson extends BasePerson {
version: number
}
/** Mutable fields that can be updated on a Person via updatePerson. */
export interface PersonUpdateFields {
properties: Properties
properties_last_updated_at: PropertiesLastUpdatedAt
properties_last_operation: PropertiesLastOperation | null
is_identified: boolean
created_at: DateTime
version?: number // Optional: allows forcing a specific version (used for dual-write sync)
}
/** Person model exposed outside of person-specific DB logic. */
export interface Person {
team_id: number

View File

@@ -942,13 +942,14 @@ describe('BatchWritingPersonStore', () => {
prop_from_distinctId2: 'value2',
},
}),
// Only mutable fields should be in the update object
expect.objectContaining({
id: sharedPerson.id,
properties: {
initial_prop: 'initial_value',
prop_from_distinctId1: 'value1',
prop_from_distinctId2: 'value2',
},
is_identified: expect.any(Boolean),
}),
'updatePersonNoAssert'
)

View File

@@ -1045,8 +1045,14 @@ export class BatchWritingPersonsStoreForBatch implements PersonsStoreForBatch, B
this.incrementDatabaseOperation(operation as MethodName, personUpdate.distinct_id)
// Convert PersonUpdate back to InternalPerson for database call
const person = toInternalPerson(personUpdate)
// Create update object without version field (updatePerson handles version internally)
const { version, ...updateFields } = person
// Always pass all mutable fields for consistent query plans
const updateFields = {
properties: person.properties,
properties_last_updated_at: person.properties_last_updated_at,
properties_last_operation: person.properties_last_operation,
is_identified: person.is_identified,
created_at: person.created_at,
}
this.incrementCount('updatePersonNoAssert', personUpdate.distinct_id)
this.incrementDatabaseOperation('updatePersonNoAssert', personUpdate.distinct_id)

View File

@@ -3,7 +3,7 @@ import { DateTime } from 'luxon'
import { Properties } from '@posthog/plugin-scaffold'
import { TopicMessage } from '~/kafka/producer'
import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '~/types'
import { InternalPerson, PersonUpdateFields, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '~/types'
import { CreatePersonResult, MoveDistinctIdsResult } from '~/utils/db/db'
import { TransactionClient } from '~/utils/db/postgres'
@@ -81,7 +81,7 @@ export class DualWritePersonRepositoryTransaction implements PersonRepositoryTra
async updatePerson(
person: InternalPerson,
update: Partial<InternalPerson>,
update: PersonUpdateFields,
tag?: string
): Promise<[InternalPerson, TopicMessage[], boolean]> {
// Enforce version parity across primary/secondary: run primary first, then set secondary to primary's new version

View File

@@ -3,7 +3,13 @@ import { DateTime } from 'luxon'
import { Properties } from '@posthog/plugin-scaffold'
import { TopicMessage } from '../../../../kafka/producer'
import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '../../../../types'
import {
InternalPerson,
PersonUpdateFields,
PropertiesLastOperation,
PropertiesLastUpdatedAt,
Team,
} from '../../../../types'
import { CreatePersonResult, MoveDistinctIdsResult } from '../../../../utils/db/db'
export interface PersonRepositoryTransaction {
@@ -21,7 +27,7 @@ export interface PersonRepositoryTransaction {
updatePerson(
person: InternalPerson,
update: Partial<InternalPerson>,
update: PersonUpdateFields,
tag?: string
): Promise<[InternalPerson, TopicMessage[], boolean]>

View File

@@ -3,7 +3,14 @@ import { DateTime } from 'luxon'
import { Properties } from '@posthog/plugin-scaffold'
import { TopicMessage } from '../../../../kafka/producer'
import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team, TeamId } from '../../../../types'
import {
InternalPerson,
PersonUpdateFields,
PropertiesLastOperation,
PropertiesLastUpdatedAt,
Team,
TeamId,
} from '../../../../types'
import { CreatePersonResult } from '../../../../utils/db/db'
import { PersonUpdate } from '../person-update-batch'
import { PersonRepositoryTransaction } from './person-repository-transaction'
@@ -50,7 +57,7 @@ export interface PersonRepository {
updatePerson(
person: InternalPerson,
update: Partial<InternalPerson>,
update: PersonUpdateFields,
tag?: string
): Promise<[InternalPerson, TopicMessage[], boolean]>

View File

@@ -9,6 +9,7 @@ import { PostgresDualWritePersonRepository } from './postgres-dualwrite-person-r
import {
assertConsistencyAcrossDatabases,
cleanupPrepared,
createPersonUpdateFields,
getFirstTeam,
mockDatabaseError,
setupMigrationDb,
@@ -278,7 +279,10 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => {
[{ distinctId: 'dw-2' }]
)) as any
const [updated] = await repository.updatePerson(person, { properties: { name: 'B' } })
const [updated] = await repository.updatePerson(
person,
createPersonUpdateFields(person, { properties: { name: 'B' } })
)
const primary = await postgres.query(
PostgresUse.PERSONS_READ,
@@ -308,9 +312,9 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => {
.spyOn((repository as any).secondaryRepo, 'updatePerson')
.mockRejectedValue(new Error('simulated secondary failure'))
await expect(repository.updatePerson(person, { properties: { y: 2 } }, 'test-fail')).rejects.toThrow(
'simulated secondary failure'
)
await expect(
repository.updatePerson(person, createPersonUpdateFields(person, { properties: { y: 2 } }), 'test-fail')
).rejects.toThrow('simulated secondary failure')
spy.mockRestore()
@@ -349,9 +353,9 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => {
const mockSpy = mockDatabaseError(postgres, new Error('primary update failed'), 'updatePerson')
await expect(repository.updatePerson(person, { properties: { name: 'Updated' } })).rejects.toThrow(
'primary update failed'
)
await expect(
repository.updatePerson(person, createPersonUpdateFields(person, { properties: { name: 'Updated' } }))
).rejects.toThrow('primary update failed')
mockSpy.mockRestore()
@@ -392,9 +396,9 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => {
const mockSpy = mockDatabaseError(migrationPostgres, new Error('secondary update failed'), 'updatePerson')
await expect(repository.updatePerson(person, { properties: { name: 'Updated' } })).rejects.toThrow(
'secondary update failed'
)
await expect(
repository.updatePerson(person, createPersonUpdateFields(person, { properties: { name: 'Updated' } }))
).rejects.toThrow('secondary update failed')
mockSpy.mockRestore()
@@ -1393,9 +1397,12 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => {
await tx.addDistinctId(createResult.person, 'tx-did-2', 1)
// Update the person
const [updatedPerson] = await tx.updatePerson(createResult.person, {
properties: { name: 'Updated Name', age: 26 },
})
const [updatedPerson] = await tx.updatePerson(
createResult.person,
createPersonUpdateFields(createResult.person, {
properties: { name: 'Updated Name', age: 26 },
})
)
return updatedPerson
})
@@ -1728,7 +1735,10 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => {
throw new Error('Failed to create person')
}
await tx.updatePerson(createResult.person, { properties: { updated: true } })
await tx.updatePerson(
createResult.person,
createPersonUpdateFields(createResult.person, { properties: { updated: true } })
)
return createResult.person
})
@@ -2087,9 +2097,12 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => {
// Now use it within a transaction
const txResult = await repository.inTransaction('test-mixed-calls', async (tx) => {
// Update the person created outside
const [updated] = await tx.updatePerson(outsidePerson, {
properties: { location: 'updated-inside', new_prop: 'added' },
})
const [updated] = await tx.updatePerson(
outsidePerson,
createPersonUpdateFields(outsidePerson, {
properties: { location: 'updated-inside', new_prop: 'added' },
})
)
// Add a distinct ID
await tx.addDistinctId(updated, 'added-in-tx', 1)
@@ -2141,7 +2154,10 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => {
)) as any
const result = await repository.inTransaction('test-version-sync', async (tx) => {
const [updatedPerson] = await tx.updatePerson(person, { properties: { updated: 'value' } })
const [updatedPerson] = await tx.updatePerson(
person,
createPersonUpdateFields(person, { properties: { updated: 'value' } })
)
return updatedPerson
})

View File

@@ -3,7 +3,14 @@ import { DateTime } from 'luxon'
import { Properties } from '@posthog/plugin-scaffold'
import { TopicMessage } from '../../../../kafka/producer'
import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team, TeamId } from '../../../../types'
import {
InternalPerson,
PersonUpdateFields,
PropertiesLastOperation,
PropertiesLastUpdatedAt,
Team,
TeamId,
} from '../../../../types'
import { CreatePersonResult, MoveDistinctIdsResult } from '../../../../utils/db/db'
import { PostgresRouter, PostgresUse } from '../../../../utils/db/postgres'
import { TwoPhaseCommitCoordinator } from '../../../../utils/db/two-phase'
@@ -132,7 +139,7 @@ export class PostgresDualWritePersonRepository implements PersonRepository {
async updatePerson(
person: InternalPerson,
update: Partial<InternalPerson>,
update: PersonUpdateFields,
tag?: string
): Promise<[InternalPerson, TopicMessage[], boolean]> {
// Enforce version parity across primary/secondary: run primary first, then set secondary to primary's new version
@@ -142,11 +149,12 @@ export class PostgresDualWritePersonRepository implements PersonRepository {
primaryOut = p
const primaryUpdated = p[0]
const secondaryUpdate: Partial<InternalPerson> = {
const secondaryUpdate: PersonUpdateFields = {
properties: primaryUpdated.properties,
properties_last_updated_at: primaryUpdated.properties_last_updated_at,
properties_last_operation: primaryUpdated.properties_last_operation,
is_identified: primaryUpdated.is_identified,
created_at: primaryUpdated.created_at,
version: primaryUpdated.version,
}

View File

@@ -3,7 +3,13 @@ import { DateTime } from 'luxon'
import { Properties } from '@posthog/plugin-scaffold'
import { TopicMessage } from '../../../../kafka/producer'
import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '../../../../types'
import {
InternalPerson,
PersonUpdateFields,
PropertiesLastOperation,
PropertiesLastUpdatedAt,
Team,
} from '../../../../types'
import { CreatePersonResult, MoveDistinctIdsResult } from '../../../../utils/db/db'
import { TransactionClient } from '../../../../utils/db/postgres'
import { PersonRepositoryTransaction } from './person-repository-transaction'
@@ -42,7 +48,7 @@ export class PostgresPersonRepositoryTransaction implements PersonRepositoryTran
async updatePerson(
person: InternalPerson,
update: Partial<InternalPerson>,
update: PersonUpdateFields,
tag?: string
): Promise<[InternalPerson, TopicMessage[], boolean]> {
return await this.repository.updatePerson(person, update, tag, this.transaction)

View File

@@ -8,7 +8,7 @@ import { parseJSON } from '../../../../utils/json-parse'
import { NoRowsUpdatedError, UUIDT } from '../../../../utils/utils'
import { PersonPropertiesSizeViolationError } from './person-repository'
import { PostgresPersonRepository } from './postgres-person-repository'
import { fetchDistinctIdValues, fetchDistinctIds } from './test-helpers'
import { createPersonUpdateFields, fetchDistinctIdValues, fetchDistinctIds } from './test-helpers'
jest.mock('../../../../utils/logger')
@@ -1171,7 +1171,10 @@ describe('PostgresPersonRepository', () => {
const person = await createTestPerson(team.id, 'test-distinct', { name: 'John', age: 25 })
const update = { properties: { name: 'Jane', age: 30, city: 'New York' } }
const [updatedPerson, messages, versionDisparity] = await repository.updatePerson(person, update)
const [updatedPerson, messages, versionDisparity] = await repository.updatePerson(
person,
createPersonUpdateFields(person, update)
)
expect(updatedPerson.properties).toEqual({ name: 'Jane', age: 30, city: 'New York' })
expect(updatedPerson.version).toBe(person.version + 1)
@@ -1184,23 +1187,15 @@ describe('PostgresPersonRepository', () => {
expect(fetchedPerson?.version).toBe(person.version + 1)
})
it('should handle empty update gracefully', async () => {
const team = await getFirstTeam(hub)
const person = await createTestPerson(team.id, 'test-distinct', { name: 'John' })
const [updatedPerson, messages, versionDisparity] = await repository.updatePerson(person, {})
expect(updatedPerson).toEqual(person)
expect(messages).toHaveLength(0)
expect(versionDisparity).toBe(false)
})
it('should update is_identified field', async () => {
const team = await getFirstTeam(hub)
const person = await createTestPerson(team.id, 'test-distinct', { name: 'John' })
const update = { is_identified: true }
const [updatedPerson, messages] = await repository.updatePerson(person, update)
const [updatedPerson, messages] = await repository.updatePerson(
person,
createPersonUpdateFields(person, update)
)
expect(updatedPerson.is_identified).toBe(true)
expect(updatedPerson.version).toBe(person.version + 1)
@@ -1213,11 +1208,17 @@ describe('PostgresPersonRepository', () => {
// First update
const update1 = { properties: { name: 'Jane' } }
const [updatedPerson1, _messages1] = await repository.updatePerson(person, update1)
const [updatedPerson1, _messages1] = await repository.updatePerson(
person,
createPersonUpdateFields(person, update1)
)
// Second update with the updated person (should succeed since we're using the latest version)
const update2 = { properties: { age: 30 } }
const [updatedPerson2, messages2] = await repository.updatePerson(updatedPerson1, update2)
const [updatedPerson2, messages2] = await repository.updatePerson(
updatedPerson1,
createPersonUpdateFields(updatedPerson1, update2)
)
// updatePerson replaces properties entirely, so we expect only the age property
expect(updatedPerson2.properties).toEqual({ age: 30 })
@@ -1231,7 +1232,12 @@ describe('PostgresPersonRepository', () => {
await postgres.transaction(PostgresUse.PERSONS_WRITE, 'test-transaction', async (tx) => {
const update = { properties: { name: 'Jane' } }
const [updatedPerson, messages] = await repository.updatePerson(person, update, 'tx', tx)
const [updatedPerson, messages] = await repository.updatePerson(
person,
createPersonUpdateFields(person, update),
'tx',
tx
)
expect(updatedPerson.properties).toEqual({ name: 'Jane' })
expect(messages).toHaveLength(1)
@@ -1247,7 +1253,11 @@ describe('PostgresPersonRepository', () => {
const person = await createTestPerson(team.id, 'test-distinct', { name: 'John' })
const update = { properties: { name: 'Jane' } }
const [updatedPerson, messages] = await repository.updatePerson(person, update, 'test-tag')
const [updatedPerson, messages] = await repository.updatePerson(
person,
createPersonUpdateFields(person, update),
'test-tag'
)
expect(updatedPerson.properties).toEqual({ name: 'Jane' })
expect(messages).toHaveLength(1)
@@ -1269,7 +1279,9 @@ describe('PostgresPersonRepository', () => {
}
const update = { properties: { name: 'Jane' } }
await expect(repository.updatePerson(nonExistentPerson, update)).rejects.toThrow(NoRowsUpdatedError)
await expect(
repository.updatePerson(nonExistentPerson, createPersonUpdateFields(nonExistentPerson, update))
).rejects.toThrow(NoRowsUpdatedError)
})
it('should handle updatePersonAssertVersion with optimistic concurrency control', async () => {
@@ -1734,7 +1746,10 @@ describe('PostgresPersonRepository', () => {
},
}
const [updatedPerson, messages] = await oversizedRepository.updatePerson(normalPerson, oversizedUpdate)
const [updatedPerson, messages] = await oversizedRepository.updatePerson(
normalPerson,
createPersonUpdateFields(normalPerson, oversizedUpdate)
)
expect(updatedPerson).toBeDefined()
expect(updatedPerson.version).toBe(normalPerson.version + 1)
@@ -1769,12 +1784,18 @@ describe('PostgresPersonRepository', () => {
},
}
await expect(oversizedRepository.updatePerson(normalPerson, oversizedUpdate)).rejects.toThrow(
PersonPropertiesSizeViolationError
)
await expect(oversizedRepository.updatePerson(normalPerson, oversizedUpdate)).rejects.toThrow(
'Person properties update would exceed size limit'
)
await expect(
oversizedRepository.updatePerson(
normalPerson,
createPersonUpdateFields(normalPerson, oversizedUpdate)
)
).rejects.toThrow(PersonPropertiesSizeViolationError)
await expect(
oversizedRepository.updatePerson(
normalPerson,
createPersonUpdateFields(normalPerson, oversizedUpdate)
)
).rejects.toThrow('Person properties update would exceed size limit')
mockPersonPropertiesSize.mockRestore()
mockQuery.mockRestore()
@@ -1816,14 +1837,20 @@ describe('PostgresPersonRepository', () => {
},
}
await expect(oversizedRepository.updatePerson(normalPerson, oversizedUpdate)).rejects.toThrow(
PersonPropertiesSizeViolationError
)
await expect(
oversizedRepository.updatePerson(
normalPerson,
createPersonUpdateFields(normalPerson, oversizedUpdate)
)
).rejects.toThrow(PersonPropertiesSizeViolationError)
updateCallCount = 0
await expect(oversizedRepository.updatePerson(normalPerson, oversizedUpdate)).rejects.toThrow(
'Person properties update failed after trying to trim oversized properties'
)
await expect(
oversizedRepository.updatePerson(
normalPerson,
createPersonUpdateFields(normalPerson, oversizedUpdate)
)
).rejects.toThrow('Person properties update failed after trying to trim oversized properties')
mockPersonPropertiesSize.mockRestore()
mockQuery.mockRestore()
@@ -1883,16 +1910,16 @@ describe('PostgresPersonRepository', () => {
},
}
await expect(oversizedRepository.updatePerson(oversizedPerson, update)).rejects.toThrow(
PersonPropertiesSizeViolationError
)
await expect(
oversizedRepository.updatePerson(oversizedPerson, createPersonUpdateFields(oversizedPerson, update))
).rejects.toThrow(PersonPropertiesSizeViolationError)
expect(updateCallCount).toBe(2)
updateCallCount = 0
await expect(oversizedRepository.updatePerson(oversizedPerson, update)).rejects.toThrow(
'Person properties update failed after trying to trim oversized properties'
)
await expect(
oversizedRepository.updatePerson(oversizedPerson, createPersonUpdateFields(oversizedPerson, update))
).rejects.toThrow('Person properties update failed after trying to trim oversized properties')
expect(updateCallCount).toBe(2)
@@ -2050,7 +2077,7 @@ describe('PostgresPersonRepository', () => {
}
try {
await oversizedRepository.updatePerson(person, oversizedUpdate)
await oversizedRepository.updatePerson(person, createPersonUpdateFields(person, oversizedUpdate))
expect(mockInc).toHaveBeenCalledWith({ result: 'success' })
} catch (error) {}
@@ -2164,12 +2191,12 @@ describe('PostgresPersonRepository', () => {
const [updatedPerson1, messages1, versionDisparity1] = await repositoryWithCalculation.updatePerson(
person1,
update,
createPersonUpdateFields(person1, update),
'test-with-logging'
)
const [updatedPerson2, messages2, versionDisparity2] = await repositoryWithoutCalculation.updatePerson(
person2,
update,
createPersonUpdateFields(person2, update),
'test-without-logging'
)
@@ -2253,7 +2280,10 @@ describe('PostgresPersonRepository', () => {
const person = await createTestPerson(team.id, 'test-default', { name: 'John' })
const update = { properties: { name: 'Jane', city: 'Boston' } }
const [updatedPerson, messages, versionDisparity] = await defaultRepository.updatePerson(person, update)
const [updatedPerson, messages, versionDisparity] = await defaultRepository.updatePerson(
person,
createPersonUpdateFields(person, update)
)
expect(updatedPerson.properties).toEqual({ name: 'Jane', city: 'Boston' })
expect(updatedPerson.version).toBe(person.version + 1)
@@ -2359,7 +2389,7 @@ describe('PostgresPersonRepository', () => {
const expectedPropertiesLastUpdatedAtSize = JSON.stringify(update.properties_last_updated_at).length
const expectedPropertiesLastOperationSize = JSON.stringify(update.properties_last_operation).length
await repository.updatePerson(person, update)
await repository.updatePerson(person, createPersonUpdateFields(person, update))
// Verify metrics were recorded for all updated fields (3 calls total)
expect(observeCalls).toHaveLength(3)
@@ -2398,14 +2428,15 @@ describe('PostgresPersonRepository', () => {
const expectedPropertiesSize = JSON.stringify(update.properties).length
await repository.updatePerson(person, update)
await repository.updatePerson(person, createPersonUpdateFields(person, update))
// Only properties metric should be recorded (1 call only)
expect(observeCalls).toHaveLength(1)
// Since we always pass all fields for consistent query plans, all 3 JSONB fields are tracked
expect(observeCalls).toHaveLength(3)
expect(observeCalls[0].labels.operation).toBe('updatePerson')
expect(observeCalls[0].labels.field).toBe('properties')
expect(observeCalls[0].value).toBe(expectedPropertiesSize)
const propertiesCall = observeCalls.find((c) => c.labels.field === 'properties')
expect(propertiesCall).toBeDefined()
expect(propertiesCall!.labels.operation).toBe('updatePerson')
expect(propertiesCall!.value).toBe(expectedPropertiesSize)
})
it('should handle large properties correctly', async () => {
@@ -2461,13 +2492,14 @@ describe('PostgresPersonRepository', () => {
// Clear observe calls from createTestPerson
observeCalls = []
// Empty update
// Empty update - but helper fills in all fields from person
const update = {}
await repository.updatePerson(person, update)
await repository.updatePerson(person, createPersonUpdateFields(person, update))
// No metrics should be recorded for empty update
expect(observeCalls).toHaveLength(0)
// Since we always pass all fields for consistent query plans, all 3 JSONB fields are tracked
// even though the values haven't changed from the person object
expect(observeCalls).toHaveLength(3)
})
})
})

View File

@@ -8,6 +8,7 @@ import { TopicMessage } from '../../../../kafka/producer'
import {
InternalPerson,
PersonDistinctId,
PersonUpdateFields,
PropertiesLastOperation,
PropertiesLastUpdatedAt,
RawPerson,
@@ -67,7 +68,7 @@ export class PostgresPersonRepository
private async handleOversizedPersonProperties(
person: InternalPerson,
update: Partial<InternalPerson>,
update: PersonUpdateFields,
tx?: TransactionClient
): Promise<[InternalPerson, TopicMessage[], boolean]> {
const currentSize = await this.personPropertiesSize(person.id)
@@ -113,7 +114,7 @@ export class PostgresPersonRepository
private async handleExistingOversizedRecord(
person: InternalPerson,
update: Partial<InternalPerson>,
update: PersonUpdateFields,
tx?: TransactionClient
): Promise<[InternalPerson, TopicMessage[], boolean]> {
try {
@@ -125,7 +126,7 @@ export class PostgresPersonRepository
{ teamId: person.team_id, personId: person.id }
)
const trimmedUpdate: Partial<InternalPerson> = {
const trimmedUpdate: PersonUpdateFields = {
...update,
properties: trimmedProperties,
}
@@ -769,7 +770,7 @@ export class PostgresPersonRepository
async updatePerson(
person: InternalPerson,
update: Partial<InternalPerson>,
update: PersonUpdateFields,
tag?: string,
tx?: TransactionClient
): Promise<[InternalPerson, TopicMessage[], boolean]> {

View File

@@ -3,7 +3,14 @@ import { DateTime } from 'luxon'
import { Properties } from '@posthog/plugin-scaffold'
import { TopicMessage } from '../../../../kafka/producer'
import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team, TeamId } from '../../../../types'
import {
InternalPerson,
PersonUpdateFields,
PropertiesLastOperation,
PropertiesLastUpdatedAt,
Team,
TeamId,
} from '../../../../types'
import { CreatePersonResult, MoveDistinctIdsResult } from '../../../../utils/db/db'
import { TransactionClient } from '../../../../utils/db/postgres'
import { PersonUpdate } from '../person-update-batch'
@@ -36,7 +43,7 @@ export interface RawPostgresPersonRepository {
updatePerson(
person: InternalPerson,
update: Partial<InternalPerson>,
update: PersonUpdateFields,
tag?: string,
tx?: TransactionClient
): Promise<[InternalPerson, TopicMessage[], boolean]>

View File

@@ -22,6 +22,7 @@ import {
assertCreatePersonConflictContractParity,
assertCreatePersonContractParity,
cleanupPrepared,
createPersonUpdateFields,
getFirstTeam,
mockDatabaseError,
setupMigrationDb,
@@ -303,12 +304,12 @@ describe('Postgres Single Write - Postgres Dual Write Compatibility', () => {
const singleUpdateResult = await singleWriteRepository.updatePerson(
singleCreatePersonResult.person,
{ properties: { name: 'B' } },
createPersonUpdateFields(singleCreatePersonResult.person, { properties: { name: 'B' } }),
'single-update'
)
const dualUpdateResult = await dualWriteRepository.updatePerson(
dualCreatePersonResult.person,
{ properties: { name: 'B' } },
createPersonUpdateFields(dualCreatePersonResult.person, { properties: { name: 'B' } }),
'dual-update'
)
assertUpdatePersonContractParity(singleUpdateResult, dualUpdateResult)
@@ -333,13 +334,13 @@ describe('Postgres Single Write - Postgres Dual Write Compatibility', () => {
() =>
singleWriteRepository.updatePerson(
singleCreatePersonResult.person,
{ properties: { name: 'B' } },
createPersonUpdateFields(singleCreatePersonResult.person, { properties: { name: 'B' } }),
'single-update'
),
() =>
dualWriteRepository.updatePerson(
dualCreatePersonResult.person,
{ properties: { name: 'B' } },
createPersonUpdateFields(dualCreatePersonResult.person, { properties: { name: 'B' } }),
'dual-update'
),
'unhandled database error'
@@ -366,13 +367,13 @@ describe('Postgres Single Write - Postgres Dual Write Compatibility', () => {
() =>
singleWriteRepository.updatePerson(
singleCreatePersonResult.person,
{ properties: { name: 'B' } },
createPersonUpdateFields(singleCreatePersonResult.person, { properties: { name: 'B' } }),
'single-update'
),
() =>
dualWriteRepository.updatePerson(
dualCreatePersonResult.person,
{ properties: { name: 'B' } },
createPersonUpdateFields(dualCreatePersonResult.person, { properties: { name: 'B' } }),
'dual-update'
),
PersonPropertiesSizeViolationError as any
@@ -428,12 +429,12 @@ describe('Postgres Single Write - Postgres Dual Write Compatibility', () => {
}
const singleUpdateResult = await singleWriteRepository.updatePerson(
singleCreatePersonResult.person,
updateToApply,
createPersonUpdateFields(singleCreatePersonResult.person, updateToApply),
'single-update'
)
const dualUpdateResult = await dualWriteRepository.updatePerson(
dualCreatePersonResult.person,
updateToApply,
createPersonUpdateFields(dualCreatePersonResult.person, updateToApply),
'dual-update'
)

View File

@@ -2,7 +2,7 @@ import fs from 'fs'
import { DateTime } from 'luxon'
import path from 'path'
import { Hub, InternalPerson, PersonDistinctId, RawPerson, Team } from '~/types'
import { Hub, InternalPerson, PersonDistinctId, PersonUpdateFields, RawPerson, Team } from '~/types'
import { PostgresRouter, PostgresUse } from '~/utils/db/postgres'
import { CreatePersonResult } from '../../../../utils/db/db'
@@ -217,3 +217,21 @@ function toPerson(row: RawPerson): InternalPerson {
version: Number(row.version || 0),
}
}
/**
* Helper to create PersonUpdateFields for tests with all required fields.
* Pass partial updates and it will fill in defaults from the person object.
*/
export function createPersonUpdateFields(
person: InternalPerson,
updates: Partial<PersonUpdateFields>
): PersonUpdateFields {
return {
properties: updates.properties ?? person.properties,
properties_last_updated_at: updates.properties_last_updated_at ?? person.properties_last_updated_at,
properties_last_operation: updates.properties_last_operation ?? person.properties_last_operation,
is_identified: updates.is_identified ?? person.is_identified,
created_at: updates.created_at ?? person.created_at,
...(updates.version !== undefined && { version: updates.version }),
}
}

View File

@@ -13,6 +13,7 @@ import { parseJSON } from '../../../src/utils/json-parse'
import { UUIDT, castTimestampOrNow } from '../../../src/utils/utils'
import { PostgresPersonRepository } from '../../../src/worker/ingestion/persons/repositories/postgres-person-repository'
import {
createPersonUpdateFields,
fetchDistinctIdValues,
fetchDistinctIds,
fetchPersons,
@@ -200,10 +201,13 @@ describe('postgres parity', () => {
await clickhouse.delayUntilEventIngested(() => clickhouse.fetchDistinctIdValues(person), 2)
// update properties and set is_identified to true
const [_p, kafkaMessagesUpdate] = await personRepository.updatePerson(person, {
properties: { replacedUserProp: 'propValue' },
is_identified: true,
})
const [_p, kafkaMessagesUpdate] = await personRepository.updatePerson(
person,
createPersonUpdateFields(person, {
properties: { replacedUserProp: 'propValue' },
is_identified: true,
})
)
await hub.db.kafkaProducer.queueMessages(kafkaMessagesUpdate)
await clickhouse.delayUntilEventIngested(async () =>
@@ -227,10 +231,13 @@ describe('postgres parity', () => {
// update date and boolean to false
const randomDate = DateTime.utc().minus(100000).setZone('UTC')
const [updatedPerson, kafkaMessages2] = await personRepository.updatePerson(person, {
created_at: randomDate,
is_identified: false,
})
const [updatedPerson, kafkaMessages2] = await personRepository.updatePerson(
person,
createPersonUpdateFields(person, {
created_at: randomDate,
is_identified: false,
})
)
await hub.db.kafkaProducer.queueMessages(kafkaMessages2)