From a036f3c822ab5c7521c84002290d208496de49c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Sequeira?= Date: Tue, 11 Nov 2025 19:21:30 +0100 Subject: [PATCH] chore(plugin-server): Don't update immutable person attributes (#41250) --- plugin-server/src/types.ts | 10 ++ .../batch-writing-person-store.test.ts | 3 +- .../persons/batch-writing-person-store.ts | 10 +- ...dualwrite-person-repository-transaction.ts | 4 +- .../person-repository-transaction.ts | 10 +- .../persons/repositories/person-repository.ts | 11 +- ...es-dualwrite-person-repository-2pc.test.ts | 52 ++++--- .../postgres-dualwrite-person-repository.ts | 14 +- .../postgres-person-repository-transaction.ts | 10 +- .../postgres-person-repository.test.ts | 138 +++++++++++------- .../postgres-person-repository.ts | 9 +- .../raw-postgres-person-repository.ts | 11 +- .../singlewrite-dualwrite-compat.test.ts | 17 ++- .../persons/repositories/test-helpers.ts | 20 ++- .../worker/ingestion/postgres-parity.test.ts | 23 ++- 15 files changed, 234 insertions(+), 108 deletions(-) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 3c2131e756..7fe403a6dc 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -1008,6 +1008,16 @@ export interface InternalPerson extends BasePerson { version: number } +/** Mutable fields that can be updated on a Person via updatePerson. */ +export interface PersonUpdateFields { + properties: Properties + properties_last_updated_at: PropertiesLastUpdatedAt + properties_last_operation: PropertiesLastOperation | null + is_identified: boolean + created_at: DateTime + version?: number // Optional: allows forcing a specific version (used for dual-write sync) +} + /** Person model exposed outside of person-specific DB logic. */ export interface Person { team_id: number diff --git a/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.test.ts b/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.test.ts index 9b78a3676d..d7b579b7eb 100644 --- a/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.test.ts +++ b/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.test.ts @@ -942,13 +942,14 @@ describe('BatchWritingPersonStore', () => { prop_from_distinctId2: 'value2', }, }), + // Only mutable fields should be in the update object expect.objectContaining({ - id: sharedPerson.id, properties: { initial_prop: 'initial_value', prop_from_distinctId1: 'value1', prop_from_distinctId2: 'value2', }, + is_identified: expect.any(Boolean), }), 'updatePersonNoAssert' ) diff --git a/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.ts b/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.ts index 7c697511b3..d86cf62b97 100644 --- a/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.ts +++ b/plugin-server/src/worker/ingestion/persons/batch-writing-person-store.ts @@ -1045,8 +1045,14 @@ export class BatchWritingPersonsStoreForBatch implements PersonsStoreForBatch, B this.incrementDatabaseOperation(operation as MethodName, personUpdate.distinct_id) // Convert PersonUpdate back to InternalPerson for database call const person = toInternalPerson(personUpdate) - // Create update object without version field (updatePerson handles version internally) - const { version, ...updateFields } = person + // Always pass all mutable fields for consistent query plans + const updateFields = { + properties: person.properties, + properties_last_updated_at: person.properties_last_updated_at, + properties_last_operation: person.properties_last_operation, + is_identified: person.is_identified, + created_at: person.created_at, + } this.incrementCount('updatePersonNoAssert', personUpdate.distinct_id) this.incrementDatabaseOperation('updatePersonNoAssert', personUpdate.distinct_id) diff --git a/plugin-server/src/worker/ingestion/persons/repositories/dualwrite-person-repository-transaction.ts b/plugin-server/src/worker/ingestion/persons/repositories/dualwrite-person-repository-transaction.ts index a2bb839f41..562d5291f1 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/dualwrite-person-repository-transaction.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/dualwrite-person-repository-transaction.ts @@ -3,7 +3,7 @@ import { DateTime } from 'luxon' import { Properties } from '@posthog/plugin-scaffold' import { TopicMessage } from '~/kafka/producer' -import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '~/types' +import { InternalPerson, PersonUpdateFields, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '~/types' import { CreatePersonResult, MoveDistinctIdsResult } from '~/utils/db/db' import { TransactionClient } from '~/utils/db/postgres' @@ -81,7 +81,7 @@ export class DualWritePersonRepositoryTransaction implements PersonRepositoryTra async updatePerson( person: InternalPerson, - update: Partial, + update: PersonUpdateFields, tag?: string ): Promise<[InternalPerson, TopicMessage[], boolean]> { // Enforce version parity across primary/secondary: run primary first, then set secondary to primary's new version diff --git a/plugin-server/src/worker/ingestion/persons/repositories/person-repository-transaction.ts b/plugin-server/src/worker/ingestion/persons/repositories/person-repository-transaction.ts index 6c1d1b1ba1..c887f24605 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/person-repository-transaction.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/person-repository-transaction.ts @@ -3,7 +3,13 @@ import { DateTime } from 'luxon' import { Properties } from '@posthog/plugin-scaffold' import { TopicMessage } from '../../../../kafka/producer' -import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '../../../../types' +import { + InternalPerson, + PersonUpdateFields, + PropertiesLastOperation, + PropertiesLastUpdatedAt, + Team, +} from '../../../../types' import { CreatePersonResult, MoveDistinctIdsResult } from '../../../../utils/db/db' export interface PersonRepositoryTransaction { @@ -21,7 +27,7 @@ export interface PersonRepositoryTransaction { updatePerson( person: InternalPerson, - update: Partial, + update: PersonUpdateFields, tag?: string ): Promise<[InternalPerson, TopicMessage[], boolean]> diff --git a/plugin-server/src/worker/ingestion/persons/repositories/person-repository.ts b/plugin-server/src/worker/ingestion/persons/repositories/person-repository.ts index 8640664788..f65f759ad5 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/person-repository.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/person-repository.ts @@ -3,7 +3,14 @@ import { DateTime } from 'luxon' import { Properties } from '@posthog/plugin-scaffold' import { TopicMessage } from '../../../../kafka/producer' -import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team, TeamId } from '../../../../types' +import { + InternalPerson, + PersonUpdateFields, + PropertiesLastOperation, + PropertiesLastUpdatedAt, + Team, + TeamId, +} from '../../../../types' import { CreatePersonResult } from '../../../../utils/db/db' import { PersonUpdate } from '../person-update-batch' import { PersonRepositoryTransaction } from './person-repository-transaction' @@ -50,7 +57,7 @@ export interface PersonRepository { updatePerson( person: InternalPerson, - update: Partial, + update: PersonUpdateFields, tag?: string ): Promise<[InternalPerson, TopicMessage[], boolean]> diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository-2pc.test.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository-2pc.test.ts index 4de2a4c851..b69f9b3939 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository-2pc.test.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository-2pc.test.ts @@ -9,6 +9,7 @@ import { PostgresDualWritePersonRepository } from './postgres-dualwrite-person-r import { assertConsistencyAcrossDatabases, cleanupPrepared, + createPersonUpdateFields, getFirstTeam, mockDatabaseError, setupMigrationDb, @@ -278,7 +279,10 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => { [{ distinctId: 'dw-2' }] )) as any - const [updated] = await repository.updatePerson(person, { properties: { name: 'B' } }) + const [updated] = await repository.updatePerson( + person, + createPersonUpdateFields(person, { properties: { name: 'B' } }) + ) const primary = await postgres.query( PostgresUse.PERSONS_READ, @@ -308,9 +312,9 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => { .spyOn((repository as any).secondaryRepo, 'updatePerson') .mockRejectedValue(new Error('simulated secondary failure')) - await expect(repository.updatePerson(person, { properties: { y: 2 } }, 'test-fail')).rejects.toThrow( - 'simulated secondary failure' - ) + await expect( + repository.updatePerson(person, createPersonUpdateFields(person, { properties: { y: 2 } }), 'test-fail') + ).rejects.toThrow('simulated secondary failure') spy.mockRestore() @@ -349,9 +353,9 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => { const mockSpy = mockDatabaseError(postgres, new Error('primary update failed'), 'updatePerson') - await expect(repository.updatePerson(person, { properties: { name: 'Updated' } })).rejects.toThrow( - 'primary update failed' - ) + await expect( + repository.updatePerson(person, createPersonUpdateFields(person, { properties: { name: 'Updated' } })) + ).rejects.toThrow('primary update failed') mockSpy.mockRestore() @@ -392,9 +396,9 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => { const mockSpy = mockDatabaseError(migrationPostgres, new Error('secondary update failed'), 'updatePerson') - await expect(repository.updatePerson(person, { properties: { name: 'Updated' } })).rejects.toThrow( - 'secondary update failed' - ) + await expect( + repository.updatePerson(person, createPersonUpdateFields(person, { properties: { name: 'Updated' } })) + ).rejects.toThrow('secondary update failed') mockSpy.mockRestore() @@ -1393,9 +1397,12 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => { await tx.addDistinctId(createResult.person, 'tx-did-2', 1) // Update the person - const [updatedPerson] = await tx.updatePerson(createResult.person, { - properties: { name: 'Updated Name', age: 26 }, - }) + const [updatedPerson] = await tx.updatePerson( + createResult.person, + createPersonUpdateFields(createResult.person, { + properties: { name: 'Updated Name', age: 26 }, + }) + ) return updatedPerson }) @@ -1728,7 +1735,10 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => { throw new Error('Failed to create person') } - await tx.updatePerson(createResult.person, { properties: { updated: true } }) + await tx.updatePerson( + createResult.person, + createPersonUpdateFields(createResult.person, { properties: { updated: true } }) + ) return createResult.person }) @@ -2087,9 +2097,12 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => { // Now use it within a transaction const txResult = await repository.inTransaction('test-mixed-calls', async (tx) => { // Update the person created outside - const [updated] = await tx.updatePerson(outsidePerson, { - properties: { location: 'updated-inside', new_prop: 'added' }, - }) + const [updated] = await tx.updatePerson( + outsidePerson, + createPersonUpdateFields(outsidePerson, { + properties: { location: 'updated-inside', new_prop: 'added' }, + }) + ) // Add a distinct ID await tx.addDistinctId(updated, 'added-in-tx', 1) @@ -2141,7 +2154,10 @@ describe('PostgresDualWritePersonRepository 2PC Dual-Write Tests', () => { )) as any const result = await repository.inTransaction('test-version-sync', async (tx) => { - const [updatedPerson] = await tx.updatePerson(person, { properties: { updated: 'value' } }) + const [updatedPerson] = await tx.updatePerson( + person, + createPersonUpdateFields(person, { properties: { updated: 'value' } }) + ) return updatedPerson }) diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository.ts index dac85eb6fb..701ec3ada7 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-dualwrite-person-repository.ts @@ -3,7 +3,14 @@ import { DateTime } from 'luxon' import { Properties } from '@posthog/plugin-scaffold' import { TopicMessage } from '../../../../kafka/producer' -import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team, TeamId } from '../../../../types' +import { + InternalPerson, + PersonUpdateFields, + PropertiesLastOperation, + PropertiesLastUpdatedAt, + Team, + TeamId, +} from '../../../../types' import { CreatePersonResult, MoveDistinctIdsResult } from '../../../../utils/db/db' import { PostgresRouter, PostgresUse } from '../../../../utils/db/postgres' import { TwoPhaseCommitCoordinator } from '../../../../utils/db/two-phase' @@ -132,7 +139,7 @@ export class PostgresDualWritePersonRepository implements PersonRepository { async updatePerson( person: InternalPerson, - update: Partial, + update: PersonUpdateFields, tag?: string ): Promise<[InternalPerson, TopicMessage[], boolean]> { // Enforce version parity across primary/secondary: run primary first, then set secondary to primary's new version @@ -142,11 +149,12 @@ export class PostgresDualWritePersonRepository implements PersonRepository { primaryOut = p const primaryUpdated = p[0] - const secondaryUpdate: Partial = { + const secondaryUpdate: PersonUpdateFields = { properties: primaryUpdated.properties, properties_last_updated_at: primaryUpdated.properties_last_updated_at, properties_last_operation: primaryUpdated.properties_last_operation, is_identified: primaryUpdated.is_identified, + created_at: primaryUpdated.created_at, version: primaryUpdated.version, } diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository-transaction.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository-transaction.ts index 5bfd05d617..679c35db93 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository-transaction.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository-transaction.ts @@ -3,7 +3,13 @@ import { DateTime } from 'luxon' import { Properties } from '@posthog/plugin-scaffold' import { TopicMessage } from '../../../../kafka/producer' -import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '../../../../types' +import { + InternalPerson, + PersonUpdateFields, + PropertiesLastOperation, + PropertiesLastUpdatedAt, + Team, +} from '../../../../types' import { CreatePersonResult, MoveDistinctIdsResult } from '../../../../utils/db/db' import { TransactionClient } from '../../../../utils/db/postgres' import { PersonRepositoryTransaction } from './person-repository-transaction' @@ -42,7 +48,7 @@ export class PostgresPersonRepositoryTransaction implements PersonRepositoryTran async updatePerson( person: InternalPerson, - update: Partial, + update: PersonUpdateFields, tag?: string ): Promise<[InternalPerson, TopicMessage[], boolean]> { return await this.repository.updatePerson(person, update, tag, this.transaction) diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts index b86fd88c21..2e222ae656 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.test.ts @@ -8,7 +8,7 @@ import { parseJSON } from '../../../../utils/json-parse' import { NoRowsUpdatedError, UUIDT } from '../../../../utils/utils' import { PersonPropertiesSizeViolationError } from './person-repository' import { PostgresPersonRepository } from './postgres-person-repository' -import { fetchDistinctIdValues, fetchDistinctIds } from './test-helpers' +import { createPersonUpdateFields, fetchDistinctIdValues, fetchDistinctIds } from './test-helpers' jest.mock('../../../../utils/logger') @@ -1171,7 +1171,10 @@ describe('PostgresPersonRepository', () => { const person = await createTestPerson(team.id, 'test-distinct', { name: 'John', age: 25 }) const update = { properties: { name: 'Jane', age: 30, city: 'New York' } } - const [updatedPerson, messages, versionDisparity] = await repository.updatePerson(person, update) + const [updatedPerson, messages, versionDisparity] = await repository.updatePerson( + person, + createPersonUpdateFields(person, update) + ) expect(updatedPerson.properties).toEqual({ name: 'Jane', age: 30, city: 'New York' }) expect(updatedPerson.version).toBe(person.version + 1) @@ -1184,23 +1187,15 @@ describe('PostgresPersonRepository', () => { expect(fetchedPerson?.version).toBe(person.version + 1) }) - it('should handle empty update gracefully', async () => { - const team = await getFirstTeam(hub) - const person = await createTestPerson(team.id, 'test-distinct', { name: 'John' }) - - const [updatedPerson, messages, versionDisparity] = await repository.updatePerson(person, {}) - - expect(updatedPerson).toEqual(person) - expect(messages).toHaveLength(0) - expect(versionDisparity).toBe(false) - }) - it('should update is_identified field', async () => { const team = await getFirstTeam(hub) const person = await createTestPerson(team.id, 'test-distinct', { name: 'John' }) const update = { is_identified: true } - const [updatedPerson, messages] = await repository.updatePerson(person, update) + const [updatedPerson, messages] = await repository.updatePerson( + person, + createPersonUpdateFields(person, update) + ) expect(updatedPerson.is_identified).toBe(true) expect(updatedPerson.version).toBe(person.version + 1) @@ -1213,11 +1208,17 @@ describe('PostgresPersonRepository', () => { // First update const update1 = { properties: { name: 'Jane' } } - const [updatedPerson1, _messages1] = await repository.updatePerson(person, update1) + const [updatedPerson1, _messages1] = await repository.updatePerson( + person, + createPersonUpdateFields(person, update1) + ) // Second update with the updated person (should succeed since we're using the latest version) const update2 = { properties: { age: 30 } } - const [updatedPerson2, messages2] = await repository.updatePerson(updatedPerson1, update2) + const [updatedPerson2, messages2] = await repository.updatePerson( + updatedPerson1, + createPersonUpdateFields(updatedPerson1, update2) + ) // updatePerson replaces properties entirely, so we expect only the age property expect(updatedPerson2.properties).toEqual({ age: 30 }) @@ -1231,7 +1232,12 @@ describe('PostgresPersonRepository', () => { await postgres.transaction(PostgresUse.PERSONS_WRITE, 'test-transaction', async (tx) => { const update = { properties: { name: 'Jane' } } - const [updatedPerson, messages] = await repository.updatePerson(person, update, 'tx', tx) + const [updatedPerson, messages] = await repository.updatePerson( + person, + createPersonUpdateFields(person, update), + 'tx', + tx + ) expect(updatedPerson.properties).toEqual({ name: 'Jane' }) expect(messages).toHaveLength(1) @@ -1247,7 +1253,11 @@ describe('PostgresPersonRepository', () => { const person = await createTestPerson(team.id, 'test-distinct', { name: 'John' }) const update = { properties: { name: 'Jane' } } - const [updatedPerson, messages] = await repository.updatePerson(person, update, 'test-tag') + const [updatedPerson, messages] = await repository.updatePerson( + person, + createPersonUpdateFields(person, update), + 'test-tag' + ) expect(updatedPerson.properties).toEqual({ name: 'Jane' }) expect(messages).toHaveLength(1) @@ -1269,7 +1279,9 @@ describe('PostgresPersonRepository', () => { } const update = { properties: { name: 'Jane' } } - await expect(repository.updatePerson(nonExistentPerson, update)).rejects.toThrow(NoRowsUpdatedError) + await expect( + repository.updatePerson(nonExistentPerson, createPersonUpdateFields(nonExistentPerson, update)) + ).rejects.toThrow(NoRowsUpdatedError) }) it('should handle updatePersonAssertVersion with optimistic concurrency control', async () => { @@ -1734,7 +1746,10 @@ describe('PostgresPersonRepository', () => { }, } - const [updatedPerson, messages] = await oversizedRepository.updatePerson(normalPerson, oversizedUpdate) + const [updatedPerson, messages] = await oversizedRepository.updatePerson( + normalPerson, + createPersonUpdateFields(normalPerson, oversizedUpdate) + ) expect(updatedPerson).toBeDefined() expect(updatedPerson.version).toBe(normalPerson.version + 1) @@ -1769,12 +1784,18 @@ describe('PostgresPersonRepository', () => { }, } - await expect(oversizedRepository.updatePerson(normalPerson, oversizedUpdate)).rejects.toThrow( - PersonPropertiesSizeViolationError - ) - await expect(oversizedRepository.updatePerson(normalPerson, oversizedUpdate)).rejects.toThrow( - 'Person properties update would exceed size limit' - ) + await expect( + oversizedRepository.updatePerson( + normalPerson, + createPersonUpdateFields(normalPerson, oversizedUpdate) + ) + ).rejects.toThrow(PersonPropertiesSizeViolationError) + await expect( + oversizedRepository.updatePerson( + normalPerson, + createPersonUpdateFields(normalPerson, oversizedUpdate) + ) + ).rejects.toThrow('Person properties update would exceed size limit') mockPersonPropertiesSize.mockRestore() mockQuery.mockRestore() @@ -1816,14 +1837,20 @@ describe('PostgresPersonRepository', () => { }, } - await expect(oversizedRepository.updatePerson(normalPerson, oversizedUpdate)).rejects.toThrow( - PersonPropertiesSizeViolationError - ) + await expect( + oversizedRepository.updatePerson( + normalPerson, + createPersonUpdateFields(normalPerson, oversizedUpdate) + ) + ).rejects.toThrow(PersonPropertiesSizeViolationError) updateCallCount = 0 - await expect(oversizedRepository.updatePerson(normalPerson, oversizedUpdate)).rejects.toThrow( - 'Person properties update failed after trying to trim oversized properties' - ) + await expect( + oversizedRepository.updatePerson( + normalPerson, + createPersonUpdateFields(normalPerson, oversizedUpdate) + ) + ).rejects.toThrow('Person properties update failed after trying to trim oversized properties') mockPersonPropertiesSize.mockRestore() mockQuery.mockRestore() @@ -1883,16 +1910,16 @@ describe('PostgresPersonRepository', () => { }, } - await expect(oversizedRepository.updatePerson(oversizedPerson, update)).rejects.toThrow( - PersonPropertiesSizeViolationError - ) + await expect( + oversizedRepository.updatePerson(oversizedPerson, createPersonUpdateFields(oversizedPerson, update)) + ).rejects.toThrow(PersonPropertiesSizeViolationError) expect(updateCallCount).toBe(2) updateCallCount = 0 - await expect(oversizedRepository.updatePerson(oversizedPerson, update)).rejects.toThrow( - 'Person properties update failed after trying to trim oversized properties' - ) + await expect( + oversizedRepository.updatePerson(oversizedPerson, createPersonUpdateFields(oversizedPerson, update)) + ).rejects.toThrow('Person properties update failed after trying to trim oversized properties') expect(updateCallCount).toBe(2) @@ -2050,7 +2077,7 @@ describe('PostgresPersonRepository', () => { } try { - await oversizedRepository.updatePerson(person, oversizedUpdate) + await oversizedRepository.updatePerson(person, createPersonUpdateFields(person, oversizedUpdate)) expect(mockInc).toHaveBeenCalledWith({ result: 'success' }) } catch (error) {} @@ -2164,12 +2191,12 @@ describe('PostgresPersonRepository', () => { const [updatedPerson1, messages1, versionDisparity1] = await repositoryWithCalculation.updatePerson( person1, - update, + createPersonUpdateFields(person1, update), 'test-with-logging' ) const [updatedPerson2, messages2, versionDisparity2] = await repositoryWithoutCalculation.updatePerson( person2, - update, + createPersonUpdateFields(person2, update), 'test-without-logging' ) @@ -2253,7 +2280,10 @@ describe('PostgresPersonRepository', () => { const person = await createTestPerson(team.id, 'test-default', { name: 'John' }) const update = { properties: { name: 'Jane', city: 'Boston' } } - const [updatedPerson, messages, versionDisparity] = await defaultRepository.updatePerson(person, update) + const [updatedPerson, messages, versionDisparity] = await defaultRepository.updatePerson( + person, + createPersonUpdateFields(person, update) + ) expect(updatedPerson.properties).toEqual({ name: 'Jane', city: 'Boston' }) expect(updatedPerson.version).toBe(person.version + 1) @@ -2359,7 +2389,7 @@ describe('PostgresPersonRepository', () => { const expectedPropertiesLastUpdatedAtSize = JSON.stringify(update.properties_last_updated_at).length const expectedPropertiesLastOperationSize = JSON.stringify(update.properties_last_operation).length - await repository.updatePerson(person, update) + await repository.updatePerson(person, createPersonUpdateFields(person, update)) // Verify metrics were recorded for all updated fields (3 calls total) expect(observeCalls).toHaveLength(3) @@ -2398,14 +2428,15 @@ describe('PostgresPersonRepository', () => { const expectedPropertiesSize = JSON.stringify(update.properties).length - await repository.updatePerson(person, update) + await repository.updatePerson(person, createPersonUpdateFields(person, update)) - // Only properties metric should be recorded (1 call only) - expect(observeCalls).toHaveLength(1) + // Since we always pass all fields for consistent query plans, all 3 JSONB fields are tracked + expect(observeCalls).toHaveLength(3) - expect(observeCalls[0].labels.operation).toBe('updatePerson') - expect(observeCalls[0].labels.field).toBe('properties') - expect(observeCalls[0].value).toBe(expectedPropertiesSize) + const propertiesCall = observeCalls.find((c) => c.labels.field === 'properties') + expect(propertiesCall).toBeDefined() + expect(propertiesCall!.labels.operation).toBe('updatePerson') + expect(propertiesCall!.value).toBe(expectedPropertiesSize) }) it('should handle large properties correctly', async () => { @@ -2461,13 +2492,14 @@ describe('PostgresPersonRepository', () => { // Clear observe calls from createTestPerson observeCalls = [] - // Empty update + // Empty update - but helper fills in all fields from person const update = {} - await repository.updatePerson(person, update) + await repository.updatePerson(person, createPersonUpdateFields(person, update)) - // No metrics should be recorded for empty update - expect(observeCalls).toHaveLength(0) + // Since we always pass all fields for consistent query plans, all 3 JSONB fields are tracked + // even though the values haven't changed from the person object + expect(observeCalls).toHaveLength(3) }) }) }) diff --git a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts index 5b26ce5464..00a9f91857 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/postgres-person-repository.ts @@ -8,6 +8,7 @@ import { TopicMessage } from '../../../../kafka/producer' import { InternalPerson, PersonDistinctId, + PersonUpdateFields, PropertiesLastOperation, PropertiesLastUpdatedAt, RawPerson, @@ -67,7 +68,7 @@ export class PostgresPersonRepository private async handleOversizedPersonProperties( person: InternalPerson, - update: Partial, + update: PersonUpdateFields, tx?: TransactionClient ): Promise<[InternalPerson, TopicMessage[], boolean]> { const currentSize = await this.personPropertiesSize(person.id) @@ -113,7 +114,7 @@ export class PostgresPersonRepository private async handleExistingOversizedRecord( person: InternalPerson, - update: Partial, + update: PersonUpdateFields, tx?: TransactionClient ): Promise<[InternalPerson, TopicMessage[], boolean]> { try { @@ -125,7 +126,7 @@ export class PostgresPersonRepository { teamId: person.team_id, personId: person.id } ) - const trimmedUpdate: Partial = { + const trimmedUpdate: PersonUpdateFields = { ...update, properties: trimmedProperties, } @@ -769,7 +770,7 @@ export class PostgresPersonRepository async updatePerson( person: InternalPerson, - update: Partial, + update: PersonUpdateFields, tag?: string, tx?: TransactionClient ): Promise<[InternalPerson, TopicMessage[], boolean]> { diff --git a/plugin-server/src/worker/ingestion/persons/repositories/raw-postgres-person-repository.ts b/plugin-server/src/worker/ingestion/persons/repositories/raw-postgres-person-repository.ts index a6df5c2c66..5e3f54cdd9 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/raw-postgres-person-repository.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/raw-postgres-person-repository.ts @@ -3,7 +3,14 @@ import { DateTime } from 'luxon' import { Properties } from '@posthog/plugin-scaffold' import { TopicMessage } from '../../../../kafka/producer' -import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team, TeamId } from '../../../../types' +import { + InternalPerson, + PersonUpdateFields, + PropertiesLastOperation, + PropertiesLastUpdatedAt, + Team, + TeamId, +} from '../../../../types' import { CreatePersonResult, MoveDistinctIdsResult } from '../../../../utils/db/db' import { TransactionClient } from '../../../../utils/db/postgres' import { PersonUpdate } from '../person-update-batch' @@ -36,7 +43,7 @@ export interface RawPostgresPersonRepository { updatePerson( person: InternalPerson, - update: Partial, + update: PersonUpdateFields, tag?: string, tx?: TransactionClient ): Promise<[InternalPerson, TopicMessage[], boolean]> diff --git a/plugin-server/src/worker/ingestion/persons/repositories/singlewrite-dualwrite-compat.test.ts b/plugin-server/src/worker/ingestion/persons/repositories/singlewrite-dualwrite-compat.test.ts index 078414a0a7..b648cb3fef 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/singlewrite-dualwrite-compat.test.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/singlewrite-dualwrite-compat.test.ts @@ -22,6 +22,7 @@ import { assertCreatePersonConflictContractParity, assertCreatePersonContractParity, cleanupPrepared, + createPersonUpdateFields, getFirstTeam, mockDatabaseError, setupMigrationDb, @@ -303,12 +304,12 @@ describe('Postgres Single Write - Postgres Dual Write Compatibility', () => { const singleUpdateResult = await singleWriteRepository.updatePerson( singleCreatePersonResult.person, - { properties: { name: 'B' } }, + createPersonUpdateFields(singleCreatePersonResult.person, { properties: { name: 'B' } }), 'single-update' ) const dualUpdateResult = await dualWriteRepository.updatePerson( dualCreatePersonResult.person, - { properties: { name: 'B' } }, + createPersonUpdateFields(dualCreatePersonResult.person, { properties: { name: 'B' } }), 'dual-update' ) assertUpdatePersonContractParity(singleUpdateResult, dualUpdateResult) @@ -333,13 +334,13 @@ describe('Postgres Single Write - Postgres Dual Write Compatibility', () => { () => singleWriteRepository.updatePerson( singleCreatePersonResult.person, - { properties: { name: 'B' } }, + createPersonUpdateFields(singleCreatePersonResult.person, { properties: { name: 'B' } }), 'single-update' ), () => dualWriteRepository.updatePerson( dualCreatePersonResult.person, - { properties: { name: 'B' } }, + createPersonUpdateFields(dualCreatePersonResult.person, { properties: { name: 'B' } }), 'dual-update' ), 'unhandled database error' @@ -366,13 +367,13 @@ describe('Postgres Single Write - Postgres Dual Write Compatibility', () => { () => singleWriteRepository.updatePerson( singleCreatePersonResult.person, - { properties: { name: 'B' } }, + createPersonUpdateFields(singleCreatePersonResult.person, { properties: { name: 'B' } }), 'single-update' ), () => dualWriteRepository.updatePerson( dualCreatePersonResult.person, - { properties: { name: 'B' } }, + createPersonUpdateFields(dualCreatePersonResult.person, { properties: { name: 'B' } }), 'dual-update' ), PersonPropertiesSizeViolationError as any @@ -428,12 +429,12 @@ describe('Postgres Single Write - Postgres Dual Write Compatibility', () => { } const singleUpdateResult = await singleWriteRepository.updatePerson( singleCreatePersonResult.person, - updateToApply, + createPersonUpdateFields(singleCreatePersonResult.person, updateToApply), 'single-update' ) const dualUpdateResult = await dualWriteRepository.updatePerson( dualCreatePersonResult.person, - updateToApply, + createPersonUpdateFields(dualCreatePersonResult.person, updateToApply), 'dual-update' ) diff --git a/plugin-server/src/worker/ingestion/persons/repositories/test-helpers.ts b/plugin-server/src/worker/ingestion/persons/repositories/test-helpers.ts index 0294d89b0f..9b80083d12 100644 --- a/plugin-server/src/worker/ingestion/persons/repositories/test-helpers.ts +++ b/plugin-server/src/worker/ingestion/persons/repositories/test-helpers.ts @@ -2,7 +2,7 @@ import fs from 'fs' import { DateTime } from 'luxon' import path from 'path' -import { Hub, InternalPerson, PersonDistinctId, RawPerson, Team } from '~/types' +import { Hub, InternalPerson, PersonDistinctId, PersonUpdateFields, RawPerson, Team } from '~/types' import { PostgresRouter, PostgresUse } from '~/utils/db/postgres' import { CreatePersonResult } from '../../../../utils/db/db' @@ -217,3 +217,21 @@ function toPerson(row: RawPerson): InternalPerson { version: Number(row.version || 0), } } + +/** + * Helper to create PersonUpdateFields for tests with all required fields. + * Pass partial updates and it will fill in defaults from the person object. + */ +export function createPersonUpdateFields( + person: InternalPerson, + updates: Partial +): PersonUpdateFields { + return { + properties: updates.properties ?? person.properties, + properties_last_updated_at: updates.properties_last_updated_at ?? person.properties_last_updated_at, + properties_last_operation: updates.properties_last_operation ?? person.properties_last_operation, + is_identified: updates.is_identified ?? person.is_identified, + created_at: updates.created_at ?? person.created_at, + ...(updates.version !== undefined && { version: updates.version }), + } +} diff --git a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts index 3ac0d56374..1a225f4979 100644 --- a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts +++ b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts @@ -13,6 +13,7 @@ import { parseJSON } from '../../../src/utils/json-parse' import { UUIDT, castTimestampOrNow } from '../../../src/utils/utils' import { PostgresPersonRepository } from '../../../src/worker/ingestion/persons/repositories/postgres-person-repository' import { + createPersonUpdateFields, fetchDistinctIdValues, fetchDistinctIds, fetchPersons, @@ -200,10 +201,13 @@ describe('postgres parity', () => { await clickhouse.delayUntilEventIngested(() => clickhouse.fetchDistinctIdValues(person), 2) // update properties and set is_identified to true - const [_p, kafkaMessagesUpdate] = await personRepository.updatePerson(person, { - properties: { replacedUserProp: 'propValue' }, - is_identified: true, - }) + const [_p, kafkaMessagesUpdate] = await personRepository.updatePerson( + person, + createPersonUpdateFields(person, { + properties: { replacedUserProp: 'propValue' }, + is_identified: true, + }) + ) await hub.db.kafkaProducer.queueMessages(kafkaMessagesUpdate) await clickhouse.delayUntilEventIngested(async () => @@ -227,10 +231,13 @@ describe('postgres parity', () => { // update date and boolean to false const randomDate = DateTime.utc().minus(100000).setZone('UTC') - const [updatedPerson, kafkaMessages2] = await personRepository.updatePerson(person, { - created_at: randomDate, - is_identified: false, - }) + const [updatedPerson, kafkaMessages2] = await personRepository.updatePerson( + person, + createPersonUpdateFields(person, { + created_at: randomDate, + is_identified: false, + }) + ) await hub.db.kafkaProducer.queueMessages(kafkaMessages2)