mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
chore(persons): adds env var for json size trimming (#36415)
This commit is contained in:
@@ -273,7 +273,10 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
PERSON_BATCH_WRITING_MAX_OPTIMISTIC_UPDATE_RETRIES: 5,
|
||||
PERSON_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS: 50,
|
||||
PERSON_UPDATE_CALCULATE_PROPERTIES_SIZE: 0,
|
||||
PERSON_PROPERTIES_SIZE_LIMIT: 1024 * 1024, // 1MB default
|
||||
// DB constraint check uses pg_column_size(properties); default 512kb + 128kb = 655360 bytes
|
||||
PERSON_PROPERTIES_DB_CONSTRAINT_LIMIT_BYTES: 655360,
|
||||
// Trim target is the customer-facing limit (512kb)
|
||||
PERSON_PROPERTIES_TRIM_TARGET_BYTES: 512 * 1024,
|
||||
GROUP_BATCH_WRITING_MAX_CONCURRENT_UPDATES: 10,
|
||||
GROUP_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS: 50,
|
||||
GROUP_BATCH_WRITING_MAX_OPTIMISTIC_UPDATE_RETRIES: 5,
|
||||
|
||||
@@ -154,7 +154,8 @@ export class IngestionConsumer {
|
||||
this.personStore = new BatchWritingPersonsStore(
|
||||
new PostgresPersonRepository(this.hub.db.postgres, {
|
||||
calculatePropertiesSize: this.hub.PERSON_UPDATE_CALCULATE_PROPERTIES_SIZE,
|
||||
personPropertiesSizeLimit: this.hub.PERSON_PROPERTIES_SIZE_LIMIT,
|
||||
personPropertiesDbConstraintLimitBytes: this.hub.PERSON_PROPERTIES_DB_CONSTRAINT_LIMIT_BYTES,
|
||||
personPropertiesTrimTargetBytes: this.hub.PERSON_PROPERTIES_TRIM_TARGET_BYTES,
|
||||
}),
|
||||
this.hub.db.kafkaProducer,
|
||||
{
|
||||
|
||||
@@ -186,7 +186,8 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
|
||||
PERSON_BATCH_WRITING_MAX_OPTIMISTIC_UPDATE_RETRIES: number // maximum number of retries for optimistic update
|
||||
PERSON_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS: number // starting interval for exponential backoff between retries for optimistic update
|
||||
PERSON_UPDATE_CALCULATE_PROPERTIES_SIZE: number
|
||||
PERSON_PROPERTIES_SIZE_LIMIT: number // maximum size in bytes for person properties JSON
|
||||
PERSON_PROPERTIES_DB_CONSTRAINT_LIMIT_BYTES: number // maximum size in bytes for person properties JSON as stored, checked via pg_column_size(properties)
|
||||
PERSON_PROPERTIES_TRIM_TARGET_BYTES: number // target size in bytes we trim JSON to before writing (customer-facing 512kb)
|
||||
GROUP_BATCH_WRITING_MAX_CONCURRENT_UPDATES: number // maximum number of concurrent updates to groups table per batch
|
||||
GROUP_BATCH_WRITING_MAX_OPTIMISTIC_UPDATE_RETRIES: number // maximum number of retries for optimistic update
|
||||
GROUP_BATCH_WRITING_OPTIMISTIC_UPDATE_RETRY_INTERVAL_MS: number // starting interval for exponential backoff between retries for optimistic update
|
||||
|
||||
@@ -22,7 +22,8 @@ describe('PostgresPersonRepository', () => {
|
||||
postgres = hub.db.postgres
|
||||
repository = new PostgresPersonRepository(postgres, {
|
||||
calculatePropertiesSize: 0,
|
||||
personPropertiesSizeLimit: 1024 * 1024, // 1MB for tests
|
||||
personPropertiesDbConstraintLimitBytes: 1024 * 1024, // 1MB for tests
|
||||
personPropertiesTrimTargetBytes: 512 * 1024,
|
||||
})
|
||||
|
||||
const redis = await hub.redisPool.acquire()
|
||||
@@ -1141,7 +1142,8 @@ describe('PostgresPersonRepository', () => {
|
||||
beforeEach(() => {
|
||||
oversizedRepository = new PostgresPersonRepository(postgres, {
|
||||
calculatePropertiesSize: 0,
|
||||
personPropertiesSizeLimit: 50,
|
||||
personPropertiesDbConstraintLimitBytes: 50,
|
||||
personPropertiesTrimTargetBytes: 25,
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1730,11 +1732,13 @@ describe('PostgresPersonRepository', () => {
|
||||
|
||||
const repositoryWithCalculation = new PostgresPersonRepository(postgres, {
|
||||
calculatePropertiesSize: 100,
|
||||
personPropertiesSizeLimit: 1024 * 1024,
|
||||
personPropertiesDbConstraintLimitBytes: 1024 * 1024,
|
||||
personPropertiesTrimTargetBytes: 512 * 1024,
|
||||
})
|
||||
const repositoryWithoutCalculation = new PostgresPersonRepository(postgres, {
|
||||
calculatePropertiesSize: 0,
|
||||
personPropertiesSizeLimit: 1024 * 1024,
|
||||
personPropertiesDbConstraintLimitBytes: 1024 * 1024,
|
||||
personPropertiesTrimTargetBytes: 512 * 1024,
|
||||
})
|
||||
|
||||
const update = {
|
||||
@@ -1778,11 +1782,13 @@ describe('PostgresPersonRepository', () => {
|
||||
|
||||
const repositoryWithCalculation = new PostgresPersonRepository(postgres, {
|
||||
calculatePropertiesSize: 100,
|
||||
personPropertiesSizeLimit: 1024 * 1024,
|
||||
personPropertiesDbConstraintLimitBytes: 1024 * 1024,
|
||||
personPropertiesTrimTargetBytes: 512 * 1024,
|
||||
})
|
||||
const repositoryWithoutCalculation = new PostgresPersonRepository(postgres, {
|
||||
calculatePropertiesSize: 0,
|
||||
personPropertiesSizeLimit: 1024 * 1024,
|
||||
personPropertiesDbConstraintLimitBytes: 1024 * 1024,
|
||||
personPropertiesTrimTargetBytes: 512 * 1024,
|
||||
})
|
||||
|
||||
const createPersonUpdate = (person: InternalPerson, distinctId: string) => ({
|
||||
@@ -1826,7 +1832,8 @@ describe('PostgresPersonRepository', () => {
|
||||
const team = await getFirstTeam(hub)
|
||||
const defaultRepository = new PostgresPersonRepository(postgres, {
|
||||
calculatePropertiesSize: 0,
|
||||
personPropertiesSizeLimit: 1024 * 1024,
|
||||
personPropertiesDbConstraintLimitBytes: 1024 * 1024,
|
||||
personPropertiesTrimTargetBytes: 512 * 1024,
|
||||
})
|
||||
|
||||
const person = await createTestPerson(team.id, 'test-default', { name: 'John' })
|
||||
|
||||
@@ -30,16 +30,21 @@ import { PersonRepositoryTransaction } from './person-repository-transaction'
|
||||
import { PostgresPersonRepositoryTransaction } from './postgres-person-repository-transaction'
|
||||
import { RawPostgresPersonRepository } from './raw-postgres-person-repository'
|
||||
|
||||
const DEFAULT_PERSON_PROPERTIES_SIZE_LIMIT = 512 * 1024
|
||||
const DEFAULT_PERSON_PROPERTIES_TRIM_TARGET_BYTES = 512 * 1024
|
||||
const DEFAULT_PERSON_PROPERTIES_DB_CONSTRAINT_LIMIT_BYTES = 655360
|
||||
|
||||
export interface PostgresPersonRepositoryOptions {
|
||||
calculatePropertiesSize: number
|
||||
personPropertiesSizeLimit: number
|
||||
/** Limit used when comparing pg_column_size(properties) to decide whether to remediate */
|
||||
personPropertiesDbConstraintLimitBytes: number
|
||||
/** Target JSON size (stringified) to trim down to when remediating oversized properties */
|
||||
personPropertiesTrimTargetBytes: number
|
||||
}
|
||||
|
||||
const DEFAULT_OPTIONS: PostgresPersonRepositoryOptions = {
|
||||
calculatePropertiesSize: 0,
|
||||
personPropertiesSizeLimit: DEFAULT_PERSON_PROPERTIES_SIZE_LIMIT,
|
||||
personPropertiesDbConstraintLimitBytes: DEFAULT_PERSON_PROPERTIES_DB_CONSTRAINT_LIMIT_BYTES,
|
||||
personPropertiesTrimTargetBytes: DEFAULT_PERSON_PROPERTIES_TRIM_TARGET_BYTES,
|
||||
}
|
||||
|
||||
export class PostgresPersonRepository
|
||||
@@ -60,7 +65,7 @@ export class PostgresPersonRepository
|
||||
): Promise<[InternalPerson, TopicMessage[], boolean]> {
|
||||
const currentSize = await this.personPropertiesSize(person.id)
|
||||
|
||||
if (currentSize >= this.options.personPropertiesSizeLimit) {
|
||||
if (currentSize >= this.options.personPropertiesDbConstraintLimitBytes) {
|
||||
try {
|
||||
personPropertiesSizeViolationCounter.inc({
|
||||
violation_type: 'existing_record_violates_limit',
|
||||
@@ -85,7 +90,7 @@ export class PostgresPersonRepository
|
||||
violation_type: 'attempt_to_violate_limit',
|
||||
})
|
||||
|
||||
logger.warn('Rejecting person properties create/update, exceeds size limit', {
|
||||
logger.warn('Rejecting person properties create/update, exceed size limit', {
|
||||
team_id: person.team_id,
|
||||
person_id: person.id,
|
||||
violation_type: 'attempt_to_violate_limit',
|
||||
@@ -108,7 +113,8 @@ export class PostgresPersonRepository
|
||||
// NOTE: we exclude the properties in the update and just try to trim the existing properties for simplicity
|
||||
// we are throwing data away either way
|
||||
person.properties,
|
||||
this.options.personPropertiesSizeLimit
|
||||
this.options.personPropertiesTrimTargetBytes,
|
||||
{ teamId: person.team_id, personId: person.id }
|
||||
)
|
||||
|
||||
const trimmedUpdate: Partial<InternalPerson> = {
|
||||
@@ -146,7 +152,11 @@ export class PostgresPersonRepository
|
||||
}
|
||||
}
|
||||
|
||||
private trimPropertiesToFitSize(properties: Record<string, any>, targetSizeBytes: number): Record<string, any> {
|
||||
private trimPropertiesToFitSize(
|
||||
properties: Record<string, any>,
|
||||
targetSizeBytes: number,
|
||||
context?: { teamId: number; personId: string }
|
||||
): Record<string, any> {
|
||||
const trimmedProperties = { ...properties }
|
||||
|
||||
let currentSizeBytes = Buffer.byteLength(JSON.stringify(trimmedProperties), 'utf8')
|
||||
@@ -187,6 +197,8 @@ export class PostgresPersonRepository
|
||||
target_size_bytes: targetSizeBytes,
|
||||
properties_removed: removedCount,
|
||||
final_property_count: Object.keys(trimmedProperties).length,
|
||||
team_id: context?.teamId,
|
||||
person_id: context?.personId,
|
||||
})
|
||||
return trimmedProperties
|
||||
}
|
||||
|
||||
@@ -111,7 +111,8 @@ exports[`EventPipelineRunner runEventPipeline() runs steps 1`] = `
|
||||
"personRepository": {
|
||||
"options": {
|
||||
"calculatePropertiesSize": 0,
|
||||
"personPropertiesSizeLimit": 524288,
|
||||
"personPropertiesDbConstraintLimitBytes": 655360,
|
||||
"personPropertiesTrimTargetBytes": 524288,
|
||||
},
|
||||
},
|
||||
"personUpdateCache": {},
|
||||
|
||||
Reference in New Issue
Block a user