feat(ingestion): dual write persons repository (#36529)

This commit is contained in:
Nick Best
2025-08-22 13:21:27 -07:00
committed by GitHub
parent fd4a036b4f
commit d57d105033
22 changed files with 6275 additions and 64 deletions

View File

@@ -81,7 +81,7 @@ services:
# (maybe only in Pycharm) keeps many idle transactions open
# and eventually kills postgres, these settings aim to stop that happening.
# They are only for DEV and should not be used in production.
command: postgres -c max_connections=1000 -c idle_in_transaction_session_timeout=300000
command: postgres -c max_connections=1000 -c idle_in_transaction_session_timeout=300000 -c max_prepared_transactions=10
redis:
extends:
file: docker-compose.base.yml

View File

@@ -81,7 +81,7 @@ services:
# (maybe only in Pycharm) keeps many idle transactions open
# and eventually kills postgres, these settings aim to stop that happening.
# They are only for DEV and should not be used in production.
command: postgres -c max_connections=1000 -c idle_in_transaction_session_timeout=300000
command: postgres -c max_connections=1000 -c idle_in_transaction_session_timeout=300000 -c max_prepared_transactions=10
counters_db:
extends:

View File

@@ -26,13 +26,15 @@
"prettier:check": "prettier --check .",
"prepublishOnly": "pnpm build",
"setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse",
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron && pnpm run setup:test:counters",
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron && pnpm run setup:test:counters && pnpm run setup:test:persons-migration-db:init",
"setup:test:cyclotron": "CYCLOTRON_DATABASE_NAME=test_cyclotron ../rust/bin/migrate-cyclotron",
"setup:test:counters": "TEST=1 COUNTERS_DATABASE_URL='postgres://posthog:posthog@localhost:5432/test_counters' ./bin/migrate-counters.sh",
"migrate:counters": "./bin/migrate-counters.sh",
"migrate:counters:test": "COUNTERS_DATABASE_URL='postgres://posthog:posthog@localhost:5432/test_counters' node-pg-migrate up --migrations-dir src/migrations",
"migrate:counters:down": "node-pg-migrate down --migrations-dir src/migrations",
"migrate:counters:create": "node-pg-migrate create --javascript-file --migrations-dir src/migrations",
"setup:test:persons-migration-db:init": "ts-node src/scripts/setup-persons-migration-db.ts",
"setup:test:persons-migration-db:drop": "ts-node src/scripts/setup-persons-migration-db.ts --drop",
"services:start": "cd .. && docker compose -f docker-compose.dev.yml up",
"services:stop": "cd .. && docker compose -f docker-compose.dev.yml down",
"services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v",

View File

@@ -0,0 +1,133 @@
-- Persons DB schema for test harness with secondary DB (secondary DB used by dual-write)
-- Minimal compatible DDL for PostgresPersonRepository operations for testing
CREATE TABLE IF NOT EXISTS posthog_person (
id BIGSERIAL PRIMARY KEY,
uuid UUID NOT NULL UNIQUE,
created_at TIMESTAMPTZ NOT NULL,
team_id INTEGER NOT NULL,
properties JSONB NOT NULL DEFAULT '{}'::jsonb,
properties_last_updated_at JSONB NOT NULL DEFAULT '{}'::jsonb,
properties_last_operation JSONB NOT NULL DEFAULT '{}'::jsonb,
is_user_id INTEGER NULL,
is_identified BOOLEAN NOT NULL DEFAULT false,
version BIGINT NULL
);
-- Helpful index for updatePersonAssertVersion
CREATE INDEX IF NOT EXISTS posthog_person_team_uuid_idx
ON posthog_person (team_id, uuid);
-- Distinct IDs
CREATE TABLE IF NOT EXISTS posthog_persondistinctid (
id BIGSERIAL PRIMARY KEY,
distinct_id VARCHAR(400) NOT NULL,
person_id BIGINT NOT NULL,
team_id INTEGER NOT NULL,
version BIGINT NULL
);
-- Add both foreign key constraints to match production schema
-- The deferred constraint needs CASCADE for delete operations to work
-- Drop constraints if they exist first to ensure clean state
DO $$
BEGIN
ALTER TABLE posthog_persondistinctid
DROP CONSTRAINT IF EXISTS posthog_persondistin_person_id_5d655bba_fk_posthog_p;
ALTER TABLE posthog_persondistinctid
DROP CONSTRAINT IF EXISTS posthog_persondistinctid_person_id_5d655bba_fk;
ALTER TABLE posthog_persondistinctid
ADD CONSTRAINT posthog_persondistin_person_id_5d655bba_fk_posthog_p
FOREIGN KEY (person_id)
REFERENCES posthog_person(id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED;
ALTER TABLE posthog_persondistinctid
ADD CONSTRAINT posthog_persondistinctid_person_id_5d655bba_fk
FOREIGN KEY (person_id)
REFERENCES posthog_person(id)
ON DELETE CASCADE;
END $$;
-- Create the unique constraint (not just index) to match production
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'unique distinct_id for team'
AND conrelid = 'posthog_persondistinctid'::regclass
) THEN
ALTER TABLE posthog_persondistinctid
ADD CONSTRAINT "unique distinct_id for team"
UNIQUE (team_id, distinct_id);
END IF;
END $$;
CREATE INDEX IF NOT EXISTS posthog_persondistinctid_person_id_5d655bba
ON posthog_persondistinctid (person_id);
-- Personless distinct IDs (merge queue helpers)
CREATE TABLE IF NOT EXISTS posthog_personlessdistinctid (
team_id INTEGER NOT NULL,
distinct_id TEXT NOT NULL,
is_merged BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (team_id, distinct_id)
);
-- Cohort membership by person (only person_id is touched by repo)
CREATE TABLE IF NOT EXISTS posthog_cohortpeople (
id BIGSERIAL PRIMARY KEY,
cohort_id INTEGER NOT NULL,
person_id BIGINT NOT NULL,
version INTEGER NULL
);
-- Add both foreign key constraints to match production schema
ALTER TABLE posthog_cohortpeople
ADD CONSTRAINT posthog_cohortpeople_person_id_33da7d3f_fk_posthog_person_id
FOREIGN KEY (person_id)
REFERENCES posthog_person(id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED;
ALTER TABLE posthog_cohortpeople
ADD CONSTRAINT posthog_cohortpeople_person_id_33da7d3f_fk
FOREIGN KEY (person_id)
REFERENCES posthog_person(id)
ON DELETE CASCADE;
CREATE INDEX IF NOT EXISTS posthog_cohortpeople_person_id_33da7d3f
ON posthog_cohortpeople (person_id);
-- Index from Django model Meta class
CREATE INDEX IF NOT EXISTS posthog_cohortpeople_cohort_person_idx
ON posthog_cohortpeople (cohort_id, person_id);
-- Feature flag hash key overrides (referenced during person merges)
CREATE TABLE IF NOT EXISTS posthog_featureflaghashkeyoverride (
id BIGSERIAL PRIMARY KEY,
team_id INTEGER NOT NULL,
person_id BIGINT NOT NULL,
feature_flag_key TEXT NOT NULL,
hash_key TEXT NOT NULL
);
-- Add both foreign key constraints to match production schema
ALTER TABLE posthog_featureflaghashkeyoverride
ADD CONSTRAINT posthog_featureflagh_person_id_7e517f7c_fk_posthog_p
FOREIGN KEY (person_id)
REFERENCES posthog_person(id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED;
ALTER TABLE posthog_featureflaghashkeyoverride
ADD CONSTRAINT posthog_featureflaghashkeyoverride_person_id_7e517f7c_fk
FOREIGN KEY (person_id)
REFERENCES posthog_person(id)
ON DELETE CASCADE;
CREATE INDEX IF NOT EXISTS posthog_featureflaghashkeyoverride_person_id_7e517f7c
ON posthog_featureflaghashkeyoverride (person_id);

View File

@@ -36,6 +36,12 @@ export function getDefaultConfig(): PluginsServerConfig {
? 'postgres://posthog:posthog@localhost:5432/posthog'
: '',
PERSONS_READONLY_DATABASE_URL: '',
PERSONS_MIGRATION_DATABASE_URL: isTestEnv()
? 'postgres://posthog:posthog@localhost:5432/test_posthog_persons_migration'
: isDevEnv()
? 'postgres://posthog:posthog@localhost:5432/posthog_persons_migration'
: '',
PERSONS_MIGRATION_READONLY_DATABASE_URL: '',
POSTGRES_CONNECTION_POOL_SIZE: 10,
POSTHOG_DB_NAME: null,
POSTHOG_DB_USER: 'postgres',
@@ -285,6 +291,8 @@ export function getDefaultConfig(): PluginsServerConfig {
GROUP_BATCH_WRITING_MAX_CONCURRENT_UPDATES: 10,
GROUP_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS: 50,
GROUP_BATCH_WRITING_MAX_OPTIMISTIC_UPDATE_RETRIES: 5,
PERSONS_DUAL_WRITE_ENABLED: false,
PERSONS_DUAL_WRITE_COMPARISON_ENABLED: false,
USE_DYNAMIC_EVENT_INGESTION_RESTRICTION_CONFIG: false,
// Messaging

View File

@@ -31,6 +31,7 @@ import { BatchWritingGroupStore } from '../worker/ingestion/groups/batch-writing
import { GroupStoreForBatch } from '../worker/ingestion/groups/group-store-for-batch.interface'
import { BatchWritingPersonsStore } from '../worker/ingestion/persons/batch-writing-person-store'
import { FlushResult, PersonsStoreForBatch } from '../worker/ingestion/persons/persons-store-for-batch'
import { PostgresDualWritePersonRepository } from '../worker/ingestion/persons/repositories/postgres-dualwrite-person-repository'
import { PostgresPersonRepository } from '../worker/ingestion/persons/repositories/postgres-person-repository'
import { deduplicateEvents } from './deduplication/events'
import { DeduplicationRedis, createDeduplicationRedis } from './deduplication/redis-client'
@@ -152,20 +153,25 @@ export class IngestionConsumer {
this.ingestionWarningLimiter = new MemoryRateLimiter(1, 1.0 / 3600)
this.hogTransformer = new HogTransformerService(hub)
this.personStore = new BatchWritingPersonsStore(
new PostgresPersonRepository(this.hub.db.postgres, {
calculatePropertiesSize: this.hub.PERSON_UPDATE_CALCULATE_PROPERTIES_SIZE,
personPropertiesDbConstraintLimitBytes: this.hub.PERSON_PROPERTIES_DB_CONSTRAINT_LIMIT_BYTES,
personPropertiesTrimTargetBytes: this.hub.PERSON_PROPERTIES_TRIM_TARGET_BYTES,
}),
this.hub.db.kafkaProducer,
{
dbWriteMode: this.hub.PERSON_BATCH_WRITING_DB_WRITE_MODE,
maxConcurrentUpdates: this.hub.PERSON_BATCH_WRITING_MAX_CONCURRENT_UPDATES,
maxOptimisticUpdateRetries: this.hub.PERSON_BATCH_WRITING_MAX_OPTIMISTIC_UPDATE_RETRIES,
optimisticUpdateRetryInterval: this.hub.PERSON_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS,
}
)
const personRepositoryOptions = {
calculatePropertiesSize: this.hub.PERSON_UPDATE_CALCULATE_PROPERTIES_SIZE,
comparisonEnabled: this.hub.PERSONS_DUAL_WRITE_COMPARISON_ENABLED,
}
const personRepository = this.hub.PERSONS_DUAL_WRITE_ENABLED
? new PostgresDualWritePersonRepository(
this.hub.db.postgres,
this.hub.db.postgresPersonMigration,
personRepositoryOptions
)
: new PostgresPersonRepository(this.hub.db.postgres, personRepositoryOptions)
this.personStore = new BatchWritingPersonsStore(personRepository, this.hub.db.kafkaProducer, {
dbWriteMode: this.hub.PERSON_BATCH_WRITING_DB_WRITE_MODE,
maxConcurrentUpdates: this.hub.PERSON_BATCH_WRITING_MAX_CONCURRENT_UPDATES,
maxOptimisticUpdateRetries: this.hub.PERSON_BATCH_WRITING_MAX_OPTIMISTIC_UPDATE_RETRIES,
optimisticUpdateRetryInterval: this.hub.PERSON_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS,
})
this.groupStore = new BatchWritingGroupStore(this.hub, {
maxConcurrentUpdates: this.hub.GROUP_BATCH_WRITING_MAX_CONCURRENT_UPDATES,

View File

@@ -0,0 +1,102 @@
import fs from 'fs'
import path from 'path'
import { Client } from 'pg'
function parseDb(urlStr: string): { adminUrl: string; dbName: string } {
const u = new URL(urlStr)
const dbName = (u.pathname || '/').replace(/^\//, '') || 'postgres'
const admin = new URL(u.toString())
admin.pathname = '/postgres'
return { adminUrl: admin.toString(), dbName }
}
async function ensureDbExists(adminUrl: string, dbName: string): Promise<void> {
const admin = new Client({ connectionString: adminUrl })
await admin.connect()
try {
const { rows } = await admin.query('SELECT 1 FROM pg_database WHERE datname = $1', [dbName])
if (rows.length === 0) {
await admin.query(`CREATE DATABASE ${JSON.stringify(dbName).replace(/^"|"$/g, '')}`)
}
} finally {
await admin.end()
}
}
async function dropDbIfExists(adminUrl: string, dbName: string): Promise<void> {
const admin = new Client({ connectionString: adminUrl })
await admin.connect()
try {
// terminate existing connections to allow DROP
await admin.query(
`
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = $1 AND pid <> pg_backend_pid()
`,
[dbName]
)
await admin.query(`DROP DATABASE IF EXISTS ${JSON.stringify(dbName).replace(/^"|"$/g, '')}`)
} finally {
await admin.end()
}
}
async function applySchema(dbUrl: string, sqlFile: string): Promise<void> {
const sql = fs.readFileSync(sqlFile, 'utf8')
const client = new Client({ connectionString: dbUrl })
await client.connect()
try {
await client.query(sql)
} finally {
await client.end()
}
}
async function checkPreparedTransactions(dbUrl: string): Promise<void> {
const client = new Client({ connectionString: dbUrl })
await client.connect()
try {
const { rows } = await client.query(`SHOW max_prepared_transactions`)
const val = parseInt(rows[0].max_prepared_transactions, 10)
if (!Number.isFinite(val) || val <= 0) {
console.warn(
'Warning: max_prepared_transactions is 0; two-phase commit will not work. Set it > 0 for full dual-write tests.'
)
}
} catch {
// ignore
} finally {
await client.end()
}
}
async function main() {
const defaultUrl = 'postgres://posthog:posthog@localhost:5432/test_posthog_persons_migration'
const dbUrl = process.env.PERSONS_MIGRATION_DATABASE_URL || defaultUrl
const { adminUrl, dbName } = parseDb(dbUrl)
const sqlPath = path.resolve(__dirname, '../../sql/create_persons_tables.sql')
// Always drop and recreate for idempotency
console.log(`Setting up persons migration database: ${dbName}`)
// Drop existing database if it exists
await dropDbIfExists(adminUrl, dbName)
// Create fresh database
await ensureDbExists(adminUrl, dbName)
// Apply schema
await applySchema(dbUrl, sqlPath)
// Check configuration
await checkPreparedTransactions(dbUrl)
console.log(`Database ${dbName} setup completed successfully`)
}
main().catch((err) => {
console.error(err)
process.exit(1)
})

View File

@@ -198,11 +198,15 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
GROUP_BATCH_WRITING_MAX_CONCURRENT_UPDATES: number // maximum number of concurrent updates to groups table per batch
GROUP_BATCH_WRITING_MAX_OPTIMISTIC_UPDATE_RETRIES: number // maximum number of retries for optimistic update
GROUP_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS: number // starting interval for exponential backoff between retries for optimistic update
PERSONS_DUAL_WRITE_ENABLED: boolean // Enable dual-write mode for persons to both primary and migration databases
PERSONS_DUAL_WRITE_COMPARISON_ENABLED: boolean // Enable comparison metrics between primary and secondary DBs during dual-write
TASK_TIMEOUT: number // how many seconds until tasks are timed out
DATABASE_URL: string // Postgres database URL
DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database
PERSONS_DATABASE_URL: string // Optional read-write Postgres database for persons
PERSONS_READONLY_DATABASE_URL: string // Optional read-only replica to the persons Postgres database
PERSONS_MIGRATION_DATABASE_URL: string // Read-write Postgres database for persons during dual write/migration
PERSONS_MIGRATION_READONLY_DATABASE_URL: string // Optional read-only replica to the persons Postgres database during dual write/migration
PLUGIN_STORAGE_DATABASE_URL: string // Optional read-write Postgres database for plugin storage
COUNTERS_DATABASE_URL: string // Optional read-write Postgres database for counters
POSTGRES_CONNECTION_POOL_SIZE: number
@@ -389,6 +393,7 @@ export interface Hub extends PluginsServerConfig {
// active connections to Postgres, Redis, Kafka
db: DB
postgres: PostgresRouter
postgresPersonMigration: PostgresRouter
redisPool: GenericPool<Redis>
cookielessRedisPool: GenericPool<Redis>
kafka: Kafka

View File

@@ -126,6 +126,8 @@ export const POSTGRES_UNAVAILABLE_ERROR_MESSAGES = [
export class DB {
/** Postgres connection router for database access. */
postgres: PostgresRouter
/** Postgres connection router for database access for persons migration. */
postgresPersonMigration: PostgresRouter
/** Redis used for various caches. */
redisPool: GenericPool<Redis.Redis>
/** Redis used to store state for cookieless ingestion. */
@@ -142,6 +144,7 @@ export class DB {
constructor(
postgres: PostgresRouter,
postgresPersonMigration: PostgresRouter,
redisPool: GenericPool<Redis.Redis>,
redisPoolCookieless: GenericPool<Redis.Redis>,
kafkaProducer: KafkaProducerWrapper,
@@ -149,6 +152,7 @@ export class DB {
personAndGroupsCacheTtl = 1
) {
this.postgres = postgres
this.postgresPersonMigration = postgresPersonMigration
this.redisPool = redisPool
this.redisPoolCookieless = redisPoolCookieless
this.kafkaProducer = kafkaProducer

View File

@@ -84,6 +84,14 @@ export async function createHub(
logger.info('👍', `Kafka ready`)
const postgres = new PostgresRouter(serverConfig)
// Instantiate a second router for the Persons database migration
const postgresPersonMigration = new PostgresRouter({
...serverConfig,
PERSONS_DATABASE_URL: serverConfig.PERSONS_MIGRATION_DATABASE_URL || serverConfig.PERSONS_DATABASE_URL,
PERSONS_READONLY_DATABASE_URL:
serverConfig.PERSONS_MIGRATION_READONLY_DATABASE_URL || serverConfig.PERSONS_READONLY_DATABASE_URL,
})
// TODO: assert tables are reachable (async calls that cannot be in a constructor)
logger.info('👍', `Postgres Router ready`)
@@ -106,6 +114,7 @@ export async function createHub(
const db = new DB(
postgres,
postgresPersonMigration,
redisPool,
cookielessRedisPool,
kafkaProducer,
@@ -137,6 +146,7 @@ export async function createHub(
capabilities,
db,
postgres,
postgresPersonMigration,
redisPool,
cookielessRedisPool,
kafka,
@@ -186,7 +196,12 @@ export const closeHub = async (hub: Hub): Promise<void> => {
}
logger.info('💤', 'Closing kafka, redis, postgres...')
await hub.pubSub.stop()
await Promise.allSettled([hub.kafkaProducer.disconnect(), hub.redisPool.drain(), hub.postgres?.end()])
await Promise.allSettled([
hub.kafkaProducer.disconnect(),
hub.redisPool.drain(),
hub.postgres?.end(),
hub.postgresPersonMigration?.end(),
])
await hub.redisPool.clear()
await hub.cookielessRedisPool.clear()
logger.info('💤', 'Closing cookieless manager...')

View File

@@ -162,6 +162,10 @@ export class PostgresRouter {
})
}
public async connect(usage: PostgresUse): Promise<PoolClient> {
return await this.pools.get(usage)!.connect()
}
async end(): Promise<void> {
// Close all the connection pools
const uniquePools: Set<Pool> = new Set(this.pools.values())

View File

@@ -0,0 +1,193 @@
import { twoPhaseCommitFailuresCounter } from '~/worker/ingestion/persons/metrics'
import { PostgresUse } from './postgres'
import { TwoPhaseCommitCoordinator } from './two-phase'
type QueryCall = { sql: string; args?: any[] }
class FakePoolClient {
public calls: QueryCall[] = []
constructor(private opts: { failOnPrepare?: boolean; side: 'left' | 'right' }) {}
query(sql: string, args?: any[]): any {
this.calls.push({ sql, args })
if (sql.startsWith('PREPARE TRANSACTION') && this.opts.failOnPrepare) {
return Promise.reject(new Error(`prepare_failed_${this.opts.side}`))
}
// BEGIN / ROLLBACK are always ok in this fake
return Promise.resolve({ rowCount: 0, rows: [] })
}
release(): void {
// no-op
}
}
class FakeRouter {
public client: FakePoolClient
public routerCalls: QueryCall[] = []
constructor(
private side: 'left' | 'right',
private opts: { failOnPrepare?: boolean; failCommitPrepared?: boolean; failRollbackPrepared?: boolean } = {}
) {
this.client = new FakePoolClient({ failOnPrepare: opts.failOnPrepare, side })
}
connect(_use: PostgresUse): FakePoolClient {
return this.client
}
query(_use: PostgresUse, sql: string, args?: any[], _tag?: string): any {
this.routerCalls.push({ sql, args })
if (sql.startsWith('COMMIT PREPARED') && this.opts.failCommitPrepared) {
return Promise.reject(new Error(`commit_failed_${this.side}`))
}
if (sql.startsWith('ROLLBACK PREPARED') && this.opts.failRollbackPrepared) {
return Promise.reject(new Error(`rollback_failed_${this.side}`))
}
return Promise.resolve({ rowCount: 0, rows: [] })
}
}
// Helper to capture metric label+inc pairs
function spyOn2pcFailures() {
const labelsSpy = jest.spyOn(twoPhaseCommitFailuresCounter, 'labels') as any
const calls: Array<{ tag: string; phase: string }> = []
labelsSpy.mockImplementation((tag: string, phase: string) => {
return { inc: jest.fn(() => calls.push({ tag, phase })) }
})
return { labelsSpy, calls }
}
describe('TwoPhaseCommitCoordinator', () => {
afterEach(() => {
jest.restoreAllMocks()
})
test('success path commits both sides', async () => {
const left = new FakeRouter('left')
const right = new FakeRouter('right')
const coord = new TwoPhaseCommitCoordinator({
left: { router: left as any, use: PostgresUse.PERSONS_WRITE, name: 'L' },
right: { router: right as any, use: PostgresUse.PERSONS_WRITE, name: 'R' },
})
const { labelsSpy, calls } = spyOn2pcFailures()
const result = await coord.run('ok', () => Promise.resolve('done'))
expect(result).toBe('done')
// Both sides prepared via client
expect(left.client.calls.find((c) => c.sql.startsWith('PREPARE TRANSACTION'))).toBeTruthy()
expect(right.client.calls.find((c) => c.sql.startsWith('PREPARE TRANSACTION'))).toBeTruthy()
// Both sides committed via router
expect(left.routerCalls.find((c) => c.sql.startsWith('COMMIT PREPARED'))).toBeTruthy()
expect(right.routerCalls.find((c) => c.sql.startsWith('COMMIT PREPARED'))).toBeTruthy()
// No failure metrics
expect(labelsSpy).not.toHaveBeenCalled()
expect(calls.length).toBe(0)
})
test('prepare left fails increments prepare_left_failed and run_failed', async () => {
const left = new FakeRouter('left', { failOnPrepare: true })
const right = new FakeRouter('right')
const coord = new TwoPhaseCommitCoordinator({
left: { router: left as any, use: PostgresUse.PERSONS_WRITE },
right: { router: right as any, use: PostgresUse.PERSONS_WRITE },
})
const { calls } = spyOn2pcFailures()
await expect(coord.run('t1', () => Promise.resolve('x'))).rejects.toThrow(/prepare_failed_left/)
const phases = calls.map((c) => c.phase)
expect(phases).toContain('prepare_left_failed')
expect(phases).toContain('run_failed')
// Right side's prepare succeeded, so it should be rolled back via router
expect(right.routerCalls.find((c) => c.sql.startsWith('ROLLBACK PREPARED'))).toBeTruthy()
})
test('prepare right fails increments prepare_right_failed and run_failed and rolls back left prepared', async () => {
const left = new FakeRouter('left')
const right = new FakeRouter('right', { failOnPrepare: true })
const coord = new TwoPhaseCommitCoordinator({
left: { router: left as any, use: PostgresUse.PERSONS_WRITE },
right: { router: right as any, use: PostgresUse.PERSONS_WRITE },
})
const { calls } = spyOn2pcFailures()
await expect(coord.run('t2', () => Promise.resolve('x'))).rejects.toThrow(/prepare_failed_right/)
const phases = calls.map((c) => c.phase)
expect(phases).toContain('prepare_right_failed')
expect(phases).toContain('run_failed')
// Left was prepared and should have been rolled back via router
expect(left.routerCalls.find((c) => c.sql.startsWith('ROLLBACK PREPARED'))).toBeTruthy()
})
test('commit left fails increments commit_left_failed and run_failed', async () => {
const left = new FakeRouter('left', { failCommitPrepared: true })
const right = new FakeRouter('right')
const coord = new TwoPhaseCommitCoordinator({
left: { router: left as any, use: PostgresUse.PERSONS_WRITE },
right: { router: right as any, use: PostgresUse.PERSONS_WRITE },
})
const { calls } = spyOn2pcFailures()
await expect(coord.run('t3', () => Promise.resolve('x'))).rejects.toThrow(/commit_failed_left/)
const phases = calls.map((c) => c.phase)
expect(phases).toContain('commit_left_failed')
expect(phases).toContain('run_failed')
// After failure, we attempt rollbacks
expect(left.routerCalls.find((c) => c.sql.startsWith('ROLLBACK PREPARED'))).toBeTruthy()
expect(right.routerCalls.find((c) => c.sql.startsWith('ROLLBACK PREPARED'))).toBeTruthy()
})
test('commit right fails increments commit_right_failed and run_failed', async () => {
const left = new FakeRouter('left')
const right = new FakeRouter('right', { failCommitPrepared: true })
const coord = new TwoPhaseCommitCoordinator({
left: { router: left as any, use: PostgresUse.PERSONS_WRITE },
right: { router: right as any, use: PostgresUse.PERSONS_WRITE },
})
const { calls } = spyOn2pcFailures()
await expect(coord.run('t4', () => Promise.resolve('x'))).rejects.toThrow(/commit_failed_right/)
const phases = calls.map((c) => c.phase)
expect(phases).toContain('commit_right_failed')
expect(phases).toContain('run_failed')
// Left side was already committed when right failed, so it should NOT attempt rollback
// (you cannot rollback an already-committed transaction)
expect(left.routerCalls.find((c) => c.sql.startsWith('ROLLBACK PREPARED'))).toBeFalsy()
// Right side's prepared transaction should be rolled back
expect(right.routerCalls.find((c) => c.sql.startsWith('ROLLBACK PREPARED'))).toBeTruthy()
})
test('fn throws increments run_failed and rolls back both local txs', async () => {
const left = new FakeRouter('left')
const right = new FakeRouter('right')
const coord = new TwoPhaseCommitCoordinator({
left: { router: left as any, use: PostgresUse.PERSONS_WRITE },
right: { router: right as any, use: PostgresUse.PERSONS_WRITE },
})
const { calls } = spyOn2pcFailures()
await expect(
coord.run('t5', () => {
throw new Error('boom')
})
).rejects.toThrow('boom')
const phases = calls.map((c) => c.phase)
expect(phases).toContain('run_failed')
// Both sides should have rolled back local txs (not prepared)
expect(left.client.calls.find((c) => c.sql === 'ROLLBACK')).toBeTruthy()
expect(right.client.calls.find((c) => c.sql === 'ROLLBACK')).toBeTruthy()
})
})

View File

@@ -0,0 +1,171 @@
import { PoolClient } from 'pg'
import { twoPhaseCommitFailuresCounter } from '~/worker/ingestion/persons/metrics'
import { logger } from '../logger'
import { instrumentQuery } from '../metrics'
import { PostgresRouter, PostgresUse, TransactionClient } from './postgres'
export type TwoPhaseSides = {
left: { router: PostgresRouter; use: PostgresUse; name?: string }
right: { router: PostgresRouter; use: PostgresUse; name?: string }
}
export class TwoPhaseCommitCoordinator {
constructor(private sides: TwoPhaseSides) {}
private makeGid(tag: string): string {
const ts = Date.now()
const rand = Math.random().toString(36).slice(2, 10)
// GID must <= 200 chars
return `dualwrite:${tag}:${ts}:${rand}`
}
async run<T>(tag: string, fn: (leftTx: TransactionClient, rightTx: TransactionClient) => Promise<T>): Promise<T> {
// GID is unique across the DBs but has a shared root that can be used to identify the tx
// across the two databases
// we don't re-use the exact same id so that we can support running 2PCs across two databases on the same cluster/machine
// this is helpful in test harness, where we don't want to spin up another PG instance but just stick another DB on the same instance
// the transaction id would clash in this cases if we used the exact same id
const gidRoot = this.makeGid(tag)
const gidLeft = `${gidRoot}:left`
const gidRight = `${gidRoot}:right`
const gidLeftLiteral = `'${gidLeft.replace(/'/g, "''")}'`
const gidRightLiteral = `'${gidRight.replace(/'/g, "''")}'`
const { left, right } = this.sides
return await instrumentQuery('query.dualwrite_spc', tag, async () => {
let lClient: PoolClient | undefined
let rClient: PoolClient | undefined
let preparedLeft = false
let preparedRight = false
try {
lClient = await left.router.connect(left.use)
rClient = await right.router.connect(right.use)
await Promise.all([lClient?.query('BEGIN'), rClient?.query('BEGIN')])
const result = await fn(
new TransactionClient(left.use, lClient),
new TransactionClient(right.use, rClient)
)
const prepareResults = await Promise.allSettled([
lClient?.query(`PREPARE TRANSACTION ${gidLeftLiteral}`),
rClient?.query(`PREPARE TRANSACTION ${gidRightLiteral}`),
])
if (prepareResults[0].status === 'rejected') {
twoPhaseCommitFailuresCounter.labels(tag, 'prepare_left_failed').inc()
if (prepareResults[1].status === 'fulfilled') {
preparedRight = true
}
throw prepareResults[0].reason
}
preparedLeft = true
if (prepareResults[1].status === 'rejected') {
twoPhaseCommitFailuresCounter.labels(tag, 'prepare_right_failed').inc()
throw prepareResults[1].reason
}
preparedRight = true
// Release the transaction clients back to the connection pool.
// After PREPARE TRANSACTION, the transaction is no longer associated with these connections.
// The prepared transactions now exist as independent entities in PostgreSQL's shared state
// and can be committed or rolled back from ANY connection, not just the original ones.
// This is a key feature of 2PC that enables recovery - if this process crashes after PREPARE,
// another process can still complete the commit using just the transaction IDs.
// Releasing the connections here also improves connection pool efficiency.
lClient?.release()
rClient?.release()
lClient = undefined
rClient = undefined
// COMMIT PREPARED can be executed from any connection, so we use the router to get
// fresh connections. This demonstrates the durability guarantee of 2PC - the prepared
// transactions persist independently of any specific database connection.
try {
await left.router.query(left.use, `COMMIT PREPARED ${gidLeftLiteral}`, [], `2pc-commit-left:${tag}`)
// Once committed, the prepared transaction no longer exists and cannot be rolled back
preparedLeft = false
} catch (e) {
twoPhaseCommitFailuresCounter.labels(tag, 'commit_left_failed').inc()
throw e
}
try {
await right.router.query(
right.use,
`COMMIT PREPARED ${gidRightLiteral}`,
[],
`2pc-commit-right:${tag}`
)
// Once committed, the prepared transaction no longer exists and cannot be rolled back
preparedRight = false
} catch (e) {
twoPhaseCommitFailuresCounter.labels(tag, 'commit_right_failed').inc()
throw e
}
return result
} catch (error) {
try {
if (preparedLeft) {
try {
await left.router.query(
left.use,
`ROLLBACK PREPARED ${gidLeftLiteral}`,
[],
`2pc-rollback-left:${tag}`
)
} catch (e) {
twoPhaseCommitFailuresCounter.labels(tag, 'rollback_left_failed').inc()
throw e
}
} else if (lClient) {
await lClient.query('ROLLBACK')
}
} catch (e) {
logger.error('Failed to rollback/cleanup left side of 2 PC')
}
try {
if (preparedRight) {
try {
await right.router.query(
right.use,
`ROLLBACK PREPARED ${gidRightLiteral}`,
[],
`2pc-rollback-right:${tag}`
)
} catch (e) {
twoPhaseCommitFailuresCounter.labels(tag, 'rollback_right_failed').inc()
throw e
}
} else if (rClient) {
await rClient.query('ROLLBACK')
}
} catch (e) {
logger.error('Failed to rollback/cleanup right side of 2 PC')
}
logger.error('2 phase commit failed', {
tag,
gid: gidRoot,
left: this.sides.left.name ?? 'left',
right: this.sides.right.name ?? 'right',
error,
})
twoPhaseCommitFailuresCounter.labels(tag, 'run_failed').inc()
throw error
} finally {
try {
lClient?.release()
} catch {}
try {
rClient?.release()
} catch {}
}
})
}
}

View File

@@ -127,6 +127,24 @@ export const personShadowModeReturnIntermediateOutcomeCounter = new Counter({
labelNames: ['method', 'outcome'],
})
export const twoPhaseCommitFailuresCounter = new Counter({
name: 'person_dualwrite_2pc_failures_total',
help: 'Two-phase commit failures for dual-write person repository',
labelNames: ['tag', 'phase'], // phase: fn_failed, prepare_left_failed, prepare_right_failed, commit_left_failed, commit_right_failed, rollback_left_failed, rollback_right_failed, run_failed
})
export const dualWriteComparisonCounter = new Counter({
name: 'person_dualwrite_comparison_total',
help: 'Comparison results between primary and secondary databases in dual-write mode',
labelNames: ['operation', 'comparison_type', 'result'], // operation: createPerson, updatePerson, etc., comparison_type: success_match, data_mismatch, error_mismatch, result: match, mismatch
})
export const dualWriteDataMismatchCounter = new Counter({
name: 'person_dualwrite_data_mismatch_total',
help: 'Detailed data mismatches between primary and secondary databases',
labelNames: ['operation', 'field'], // field: properties, version, is_identified, etc.
})
export function getVersionBucketLabel(version: number): string {
if (version === 0) {
return 'v0'

View File

@@ -0,0 +1,323 @@
import { DateTime } from 'luxon'
import { Properties } from '@posthog/plugin-scaffold'
import { TopicMessage } from '~/kafka/producer'
import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '~/types'
import { CreatePersonResult, MoveDistinctIdsResult } from '~/utils/db/db'
import { TransactionClient } from '~/utils/db/postgres'
import { dualWriteComparisonCounter, dualWriteDataMismatchCounter } from '../metrics'
import { PersonRepositoryTransaction } from './person-repository-transaction'
import { RawPostgresPersonRepository } from './raw-postgres-person-repository'
export class DualWritePersonRepositoryTransaction implements PersonRepositoryTransaction {
constructor(
private primaryRepo: RawPostgresPersonRepository,
private secondaryRepo: RawPostgresPersonRepository,
private lTx: TransactionClient,
private rTx: TransactionClient,
private comparisonEnabled: boolean = false
) {}
async createPerson(
createdAt: DateTime,
properties: Properties,
propertiesLastUpdatedAt: PropertiesLastUpdatedAt,
propertiesLastOperation: PropertiesLastOperation,
teamId: Team['id'],
isUserId: number | null,
isIdentified: boolean,
uuid: string,
distinctIds?: { distinctId: string; version?: number }[]
): Promise<CreatePersonResult> {
const p = await this.primaryRepo.createPerson(
createdAt,
properties,
propertiesLastUpdatedAt,
propertiesLastOperation,
teamId,
isUserId,
isIdentified,
uuid,
distinctIds,
this.lTx
)
if (!p.success) {
// We need to throw to trigger rollback, but preserve the error type
// so the outer repository can handle it appropriately
const error = new Error(`DualWrite primary create failed`)
;(error as any).result = p
throw error
}
// force same ID on secondary
const forcedId = Number(p.person.id)
const s = await this.secondaryRepo.createPerson(
createdAt,
properties,
propertiesLastUpdatedAt,
propertiesLastOperation,
teamId,
isUserId,
isIdentified,
uuid,
distinctIds,
this.rTx,
forcedId
)
if (!s.success) {
const error = new Error(`DualWrite secondary create failed`)
;(error as any).result = s
throw error
}
// Compare results between primary and secondary
if (this.comparisonEnabled) {
this.compareCreatePersonResults(p, s)
}
return p
}
async updatePerson(
person: InternalPerson,
update: Partial<InternalPerson>,
tag?: string
): Promise<[InternalPerson, TopicMessage[], boolean]> {
// Enforce version parity across primary/secondary: run primary first, then set secondary to primary's new version
const primaryOut = await this.primaryRepo.updatePerson(person, { ...update }, tag, this.lTx)
const primaryUpdated = primaryOut[0]
const secondaryOut = await this.secondaryRepo.updatePerson(
person,
{ ...update, version: primaryUpdated.version },
tag ? `${tag}-secondary` : undefined,
this.rTx
)
if (this.comparisonEnabled) {
this.compareUpdatePersonResults(primaryOut, secondaryOut, tag)
}
return primaryOut
}
async deletePerson(person: InternalPerson): Promise<TopicMessage[]> {
const [p, s] = await Promise.all([
this.primaryRepo.deletePerson(person, this.lTx),
this.secondaryRepo.deletePerson(person, this.rTx),
])
if (this.comparisonEnabled) {
this.compareTopicMessages('deletePerson', p, s)
}
return p
}
async addDistinctId(person: InternalPerson, distinctId: string, version: number): Promise<TopicMessage[]> {
const [p, s] = await Promise.all([
this.primaryRepo.addDistinctId(person, distinctId, version, this.lTx),
this.secondaryRepo.addDistinctId(person, distinctId, version, this.rTx),
])
if (this.comparisonEnabled) {
this.compareTopicMessages('addDistinctId', p, s)
}
return p
}
async moveDistinctIds(
source: InternalPerson,
target: InternalPerson,
limit?: number
): Promise<MoveDistinctIdsResult> {
const [p, s] = await Promise.all([
this.primaryRepo.moveDistinctIds(source, target, limit, this.lTx),
this.secondaryRepo.moveDistinctIds(source, target, limit, this.rTx),
])
// Match the behavior of the direct repository call:
// If both repositories return the same failure result, that's expected behavior
if (!p.success && !s.success && p.error === s.error) {
return p
}
if (p.success !== s.success || (!p.success && !s.success && p.error !== s.error)) {
if (this.comparisonEnabled) {
dualWriteComparisonCounter.inc({
operation: 'moveDistinctIds',
comparison_type: p.success !== s.success ? 'success_mismatch' : 'error_mismatch',
result: 'mismatch',
})
}
// In the direct repository, this causes a rollback via returning false from coordinator
// In transaction context, we should throw to trigger rollback
const pError = !p.success ? p.error : 'none'
const sError = !s.success ? s.error : 'none'
throw new Error(
`DualWrite moveDistinctIds mismatch: primary=${p.success}/${pError}, secondary=${s.success}/${sError}`
)
}
if (this.comparisonEnabled && p.success && s.success) {
this.compareTopicMessages('moveDistinctIds', p.messages || [], s.messages || [])
}
return p
}
async fetchPersonDistinctIds(person: InternalPerson, limit?: number): Promise<string[]> {
// This is a read operation, only use primary
return await this.primaryRepo.fetchPersonDistinctIds(person, limit, this.lTx)
}
async addPersonlessDistinctIdForMerge(teamId: Team['id'], distinctId: string): Promise<boolean> {
const [p, s] = await Promise.all([
this.primaryRepo.addPersonlessDistinctIdForMerge(teamId, distinctId, this.lTx),
this.secondaryRepo.addPersonlessDistinctIdForMerge(teamId, distinctId, this.rTx),
])
if (this.comparisonEnabled) {
if (p !== s) {
dualWriteComparisonCounter.inc({
operation: 'addPersonlessDistinctIdForMerge',
comparison_type: 'boolean_mismatch',
result: 'mismatch',
})
} else {
dualWriteComparisonCounter.inc({
operation: 'addPersonlessDistinctIdForMerge',
comparison_type: 'boolean_match',
result: 'match',
})
}
}
return p
}
async updateCohortsAndFeatureFlagsForMerge(
teamID: Team['id'],
sourcePersonID: InternalPerson['id'],
targetPersonID: InternalPerson['id']
): Promise<void> {
await Promise.all([
this.primaryRepo.updateCohortsAndFeatureFlagsForMerge(teamID, sourcePersonID, targetPersonID, this.lTx),
this.secondaryRepo.updateCohortsAndFeatureFlagsForMerge(teamID, sourcePersonID, targetPersonID, this.rTx),
])
}
private compareCreatePersonResults(primary: CreatePersonResult, secondary: CreatePersonResult): void {
if (primary.success !== secondary.success) {
dualWriteComparisonCounter.inc({
operation: 'createPerson_tx',
comparison_type: 'success_mismatch',
result: 'mismatch',
})
return
}
if (!primary.success || !secondary.success) {
if (!primary.success && !secondary.success && primary.error !== secondary.error) {
dualWriteComparisonCounter.inc({
operation: 'createPerson_tx',
comparison_type: 'error_mismatch',
result: 'mismatch',
})
} else {
dualWriteComparisonCounter.inc({
operation: 'createPerson_tx',
comparison_type: 'error_match',
result: 'match',
})
}
return
}
const p = primary.person
const s = secondary.person
let hasMismatch = false
if (JSON.stringify(p.properties) !== JSON.stringify(s.properties)) {
dualWriteDataMismatchCounter.inc({ operation: 'createPerson_tx', field: 'properties' })
hasMismatch = true
}
if (p.version !== s.version) {
dualWriteDataMismatchCounter.inc({ operation: 'createPerson_tx', field: 'version' })
hasMismatch = true
}
if (p.is_identified !== s.is_identified) {
dualWriteDataMismatchCounter.inc({ operation: 'createPerson_tx', field: 'is_identified' })
hasMismatch = true
}
if (p.is_user_id !== s.is_user_id) {
dualWriteDataMismatchCounter.inc({ operation: 'createPerson_tx', field: 'is_user_id' })
hasMismatch = true
}
dualWriteComparisonCounter.inc({
operation: 'createPerson_tx',
comparison_type: 'data_comparison',
result: hasMismatch ? 'mismatch' : 'match',
})
}
private compareUpdatePersonResults(
primary: [InternalPerson, TopicMessage[], boolean],
secondary: [InternalPerson, TopicMessage[], boolean],
tag?: string
): void {
const [pPerson, pMessages, pChanged] = primary
const [sPerson, sMessages, sChanged] = secondary
let hasMismatch = false
if (JSON.stringify(pPerson.properties) !== JSON.stringify(sPerson.properties)) {
dualWriteDataMismatchCounter.inc({ operation: `updatePerson_tx:${tag ?? 'update'}`, field: 'properties' })
hasMismatch = true
}
if (pPerson.version !== sPerson.version) {
dualWriteDataMismatchCounter.inc({ operation: `updatePerson_tx:${tag ?? 'update'}`, field: 'version' })
hasMismatch = true
}
if (pPerson.is_identified !== sPerson.is_identified) {
dualWriteDataMismatchCounter.inc({
operation: `updatePerson_tx:${tag ?? 'update'}`,
field: 'is_identified',
})
hasMismatch = true
}
if (pChanged !== sChanged) {
dualWriteDataMismatchCounter.inc({ operation: `updatePerson_tx:${tag ?? 'update'}`, field: 'changed_flag' })
hasMismatch = true
}
if (pMessages.length !== sMessages.length) {
dualWriteDataMismatchCounter.inc({
operation: `updatePerson_tx:${tag ?? 'update'}`,
field: 'message_count',
})
hasMismatch = true
}
dualWriteComparisonCounter.inc({
operation: `updatePerson_tx:${tag ?? 'update'}`,
comparison_type: 'data_comparison',
result: hasMismatch ? 'mismatch' : 'match',
})
}
private compareTopicMessages(operation: string, primary: TopicMessage[], secondary: TopicMessage[]): void {
if (primary.length !== secondary.length) {
dualWriteComparisonCounter.inc({
operation: `${operation}_tx`,
comparison_type: 'message_count_mismatch',
result: 'mismatch',
})
} else {
dualWriteComparisonCounter.inc({
operation: `${operation}_tx`,
comparison_type: 'message_count_match',
result: 'match',
})
}
}
}

View File

@@ -0,0 +1,492 @@
import { DateTime } from 'luxon'
import { Properties } from '@posthog/plugin-scaffold'
import { TopicMessage } from '../../../../kafka/producer'
import { InternalPerson, PropertiesLastOperation, PropertiesLastUpdatedAt, Team } from '../../../../types'
import { CreatePersonResult, MoveDistinctIdsResult } from '../../../../utils/db/db'
import { PostgresRouter, PostgresUse } from '../../../../utils/db/postgres'
import { TwoPhaseCommitCoordinator } from '../../../../utils/db/two-phase'
import { logger as _logger } from '../../../../utils/logger'
import { dualWriteComparisonCounter, dualWriteDataMismatchCounter } from '../metrics'
import { PersonUpdate } from '../person-update-batch'
import { DualWritePersonRepositoryTransaction } from './dualwrite-person-repository-transaction'
import { PersonRepository } from './person-repository'
import { PersonRepositoryTransaction } from './person-repository-transaction'
import type { PostgresPersonRepositoryOptions } from './postgres-person-repository'
import { PostgresPersonRepository } from './postgres-person-repository'
import { RawPostgresPersonRepository } from './raw-postgres-person-repository'
export interface PostgresDualWritePersonRepositoryOptions extends PostgresPersonRepositoryOptions {
comparisonEnabled?: boolean
}
export class PostgresDualWritePersonRepository implements PersonRepository {
private coordinator: TwoPhaseCommitCoordinator
private primaryRepo: RawPostgresPersonRepository
private secondaryRepo: RawPostgresPersonRepository
private comparisonEnabled: boolean
constructor(
primaryRouter: PostgresRouter,
secondaryRouter: PostgresRouter,
options?: Partial<PostgresDualWritePersonRepositoryOptions>
) {
this.primaryRepo = new PostgresPersonRepository(primaryRouter, options)
this.secondaryRepo = new PostgresPersonRepository(secondaryRouter, options)
this.comparisonEnabled = options?.comparisonEnabled ?? false
this.coordinator = new TwoPhaseCommitCoordinator({
left: { router: primaryRouter, use: PostgresUse.PERSONS_WRITE, name: 'primary' },
right: { router: secondaryRouter, use: PostgresUse.PERSONS_WRITE, name: 'secondary' },
})
}
// a read, just use the primary as the source of truth (will decide in the underlying logic whether to use reader/writer)
async fetchPerson(
teamId: Team['id'],
distinctId: string,
options?: { forUpdate?: boolean; useReadReplica?: boolean }
): Promise<InternalPerson | undefined> {
return await this.primaryRepo.fetchPerson(teamId, distinctId, options)
}
/*
* needs to have the exact same contract as the single-write repo
*/
async createPerson(
createdAt: DateTime,
properties: Properties,
propertiesLastUpdatedAt: PropertiesLastUpdatedAt,
propertiesLastOperation: PropertiesLastOperation,
teamId: Team['id'],
isUserId: number | null,
isIdentified: boolean,
uuid: string,
distinctIds?: { distinctId: string; version?: number }[]
): Promise<CreatePersonResult> {
let result!: CreatePersonResult
try {
await this.coordinator.run('createPerson', async (leftTx, rightTx) => {
// create is serial: create on primary first, then use returned id the DB generated on secondary
const p = await this.primaryRepo.createPerson(
createdAt,
properties,
propertiesLastUpdatedAt,
propertiesLastOperation,
teamId,
isUserId,
isIdentified,
uuid,
distinctIds,
leftTx
)
if (!p.success) {
result = p
throw new Error('DualWrite abort: primary creation conflict')
}
// force same id on secondary
const forcedId = Number(p.person.id)
const s = await this.secondaryRepo.createPerson(
createdAt,
properties,
propertiesLastUpdatedAt,
propertiesLastOperation,
teamId,
isUserId,
isIdentified,
uuid,
distinctIds,
rightTx,
forcedId
)
if (!s.success) {
result = s
throw new Error('DualWrite abort: secondary creation conflict')
}
// Compare results between primary and secondary
if (this.comparisonEnabled) {
this.compareCreatePersonResults(p, s)
}
result = p
return true
})
} catch (err) {
// if we captured a handled conflict from either side, surface it to match single-write behaviour
if (result && !result.success && result.error === 'CreationConflict') {
return result
}
throw err
}
return result
}
async updatePerson(
person: InternalPerson,
update: Partial<InternalPerson>,
tag?: string
): Promise<[InternalPerson, TopicMessage[], boolean]> {
// Enforce version parity across primary/secondary: run primary first, then set secondary to primary's new version
let primaryOut!: [InternalPerson, TopicMessage[], boolean]
await this.coordinator.run(`updatePerson:${tag ?? 'update'}`, async (leftTx, rightTx) => {
const p = await this.primaryRepo.updatePerson(person, { ...update }, tag, leftTx)
primaryOut = p
const primaryUpdated = p[0]
const secondaryUpdate: Partial<InternalPerson> = {
properties: primaryUpdated.properties,
properties_last_updated_at: primaryUpdated.properties_last_updated_at,
properties_last_operation: primaryUpdated.properties_last_operation,
is_identified: primaryUpdated.is_identified,
version: primaryUpdated.version,
}
const secondaryOut = await this.secondaryRepo.updatePerson(
person,
secondaryUpdate,
tag ? `${tag}-secondary` : undefined,
rightTx
)
// Compare results between primary and secondary
if (this.comparisonEnabled) {
this.compareUpdatePersonResults(primaryOut, secondaryOut, tag)
}
return true
})
return primaryOut
}
// No 2PC for this method, pretty sure its disabled in production
async updatePersonAssertVersion(personUpdate: PersonUpdate): Promise<[number | undefined, TopicMessage[]]> {
let primaryOut!: [number | undefined, TopicMessage[]]
await this.coordinator.run('updatePersonAssertVersion', async () => {
const p = await this.primaryRepo.updatePersonAssertVersion({ ...personUpdate })
primaryOut = p
// Only perform secondary if the optimistic update succeeded on primary
if (p[0] !== undefined) {
const s = await this.secondaryRepo.updatePersonAssertVersion({ ...personUpdate })
// Compare results
if (this.comparisonEnabled) {
if (p[0] !== s[0]) {
dualWriteComparisonCounter.inc({
operation: 'updatePersonAssertVersion',
comparison_type: 'version_mismatch',
result: 'mismatch',
})
} else {
dualWriteComparisonCounter.inc({
operation: 'updatePersonAssertVersion',
comparison_type: 'version_match',
result: 'match',
})
}
// Compare message counts
this.compareTopicMessages('updatePersonAssertVersion', p[1], s[1])
}
}
return true
})
return primaryOut
}
async deletePerson(person: InternalPerson): Promise<TopicMessage[]> {
let messages!: TopicMessage[]
await this.coordinator.run('deletePerson', async (lTx, rTx) => {
const [p, s] = await Promise.all([
this.primaryRepo.deletePerson(person, lTx),
this.secondaryRepo.deletePerson(person, rTx),
])
if (this.comparisonEnabled) {
this.compareTopicMessages('deletePerson', p, s)
}
messages = p
return true
})
return messages
}
async addDistinctId(person: InternalPerson, distinctId: string, version: number): Promise<TopicMessage[]> {
let messages!: TopicMessage[]
await this.coordinator.run('addDistinctId', async (lTx, rTx) => {
const [p, s] = await Promise.all([
this.primaryRepo.addDistinctId(person, distinctId, version, lTx),
this.secondaryRepo.addDistinctId(person, distinctId, version, rTx),
])
if (this.comparisonEnabled) {
this.compareTopicMessages('addDistinctId', p, s)
}
messages = p
return true
})
return messages
}
async moveDistinctIds(
source: InternalPerson,
target: InternalPerson,
limit?: number
): Promise<MoveDistinctIdsResult> {
let pResult!: MoveDistinctIdsResult
await this.coordinator.run('moveDistinctIds', async (lTx, rTx) => {
const [p, s] = await Promise.all([
this.primaryRepo.moveDistinctIds(source, target, limit, lTx),
this.secondaryRepo.moveDistinctIds(source, target, limit, rTx),
])
// If both repositories return the same failure result, that's expected behavior
// (e.g., both detected that the target person was deleted)
if (!p.success && !s.success && p.error === s.error) {
pResult = p
// return false to rollback the transaction; the database failed anyhow so probably don't need to rollback
return false
}
// If there's a mismatch in success or error type, that's unexpected
if (p.success !== s.success || (!p.success && !s.success && p.error !== s.error)) {
// Emit metric for mismatch
if (this.comparisonEnabled) {
dualWriteComparisonCounter.inc({
operation: 'moveDistinctIds',
comparison_type: p.success !== s.success ? 'success_mismatch' : 'error_mismatch',
result: 'mismatch',
})
}
pResult = p
// need to make sure we rollback this transaction
return false
}
pResult = p
return p.success
})
return pResult
}
async addPersonlessDistinctId(teamId: Team['id'], distinctId: string): Promise<boolean> {
let isMerged!: boolean
await this.coordinator.run('addPersonlessDistinctId', async (lTx, rTx) => {
const [p, s] = await Promise.all([
this.primaryRepo.addPersonlessDistinctId(teamId, distinctId, lTx),
this.secondaryRepo.addPersonlessDistinctId(teamId, distinctId, rTx),
])
if (this.comparisonEnabled) {
if (p !== s) {
dualWriteComparisonCounter.inc({
operation: 'addPersonlessDistinctId',
comparison_type: 'boolean_mismatch',
result: 'mismatch',
})
} else {
dualWriteComparisonCounter.inc({
operation: 'addPersonlessDistinctId',
comparison_type: 'boolean_match',
result: 'match',
})
}
}
isMerged = p
})
return isMerged
}
async fetchPersonDistinctIds(person: InternalPerson, limit?: number): Promise<string[]> {
// This is a read operation, only use primary
return await this.primaryRepo.fetchPersonDistinctIds(person, limit)
}
async addPersonlessDistinctIdForMerge(teamId: Team['id'], distinctId: string): Promise<boolean> {
let isMerged!: boolean
await this.coordinator.run('addPersonlessDistinctIdForMerge', async (lTx, rTx) => {
const [p, s] = await Promise.all([
this.primaryRepo.addPersonlessDistinctIdForMerge(teamId, distinctId, lTx),
this.secondaryRepo.addPersonlessDistinctIdForMerge(teamId, distinctId, rTx),
])
if (this.comparisonEnabled) {
if (p !== s) {
dualWriteComparisonCounter.inc({
operation: 'addPersonlessDistinctIdForMerge',
comparison_type: 'boolean_mismatch',
result: 'mismatch',
})
} else {
dualWriteComparisonCounter.inc({
operation: 'addPersonlessDistinctIdForMerge',
comparison_type: 'boolean_match',
result: 'match',
})
}
}
isMerged = p
})
return isMerged
}
async personPropertiesSize(personId: string): Promise<number> {
return await this.primaryRepo.personPropertiesSize(personId)
}
async updateCohortsAndFeatureFlagsForMerge(
teamID: Team['id'],
sourcePersonID: InternalPerson['id'],
targetPersonID: InternalPerson['id']
): Promise<void> {
await this.coordinator.run('updateCohortsAndFeatureFlagsForMerge', async (lTx, rTx) => {
await Promise.all([
this.primaryRepo.updateCohortsAndFeatureFlagsForMerge(teamID, sourcePersonID, targetPersonID, lTx),
this.secondaryRepo.updateCohortsAndFeatureFlagsForMerge(teamID, sourcePersonID, targetPersonID, rTx),
])
return true
})
}
async inTransaction<T>(
description: string,
transaction: (tx: PersonRepositoryTransaction) => Promise<T>
): Promise<T> {
// Open a 2PC boundary spanning the entire callback.
let result!: T
try {
await this.coordinator.run(`dual-tx:${description}`, async (lTx, rTx) => {
const txWrapper = new DualWritePersonRepositoryTransaction(
this.primaryRepo,
this.secondaryRepo,
lTx,
rTx,
this.comparisonEnabled
)
result = await transaction(txWrapper)
return true
})
} catch (err: any) {
// Handle special cases where the transaction wrapper throws but we want to return a result
// This matches the behavior of the direct createPerson method
if (err.result && !err.result.success && err.result.error === 'CreationConflict') {
return err.result as T
}
throw err
}
return result
}
private compareCreatePersonResults(primary: CreatePersonResult, secondary: CreatePersonResult): void {
if (primary.success !== secondary.success) {
dualWriteComparisonCounter.inc({
operation: 'createPerson',
comparison_type: 'success_mismatch',
result: 'mismatch',
})
return
}
if (!primary.success || !secondary.success) {
// Both failed, check if error types match
if (!primary.success && !secondary.success && primary.error !== secondary.error) {
dualWriteComparisonCounter.inc({
operation: 'createPerson',
comparison_type: 'error_mismatch',
result: 'mismatch',
})
} else {
dualWriteComparisonCounter.inc({
operation: 'createPerson',
comparison_type: 'error_match',
result: 'match',
})
}
return
}
// Both succeeded, compare person data
const p = primary.person
const s = secondary.person
let hasMismatch = false
if (JSON.stringify(p.properties) !== JSON.stringify(s.properties)) {
dualWriteDataMismatchCounter.inc({ operation: 'createPerson', field: 'properties' })
hasMismatch = true
}
if (p.version !== s.version) {
dualWriteDataMismatchCounter.inc({ operation: 'createPerson', field: 'version' })
hasMismatch = true
}
if (p.is_identified !== s.is_identified) {
dualWriteDataMismatchCounter.inc({ operation: 'createPerson', field: 'is_identified' })
hasMismatch = true
}
if (p.is_user_id !== s.is_user_id) {
dualWriteDataMismatchCounter.inc({ operation: 'createPerson', field: 'is_user_id' })
hasMismatch = true
}
dualWriteComparisonCounter.inc({
operation: 'createPerson',
comparison_type: 'data_comparison',
result: hasMismatch ? 'mismatch' : 'match',
})
}
private compareUpdatePersonResults(
primary: [InternalPerson, TopicMessage[], boolean],
secondary: [InternalPerson, TopicMessage[], boolean],
tag?: string
): void {
const [pPerson, pMessages, pChanged] = primary
const [sPerson, sMessages, sChanged] = secondary
let hasMismatch = false
if (JSON.stringify(pPerson.properties) !== JSON.stringify(sPerson.properties)) {
dualWriteDataMismatchCounter.inc({ operation: `updatePerson:${tag ?? 'update'}`, field: 'properties' })
hasMismatch = true
}
if (pPerson.version !== sPerson.version) {
dualWriteDataMismatchCounter.inc({ operation: `updatePerson:${tag ?? 'update'}`, field: 'version' })
hasMismatch = true
}
if (pPerson.is_identified !== sPerson.is_identified) {
dualWriteDataMismatchCounter.inc({ operation: `updatePerson:${tag ?? 'update'}`, field: 'is_identified' })
hasMismatch = true
}
if (pChanged !== sChanged) {
dualWriteDataMismatchCounter.inc({ operation: `updatePerson:${tag ?? 'update'}`, field: 'changed_flag' })
hasMismatch = true
}
if (pMessages.length !== sMessages.length) {
dualWriteDataMismatchCounter.inc({ operation: `updatePerson:${tag ?? 'update'}`, field: 'message_count' })
hasMismatch = true
}
dualWriteComparisonCounter.inc({
operation: `updatePerson:${tag ?? 'update'}`,
comparison_type: 'data_comparison',
result: hasMismatch ? 'mismatch' : 'match',
})
}
private compareTopicMessages(operation: string, primary: TopicMessage[], secondary: TopicMessage[]): void {
if (primary.length !== secondary.length) {
dualWriteComparisonCounter.inc({
operation,
comparison_type: 'message_count_mismatch',
result: 'mismatch',
})
} else {
dualWriteComparisonCounter.inc({
operation,
comparison_type: 'message_count_match',
result: 'match',
})
}
}
}

View File

@@ -62,7 +62,8 @@ export class PostgresPersonRepository
private async handleOversizedPersonProperties(
person: InternalPerson,
update: Partial<InternalPerson>
update: Partial<InternalPerson>,
tx?: TransactionClient
): Promise<[InternalPerson, TopicMessage[], boolean]> {
const currentSize = await this.personPropertiesSize(person.id)
@@ -71,7 +72,7 @@ export class PostgresPersonRepository
personPropertiesSizeViolationCounter.inc({
violation_type: 'existing_record_violates_limit',
})
return await this.handleExistingOversizedRecord(person, update)
return await this.handleExistingOversizedRecord(person, update, tx)
} catch (error) {
logger.warn('Failed to handle previously oversized person record', {
team_id: person.team_id,
@@ -107,7 +108,8 @@ export class PostgresPersonRepository
private async handleExistingOversizedRecord(
person: InternalPerson,
update: Partial<InternalPerson>
update: Partial<InternalPerson>,
tx?: TransactionClient
): Promise<[InternalPerson, TopicMessage[], boolean]> {
try {
const trimmedProperties = this.trimPropertiesToFitSize(
@@ -125,7 +127,8 @@ export class PostgresPersonRepository
const [updatedPerson, kafkaMessages, versionDisparity] = await this.updatePerson(
person,
trimmedUpdate,
'oversized_properties_remediation'
'oversized_properties_remediation',
tx
)
oversizedPersonPropertiesTrimmedCounter.inc({ result: 'success' })
return [updatedPerson, kafkaMessages, versionDisparity]
@@ -258,9 +261,11 @@ export class PostgresPersonRepository
isIdentified: boolean,
uuid: string,
distinctIds?: { distinctId: string; version?: number }[],
tx?: TransactionClient
tx?: TransactionClient,
// Used to support dual-write; we want to force the id a person is created with to prevent drift
forcedId?: number
): Promise<CreatePersonResult> {
distinctIds ||= []
distinctIds = distinctIds || []
for (const distinctId of distinctIds) {
distinctId.version ||= 0
@@ -270,47 +275,66 @@ export class PostgresPersonRepository
const personVersion = 0
try {
const { rows } = await this.postgres.query<RawPerson>(
tx ?? PostgresUse.PERSONS_WRITE,
const baseColumns = [
'created_at',
'properties',
'properties_last_updated_at',
'properties_last_operation',
'team_id',
'is_user_id',
'is_identified',
'uuid',
'version',
]
const columns = forcedId ? ['id', ...baseColumns] : baseColumns
const valuePlaceholders = columns.map((_, i) => `$${i + 1}`).join(', ')
const baseParams = [
createdAt.toISO(),
sanitizeJsonbValue(properties),
sanitizeJsonbValue(propertiesLastUpdatedAt),
sanitizeJsonbValue(propertiesLastOperation),
teamId,
isUserId,
isIdentified,
uuid,
personVersion,
]
const personParams = forcedId ? [forcedId, ...baseParams] : baseParams
// Find the actual index of team_id in the personParams array (1-indexed for SQL)
const teamIdParamIndex = personParams.indexOf(teamId) + 1
const distinctIdVersionStartIndex = columns.length + 1
const distinctIdStartIndex = distinctIdVersionStartIndex + distinctIds.length
const query =
`WITH inserted_person AS (
INSERT INTO posthog_person (
created_at, properties, properties_last_updated_at,
properties_last_operation, team_id, is_user_id, is_identified, uuid, version
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
INSERT INTO posthog_person (${columns.join(', ')})
VALUES (${valuePlaceholders})
RETURNING *
)` +
distinctIds
.map(
// NOTE: Keep this in sync with the posthog_persondistinctid INSERT in
// `addDistinctId`
(_, index) => `, distinct_id_${index} AS (
distinctIds
.map(
// NOTE: Keep this in sync with the posthog_persondistinctid INSERT in
// `addDistinctId`
(_, index) => `, distinct_id_${index} AS (
INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
VALUES (
$${11 + index + distinctIds!.length - 1},
$${distinctIdStartIndex + index},
(SELECT id FROM inserted_person),
$5,
$${10 + index})
$${teamIdParamIndex},
$${distinctIdVersionStartIndex + index})
)`
)
.join('') +
`SELECT * FROM inserted_person;`,
)
.join('') +
` SELECT * FROM inserted_person;`
const { rows } = await this.postgres.query<RawPerson>(
tx ?? PostgresUse.PERSONS_WRITE,
query,
[
createdAt.toISO(),
sanitizeJsonbValue(properties),
sanitizeJsonbValue(propertiesLastUpdatedAt),
sanitizeJsonbValue(propertiesLastOperation),
teamId,
isUserId,
isIdentified,
uuid,
personVersion,
// The copy and reverse here is to maintain compatability with pre-existing code
// and tests. Postgres appears to assign IDs in reverse order of the INSERTs in the
// CTEs above, so we need to reverse the distinctIds to match the old behavior where
// we would do a round trip for each INSERT. We shouldn't actually depend on the
// `id` column of distinct_ids, so this is just a simple way to keeps tests exactly
// the same and prove behavior is the same as before.
...personParams,
...distinctIds
.slice()
.reverse()
@@ -353,6 +377,7 @@ export class PostgresPersonRepository
} catch (error) {
// Handle constraint violation - another process created the person concurrently
if (error instanceof Error && error.message.includes('unique constraint')) {
// This is not of type CreatePersonResult?
return {
success: false,
error: 'CreationConflict',
@@ -587,9 +612,9 @@ export class PostgresPersonRepository
return rows.map((row) => row.distinct_id)
}
async addPersonlessDistinctId(teamId: number, distinctId: string): Promise<boolean> {
async addPersonlessDistinctId(teamId: number, distinctId: string, tx?: TransactionClient): Promise<boolean> {
const result = await this.postgres.query(
PostgresUse.PERSONS_WRITE,
tx ?? PostgresUse.PERSONS_WRITE,
`
INSERT INTO posthog_personlessdistinctid (team_id, distinct_id, is_merged, created_at)
VALUES ($1, $2, false, now())
@@ -606,7 +631,7 @@ export class PostgresPersonRepository
// ON CONFLICT ... DO NOTHING won't give us our RETURNING, so we have to do another SELECT
const existingResult = await this.postgres.query(
PostgresUse.PERSONS_WRITE,
tx ?? PostgresUse.PERSONS_WRITE,
`
SELECT is_merged
FROM posthog_personlessdistinctid
@@ -752,7 +777,7 @@ export class PostgresPersonRepository
return [updatedPerson, [kafkaMessage], versionDisparity > 0]
} catch (error) {
if (this.isPropertiesSizeConstraintViolation(error) && tag !== 'oversized_properties_remediation') {
return await this.handleOversizedPersonProperties(person, update)
return await this.handleOversizedPersonProperties(person, update, tx)
}
// Re-throw other errors

View File

@@ -25,7 +25,8 @@ export interface RawPostgresPersonRepository {
isIdentified: boolean,
uuid: string,
distinctIds?: { distinctId: string; version?: number }[],
tx?: TransactionClient
tx?: TransactionClient,
forcedId?: number
): Promise<CreatePersonResult>
updatePerson(
@@ -54,8 +55,7 @@ export interface RawPostgresPersonRepository {
): Promise<MoveDistinctIdsResult>
fetchPersonDistinctIds(person: InternalPerson, limit?: number, tx?: TransactionClient): Promise<string[]>
addPersonlessDistinctId(teamId: Team['id'], distinctId: string): Promise<boolean>
addPersonlessDistinctId(teamId: Team['id'], distinctId: string, tx?: TransactionClient): Promise<boolean>
addPersonlessDistinctIdForMerge(teamId: Team['id'], distinctId: string, tx?: TransactionClient): Promise<boolean>
personPropertiesSize(personId: string): Promise<number>

View File

@@ -0,0 +1,184 @@
import fs from 'fs'
import { DateTime } from 'luxon'
import path from 'path'
import { Hub, Team } from '~/types'
import { PostgresRouter, PostgresUse } from '~/utils/db/postgres'
import { CreatePersonResult } from '../../../../utils/db/db'
export const TEST_UUIDS = {
single: '11111111-1111-1111-1111-111111111111',
dual: '22222222-2222-2222-2222-222222222222',
}
export const TEST_TIMESTAMP = DateTime.fromISO('2024-01-15T10:30:00.000Z').toUTC()
export async function setupMigrationDb(migrationPostgres: PostgresRouter): Promise<void> {
const drops = [
'posthog_featureflaghashkeyoverride',
'posthog_cohortpeople',
'posthog_persondistinctid',
'posthog_personlessdistinctid',
'posthog_person',
]
for (const table of drops) {
await migrationPostgres.query(
PostgresUse.PERSONS_WRITE,
`DROP TABLE IF EXISTS ${table} CASCADE`,
[],
`drop-${table}`
)
}
const sqlPath = path.resolve(__dirname, '../../../../../sql/create_persons_tables.sql')
const sql = fs.readFileSync(sqlPath, 'utf8')
await migrationPostgres.query(PostgresUse.PERSONS_WRITE, sql, [], 'create-persons-schema-secondary')
}
export async function cleanupPrepared(hub: Hub) {
const routers = [hub.db.postgres, hub.db.postgresPersonMigration]
for (const r of routers) {
const res = await r.query(
PostgresUse.PERSONS_WRITE,
`SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'dualwrite:%'`,
[],
'list-prepared'
)
for (const row of res.rows) {
await r.query(
PostgresUse.PERSONS_WRITE,
`ROLLBACK PREPARED '${String(row.gid).replace(/'/g, "''")}'`,
[],
'rollback-prepared'
)
}
}
}
export async function getFirstTeam(postgres: PostgresRouter): Promise<Team> {
const teams = await postgres.query(
PostgresUse.COMMON_WRITE,
'SELECT * FROM posthog_team LIMIT 1',
[],
'getFirstTeam'
)
return teams.rows[0]
}
export async function assertConsistencyAcrossDatabases(
primaryRouter: PostgresRouter,
secondaryRouter: PostgresRouter,
query: string,
params: any[],
primaryTag: string,
secondaryTag: string
) {
const [primary, secondary] = await Promise.all([
primaryRouter.query(PostgresUse.PERSONS_READ, query, params, primaryTag),
secondaryRouter.query(PostgresUse.PERSONS_READ, query, params, secondaryTag),
])
expect(primary.rows).toEqual(secondary.rows)
}
export function mockDatabaseError(
router: PostgresRouter,
error: Error | { message: string; code?: string; constraint?: string },
tagPattern: string | RegExp
) {
const originalQuery = router.query.bind(router)
return jest.spyOn(router, 'query').mockImplementation((use: any, text: any, params: any, tag: string) => {
const shouldThrow =
typeof tagPattern === 'string' ? tag && tag.startsWith(tagPattern) : tag && tagPattern.test(tag)
if (shouldThrow) {
if (error instanceof Error) {
throw error
} else {
const e: any = new Error(error.message)
if (error.code) {
e.code = error.code
}
if ((error as any).constraint) {
e.constraint = (error as any).constraint
}
throw e
}
}
return originalQuery(use, text, params, tag)
})
}
export async function assertConsistentDatabaseErrorHandling<T>(
postgres: PostgresRouter,
error: Error | { message: string; code?: string; constraint?: string },
tagPattern: string | RegExp,
singleWriteOperation: () => Promise<T>,
dualWriteOperation: () => Promise<T>,
expectedError?: string | RegExp | ErrorConstructor
) {
const singleSpy = mockDatabaseError(postgres, error, tagPattern)
let singleError: any
try {
await singleWriteOperation()
} catch (e) {
singleError = e
}
singleSpy.mockRestore()
const dualSpy = mockDatabaseError(postgres, error, tagPattern)
let dualError: any
try {
await dualWriteOperation()
} catch (e) {
dualError = e
}
dualSpy.mockRestore()
if (expectedError) {
expect(singleError).toBeDefined()
expect(dualError).toBeDefined()
if (typeof expectedError === 'string') {
expect(singleError.message).toContain(expectedError)
expect(dualError.message).toContain(expectedError)
} else if (expectedError instanceof RegExp) {
expect(singleError.message).toMatch(expectedError)
expect(dualError.message).toMatch(expectedError)
} else {
expect(singleError).toBeInstanceOf(expectedError)
expect(dualError).toBeInstanceOf(expectedError)
}
} else {
expect(singleError).toBeDefined()
expect(dualError).toBeDefined()
expect(singleError.message).toBe(dualError.message)
if ((error as any).code) {
expect(singleError.code).toBe((error as any).code)
expect(dualError.code).toBe((error as any).code)
}
if ((error as any).constraint) {
expect(singleError.constraint).toBe((error as any).constraint)
expect(dualError.constraint).toBe((error as any).constraint)
}
}
}
export function assertCreatePersonContractParity(singleResult: CreatePersonResult, dualResult: CreatePersonResult) {
expect(singleResult.success).toBe(true)
expect(dualResult.success).toBe(true)
if (singleResult.success && dualResult.success) {
expect(singleResult.person.properties).toEqual(dualResult.person.properties)
}
}
export function assertCreatePersonConflictContractParity(
singleResult: CreatePersonResult,
dualResult: CreatePersonResult
) {
expect(singleResult.success).toBe(false)
expect(dualResult.success).toBe(false)
if (!singleResult.success && !dualResult.success) {
expect(singleResult.error).toBe(dualResult.error)
expect(singleResult.distinctIds).toEqual(dualResult.distinctIds)
}
}

File diff suppressed because it is too large Load Diff