revert: revert: Revert person properties updates refactor (#10349)

* Revert "revert: Revert person properties updates refactor (#10348)"

This reverts commit 6b3c4691b3.

* sanitizeEvent -> normalizeEvent

* Ensure we handle property updates from within plugins, test

Co-authored-by: Karl-Aksel Puulmann <oxymaccy@gmail.com>
This commit is contained in:
Michael Matloka
2022-06-20 08:49:11 +02:00
committed by GitHub
parent db554333eb
commit 313226838c
29 changed files with 858 additions and 482 deletions

View File

@@ -7,4 +7,5 @@ module.exports = {
coverageProvider: 'v8',
setupFilesAfterEnv: ['./jest.setup.fetch-mock.js'],
testMatch: ['<rootDir>/tests/**/*.test.ts'],
testTimeout: 60000,
}

View File

@@ -2,8 +2,8 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { EachBatchPayload, KafkaMessage } from 'kafkajs'
import { Hub, WorkerMethods } from '../../../types'
import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { sanitizeEvent } from '../../../utils/utils'
import { groupIntoBatches } from '../../../utils/utils'
import { KafkaQueue } from '../kafka-queue'
import { eachBatch } from './each-batch'
@@ -12,7 +12,7 @@ export function formPluginEvent(message: KafkaMessage): PluginEvent {
// TODO: inefficient to do this twice?
const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString())
const combinedEvent = { ...rawEvent, ...JSON.parse(dataStr) }
const event: PluginEvent = sanitizeEvent({
const event: PluginEvent = normalizeEvent({
...combinedEvent,
site_url: combinedEvent.site_url || null,
ip: combinedEvent.ip || null,

View File

@@ -26,6 +26,7 @@ import { ActionManager } from './worker/ingestion/action-manager'
import { ActionMatcher } from './worker/ingestion/action-matcher'
import { HookCommander } from './worker/ingestion/hooks'
import { OrganizationManager } from './worker/ingestion/organization-manager'
import { PersonManager } from './worker/ingestion/person-manager'
import { EventsProcessor } from './worker/ingestion/process-event'
import { SiteUrlManager } from './worker/ingestion/site-url-manager'
import { TeamManager } from './worker/ingestion/team-manager'
@@ -187,6 +188,7 @@ export interface Hub extends PluginsServerConfig {
actionMatcher: ActionMatcher
hookCannon: HookCommander
eventsProcessor: EventsProcessor
personManager: PersonManager
jobQueueManager: JobQueueManager
siteUrlManager: SiteUrlManager
// diagnostics

View File

@@ -20,6 +20,7 @@ import { ActionManager } from '../../worker/ingestion/action-manager'
import { ActionMatcher } from '../../worker/ingestion/action-matcher'
import { HookCommander } from '../../worker/ingestion/hooks'
import { OrganizationManager } from '../../worker/ingestion/organization-manager'
import { PersonManager } from '../../worker/ingestion/person-manager'
import { EventsProcessor } from '../../worker/ingestion/process-event'
import { SiteUrlManager } from '../../worker/ingestion/site-url-manager'
import { TeamManager } from '../../worker/ingestion/team-manager'
@@ -251,6 +252,7 @@ export async function createHub(
// :TODO: This is only used on worker threads, not main
hub.eventsProcessor = new EventsProcessor(hub as Hub)
hub.personManager = new PersonManager(hub as Hub)
hub.jobQueueManager = new JobQueueManager(hub as Hub)
hub.hookCannon = new HookCommander(db, teamManager, organizationManager, siteUrlManager, statsd)

View File

@@ -1,7 +1,8 @@
import { ProcessedPluginEvent } from '@posthog/plugin-scaffold'
import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
import { ClickhouseEventKafka, IngestionEvent } from '../types'
import { chainToElements } from './db/elements-chain'
import { personInitialAndUTMProperties } from './db/utils'
import { clickHouseTimestampToISO } from './utils'
export function convertToProcessedPluginEvent(event: IngestionEvent): ProcessedPluginEvent {
@@ -33,3 +34,20 @@ export function convertToIngestionEvent(event: ClickhouseEventKafka): IngestionE
elementsList: event.elements_chain ? chainToElements(event.elements_chain) : [],
}
}
export function normalizeEvent(event: PluginEvent): PluginEvent {
event.distinct_id = event.distinct_id?.toString()
let properties = event.properties ?? {}
if (event['$set']) {
properties['$set'] = { ...properties['$set'], ...event['$set'] }
}
if (event['$set_once']) {
properties['$set_once'] = { ...properties['$set_once'], ...event['$set_once'] }
}
if (event.event !== '$snapshot') {
properties = personInitialAndUTMProperties(properties)
}
event.properties = properties
return event
}

View File

@@ -1,5 +1,4 @@
import Piscina from '@posthog/piscina'
import { PluginEvent } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import AdmZip from 'adm-zip'
import { randomBytes } from 'crypto'
@@ -480,11 +479,6 @@ export function createPostgresPool(
return pgPool
}
export function sanitizeEvent(event: PluginEvent): PluginEvent {
event.distinct_id = event.distinct_id?.toString()
return event
}
export function getPiscinaStats(piscina: Piscina): Record<string, number> {
return {
utilization: (piscina.utilization || 0) * 100,

View File

@@ -1,18 +1,18 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
import { normalizeEvent } from '../../../utils/event'
import { parseEventTimestamp } from '../timestamps'
import { EventPipelineRunner, StepResult } from './runner'
export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise<StepResult> {
const { ip, site_url, team_id, now, sent_at, uuid } = event
const distinctId = String(event.distinct_id)
// :TRICKY: plugins might have modified the event, so re-sanitize
const { ip, site_url, team_id, uuid } = normalizeEvent(event)
const preIngestionEvent = await runner.hub.eventsProcessor.processEvent(
distinctId,
String(event.distinct_id),
ip,
event,
team_id,
DateTime.fromISO(now),
sent_at ? DateTime.fromISO(sent_at) : null,
parseEventTimestamp(event, runner.hub.statsd),
uuid! // it will throw if it's undefined,
)

View File

@@ -5,11 +5,11 @@ import { Hub, IngestionEvent, PreIngestionEvent } from '../../../types'
import { timeoutGuard } from '../../../utils/db/utils'
import { status } from '../../../utils/status'
import { generateEventDeadLetterQueueMessage } from '../utils'
import { createEventStep } from './createEventStep'
import { emitToBufferStep } from './emitToBufferStep'
import { pluginsProcessEventStep } from './pluginsProcessEventStep'
import { prepareEventStep } from './prepareEventStep'
import { runAsyncHandlersStep } from './runAsyncHandlersStep'
import { pluginsProcessEventStep } from './1-pluginsProcessEventStep'
import { prepareEventStep } from './2-prepareEventStep'
import { emitToBufferStep } from './3-emitToBufferStep'
import { createEventStep } from './4-createEventStep'
import { runAsyncHandlersStep } from './5-runAsyncHandlersStep'
export type StepParameters<T extends (...args: any[]) => any> = T extends (
runner: EventPipelineRunner,

View File

@@ -1,4 +1,4 @@
import { Properties } from '@posthog/plugin-scaffold'
import { PluginEvent, Properties } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import equal from 'fast-deep-equal'
import { StatsD } from 'hot-shots'
@@ -8,6 +8,7 @@ import { DatabaseError } from 'pg'
import { Person, PropertyUpdateOperation } from '../../types'
import { DB } from '../../utils/db/db'
import { timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
import { UUIDT } from '../../utils/utils'
import { PersonManager } from './person-manager'
@@ -36,7 +37,11 @@ const isDistinctIdIllegal = (id: string): boolean => {
}
// This class is responsible for creating/updating a single person through the process-event pipeline
export class PersonStateManager {
export class PersonState {
event: PluginEvent
distinctId: string
teamId: number
eventProperties: Properties
timestamp: DateTime
newUuid: string
@@ -44,45 +49,75 @@ export class PersonStateManager {
private statsd: StatsD | undefined
private personManager: PersonManager
constructor(timestamp: DateTime, db: DB, statsd: StatsD | undefined, personManager: PersonManager) {
constructor(
event: PluginEvent,
teamId: number,
distinctId: string,
timestamp: DateTime,
db: DB,
statsd: StatsD | undefined,
personManager: PersonManager,
uuid?: UUIDT
) {
this.event = event
this.distinctId = distinctId
this.teamId = teamId
this.eventProperties = event.properties!
this.timestamp = timestamp
this.newUuid = new UUIDT().toString()
this.newUuid = (uuid || new UUIDT()).toString()
this.db = db
this.statsd = statsd
this.personManager = personManager
}
async createPersonIfDistinctIdIsNew(
teamId: number,
distinctId: string,
timestamp: DateTime,
personUuid: string,
properties?: Properties,
propertiesOnce?: Properties
): Promise<boolean> {
const isNewPerson = await this.personManager.isNewPerson(this.db, teamId, distinctId)
async update(): Promise<Person | undefined> {
await this.handleIdentifyOrAlias()
return await this.updateProperties()
}
async updateProperties(): Promise<Person | undefined> {
const createdPerson = await this.createPersonIfDistinctIdIsNew()
if (
!createdPerson &&
(this.eventProperties['$set'] || this.eventProperties['$set_once'] || this.eventProperties['$unset'])
) {
return await this.updatePersonProperties()
}
return createdPerson
}
private async createPersonIfDistinctIdIsNew(): Promise<Person | undefined> {
const isNewPerson = await this.personManager.isNewPerson(this.db, this.teamId, this.distinctId)
if (isNewPerson) {
const properties = this.eventProperties['$set'] || {}
const propertiesOnce = this.eventProperties['$set_once'] || {}
// Catch race condition where in between getting and creating, another request already created this user
try {
await this.createPerson(
timestamp,
return await this.createPerson(
this.timestamp,
properties || {},
propertiesOnce || {},
teamId,
this.teamId,
null,
false,
personUuid.toString(),
[distinctId]
this.newUuid,
[this.distinctId]
)
return true
} catch (error) {
if (!error.message || !error.message.includes('duplicate key value violates unique constraint')) {
Sentry.captureException(error, { extra: { teamId, distinctId, timestamp, personUuid } })
Sentry.captureException(error, {
extra: {
teamId: this.teamId,
distinctId: this.distinctId,
timestamp: this.timestamp,
personUuid: this.newUuid,
},
})
}
}
}
return false
return undefined
}
private async createPerson(
@@ -120,30 +155,40 @@ export class PersonStateManager {
)
}
async updatePersonProperties(
teamId: number,
distinctId: string,
properties: Properties,
propertiesOnce: Properties,
unsetProperties: Array<string>
): Promise<void> {
const personFound = await this.db.fetchPerson(teamId, distinctId)
private async updatePersonProperties(): Promise<Person> {
const personFound = await this.db.fetchPerson(this.teamId, this.distinctId)
if (!personFound) {
this.statsd?.increment('person_not_found', { teamId: String(teamId), key: 'update' })
this.statsd?.increment('person_not_found', { teamId: String(this.teamId), key: 'update' })
throw new Error(
`Could not find person with distinct id "${distinctId}" in team "${teamId}" to update properties`
`Could not find person with distinct id "${this.distinctId}" in team "${this.teamId}" to update properties`
)
}
const updatedProperties = this.updatedPersonProperties(personFound.properties || {})
const arePersonsEqual = equal(personFound.properties, updatedProperties)
if (arePersonsEqual) {
return personFound
}
return await this.db.updatePersonDeprecated(personFound, { properties: updatedProperties })
}
private updatedPersonProperties(personProperties: Properties): Properties {
const updatedProperties = { ...personProperties }
const properties: Properties = this.eventProperties['$set'] || {}
const propertiesOnce: Properties = this.eventProperties['$set_once'] || {}
const unsetProperties: Array<string> = this.eventProperties['$unset'] || []
// Figure out which properties we are actually setting
const updatedProperties: Properties = { ...personFound.properties }
Object.entries(propertiesOnce).map(([key, value]) => {
if (typeof personFound?.properties[key] === 'undefined') {
if (typeof personProperties[key] === 'undefined') {
updatedProperties[key] = value
}
})
Object.entries(properties).map(([key, value]) => {
if (personFound?.properties[key] !== value) {
if (personProperties[key] !== value) {
updatedProperties[key] = value
}
})
@@ -152,32 +197,34 @@ export class PersonStateManager {
delete updatedProperties[propertyKey]
})
const arePersonsEqual = equal(personFound.properties, updatedProperties)
if (arePersonsEqual) {
return
}
await this.db.updatePersonDeprecated(personFound, { properties: updatedProperties })
return updatedProperties
}
// Alias & merge
async handleIdentifyOrAlias(
event: string,
properties: Properties,
distinctId: string,
teamId: number,
timestamp: DateTime
): Promise<void> {
if (isDistinctIdIllegal(distinctId)) {
this.statsd?.increment(`illegal_distinct_ids.total`, { distinctId })
async handleIdentifyOrAlias(): Promise<void> {
if (isDistinctIdIllegal(this.distinctId)) {
this.statsd?.increment(`illegal_distinct_ids.total`, { distinctId: this.distinctId })
return
}
if (event === '$create_alias') {
await this.merge(properties['alias'], distinctId, teamId, timestamp, false)
} else if (event === '$identify' && properties['$anon_distinct_id']) {
await this.merge(properties['$anon_distinct_id'], distinctId, teamId, timestamp, true)
const timeout = timeoutGuard('Still running "handleIdentifyOrAlias". Timeout warning after 30 sec!')
try {
if (this.event.event === '$create_alias') {
await this.merge(this.eventProperties['alias'], this.distinctId, this.teamId, this.timestamp, false)
} else if (this.event.event === '$identify' && this.eventProperties['$anon_distinct_id']) {
await this.merge(
this.eventProperties['$anon_distinct_id'],
this.distinctId,
this.teamId,
this.timestamp,
true
)
}
} catch (e) {
console.error('handleIdentifyOrAlias failed', e, this.event)
} finally {
clearTimeout(timeout)
}
}
@@ -212,9 +259,12 @@ export class PersonStateManager {
const oldPerson = await this.db.fetchPerson(teamId, previousDistinctId)
const newPerson = await this.db.fetchPerson(teamId, distinctId)
let updateAtEnd = false
if (oldPerson && !newPerson) {
try {
await this.db.addDistinctId(oldPerson, distinctId)
updateAtEnd = true
// Catch race case when somebody already added this distinct_id between .get and .addDistinctId
} catch {
// integrity error
@@ -233,6 +283,7 @@ export class PersonStateManager {
} else if (!oldPerson && newPerson) {
try {
await this.db.addDistinctId(newPerson, previousDistinctId)
updateAtEnd = true
// Catch race case when somebody already added this distinct_id between .get and .addDistinctId
} catch {
// integrity error
@@ -250,10 +301,16 @@ export class PersonStateManager {
}
} else if (!oldPerson && !newPerson) {
try {
await this.createPerson(timestamp, {}, {}, teamId, null, shouldIdentifyPerson, new UUIDT().toString(), [
distinctId,
previousDistinctId,
])
await this.createPerson(
timestamp,
this.eventProperties['$set'] || {},
this.eventProperties['$set_once'] || {},
teamId,
null,
shouldIdentifyPerson,
this.newUuid,
[distinctId, previousDistinctId]
)
} catch {
// Catch race condition where in between getting and creating,
// another request already created this person
@@ -276,6 +333,7 @@ export class PersonStateManager {
if (isIdentifyCallToMergeAnIdentifiedUser) {
status.warn('🤔', 'refused to merge an already identified user via an $identify call')
updateAtEnd = true
} else {
await this.mergePeople({
totalMergeAttempts,
@@ -287,9 +345,12 @@ export class PersonStateManager {
timestamp: timestamp,
})
}
} else {
updateAtEnd = true
}
if (shouldIdentifyPerson) {
// :KLUDGE: Only update isIdentified once, avoid recursively calling it or when not needed
if (shouldIdentifyPerson && updateAtEnd) {
await this.setIsIdentified(teamId, distinctId)
}
}
@@ -342,8 +403,8 @@ export class PersonStateManager {
mergeInto,
{
created_at: firstSeen,
properties: mergeInto.properties,
is_identified: mergeInto.is_identified || otherPerson.is_identified,
properties: this.updatedPersonProperties(mergeInto.properties),
is_identified: mergeInto.is_identified || otherPerson.is_identified || shouldIdentifyPerson,
},
client
)

View File

@@ -1,8 +1,7 @@
import ClickHouse from '@posthog/clickhouse'
import { PluginEvent, Properties } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import crypto from 'crypto'
import { DateTime, Duration } from 'luxon'
import { DateTime } from 'luxon'
import { Event as EventProto, IEvent } from '../../config/idl/protos'
import { KAFKA_EVENTS, KAFKA_SESSION_RECORDING_EVENTS } from '../../config/kafka-topics'
@@ -19,22 +18,14 @@ import {
import { DB, GroupIdentifier } from '../../utils/db/db'
import { elementsToString, extractElements } from '../../utils/db/elements-chain'
import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper'
import {
personInitialAndUTMProperties,
safeClickhouseString,
sanitizeEventName,
timeoutGuard,
} from '../../utils/db/utils'
import { status } from '../../utils/status'
import { safeClickhouseString, sanitizeEventName, timeoutGuard } from '../../utils/db/utils'
import { castTimestampOrNow, UUID } from '../../utils/utils'
import { KAFKA_BUFFER } from './../../config/kafka-topics'
import { GroupTypeManager } from './group-type-manager'
import { addGroupProperties } from './groups'
import { PersonManager } from './person-manager'
import { PersonStateManager } from './person-state-manager'
import { PersonState } from './person-state'
import { upsertGroup } from './properties-updater'
import { TeamManager } from './team-manager'
import { parseDate } from './utils'
export interface EventProcessingResult {
event: IEvent | SessionRecordingEvent | PostgresSessionRecordingEvent
@@ -48,7 +39,6 @@ export class EventsProcessor {
clickhouse: ClickHouse
kafkaProducer: KafkaProducerWrapper
teamManager: TeamManager
personManager: PersonManager
groupTypeManager: GroupTypeManager
clickhouseExternalSchemasDisabledTeams: Set<number>
@@ -58,7 +48,6 @@ export class EventsProcessor {
this.clickhouse = pluginsServer.clickhouse
this.kafkaProducer = pluginsServer.kafkaProducer
this.teamManager = pluginsServer.teamManager
this.personManager = new PersonManager(pluginsServer)
this.groupTypeManager = new GroupTypeManager(pluginsServer.db, this.teamManager, pluginsServer.SITE_URL)
this.clickhouseExternalSchemasDisabledTeams = new Set(
pluginsServer.CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS.split(',').filter(String).map(Number)
@@ -70,8 +59,7 @@ export class EventsProcessor {
ip: string | null,
data: PluginEvent,
teamId: number,
now: DateTime,
sentAt: DateTime | null,
timestamp: DateTime,
eventUuid: string
): Promise<PreIngestionEvent | null> {
if (!UUID.validateString(eventUuid, false)) {
@@ -84,43 +72,25 @@ export class EventsProcessor {
let result: PreIngestionEvent | null = null
try {
// Sanitize values, even though `sanitizeEvent` should have gotten to them
const properties: Properties = data.properties ?? {}
if (data['$set']) {
properties['$set'] = { ...properties['$set'], ...data['$set'] }
}
if (data['$set_once']) {
properties['$set_once'] = { ...properties['$set_once'], ...data['$set_once'] }
}
// TODO: we should just handle all person's related changes together not here and in capture separately
const parsedTs = this.handleTimestamp(data, now, sentAt)
const ts = parsedTs.isValid ? parsedTs : DateTime.now()
if (!parsedTs.isValid) {
this.pluginsServer.statsd?.increment('process_event_invalid_timestamp', { teamId: String(teamId) })
}
const timeout1 = timeoutGuard('Still running "handleIdentifyOrAlias". Timeout warning after 30 sec!', {
eventUuid,
})
// We know `normalizeEvent` has been called here.
const properties: Properties = data.properties!
const team = await this.teamManager.fetchTeam(teamId)
if (!team) {
throw new Error(`No team found with ID ${teamId}. Can't ingest event.`)
}
const personStateManager = new PersonStateManager(
ts,
const personState = new PersonState(
data,
teamId,
distinctId,
timestamp,
this.db,
this.pluginsServer.statsd,
this.personManager
this.pluginsServer.personManager
)
try {
await personStateManager.handleIdentifyOrAlias(data['event'], properties, distinctId, teamId, ts)
} catch (e) {
console.error('handleIdentifyOrAlias failed', e, data)
} finally {
clearTimeout(timeout1)
}
await personState.update()
if (data['event'] === '$snapshot') {
if (team.session_recording_opt_in) {
@@ -130,13 +100,12 @@ export class EventsProcessor {
)
try {
result = await this.createSessionRecordingEvent(
personStateManager,
eventUuid,
teamId,
distinctId,
properties['$session_id'],
properties['$window_id'],
ts,
timestamp,
properties['$snapshot_data'],
properties,
ip
@@ -152,16 +121,7 @@ export class EventsProcessor {
} else {
const timeout3 = timeoutGuard('Still running "capture". Timeout warning after 30 sec!', { eventUuid })
try {
result = await this.capture(
personStateManager,
eventUuid,
ip,
team,
data['event'],
distinctId,
properties,
ts
)
result = await this.capture(eventUuid, ip, team, data['event'], distinctId, properties, timestamp)
this.pluginsServer.statsd?.timing('kafka_queue.single_save.standard', singleSaveTimer, {
team_id: teamId.toString(),
})
@@ -175,28 +135,6 @@ export class EventsProcessor {
return result
}
public handleTimestamp(data: PluginEvent, now: DateTime, sentAt: DateTime | null): DateTime {
if (data['timestamp']) {
if (sentAt) {
// sent_at - timestamp == now - x
// x = now + (timestamp - sent_at)
try {
// timestamp and sent_at must both be in the same format: either both with or both without timezones
// otherwise we can't get a diff to add to now
return now.plus(parseDate(data['timestamp']).diff(sentAt))
} catch (error) {
status.error('⚠️', 'Error when handling timestamp:', error)
Sentry.captureException(error, { extra: { data, now, sentAt } })
}
}
return parseDate(data['timestamp'])
}
if (data['offset']) {
return now.minus(Duration.fromMillis(data['offset']))
}
return now
}
public clickhouseExternalSchemasEnabled(teamId: number): boolean {
if (this.pluginsServer.CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS) {
return false
@@ -205,7 +143,6 @@ export class EventsProcessor {
}
private async capture(
personStateManager: PersonStateManager,
eventUuid: string,
ip: string | null,
team: Team,
@@ -227,32 +164,11 @@ export class EventsProcessor {
properties['$ip'] = ip
}
properties = personInitialAndUTMProperties(properties)
await this.teamManager.updateEventNamesAndProperties(team.id, event, properties)
properties = await addGroupProperties(team.id, properties, this.groupTypeManager)
const createdNewPersonWithProperties = await personStateManager.createPersonIfDistinctIdIsNew(
team.id,
distinctId,
timestamp,
personStateManager.newUuid,
properties['$set'],
properties['$set_once']
)
if (event === '$groupidentify') {
await this.upsertGroup(team.id, properties, timestamp)
} else if (
!createdNewPersonWithProperties &&
(properties['$set'] || properties['$set_once'] || properties['$unset'])
) {
await personStateManager.updatePersonProperties(
team.id,
distinctId,
properties['$set'] || {},
properties['$set_once'] || {},
properties['$unset'] || []
)
}
return {
@@ -356,7 +272,6 @@ export class EventsProcessor {
}
private async createSessionRecordingEvent(
personStateManager: PersonStateManager,
uuid: string,
team_id: number,
distinct_id: string,
@@ -372,13 +287,6 @@ export class EventsProcessor {
this.kafkaProducer ? TimestampFormat.ClickHouse : TimestampFormat.ISO
)
await personStateManager.createPersonIfDistinctIdIsNew(
team_id,
distinct_id,
timestamp,
personStateManager.newUuid
)
const data: SessionRecordingEvent = {
uuid,
team_id: team_id,

View File

@@ -0,0 +1,48 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { StatsD } from 'hot-shots'
import { DateTime, Duration } from 'luxon'
import { status } from '../../utils/status'
export function parseEventTimestamp(data: PluginEvent, statsd?: StatsD | undefined): DateTime {
const now = DateTime.fromISO(data['now'])
const sentAt = data['sent_at'] ? DateTime.fromISO(data['sent_at']) : null
const parsedTs = handleTimestamp(data, now, sentAt)
const ts = parsedTs.isValid ? parsedTs : DateTime.now()
if (!parsedTs.isValid) {
statsd?.increment('process_event_invalid_timestamp', { teamId: String(data['team_id']) })
}
return ts
}
function handleTimestamp(data: PluginEvent, now: DateTime, sentAt: DateTime | null): DateTime {
if (data['timestamp']) {
if (sentAt) {
// sent_at - timestamp == now - x
// x = now + (timestamp - sent_at)
try {
// timestamp and sent_at must both be in the same format: either both with or both without timezones
// otherwise we can't get a diff to add to now
return now.plus(parseDate(data['timestamp']).diff(sentAt))
} catch (error) {
status.error('⚠️', 'Error when handling timestamp:', error)
Sentry.captureException(error, { extra: { data, now, sentAt } })
}
}
return parseDate(data['timestamp'])
}
if (data['offset']) {
return now.minus(Duration.fromMillis(data['offset']))
}
return now
}
export function parseDate(supposedIsoString: string): DateTime {
const jsDate = new Date(supposedIsoString)
if (Number.isNaN(jsDate.getTime())) {
return DateTime.fromISO(supposedIsoString)
}
return DateTime.fromJSDate(jsDate)
}

View File

@@ -54,11 +54,3 @@ export function generateEventDeadLetterQueueMessage(
}
return message
}
export function parseDate(supposedIsoString: string): DateTime {
const jsDate = new Date(supposedIsoString)
if (Number.isNaN(jsDate.getTime())) {
return DateTime.fromISO(supposedIsoString)
}
return DateTime.fromJSDate(jsDate)
}

View File

@@ -17,6 +17,7 @@ import {
Team,
} from '../../src/types'
import { createHub } from '../../src/utils/db/hub'
import { personInitialAndUTMProperties } from '../../src/utils/db/utils'
import { posthog } from '../../src/utils/posthog'
import { UUIDT } from '../../src/utils/utils'
import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner'
@@ -103,11 +104,10 @@ async function processEvent(
_siteUrl: string,
data: PluginEvent,
teamId: number,
now: DateTime,
sentAt: DateTime | null,
timestamp: DateTime,
eventUuid: string
): Promise<PreIngestionEvent | null> {
const response = await eventsProcessor.processEvent(distinctId, ip, data, teamId, now, sentAt, eventUuid)
const response = await eventsProcessor.processEvent(distinctId, ip, data, teamId, timestamp, eventUuid)
if (response) {
await eventsProcessor.createEvent(response)
}
@@ -209,7 +209,6 @@ test('merge people', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -229,7 +228,6 @@ test('merge people', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -258,37 +256,37 @@ test('capture new person', async () => {
expect(await hub.db.fetchEventDefinitions()).toEqual([])
expect(await hub.db.fetchPropertyDefinitions()).toEqual([])
const properties = personInitialAndUTMProperties({
distinct_id: 2,
token: team.api_token,
$browser: 'Chrome',
$current_url: 'https://test.com',
$os: 'Mac OS X',
$browser_version: '95',
$initial_referring_domain: 'https://google.com',
$initial_referrer_url: 'https://google.com/?q=posthog',
utm_medium: 'twitter',
gclid: 'GOOGLE ADS ID',
$elements: [
{ tag_name: 'a', nth_child: 1, nth_of_type: 2, attr__class: 'btn btn-sm' },
{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: '💻' },
],
})
await processEvent(
'2',
'127.0.0.1',
'',
{
event: '$autocapture',
properties: {
distinct_id: 2,
token: team.api_token,
$browser: 'Chrome',
$current_url: 'https://test.com',
$os: 'Mac OS X',
$browser_version: '95',
$initial_referring_domain: 'https://google.com',
$initial_referrer_url: 'https://google.com/?q=posthog',
utm_medium: 'twitter',
gclid: 'GOOGLE ADS ID',
$elements: [
{ tag_name: 'a', nth_child: 1, nth_of_type: 2, attr__class: 'btn btn-sm' },
{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: '💻' },
],
},
properties,
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
let persons = await hub.db.fetchPersons()
let events = await hub.db.fetchEvents()
expect(persons[0].version).toEqual(0)
expect(persons[0].created_at).toEqual(now)
let expectedProps = {
@@ -303,12 +301,14 @@ test('capture new person', async () => {
}
expect(persons[0].properties).toEqual(expectedProps)
await delayUntilEventIngested(() => hub.db.fetchEvents(), 1)
await delayUntilEventIngested(() => hub.db.fetchPersons(Database.ClickHouse), 1)
const chPeople = await hub.db.fetchPersons(Database.ClickHouse)
expect(chPeople.length).toEqual(1)
expect(JSON.parse(chPeople[0].properties)).toEqual(expectedProps)
expect(chPeople[0].created_at).toEqual(now.toFormat('yyyy-MM-dd HH:mm:ss.000'))
let events = await hub.db.fetchEvents()
expect(events[0].properties).toEqual({
$ip: '127.0.0.1',
$os: 'Mac OS X',
@@ -340,7 +340,7 @@ test('capture new person', async () => {
'',
{
event: '$autocapture',
properties: {
properties: personInitialAndUTMProperties({
distinct_id: 2,
token: team.api_token,
utm_medium: 'instagram',
@@ -351,11 +351,10 @@ test('capture new person', async () => {
{ tag_name: 'a', nth_child: 1, nth_of_type: 2, attr__class: 'btn btn-sm' },
{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: '💻' },
],
},
}),
} as any as PluginEvent,
team.id,
DateTime.now(),
DateTime.now(),
new UUIDT().toString()
)
@@ -412,7 +411,7 @@ test('capture new person', async () => {
'',
{
event: '$autocapture',
properties: {
properties: personInitialAndUTMProperties({
distinct_id: 2,
token: team.api_token,
utm_medium: 'instagram',
@@ -423,11 +422,10 @@ test('capture new person', async () => {
{ tag_name: 'a', nth_child: 1, nth_of_type: 2, attr__class: 'btn btn-sm' },
{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: '💻' },
],
},
}),
} as any as PluginEvent,
team.id,
DateTime.now(),
DateTime.now(),
new UUIDT().toString()
)
@@ -581,32 +579,6 @@ test('capture new person', async () => {
])
})
test('initial current domain regression test', async () => {
// we weren't capturing $initial_current_url if no utm tags were set
await processEvent(
'2',
'127.0.0.1',
'',
{
event: '$pageview',
properties: {
distinct_id: 2,
token: team.api_token,
$current_url: 'https://test.com',
},
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
const persons = await hub.db.fetchPersons()
expect(persons[0].properties).toEqual({
$initial_current_url: 'https://test.com',
})
})
test('capture bad team', async () => {
await expect(async () => {
await processEvent(
@@ -619,7 +591,6 @@ test('capture bad team', async () => {
} as any as PluginEvent,
1337,
now,
now,
new UUIDT().toString()
)
}).rejects.toThrowError("No team found with ID 1337. Can't ingest event.")
@@ -638,7 +609,6 @@ test('capture no element', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -647,94 +617,6 @@ test('capture no element', async () => {
expect(event.event).toBe('$pageview')
})
test('capture sent_at', async () => {
await createPerson(hub, team, ['asdfasdfasdf'])
const rightNow = DateTime.utc()
const tomorrow = rightNow.plus({ days: 1, hours: 2 })
const tomorrowSentAt = rightNow.plus({ days: 1, hours: 2, minutes: 10 })
await processEvent(
'movie played',
'',
'',
{
event: '$pageview',
timestamp: tomorrow.toISO(),
properties: { distinct_id: 'asdfasdfasdf', token: team.api_token },
} as any as PluginEvent,
team.id,
rightNow,
tomorrowSentAt,
new UUIDT().toString()
)
const [event] = await hub.db.fetchEvents()
const eventSecondsBeforeNow = rightNow.diff(DateTime.fromISO(event.timestamp), 'seconds').seconds
expect(eventSecondsBeforeNow).toBeGreaterThan(590)
expect(eventSecondsBeforeNow).toBeLessThan(610)
})
test('capture sent_at no timezones', async () => {
await createPerson(hub, team, ['asdfasdfasdf'])
const rightNow = DateTime.utc()
const tomorrow = rightNow.plus({ days: 1, hours: 2 }).setZone('UTC+4')
const tomorrowSentAt = rightNow.plus({ days: 1, hours: 2, minutes: 10 }).setZone('UTC+4')
// TODO: not sure if this is correct?
// tomorrow = tomorrow.replace(tzinfo=None)
// tomorrow_sent_at = tomorrow_sent_at.replace(tzinfo=None)
await processEvent(
'movie played',
'',
'',
{
event: '$pageview',
timestamp: tomorrow,
properties: { distinct_id: 'asdfasdfasdf', token: team.api_token },
} as any as PluginEvent,
team.id,
rightNow,
tomorrowSentAt,
new UUIDT().toString()
)
const [event] = await hub.db.fetchEvents()
const eventSecondsBeforeNow = rightNow.diff(DateTime.fromISO(event.timestamp), 'seconds').seconds
expect(eventSecondsBeforeNow).toBeGreaterThan(590)
expect(eventSecondsBeforeNow).toBeLessThan(610)
})
test('capture no sent_at', async () => {
await createPerson(hub, team, ['asdfasdfasdf'])
const rightNow = DateTime.utc()
const tomorrow = rightNow.plus({ days: 1, hours: 2 })
await processEvent(
'movie played',
'',
'',
{
event: '$pageview',
timestamp: tomorrow.toISO(),
properties: { distinct_id: 'asdfasdfasdf', token: team.api_token },
} as any as PluginEvent,
team.id,
rightNow,
null,
new UUIDT().toString()
)
const [event] = await hub.db.fetchEvents()
const difference = tomorrow.diff(DateTime.fromISO(event.timestamp), 'seconds').seconds
expect(difference).toBeLessThan(1)
})
test('ip none', async () => {
await createPerson(hub, team, ['asdfasdfasdf'])
@@ -748,7 +630,6 @@ test('ip none', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
const [event] = await hub.db.fetchEvents()
@@ -768,7 +649,6 @@ test('ip capture', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
const [event] = await hub.db.fetchEvents()
@@ -788,7 +668,6 @@ test('ip override', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -810,7 +689,6 @@ test('anonymized ip capture', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -831,7 +709,6 @@ test('alias', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -855,7 +732,6 @@ test('alias reverse', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -879,7 +755,6 @@ test('alias twice', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -903,7 +778,6 @@ test('alias twice', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
expect((await hub.db.fetchEvents()).length).toBe(2)
@@ -926,7 +800,6 @@ test('alias before person', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -952,7 +825,6 @@ test('alias both existing', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -963,44 +835,6 @@ test('alias both existing', async () => {
])
})
test('offset timestamp', async () => {
now = DateTime.fromISO('2020-01-01T12:00:05.200Z')
await processEvent(
'distinct_id1',
'',
'',
{ offset: 150, event: '$autocapture', distinct_id: 'distinct_id1' } as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
expect((await hub.db.fetchEvents()).length).toBe(1)
const [event] = await hub.db.fetchEvents()
expect(event.timestamp).toEqual('2020-01-01T12:00:05.050Z')
})
test('offset timestamp no sent_at', async () => {
now = DateTime.fromISO('2020-01-01T12:00:05.200Z')
await processEvent(
'distinct_id1',
'',
'',
{ offset: 150, event: '$autocapture', distinct_id: 'distinct_id1' } as any as PluginEvent,
team.id,
now,
null,
new UUIDT().toString()
)
expect((await hub.db.fetchEvents()).length).toBe(1)
const [event] = await hub.db.fetchEvents()
expect(event.timestamp).toEqual('2020-01-01T12:00:05.050Z')
})
test('alias merge properties', async () => {
await createPerson(hub, team, ['new_distinct_id'], {
key_on_both: 'new value both',
@@ -1021,7 +855,6 @@ test('alias merge properties', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1060,7 +893,6 @@ test('long htext', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1090,7 +922,6 @@ test('capture first team event', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1124,7 +955,6 @@ it('snapshot event not stored if session recording disabled', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
await delayUntilEventIngested(() => hub.db.fetchSessionRecordingEvents())
@@ -1146,7 +976,6 @@ test('snapshot event stored as session_recording_event', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
await delayUntilEventIngested(() => hub.db.fetchSessionRecordingEvents())
@@ -1173,7 +1002,6 @@ test('$snapshot event creates new person if needed', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
await delayUntilEventIngested(() => hub.db.fetchPersons())
@@ -1202,7 +1030,6 @@ test('identify set', async () => {
} as any as PluginEvent,
team.id,
ts_before,
ts_before,
new UUIDT().toString()
)
@@ -1230,7 +1057,6 @@ test('identify set', async () => {
} as any as PluginEvent,
team.id,
ts_after,
ts_after,
new UUIDT().toString()
)
expect((await hub.db.fetchEvents()).length).toBe(2)
@@ -1255,7 +1081,6 @@ test('identify set_once', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1283,7 +1108,6 @@ test('identify set_once', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
expect((await hub.db.fetchEvents()).length).toBe(2)
@@ -1313,7 +1137,6 @@ test('identify with illegal (generic) id', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
}
@@ -1363,7 +1186,6 @@ test('Alias with illegal (generic) id', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
// person with illegal id got created but not merged
@@ -1388,7 +1210,6 @@ test('distinct with anonymous_id', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1416,7 +1237,6 @@ test('distinct with anonymous_id', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
})
@@ -1444,7 +1264,6 @@ test('distinct with anonymous_id which was already created', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1471,7 +1290,6 @@ test('identify with the same distinct_id as anon_distinct_id', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1498,7 +1316,6 @@ test('distinct with multiple anonymous_ids which were already created', async ()
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1524,7 +1341,6 @@ test('distinct with multiple anonymous_ids which were already created', async ()
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1566,7 +1382,6 @@ test('distinct team leakage', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1889,7 +1704,6 @@ test('team event_properties', async () => {
{ event: 'purchase', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -1970,7 +1784,6 @@ test('event name object json', async () => {
{ event: { 'event name': 'as object' }, properties: {} } as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
const [event] = await hub.db.fetchEvents()
@@ -1985,7 +1798,6 @@ test('event name array json', async () => {
{ event: ['event name', 'a list'], properties: {} } as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
const [event] = await hub.db.fetchEvents()
@@ -2000,7 +1812,6 @@ test('long event name substr', async () => {
{ event: 'E'.repeat(300), properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent,
team.id,
DateTime.utc(),
DateTime.utc(),
new UUIDT().toString()
)
@@ -2017,7 +1828,6 @@ test('throws with bad uuid', async () => {
{ event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent,
team.id,
DateTime.utc(),
DateTime.utc(),
'this is not an uuid'
)
).rejects.toEqual(new Error('Not a valid UUID: "this is not an uuid"'))
@@ -2030,7 +1840,6 @@ test('throws with bad uuid', async () => {
{ event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent,
team.id,
DateTime.utc(),
DateTime.utc(),
null as any
)
).rejects.toEqual(new Error('Not a valid UUID: "null"'))
@@ -2053,7 +1862,6 @@ test('any event can do $set on props (user exists)', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -2082,7 +1890,6 @@ test('any event can do $set on props (new user)', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -2113,7 +1920,6 @@ test('any event can do $set_once on props', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -2140,7 +1946,6 @@ test('any event can do $set_once on props', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
expect((await hub.db.fetchEvents()).length).toBe(2)
@@ -2148,34 +1953,27 @@ test('any event can do $set_once on props', async () => {
expect(person2.properties).toEqual({ a_prop: 'test-1', b_prop: 'test-2b', c_prop: 'test-1' })
})
test('$set and $set_once merge with properties', async () => {
test('$set and $set_once', async () => {
await processEvent(
'distinct_id1',
'',
'',
{
event: 'some_event',
$set: { key1: 'value1', key2: 'value2' },
$set_once: { key1_once: 'value1', key2_once: 'value2' },
properties: {
token: team.api_token,
distinct_id: 'distinct_id1',
$set: { key2: 'value3', key3: 'value4' },
$set_once: { key2_once: 'value3', key3_once: 'value4' },
$set: { key1: 'value1', key2: 'value2', key3: 'value4' },
$set_once: { key1_once: 'value1', key2_once: 'value2', key3_once: 'value4' },
},
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
expect((await hub.db.fetchEvents()).length).toBe(1)
const [event] = await hub.db.fetchEvents()
expect(event.properties['$set']).toEqual({ key1: 'value1', key2: 'value2', key3: 'value4' })
expect(event.properties['$set_once']).toEqual({ key1_once: 'value1', key2_once: 'value2', key3_once: 'value4' })
const [person] = await hub.db.fetchPersons()
expect(await hub.db.fetchDistinctIdValues(person)).toEqual(['distinct_id1'])
expect(person.properties).toEqual({
@@ -2209,7 +2007,6 @@ test('groupidentify', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -2273,7 +2070,6 @@ test('$groupidentify updating properties', async () => {
} as any as PluginEvent,
team.id,
next,
next,
new UUIDT().toString()
)
@@ -2330,7 +2126,6 @@ test('person and group properties on events', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
await processEvent(
@@ -2351,7 +2146,6 @@ test('person and group properties on events', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
await processEvent(
@@ -2370,7 +2164,6 @@ test('person and group properties on events', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
@@ -2401,7 +2194,6 @@ test('set and set_once on the same key', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
expect((await hub.db.fetchEvents()).length).toBe(1)
@@ -2432,7 +2224,6 @@ test('$unset person property', async () => {
} as any as PluginEvent,
team.id,
now,
now,
new UUIDT().toString()
)
expect((await hub.db.fetchEvents()).length).toBe(1)
@@ -2491,12 +2282,13 @@ describe('ingestion in any order', () => {
'',
{
event: 'some_event',
$set: set,
$set_once: setOnce,
properties: {
$set: set,
$set_once: setOnce,
},
} as any as PluginEvent,
team.id,
ts,
ts,
new UUIDT().toString()
)
}

View File

@@ -16,7 +16,6 @@ import {
UUID,
UUIDT,
} from '../src/utils/utils'
import { parseDate } from '../src/worker/ingestion/utils'
// .zip in Base64: github repo posthog/helloworldplugin
const zip =
@@ -375,32 +374,6 @@ describe('utils', () => {
})
})
describe('parseDate', () => {
const timestamps = [
'2021-10-29',
'2021-10-29 00:00:00',
'2021-10-29 00:00:00.000000',
'2021-10-29T00:00:00.000Z',
'2021-10-29 00:00:00+00:00',
'2021-10-29T00:00:00.000-00:00',
'2021-10-29T00:00:00.000',
'2021-10-29T00:00:00.000+00:00',
'2021-W43-5',
'2021-302',
]
test.each(timestamps)('parses %s', (timestamp) => {
const parsedTimestamp = parseDate(timestamp)
expect(parsedTimestamp.year).toBe(2021)
expect(parsedTimestamp.month).toBe(10)
expect(parsedTimestamp.day).toBe(29)
expect(parsedTimestamp.hour).toBe(0)
expect(parsedTimestamp.minute).toBe(0)
expect(parsedTimestamp.second).toBe(0)
expect(parsedTimestamp.millisecond).toBe(0)
})
})
describe('safeClickhouseString', () => {
// includes real data
const validStrings = [

View File

@@ -0,0 +1,62 @@
import { personInitialAndUTMProperties } from '../../../src/utils/db/utils'
describe('personInitialAndUTMProperties()', () => {
it('adds initial and utm properties', () => {
const properties = {
distinct_id: 2,
$browser: 'Chrome',
$current_url: 'https://test.com',
$os: 'Mac OS X',
$browser_version: '95',
$initial_referring_domain: 'https://google.com',
$initial_referrer_url: 'https://google.com/?q=posthog',
utm_medium: 'twitter',
gclid: 'GOOGLE ADS ID',
$elements: [
{ tag_name: 'a', nth_child: 1, nth_of_type: 2, attr__class: 'btn btn-sm' },
{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: '💻' },
],
}
expect(personInitialAndUTMProperties(properties)).toEqual({
distinct_id: 2,
$browser: 'Chrome',
$current_url: 'https://test.com',
$os: 'Mac OS X',
$browser_version: '95',
$initial_referring_domain: 'https://google.com',
$initial_referrer_url: 'https://google.com/?q=posthog',
utm_medium: 'twitter',
gclid: 'GOOGLE ADS ID',
$elements: [
{
tag_name: 'a',
nth_child: 1,
nth_of_type: 2,
attr__class: 'btn btn-sm',
},
{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: '💻' },
],
$set: { utm_medium: 'twitter', gclid: 'GOOGLE ADS ID' },
$set_once: {
$initial_browser: 'Chrome',
$initial_current_url: 'https://test.com',
$initial_os: 'Mac OS X',
$initial_browser_version: '95',
$initial_utm_medium: 'twitter',
$initial_gclid: 'GOOGLE ADS ID',
},
})
})
it('initial current domain regression test', () => {
const properties = {
$current_url: 'https://test.com',
}
expect(personInitialAndUTMProperties(properties)).toEqual({
$current_url: 'https://test.com',
$set_once: { $initial_current_url: 'https://test.com' },
})
})
})

View File

@@ -0,0 +1,43 @@
import { normalizeEvent } from '../../src/utils/event'
describe('normalizeEvent()', () => {
describe('distinctId', () => {
test.each([
{ distinctId: 'abc', expected: 'abc' },
{ distinctId: 123, expected: '123' },
{ distinctId: true, expected: 'true' },
])('$distinctId', ({ distinctId, expected }) => {
const event = { distinct_id: distinctId }
expect(normalizeEvent(event as any).distinct_id).toBe(expected)
})
})
it('adds missing properties', () => {
const event = { distinct_id: 'something' }
expect(normalizeEvent(event as any).properties).toEqual({})
const event2 = { distinct_id: 'something', properties: { a: 1 } }
expect(normalizeEvent(event2 as any).properties).toEqual({ a: 1 })
})
it('combines .properties $set and $set_once with top-level $set and $set_once', () => {
const event = {
event: 'some_event',
$set: { key1: 'value1', key2: 'value2' },
$set_once: { key1_once: 'value1', key2_once: 'value2' },
properties: {
distinct_id: 'distinct_id1',
$set: { key2: 'value3', key3: 'value4' },
$set_once: { key2_once: 'value3', key3_once: 'value4' },
},
}
const sanitized = normalizeEvent(event as any)
expect(sanitized.properties!['$set']).toEqual({ key1: 'value1', key2: 'value2', key3: 'value4' })
expect(sanitized.properties!['$set_once']).toEqual({
key1_once: 'value1',
key2_once: 'value2',
key3_once: 'value4',
})
})
})

View File

@@ -1,5 +1,5 @@
import { PreIngestionEvent } from '../../../../src/types'
import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep'
import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/4-createEventStep'
jest.mock('../../../../src/worker/plugins/run')

View File

@@ -4,7 +4,7 @@ import { Person, PreIngestionEvent } from '../../../../src/types'
import {
emitToBufferStep,
shouldSendEventToBuffer,
} from '../../../../src/worker/ingestion/event-pipeline/emitToBufferStep'
} from '../../../../src/worker/ingestion/event-pipeline/3-emitToBufferStep'
const now = DateTime.fromISO('2020-01-01T12:00:05.200Z')

View File

@@ -1,10 +1,12 @@
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'
import { PluginEvent } from '@posthog/plugin-scaffold'
import fetch from 'node-fetch'
import { Hook, Hub } from '../../../../src/types'
import { createHub } from '../../../../src/utils/db/hub'
import { UUIDT } from '../../../../src/utils/utils'
import { EventPipelineRunner } from '../../../../src/worker/ingestion/event-pipeline/runner'
import { setupPlugins } from '../../../../src/worker/plugins/setup'
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../../helpers/clickhouse'
import { commonUserId } from '../../../helpers/plugins'
import { insertRow, resetTestDatabase } from '../../../helpers/sql'
@@ -18,6 +20,7 @@ describe('Event Pipeline integration test', () => {
beforeEach(async () => {
await resetTestDatabase()
await resetTestDatabaseClickhouse()
;[hub, closeServer] = await createHub()
})
@@ -25,6 +28,72 @@ describe('Event Pipeline integration test', () => {
await closeServer()
})
it('handles plugins setting properties', async () => {
await resetTestDatabase(`
function processEvent (event) {
event.properties = {
...event.properties,
$browser: 'Chrome',
processed: 'hell yes'
}
event.$set = {
...event.$set,
personProp: 'value'
}
return event
}
`)
await setupPlugins(hub)
const event: PluginEvent = {
event: 'xyz',
properties: { foo: 'bar' },
$set: { personProp: 1, anotherValue: 2 },
timestamp: new Date().toISOString(),
now: new Date().toISOString(),
team_id: 2,
distinct_id: 'abc',
ip: null,
site_url: 'https://example.com',
uuid: new UUIDT().toString(),
}
await ingestEvent(event)
const events = await delayUntilEventIngested(() => hub.db.fetchEvents())
const persons = await delayUntilEventIngested(() => hub.db.fetchPersons())
expect(events.length).toEqual(1)
expect(events[0]).toEqual(
expect.objectContaining({
uuid: event.uuid,
event: 'xyz',
team_id: 2,
timestamp: event.timestamp,
// :KLUDGE: Ignore properties like $plugins_succeeded, etc
properties: expect.objectContaining({
foo: 'bar',
$browser: 'Chrome',
processed: 'hell yes',
$set: {
personProp: 'value',
anotherValue: 2,
},
$set_once: {
$initial_browser: 'Chrome',
},
}),
})
)
expect(persons.length).toEqual(1)
expect(persons[0].properties).toEqual({
$initial_browser: 'Chrome',
personProp: 'value',
anotherValue: 2,
})
})
it('fires a webhook', async () => {
await hub.db.postgresQuery(
`UPDATE posthog_team SET slack_incoming_webhook = 'https://webhook.example.com/'`,

View File

@@ -1,6 +1,6 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep'
import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/1-pluginsProcessEventStep'
import { runProcessEvent } from '../../../../src/worker/plugins/run'
jest.mock('../../../../src/worker/plugins/run')

View File

@@ -4,7 +4,7 @@ import { DateTime } from 'luxon'
import { Hub } from '../../../../src/types'
import { createHub } from '../../../../src/utils/db/hub'
import { UUIDT } from '../../../../src/utils/utils'
import { prepareEventStep } from '../../../../src/worker/ingestion/event-pipeline/prepareEventStep'
import { prepareEventStep } from '../../../../src/worker/ingestion/event-pipeline/2-prepareEventStep'
import { resetTestDatabase } from '../../../helpers/sql'
jest.mock('../../../../src/utils/status')
@@ -105,4 +105,40 @@ describe('prepareEventStep()', () => {
expect(response).toEqual(null)
expect(hub.db.kafkaProducer!.queueMessage).not.toHaveBeenCalled()
})
it('re-normalizes event after plugins have been run', async () => {
const response = await prepareEventStep(runner, {
...pluginEvent,
properties: {
$browser: 'Chrome',
},
$set: {
someProp: 'value',
},
})
expect(response).toEqual([
'emitToBufferStep',
{
distinctId: 'my_id',
elementsList: [],
event: 'default event',
eventUuid: '017ef865-19da-0000-3b60-1506093bf40f',
ip: '127.0.0.1',
properties: {
$ip: '127.0.0.1',
$browser: 'Chrome',
$set: {
someProp: 'value',
},
$set_once: {
$initial_browser: 'Chrome',
},
},
teamId: 2,
timestamp: expect.any(DateTime),
},
])
expect(hub.db.kafkaProducer!.queueMessage).toHaveBeenCalled()
})
})

View File

@@ -1,6 +1,6 @@
import { PreIngestionEvent } from '../../../../src/types'
import { convertToProcessedPluginEvent } from '../../../../src/utils/event'
import { runAsyncHandlersStep } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep'
import { runAsyncHandlersStep } from '../../../../src/worker/ingestion/event-pipeline/5-runAsyncHandlersStep'
import { runOnAction, runOnEvent, runOnSnapshot } from '../../../../src/worker/plugins/run'
jest.mock('../../../../src/worker/plugins/run')

View File

@@ -1,11 +1,11 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { PreIngestionEvent } from '../../../../src/types'
import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep'
import { emitToBufferStep } from '../../../../src/worker/ingestion/event-pipeline/emitToBufferStep'
import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep'
import { prepareEventStep } from '../../../../src/worker/ingestion/event-pipeline/prepareEventStep'
import { runAsyncHandlersStep } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep'
import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/1-pluginsProcessEventStep'
import { prepareEventStep } from '../../../../src/worker/ingestion/event-pipeline/2-prepareEventStep'
import { emitToBufferStep } from '../../../../src/worker/ingestion/event-pipeline/3-emitToBufferStep'
import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/4-createEventStep'
import { runAsyncHandlersStep } from '../../../../src/worker/ingestion/event-pipeline/5-runAsyncHandlersStep'
import {
EventPipelineRunner,
EventPipelineStepsType,
@@ -16,11 +16,11 @@ import {
import { generateEventDeadLetterQueueMessage } from '../../../../src/worker/ingestion/utils'
jest.mock('../../../../src/utils/status')
jest.mock('../../../../src/worker/ingestion/event-pipeline/createEventStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/emitToBufferStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/prepareEventStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/1-pluginsProcessEventStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/2-prepareEventStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/3-emitToBufferStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/4-createEventStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/5-runAsyncHandlersStep')
jest.mock('../../../../src/worker/ingestion/utils')
class TestEventPipelineRunner extends EventPipelineRunner {

View File

@@ -0,0 +1,264 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
import { Hub } from '../../../src/types'
import { createHub } from '../../../src/utils/db/hub'
import { UUIDT } from '../../../src/utils/utils'
import { PersonState } from '../../../src/worker/ingestion/person-state'
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../helpers/clickhouse'
import { resetTestDatabase } from '../../helpers/sql'
jest.mock('../../../src/utils/status')
jest.setTimeout(60000) // 60 sec timeout
const timestamp = DateTime.fromISO('2020-01-01T12:00:05.200Z').toUTC()
const uuid = new UUIDT()
const uuid2 = new UUIDT()
describe('PersonState.update()', () => {
let hub: Hub
let closeHub: () => Promise<void>
beforeEach(async () => {
await resetTestDatabase()
await resetTestDatabaseClickhouse()
;[hub, closeHub] = await createHub({})
// Avoid collapsing merge tree causing race conditions!
await hub.db.clickhouseQuery('SYSTEM STOP MERGES')
})
afterEach(async () => {
await closeHub()
await hub.db.clickhouseQuery('SYSTEM START MERGES')
})
function personState(event: Partial<PluginEvent>) {
const fullEvent = {
team_id: 2,
properties: {},
...event,
}
return new PersonState(
fullEvent as any,
2,
event.distinct_id!,
timestamp,
hub.db,
hub.statsd,
hub.personManager,
uuid
)
}
async function fetchPersonsRows(options: { final?: boolean } = {}) {
const query = `SELECT * FROM person ${options.final ? 'FINAL' : ''}`
return (await hub.db.clickhouseQuery(query)).data
}
it('creates person if theyre new', async () => {
const createdPerson = await personState({ event: '$pageview', distinct_id: 'new-user' }).update()
expect(createdPerson).toEqual(
expect.objectContaining({
id: expect.any(Number),
uuid: uuid.toString(),
properties: {},
created_at: timestamp,
version: 0,
})
)
const clickhousePersons = await delayUntilEventIngested(fetchPersonsRows)
expect(clickhousePersons.length).toEqual(1)
expect(clickhousePersons[0]).toEqual(
expect.objectContaining({
id: uuid.toString(),
properties: '{}',
created_at: '2020-01-01 12:00:05.000',
version: 0,
})
)
})
it('creates person with properties', async () => {
const createdPerson = await personState({
event: '$pageview',
distinct_id: 'new-user',
properties: {
$set_once: { a: 1, b: 2 },
$set: { b: 3, c: 4 },
},
}).update()
expect(createdPerson).toEqual(
expect.objectContaining({
id: expect.any(Number),
uuid: uuid.toString(),
properties: { a: 1, b: 3, c: 4 },
created_at: timestamp,
version: 0,
})
)
const clickhousePersons = await delayUntilEventIngested(fetchPersonsRows)
expect(clickhousePersons.length).toEqual(1)
expect(clickhousePersons[0]).toEqual(
expect.objectContaining({
id: uuid.toString(),
properties: JSON.stringify({ a: 1, b: 3, c: 4 }),
created_at: '2020-01-01 12:00:05.000',
version: 0,
})
)
})
// This is a regression test
it('creates person on $identify event', async () => {
const createdPerson = await personState({
event: '$identify',
distinct_id: 'new-user',
properties: {
$set: { foo: 'bar' },
$anon_distinct_id: 'old-user',
},
}).update()
expect(createdPerson).toEqual(
expect.objectContaining({
id: expect.any(Number),
uuid: uuid.toString(),
properties: { foo: 'bar' },
created_at: timestamp,
version: 0,
})
)
const clickhousePersons = await delayUntilEventIngested(fetchPersonsRows)
expect(clickhousePersons.length).toEqual(1)
expect(clickhousePersons[0]).toEqual(
expect.objectContaining({
id: uuid.toString(),
properties: JSON.stringify({ foo: 'bar' }),
created_at: '2020-01-01 12:00:05.000',
version: 0,
})
)
})
it('merges people on $identify event', async () => {
await hub.db.createPerson(timestamp, { a: 1, b: 2 }, {}, {}, 2, null, false, uuid.toString(), ['old-user'])
await hub.db.createPerson(timestamp, { b: 3, c: 4 }, {}, {}, 2, null, false, uuid2.toString(), ['new-user'])
await personState({
event: '$identify',
distinct_id: 'new-user',
properties: {
$anon_distinct_id: 'old-user',
},
}).update()
await hub.db.kafkaProducer.flush()
const persons = await hub.db.fetchPersons()
expect(persons.length).toEqual(1)
expect(persons[0]).toEqual(
expect.objectContaining({
id: expect.any(Number),
uuid: uuid2.toString(),
properties: { a: 1, b: 3, c: 4 },
created_at: timestamp,
is_identified: true,
version: 1,
})
)
const distinctIds = await hub.db.fetchDistinctIdValues(persons[0])
expect(distinctIds).toEqual(expect.arrayContaining(['new-user', 'old-user']))
const clickhousePersons = await delayUntilEventIngested(() => fetchPersonsRows({ final: true }), 2)
expect(clickhousePersons.length).toEqual(2)
expect(clickhousePersons).toEqual(
expect.arrayContaining([
expect.objectContaining({
id: uuid2.toString(),
properties: JSON.stringify({ a: 1, b: 3, c: 4 }),
is_deleted: 0,
created_at: '2020-01-01 12:00:05.000',
version: 1,
}),
expect.objectContaining({
id: uuid.toString(),
is_deleted: 1,
version: 100,
}),
])
)
})
it('does not merge already identified users', async () => {
await hub.db.createPerson(timestamp, { a: 1, b: 2 }, {}, {}, 2, null, true, uuid.toString(), ['old-user'])
await hub.db.createPerson(timestamp, { b: 3, c: 4, d: 5 }, {}, {}, 2, null, false, uuid2.toString(), [
'new-user',
])
await personState({
event: '$identify',
distinct_id: 'new-user',
properties: {
$anon_distinct_id: 'old-user',
},
}).update()
const persons = await hub.db.fetchPersons()
expect(persons.length).toEqual(2)
})
it('merges people on $identify event and updates properties with $set/$set_once', async () => {
await hub.db.createPerson(timestamp, { a: 1, b: 2 }, {}, {}, 2, null, false, uuid.toString(), ['old-user'])
await hub.db.createPerson(timestamp, { b: 3, c: 4, d: 5 }, {}, {}, 2, null, false, uuid2.toString(), [
'new-user',
])
await personState({
event: '$identify',
distinct_id: 'new-user',
properties: {
$set: { d: 6, e: 7 },
$set_once: { a: 8, f: 9 },
$anon_distinct_id: 'old-user',
},
}).update()
await hub.db.kafkaProducer.flush()
const persons = await hub.db.fetchPersons()
expect(persons.length).toEqual(1)
expect(persons[0]).toEqual(
expect.objectContaining({
id: expect.any(Number),
uuid: uuid2.toString(),
properties: { a: 1, b: 3, c: 4, d: 6, e: 7, f: 9 },
created_at: timestamp,
is_identified: true,
version: 1,
})
)
const distinctIds = await hub.db.fetchDistinctIdValues(persons[0])
expect(distinctIds).toEqual(expect.arrayContaining(['new-user', 'old-user']))
const clickhousePersons = await delayUntilEventIngested(() => fetchPersonsRows({ final: true }), 2)
expect(clickhousePersons.length).toEqual(2)
expect(clickhousePersons).toEqual(
expect.arrayContaining([
expect.objectContaining({
id: uuid2.toString(),
properties: JSON.stringify({ a: 1, b: 3, c: 4, d: 6, e: 7, f: 9 }),
is_deleted: 0,
created_at: '2020-01-01 12:00:05.000',
version: 1,
}),
expect.objectContaining({
id: uuid.toString(),
is_deleted: 1,
version: 100,
}),
])
)
})
})

View File

@@ -0,0 +1,111 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
import { parseDate, parseEventTimestamp } from '../../../src/worker/ingestion/timestamps'
describe('parseDate()', () => {
const timestamps = [
'2021-10-29',
'2021-10-29 00:00:00',
'2021-10-29 00:00:00.000000',
'2021-10-29T00:00:00.000Z',
'2021-10-29 00:00:00+00:00',
'2021-10-29T00:00:00.000-00:00',
'2021-10-29T00:00:00.000',
'2021-10-29T00:00:00.000+00:00',
'2021-W43-5',
'2021-302',
]
test.each(timestamps)('parses %s', (timestamp) => {
const parsedTimestamp = parseDate(timestamp)
expect(parsedTimestamp.year).toBe(2021)
expect(parsedTimestamp.month).toBe(10)
expect(parsedTimestamp.day).toBe(29)
expect(parsedTimestamp.hour).toBe(0)
expect(parsedTimestamp.minute).toBe(0)
expect(parsedTimestamp.second).toBe(0)
expect(parsedTimestamp.millisecond).toBe(0)
})
})
describe('parseEventTimestamp()', () => {
it('captures sent_at', () => {
const rightNow = DateTime.utc()
const tomorrow = rightNow.plus({ days: 1, hours: 2 })
const tomorrowSentAt = rightNow.plus({ days: 1, hours: 2, minutes: 10 })
const event = {
timestamp: tomorrow.toISO(),
now: rightNow,
sent_at: tomorrowSentAt,
} as any as PluginEvent
const timestamp = parseEventTimestamp(event)
const eventSecondsBeforeNow = rightNow.diff(timestamp, 'seconds').seconds
expect(eventSecondsBeforeNow).toBeGreaterThan(590)
expect(eventSecondsBeforeNow).toBeLessThan(610)
})
it('captures sent_at with no timezones', () => {
const rightNow = DateTime.utc()
const tomorrow = rightNow.plus({ days: 1, hours: 2 }).setZone('UTC+4')
const tomorrowSentAt = rightNow.plus({ days: 1, hours: 2, minutes: 10 }).setZone('UTC+4')
// TODO: not sure if this is correct?
// tomorrow = tomorrow.replace(tzinfo=None)
// tomorrow_sent_at = tomorrow_sent_at.replace(tzinfo=None)
const event = {
timestamp: tomorrow,
now: rightNow,
sent_at: tomorrowSentAt,
} as any as PluginEvent
const timestamp = parseEventTimestamp(event)
const eventSecondsBeforeNow = rightNow.diff(timestamp, 'seconds').seconds
expect(eventSecondsBeforeNow).toBeGreaterThan(590)
expect(eventSecondsBeforeNow).toBeLessThan(610)
})
it('captures with no sent_at', () => {
const rightNow = DateTime.utc()
const tomorrow = rightNow.plus({ days: 1, hours: 2 })
const event = {
timestamp: tomorrow,
now: rightNow,
} as any as PluginEvent
const timestamp = parseEventTimestamp(event)
const difference = tomorrow.diff(timestamp, 'seconds').seconds
expect(difference).toBeLessThan(1)
})
it('works with offset timestamp', () => {
const now = DateTime.fromISO('2020-01-01T12:00:05.200Z')
const event = {
offset: 150,
now,
sent_at: now,
} as any as PluginEvent
const timestamp = parseEventTimestamp(event)
expect(timestamp.toUTC().toISO()).toEqual('2020-01-01T12:00:05.050Z')
})
it('works with offset timestamp and no sent_at', () => {
const now = DateTime.fromISO('2020-01-01T12:00:05.200Z')
const event = {
offset: 150,
now,
} as any as PluginEvent
const timestamp = parseEventTimestamp(event)
expect(timestamp.toUTC().toISO()).toEqual('2020-01-01T12:00:05.050Z')
})
})