Files
posthog/plugin-server/tests/main/process-event.test.ts

1533 lines
56 KiB
TypeScript

/*
This file contains a bunch of legacy E2E tests mixed with unit tests.
Rather than add tests here, consider improving event-pipeline-integration test suite or adding
unit tests to appropriate classes/functions.
*/
import { KafkaProducerObserver } from '~/tests/helpers/mocks/producer.spy'
import { DateTime } from 'luxon'
import { Properties } from '@posthog/plugin-scaffold'
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'
import { KAFKA_GROUPS } from '~/config/kafka-topics'
import { createRedis } from '~/utils/db/redis'
import { parseRawClickHouseEvent } from '~/utils/event'
import { captureTeamEvent } from '~/utils/posthog'
import { BatchWritingGroupStoreForBatch } from '~/worker/ingestion/groups/batch-writing-group-store'
import { BatchWritingPersonsStoreForBatch } from '~/worker/ingestion/persons/batch-writing-person-store'
import { PersonsStoreForBatch } from '~/worker/ingestion/persons/persons-store-for-batch'
import { createEmitEventStep } from '../../src/ingestion/event-processing/emit-event-step'
import { isOkResult } from '../../src/ingestion/pipelines/results'
import { ClickHouseEvent, Hub, Person, PluginsServerConfig, Team } from '../../src/types'
import { closeHub, createHub } from '../../src/utils/db/hub'
import { PostgresUse } from '../../src/utils/db/postgres'
import { UUIDT } from '../../src/utils/utils'
import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner'
import { PostgresPersonRepository } from '../../src/worker/ingestion/persons/repositories/postgres-person-repository'
import { fetchDistinctIdValues, fetchPersons } from '../../src/worker/ingestion/persons/repositories/test-helpers'
import { EventsProcessor } from '../../src/worker/ingestion/process-event'
import { resetKafka } from '../helpers/kafka'
import { createUserTeamAndOrganization, getFirstTeam, getTeams, resetTestDatabase } from '../helpers/sql'
jest.mock('../../src/utils/logger')
jest.setTimeout(600000) // 600 sec timeout.
jest.mock('../../src/utils/posthog', () => ({
...jest.requireActual('../../src/utils/posthog'),
captureTeamEvent: jest.fn(),
}))
export async function createPerson(
server: Hub,
team: Team,
distinctIds: string[],
properties: Record<string, any> = {}
): Promise<Person> {
const personRepository = new PostgresPersonRepository(server.db.postgres)
const result = await personRepository.createPerson(
DateTime.utc(),
properties,
{},
{},
team.id,
null,
false,
new UUIDT().toString(),
distinctIds.map((distinctId) => ({ distinctId }))
)
if (!result.success) {
throw new Error('Failed to create person')
}
await server.db.kafkaProducer.queueMessages(result.messages)
return result.person
}
async function flushPersonStoreToKafka(hub: Hub, personStore: PersonsStoreForBatch, kafkaAcks: Promise<unknown>[]) {
const kafkaMessages = await personStore.flush()
await hub.db.kafkaProducer.queueMessages(kafkaMessages.map((message) => message.topicMessage))
await hub.db.kafkaProducer.flush()
await Promise.all(kafkaAcks)
return kafkaMessages
}
const TEST_CONFIG: Partial<PluginsServerConfig> = {
LOG_LEVEL: 'info',
}
describe('processEvent', () => {
let team: Team
let hub: Hub
let personRepository: PostgresPersonRepository
let eventsProcessor: EventsProcessor
let now = DateTime.utc()
async function processEvent(
distinctId: string,
ip: string | null,
_siteUrl: string,
data: Partial<PluginEvent>,
teamId: number,
timestamp: DateTime,
eventUuid: string
): Promise<void> {
const pluginEvent: PluginEvent = {
distinct_id: distinctId,
site_url: _siteUrl,
team_id: teamId,
timestamp: timestamp.toUTC().toISO(),
now: timestamp.toUTC().toISO(),
ip: ip,
uuid: eventUuid,
...data,
} as any as PluginEvent
const personsStoreForBatch = new BatchWritingPersonsStoreForBatch(
new PostgresPersonRepository(hub.db.postgres),
hub.db.kafkaProducer
)
const groupStoreForBatch = new BatchWritingGroupStoreForBatch(
hub.db,
hub.groupRepository,
hub.clickhouseGroupRepository
)
const runner = new EventPipelineRunner(hub, pluginEvent, null, personsStoreForBatch, groupStoreForBatch)
const res = await runner.runEventPipeline(pluginEvent, team)
if (isOkResult(res)) {
// Use emit event step to emit the event
const emitEventStep = createEmitEventStep({
kafkaProducer: hub.kafkaProducer,
clickhouseJsonEventsTopic: hub.CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC,
})
const emitResult = await emitEventStep(res.value)
// Handle side effects using side effect handling pipeline
if (isOkResult(emitResult) && emitResult.sideEffects.length > 0) {
await Promise.allSettled(emitResult.sideEffects)
}
await flushPersonStoreToKafka(hub, personsStoreForBatch, res.sideEffects ?? [])
}
await groupStoreForBatch.flush()
}
// Simple client used to simulate sending events
// Use state object to simulate stateful clients that keep track of old
// distinct id, starting with an anonymous one. I've taken posthog-js as
// the reference implementation.
let state = { currentDistinctId: 'anonymous_id' }
let mockProducerObserver: KafkaProducerObserver
beforeAll(async () => {
await resetKafka(TEST_CONFIG)
})
beforeEach(async () => {
const testCode = `
function processEvent (event, meta) {
event.properties["somewhere"] = "over the rainbow";
return event
}
`
await resetTestDatabase(testCode, TEST_CONFIG)
hub = await createHub({ ...TEST_CONFIG })
mockProducerObserver = new KafkaProducerObserver(hub.kafkaProducer)
mockProducerObserver.resetKafkaProducer()
personRepository = new PostgresPersonRepository(hub.db.postgres)
eventsProcessor = new EventsProcessor(hub)
team = await getFirstTeam(hub)
now = DateTime.utc()
const redis = await createRedis(hub, 'ingestion')
const hooksCacheKey = `@posthog/plugin-server/hooks/${team.id}`
await redis.del(hooksCacheKey)
await redis.quit()
// Always start with an anonymous state
state = { currentDistinctId: 'anonymous_id' }
})
afterEach(async () => {
await closeHub(hub)
})
const getEventsFromKafka = (): Record<string, any>[] => {
const events = mockProducerObserver
.getProducedKafkaMessagesForTopic(hub.CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC)
.map((x) => parseRawClickHouseEvent(x.value as any))
return events
}
type EventsByPerson = [string[], string[]]
const getEventsByPerson = async (hub: Hub): Promise<EventsByPerson[]> => {
// Helper function to retrieve events paired with their associated distinct
// ids
const persons = await fetchPersons(hub.db.postgres)
const events = getEventsFromKafka()
return await Promise.all(
persons
.sort((p1, p2) => p1.created_at.diff(p2.created_at).toMillis())
.map(async (person) => {
const distinctIds = await fetchDistinctIdValues(hub.db.postgres, person)
return [
distinctIds,
(events as ClickHouseEvent[])
.filter((event) => distinctIds.includes(event.distinct_id))
.sort((e1, e2) => e1.timestamp.diff(e2.timestamp).toMillis())
.map((event) => event.event),
] as EventsByPerson
})
)
}
const capture = async (
hub: Hub,
eventName: string,
properties: any = {},
personRepository?: PostgresPersonRepository
) => {
const event = {
event: eventName,
distinct_id: properties.distinct_id ?? state.currentDistinctId,
properties: properties,
now: new Date().toISOString(),
sent_at: new Date().toISOString(),
ip: '127.0.0.1',
site_url: 'https://posthog.com',
team_id: team.id,
uuid: new UUIDT().toString(),
}
const personsStoreForBatch = new BatchWritingPersonsStoreForBatch(
personRepository || new PostgresPersonRepository(hub.db.postgres),
hub.db.kafkaProducer
)
const groupStoreForBatch = new BatchWritingGroupStoreForBatch(
hub.db,
hub.groupRepository,
hub.clickhouseGroupRepository
)
const runner = new EventPipelineRunner(hub, event, null, personsStoreForBatch, groupStoreForBatch)
const res = await runner.runEventPipeline(event, team)
if (isOkResult(res)) {
// Use emit event step to emit the event
const emitEventStep = createEmitEventStep({
kafkaProducer: hub.kafkaProducer,
clickhouseJsonEventsTopic: hub.CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC,
})
const emitResult = await emitEventStep(res.value)
// Handle side effects using side effect handling pipeline
if (isOkResult(emitResult) && emitResult.sideEffects.length > 0) {
await Promise.allSettled(emitResult.sideEffects)
}
await flushPersonStoreToKafka(hub, personsStoreForBatch, res.sideEffects ?? [])
}
await groupStoreForBatch.flush()
}
const identify = async (hub: Hub, distinctId: string, personRepository?: PostgresPersonRepository) => {
// Update currentDistinctId state immediately, as the event will be
// dispatch asynchronously
const currentDistinctId = state.currentDistinctId
state.currentDistinctId = distinctId
await capture(
hub,
'$identify',
{
// posthog-js will send the previous distinct id as
// $anon_distinct_id
$anon_distinct_id: currentDistinctId,
distinct_id: distinctId,
},
personRepository
)
}
const alias = async (hub: Hub, alias: string, distinctId: string) => {
await capture(hub, '$create_alias', { alias, disinct_id: distinctId })
}
test('capture bad team', async () => {
const groupStoreForBatch = new BatchWritingGroupStoreForBatch(
hub.db,
hub.groupRepository,
hub.clickhouseGroupRepository
)
await expect(
eventsProcessor.processEvent(
'asdfasdfasdf',
{
event: '$pageview',
properties: { distinct_id: 'asdfasdfasdf', token: team.api_token },
} as any as PluginEvent,
1337,
now,
new UUIDT().toString(),
false,
groupStoreForBatch
)
).rejects.toThrow("No team found with ID 1337. Can't ingest event.")
})
test('ip none', async () => {
await createPerson(hub, team, ['asdfasdfasdf'])
await processEvent(
'asdfasdfasdf',
null,
'',
{
event: '$pageview',
properties: { distinct_id: 'asdfasdfasdf', token: team.api_token },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(Object.keys(event.properties)).not.toContain('$ip')
})
test('ip capture', async () => {
await createPerson(hub, team, ['asdfasdfasdf'])
await processEvent(
'asdfasdfasdf',
'11.12.13.14',
'',
{
event: '$pageview',
properties: { distinct_id: 'asdfasdfasdf', token: team.api_token },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(event.properties['$ip']).toBe('11.12.13.14')
})
test('ip override', async () => {
await createPerson(hub, team, ['asdfasdfasdf'])
await processEvent(
'asdfasdfasdf',
'11.12.13.14',
'',
{
event: '$pageview',
properties: { $ip: '1.0.0.1', distinct_id: 'asdfasdfasdf', token: team.api_token },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(event.properties['$ip']).toBe('1.0.0.1')
})
test('anonymized ip capture', async () => {
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
'update posthog_team set anonymize_ips = $1',
[true],
'testTag'
)
await createPerson(hub, team, ['asdfasdfasdf'])
await processEvent(
'asdfasdfasdf',
'11.12.13.14',
'',
{
event: '$pageview',
properties: { distinct_id: 'asdfasdfasdf', token: team.api_token },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(event.properties['$ip']).not.toBeTruthy()
})
test('merge_dangerously', async () => {
await createPerson(hub, team, ['old_distinct_id'])
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$merge_dangerously',
properties: { distinct_id: 'new_distinct_id', token: team.api_token, alias: 'old_distinct_id' },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
expect(await fetchDistinctIdValues(hub.db.postgres, (await fetchPersons(hub.db.postgres))[0])).toEqual([
'old_distinct_id',
'new_distinct_id',
])
})
test('alias', async () => {
await createPerson(hub, team, ['old_distinct_id'])
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$create_alias',
properties: { distinct_id: 'new_distinct_id', token: team.api_token, alias: 'old_distinct_id' },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
expect(await fetchDistinctIdValues(hub.db.postgres, (await fetchPersons(hub.db.postgres))[0])).toEqual([
'old_distinct_id',
'new_distinct_id',
])
})
test('alias reverse', async () => {
await createPerson(hub, team, ['old_distinct_id'])
await processEvent(
'old_distinct_id',
'',
'',
{
event: '$create_alias',
properties: { distinct_id: 'old_distinct_id', token: team.api_token, alias: 'new_distinct_id' },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
expect(await fetchDistinctIdValues(hub.db.postgres, (await fetchPersons(hub.db.postgres))[0])).toEqual([
'old_distinct_id',
'new_distinct_id',
])
})
test('alias twice', async () => {
await createPerson(hub, team, ['old_distinct_id'])
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$create_alias',
properties: { distinct_id: 'new_distinct_id', token: team.api_token, alias: 'old_distinct_id' },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
expect((await fetchPersons(hub.db.postgres)).length).toBe(1)
expect(await fetchDistinctIdValues(hub.db.postgres, (await fetchPersons(hub.db.postgres))[0])).toEqual([
'old_distinct_id',
'new_distinct_id',
])
await createPerson(hub, team, ['old_distinct_id_2'])
expect((await fetchPersons(hub.db.postgres)).length).toBe(2)
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$create_alias',
properties: { distinct_id: 'new_distinct_id', token: team.api_token, alias: 'old_distinct_id_2' },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
expect((await fetchPersons(hub.db.postgres)).length).toBe(1)
expect(await fetchDistinctIdValues(hub.db.postgres, (await fetchPersons(hub.db.postgres))[0])).toEqual([
'old_distinct_id',
'new_distinct_id',
'old_distinct_id_2',
])
})
test('alias before person', async () => {
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$create_alias',
properties: { distinct_id: 'new_distinct_id', token: team.api_token, alias: 'old_distinct_id' },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
expect((await fetchPersons(hub.db.postgres)).length).toBe(1)
const distinctIds = await fetchDistinctIdValues(hub.db.postgres, (await fetchPersons(hub.db.postgres))[0])
expect(distinctIds).toEqual(expect.arrayContaining(['new_distinct_id', 'old_distinct_id']))
expect(distinctIds).toHaveLength(2)
})
test('alias both existing', async () => {
await createPerson(hub, team, ['old_distinct_id'])
await createPerson(hub, team, ['new_distinct_id'])
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$create_alias',
properties: { distinct_id: 'new_distinct_id', token: team.api_token, alias: 'old_distinct_id' },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
expect(await fetchDistinctIdValues(hub.db.postgres, (await fetchPersons(hub.db.postgres))[0])).toEqual([
'old_distinct_id',
'new_distinct_id',
])
})
test('alias merge properties', async () => {
await createPerson(hub, team, ['new_distinct_id'], {
key_on_both: 'new value both',
key_on_new: 'new value',
})
await createPerson(hub, team, ['old_distinct_id'], {
key_on_both: 'old value both',
key_on_old: 'old value',
})
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$create_alias',
properties: { distinct_id: 'new_distinct_id', token: team.api_token, alias: 'old_distinct_id' },
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
expect((await fetchPersons(hub.db.postgres)).length).toBe(1)
const [person] = await fetchPersons(hub.db.postgres)
expect((await fetchDistinctIdValues(hub.db.postgres, person)).sort()).toEqual([
'new_distinct_id',
'old_distinct_id',
])
expect(person.properties).toEqual({
key_on_both: 'new value both',
key_on_new: 'new value',
key_on_old: 'old value',
})
})
test('long htext', async () => {
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$autocapture',
properties: {
distinct_id: 'new_distinct_id',
token: team.api_token,
$elements: [
{
tag_name: 'a',
$el_text: 'a'.repeat(2050),
attr__href: 'a'.repeat(2050),
nth_child: 1,
nth_of_type: 2,
attr__class: 'btn btn-sm',
},
],
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
const [element] = event.elements_chain!
expect(element.href?.length).toEqual(2048)
expect(element.text?.length).toEqual(400)
})
test('capture first team event', async () => {
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_team
SET ingested_event = $1
WHERE id = $2`,
[false, team.id],
'testTag'
)
await processEvent(
'2',
'',
'',
{
event: '$autocapture',
properties: {
distinct_id: 1,
token: team.api_token,
$elements: [{ tag_name: 'a', nth_child: 1, nth_of_type: 2, attr__class: 'btn btn-sm' }],
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
expect(captureTeamEvent).toHaveBeenCalledWith(
expect.objectContaining({ uuid: team.uuid, organization_id: team.organization_id }),
'first team event ingested',
{ host: undefined, realm: undefined, sdk: undefined },
'plugin_test_user_distinct_id_1001'
)
team = await getFirstTeam(hub)
expect(team.ingested_event).toEqual(true)
const [event] = getEventsFromKafka()
const elements = event.elements_chain!
expect(elements.length).toEqual(1)
})
test('identify with illegal (generic) id', async () => {
await createPerson(hub, team, ['im an anonymous id'])
expect((await fetchPersons(hub.db.postgres)).length).toBe(1)
const createPersonAndSendIdentify = async (distinctId: string): Promise<void> => {
await createPerson(hub, team, [distinctId])
await processEvent(
distinctId,
'',
'',
{
event: '$identify',
properties: {
token: team.api_token,
distinct_id: distinctId,
$anon_distinct_id: 'im an anonymous id',
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
}
// try to merge, the merge should fail
await createPersonAndSendIdentify('distinctId')
expect((await fetchPersons(hub.db.postgres)).length).toBe(2)
await createPersonAndSendIdentify(' ')
expect((await fetchPersons(hub.db.postgres)).length).toBe(3)
await createPersonAndSendIdentify('NaN')
expect((await fetchPersons(hub.db.postgres)).length).toBe(4)
await createPersonAndSendIdentify('undefined')
expect((await fetchPersons(hub.db.postgres)).length).toBe(5)
await createPersonAndSendIdentify('None')
expect((await fetchPersons(hub.db.postgres)).length).toBe(6)
await createPersonAndSendIdentify('0')
expect((await fetchPersons(hub.db.postgres)).length).toBe(7)
// 'Nan' is an allowed id, so the merge should work
// as such, no extra person is created
await createPersonAndSendIdentify('Nan')
expect((await fetchPersons(hub.db.postgres)).length).toBe(7)
})
test('Alias with illegal (generic) id', async () => {
const legal_id = 'user123'
const illegal_id = 'null'
await createPerson(hub, team, [legal_id])
expect((await fetchPersons(hub.db.postgres)).length).toBe(1)
await processEvent(
illegal_id,
'',
'',
{
event: '$create_alias',
properties: {
token: team.api_token,
distinct_id: legal_id,
alias: illegal_id,
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
// person with illegal id got created but not merged
expect((await fetchPersons(hub.db.postgres)).length).toBe(2)
})
// This case is likely to happen after signup, for example:
// 1. User browses website with anonymous_id
// 2. User signs up, triggers event with their new_distinct_id (creating a new Person)
// 3. In the frontend, try to alias anonymous_id with new_distinct_id
// Result should be that we end up with one Person with both ID's
test('distinct with anonymous_id which was already created', async () => {
await createPerson(hub, team, ['anonymous_id'])
await createPerson(hub, team, ['new_distinct_id'], { email: 'someone@gmail.com' })
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$identify',
properties: {
$anon_distinct_id: 'anonymous_id',
token: team.api_token,
distinct_id: 'new_distinct_id',
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [person] = await fetchPersons(hub.db.postgres)
expect(await fetchDistinctIdValues(hub.db.postgres, person)).toEqual(['anonymous_id', 'new_distinct_id'])
expect(person.properties['email']).toEqual('someone@gmail.com')
expect(person.is_identified).toEqual(true)
})
test('identify with the same distinct_id as anon_distinct_id', async () => {
await createPerson(hub, team, ['anonymous_id'])
await processEvent(
'anonymous_id',
'',
'',
{
event: '$identify',
properties: {
$anon_distinct_id: 'anonymous_id',
token: team.api_token,
distinct_id: 'anonymous_id',
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [person] = await fetchPersons(hub.db.postgres)
expect(await fetchDistinctIdValues(hub.db.postgres, person)).toEqual(['anonymous_id'])
expect(person.is_identified).toEqual(false)
})
test('distinct with multiple anonymous_ids which were already created', async () => {
await createPerson(hub, team, ['anonymous_id'])
await createPerson(hub, team, ['new_distinct_id'], { email: 'someone@gmail.com' })
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$identify',
properties: {
$anon_distinct_id: 'anonymous_id',
token: team.api_token,
distinct_id: 'new_distinct_id',
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const persons1 = await fetchPersons(hub.db.postgres)
expect(persons1.length).toBe(1)
expect(await fetchDistinctIdValues(hub.db.postgres, persons1[0])).toEqual(['anonymous_id', 'new_distinct_id'])
expect(persons1[0].properties['email']).toEqual('someone@gmail.com')
expect(persons1[0].is_identified).toEqual(true)
await createPerson(hub, team, ['anonymous_id_2'])
await processEvent(
'new_distinct_id',
'',
'',
{
event: '$identify',
properties: {
$anon_distinct_id: 'anonymous_id_2',
token: team.api_token,
distinct_id: 'new_distinct_id',
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const persons2 = await fetchPersons(hub.db.postgres)
expect(persons2.length).toBe(1)
expect(await fetchDistinctIdValues(hub.db.postgres, persons2[0])).toEqual([
'anonymous_id',
'new_distinct_id',
'anonymous_id_2',
])
expect(persons2[0].properties['email']).toEqual('someone@gmail.com')
expect(persons2[0].is_identified).toEqual(true)
})
test('distinct team leakage', async () => {
await createUserTeamAndOrganization(
hub.postgres,
3,
1002,
'a73fc995-a63f-4e4e-bf65-2a5e9f93b2b1',
'01774e2f-0d01-0000-ee94-9a238640c6ee',
'0174f81e-36f5-0000-7ef8-cc26c1fbab1c'
)
const team2 = (await getTeams(hub))[1]
await createPerson(hub, team2, ['2'], { email: 'team2@gmail.com' })
await createPerson(hub, team, ['1', '2'])
await processEvent(
'2',
'',
'',
{
event: '$identify',
properties: {
$anon_distinct_id: '1',
token: team.api_token,
distinct_id: '2',
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const people = (await fetchPersons(hub.db.postgres)).sort((p1, p2) => p2.team_id - p1.team_id)
expect(people.length).toEqual(2)
expect(people[1].team_id).toEqual(team.id)
expect(people[1].properties).toEqual({})
const distinctIdsForPerson1 = await fetchDistinctIdValues(hub.db.postgres, people[1])
expect(distinctIdsForPerson1).toEqual(expect.arrayContaining(['1', '2']))
expect(distinctIdsForPerson1).toHaveLength(2)
expect(people[0].team_id).toEqual(team2.id)
expect(await fetchDistinctIdValues(hub.db.postgres, people[0])).toEqual(['2'])
})
describe('when handling $identify', () => {
test('we do not alias users if distinct id changes but we are already identified', async () => {
// This test is in reference to
// https://github.com/PostHog/posthog/issues/5527 , where we were
// correctly identifying that an anonymous user before login should be
// aliased to the user they subsequently login as, but incorrectly
// aliasing on subsequent $identify events. The anonymous case is
// special as we want to alias to a known user, but otherwise we
// shouldn't be doing so.
const anonymousId = 'anonymous_id'
const initialDistinctId = 'initial_distinct_id'
const p2DistinctId = 'p2_distinct_id'
const p2NewDistinctId = 'new_distinct_id'
// Play out a sequence of events that should result in two users being
// identified, with the first to events associated with one user, and
// the third with another.
await capture(hub, 'event 1')
await identify(hub, initialDistinctId)
await capture(hub, 'event 2')
state.currentDistinctId = p2DistinctId
await capture(hub, 'event 3')
await identify(hub, p2NewDistinctId)
await capture(hub, 'event 4')
// Let's also make sure that we do not alias when switching back to
// initialDistictId
await identify(hub, initialDistinctId)
// Get pairins of person distinctIds and the events associated with them
const eventsByPerson = await getEventsByPerson(hub)
expect(eventsByPerson).toEqual([
[
[anonymousId, initialDistinctId],
['event 1', '$identify', 'event 2', '$identify'],
],
[
[p2DistinctId, p2NewDistinctId],
['event 3', '$identify', 'event 4'],
],
])
// Make sure the persons are identified
const persons = await fetchPersons(hub.db.postgres)
expect(persons.map((person) => person.is_identified)).toEqual([true, true])
})
test('we do not alias users if distinct id changes but we are already identified, with no anonymous event', async () => {
// This test is in reference to
// https://github.com/PostHog/posthog/issues/5527 , where we were
// correctly identifying that an anonymous user before login should be
// aliased to the user they subsequently login as, but incorrectly
// aliasing on subsequent $identify events. The anonymous case is
// special as we want to alias to a known user, but otherwise we
// shouldn't be doing so. This test is similar to the previous one,
// except it does not include an initial anonymous event.
const anonymousId = 'anonymous_id'
const initialDistinctId = 'initial_distinct_id'
const p2DistinctId = 'p2_distinct_id'
const p2NewDistinctId = 'new_distinct_id'
// Play out a sequence of events that should result in two users being
// identified, with the first to events associated with one user, and
// the third with another.
await identify(hub, initialDistinctId)
await capture(hub, 'event 2')
state.currentDistinctId = p2DistinctId
await capture(hub, 'event 3')
await identify(hub, p2NewDistinctId)
await capture(hub, 'event 4')
// Let's also make sure that we do not alias when switching back to
// initialDistictId
await identify(hub, initialDistinctId)
// Get pairins of person distinctIds and the events associated with them
const eventsByPerson = await getEventsByPerson(hub)
expect(eventsByPerson).toHaveLength(2)
expect(eventsByPerson[0][0]).toEqual(expect.arrayContaining([initialDistinctId, anonymousId]))
expect(eventsByPerson[0][0]).toHaveLength(2)
expect(eventsByPerson[0][1]).toEqual(['$identify', 'event 2', '$identify'])
expect(eventsByPerson[1][0]).toEqual(expect.arrayContaining([p2DistinctId, p2NewDistinctId]))
expect(eventsByPerson[1][0]).toHaveLength(2)
expect(eventsByPerson[1][1]).toEqual(['event 3', '$identify', 'event 4'])
// Make sure the persons are identified
const persons = await fetchPersons(hub.db.postgres)
expect(persons.map((person) => person.is_identified)).toEqual([true, true])
})
test('we do not leave things in inconsistent state if $identify is run concurrently', async () => {
// There are a few places where we have the pattern of:
//
// 1. fetch from postgres
// 2. check rows match condition
// 3. perform update
//
// This test is designed to check the specific case where, in
// handling we are creating an unidentified user, then updating this
// user to have is_identified = true. Since we are using the
// is_identified to decide on if we will merge persons, we want to
// make sure we guard against this race condition. The scenario is:
//
// 1. initiate identify for 'distinct-id'
// 2. once person for distinct-id has been created, initiate
// identify for 'new-distinct-id'
// 3. check that the persons remain distinct
// Check the db is empty to start with
expect(await fetchPersons(hub.db.postgres)).toEqual([])
const anonymousId = 'anonymous_id'
const initialDistinctId = 'initial-distinct-id'
const newDistinctId = 'new-distinct-id'
state.currentDistinctId = newDistinctId
await capture(hub, 'some event')
state.currentDistinctId = anonymousId
// Hook into createPerson, which is as of writing called from
// alias. Here we simply call identify again and wait on it
// completing before continuing with the first identify.
const originalCreatePerson = personRepository.createPerson.bind(personRepository)
const createPersonMock = jest.fn(async (...args) => {
// We need to slice off the txn arg, or else we conflict with the `identify` below.
// @ts-expect-error because TS is crazy, this is valid
const result = await originalCreatePerson(...args.slice(0, -1))
if (createPersonMock.mock.calls.length === 1) {
// On second invocation, make another identify call
await identify(hub, newDistinctId, personRepository)
}
return result
})
personRepository.createPerson = createPersonMock
// set the first identify going
await identify(hub, initialDistinctId, personRepository)
// Let's first just make sure `updatePerson` was called, as a way of
// checking that our mocking was actually invoked
expect(personRepository.createPerson).toHaveBeenCalled()
// Now make sure that we have one person in the db that has been
// identified
const persons = await fetchPersons(hub.db.postgres)
expect(persons.length).toEqual(2)
expect(persons.map((person) => person.is_identified)).toEqual([true, true])
})
})
describe('when handling $create_alias', () => {
test('we can alias an identified person to an identified person', async () => {
const anonymousId = 'anonymous_id'
const identifiedId1 = 'identified_id1'
const identifiedId2 = 'identified_id2'
// anonymous_id -> identified_id1
await identify(hub, identifiedId1)
state.currentDistinctId = identifiedId1
await capture(hub, 'some event')
await identify(hub, identifiedId2)
await alias(hub, identifiedId1, identifiedId2)
// Get pairings of person distinctIds and the events associated with them
const eventsByPerson = await getEventsByPerson(hub)
// There should just be one person, to which all events are associated
expect(eventsByPerson).toEqual([
[
expect.arrayContaining([anonymousId, identifiedId1, identifiedId2]),
['$identify', 'some event', '$identify', '$create_alias'],
],
])
// Make sure there is one identified person
const persons = await fetchPersons(hub.db.postgres)
expect(persons.map((person) => person.is_identified)).toEqual([true])
})
test('we can alias an anonymous person to an identified person', async () => {
const anonymousId = 'anonymous_id'
const initialDistinctId = 'initial_distinct_id'
// Identify one person, then become anonymous
await identify(hub, initialDistinctId)
state.currentDistinctId = anonymousId
await capture(hub, 'anonymous event')
// Then try to alias them
await alias(hub, anonymousId, initialDistinctId)
// Get pairings of person distinctIds and the events associated with them
const eventsByPerson = await getEventsByPerson(hub)
// There should just be one person, to which all events are associated
expect(eventsByPerson).toHaveLength(1)
expect(eventsByPerson[0][0]).toEqual(expect.arrayContaining([initialDistinctId, anonymousId]))
expect(eventsByPerson[0][0]).toHaveLength(2)
expect(eventsByPerson[0][1]).toEqual(['$identify', 'anonymous event', '$create_alias'])
// Make sure there is one identified person
const persons = await fetchPersons(hub.db.postgres)
expect(persons.map((person) => person.is_identified)).toEqual([true])
})
test('we can alias an identified person to an anonymous person', async () => {
const anonymousId = 'anonymous_id'
const initialDistinctId = 'initial_distinct_id'
// Identify one person, then become anonymous
await identify(hub, initialDistinctId)
state.currentDistinctId = anonymousId
await capture(hub, 'anonymous event')
// Then try to alias them
await alias(hub, initialDistinctId, anonymousId)
// Get pairings of person distinctIds and the events associated with them
const eventsByPerson = await getEventsByPerson(hub)
// There should just be one person, to which all events are associated
expect(eventsByPerson).toHaveLength(1)
expect(eventsByPerson[0][0]).toEqual(expect.arrayContaining([initialDistinctId, anonymousId]))
expect(eventsByPerson[0][0]).toHaveLength(2)
expect(eventsByPerson[0][1]).toEqual(['$identify', 'anonymous event', '$create_alias'])
// Make sure there is one identified person
const persons = await fetchPersons(hub.db.postgres)
expect(persons.map((person) => person.is_identified)).toEqual([true])
})
test('we can alias an anonymous person to an anonymous person', async () => {
const anonymous1 = 'anonymous-1'
const anonymous2 = 'anonymous-2'
// Identify one person, then become anonymous
state.currentDistinctId = anonymous1
await capture(hub, 'anonymous event 1')
state.currentDistinctId = anonymous2
await capture(hub, 'anonymous event 2')
// Then try to alias them
await alias(hub, anonymous1, anonymous2)
// Get pairings of person distinctIds and the events associated with them
const eventsByPerson = await getEventsByPerson(hub)
// There should just be one person, to which all events are associated
expect(eventsByPerson).toEqual([
[
[anonymous1, anonymous2],
['anonymous event 1', 'anonymous event 2', '$create_alias'],
],
])
// Make sure there is one identified person
const persons = await fetchPersons(hub.db.postgres)
expect(persons.map((person) => person.is_identified)).toEqual([true])
})
test('we can alias two non-existent persons', async () => {
const anonymous1 = 'anonymous-1'
const anonymous2 = 'anonymous-2'
// Then try to alias them
state.currentDistinctId = anonymous1
await alias(hub, anonymous2, anonymous1)
// Get pairings of person distinctIds and the events associated with them
const eventsByPerson = await getEventsByPerson(hub)
// There should just be one person, to which all events are associated
expect(eventsByPerson).toHaveLength(1)
expect(eventsByPerson[0][0]).toEqual(expect.arrayContaining([anonymous1, anonymous2]))
expect(eventsByPerson[0][0]).toHaveLength(2)
expect(eventsByPerson[0][1]).toEqual(['$create_alias'])
const persons = await fetchPersons(hub.db.postgres)
expect(persons.map((person) => person.is_identified)).toEqual([true])
})
})
test('event name object json', async () => {
await processEvent(
'xxx',
'',
'',
{ event: { 'event name': 'as object' }, properties: {} } as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(event.event).toEqual('{"event name":"as object"}')
})
test('event name array json', async () => {
await processEvent(
'xxx',
'',
'',
{ event: ['event name', 'a list'], properties: {} } as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(event.event).toEqual('["event name","a list"]')
})
test('long event name substr', async () => {
await processEvent(
'xxx',
'',
'',
{ event: 'E'.repeat(300), properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent,
team.id,
DateTime.utc(),
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(event.event?.length).toBe(200)
})
test('groupidentify without group_type ingests event', async () => {
await createPerson(hub, team, ['distinct_id1'])
await processEvent(
'distinct_id1',
'',
'',
{
event: '$groupidentify',
properties: {
token: team.api_token,
distinct_id: 'distinct_id1',
$group_key: 'org::5',
$group_set: {
foo: 'bar',
},
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
})
test('$groupidentify updating properties', async () => {
const next: DateTime = now.plus({ minutes: 1 })
await createPerson(hub, team, ['distinct_id1'])
await hub.groupRepository.insertGroup(team.id, 0, 'org::5', { a: 1, b: 2 }, now, {}, {})
await processEvent(
'distinct_id1',
'',
'',
{
event: '$groupidentify',
properties: {
token: team.api_token,
distinct_id: 'distinct_id1',
$group_type: 'organization',
$group_key: 'org::5',
$group_set: {
foo: 'bar',
a: 3,
},
},
} as any as PluginEvent,
team.id,
next,
new UUIDT().toString()
)
expect(mockProducerObserver.getProducedKafkaMessagesForTopic(KAFKA_GROUPS)[0].value).toEqual({
group_key: 'org::5',
group_properties: JSON.stringify({ a: 3, b: 2, foo: 'bar' }),
group_type_index: 0,
team_id: team.id,
created_at: expect.any(String),
version: 2,
})
const group = await hub.groupRepository.fetchGroup(team.id, 0, 'org::5')
expect(group).toEqual({
id: expect.any(Number),
team_id: team.id,
group_type_index: 0,
group_key: 'org::5',
group_properties: { a: 3, b: 2, foo: 'bar' },
created_at: now,
properties_last_updated_at: {},
properties_last_operation: {},
version: 2,
})
})
test('person and group properties on events', async () => {
await createPerson(hub, team, ['distinct_id1'], { pineapple: 'on', pizza: 1 })
await processEvent(
'distinct_id1',
'',
'',
{
event: '$groupidentify',
properties: {
token: team.api_token,
distinct_id: 'distinct_id1',
$group_type: 'organization',
$group_key: 'org:5',
$group_set: {
foo: 'bar',
},
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
await processEvent(
'distinct_id1',
'',
'',
{
event: '$groupidentify',
properties: {
token: team.api_token,
distinct_id: 'distinct_id1',
$group_type: 'second',
$group_key: 'second_key',
$group_set: {
pineapple: 'yummy',
},
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
await processEvent(
'distinct_id1',
'',
'',
{
event: 'test event',
properties: {
token: team.api_token,
distinct_id: 'distinct_id1',
$set: { new: 5 },
$group_0: 'org:5',
$group_1: 'second_key',
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const events = getEventsFromKafka()
const event = [...events].find((e: any) => e['event'] === 'test event')
expect(event?.person_properties).toEqual({ pineapple: 'on', pizza: 1, new: 5 })
expect(event?.properties.$group_0).toEqual('org:5')
expect(event?.properties.$group_1).toEqual('second_key')
expect(event?.group0_properties).toEqual({}) // We stopped writing these to the event as queries don't use them
expect(event?.group1_properties).toEqual({}) // We stopped writing these to the event as queries don't use them
})
test('set and set_once on the same key', async () => {
await createPerson(hub, team, ['distinct_id1'])
await processEvent(
'distinct_id1',
'',
'',
{
event: 'some_event',
properties: {
token: team.api_token,
distinct_id: 'distinct_id1',
$set: { a_prop: 'test-set' },
$set_once: { a_prop: 'test-set_once' },
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(event.properties['$set']).toEqual({ a_prop: 'test-set' })
expect(event.properties['$set_once']).toEqual({ a_prop: 'test-set_once' })
const [person] = await fetchPersons(hub.db.postgres)
expect(await fetchDistinctIdValues(hub.db.postgres, person)).toEqual(['distinct_id1'])
expect(person.properties).toEqual({ a_prop: 'test-set' })
})
test('$unset person property', async () => {
await createPerson(hub, team, ['distinct_id1'], { a: 1, b: 2, c: 3 })
await processEvent(
'distinct_id1',
'',
'',
{
event: 'some_event',
properties: {
token: team.api_token,
distinct_id: 'distinct_id1',
$unset: ['a', 'c'],
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(event.properties['$unset']).toEqual(['a', 'c'])
const [person] = await fetchPersons(hub.db.postgres)
expect(await fetchDistinctIdValues(hub.db.postgres, person)).toEqual(['distinct_id1'])
expect(person.properties).toEqual({ b: 2 })
})
test('$unset person empty set ignored', async () => {
await createPerson(hub, team, ['distinct_id1'], { a: 1, b: 2, c: 3 })
await processEvent(
'distinct_id1',
'',
'',
{
event: 'some_event',
properties: {
token: team.api_token,
distinct_id: 'distinct_id1',
$unset: {},
},
} as any as PluginEvent,
team.id,
now,
new UUIDT().toString()
)
const [event] = getEventsFromKafka()
expect(event.properties['$unset']).toEqual({})
const [person] = await fetchPersons(hub.db.postgres)
expect(await fetchDistinctIdValues(hub.db.postgres, person)).toEqual(['distinct_id1'])
expect(person.properties).toEqual({ a: 1, b: 2, c: 3 })
})
describe('ingestion in any order', () => {
const ts0: DateTime = now
const ts1: DateTime = now.plus({ minutes: 1 })
const ts2: DateTime = now.plus({ minutes: 2 })
const ts3: DateTime = now.plus({ minutes: 3 })
// key encodes when the value is updated, e.g. s0 means only set call for the 0th event
// s03o23 means via a set in events number 0 and 3 plus via set_once on 2nd and 3rd event
// the value corresponds to which call updated it + random letter (same letter for the same key)
// the letter is for verifying we update the right key only
const set0: Properties = { s0123o0123: 's0a', s02o13: 's0b', s013: 's0e' }
const setOnce0: Properties = { s0123o0123: 'o0a', s13o02: 'o0g', o023: 'o0f' }
const set1: Properties = { s0123o0123: 's1a', s13o02: 's1g', s1: 's1c', s013: 's1e' }
const setOnce1: Properties = { s0123o0123: 'o1a', s02o13: 'o1b', o1: 'o1d' }
const set2: Properties = { s0123o0123: 's2a', s02o13: 's2b' }
const setOnce2: Properties = { s0123o0123: 'o2a', s13o02: 'o2g', o023: 'o2f' }
const set3: Properties = { s0123o0123: 's3a', s13o02: 's3g', s013: 's3e' }
const setOnce3: Properties = { s0123o0123: 'o3a', s02o13: 'o3b', o023: 'o3f' }
beforeEach(async () => {
await createPerson(hub, team, ['distinct_id1'])
})
async function verifyPersonPropertiesSetCorrectly() {
const [person] = await fetchPersons(hub.db.postgres)
expect(await fetchDistinctIdValues(hub.db.postgres, person)).toEqual(['distinct_id1'])
expect(person.properties).toEqual({
s0123o0123: 's3a',
s02o13: 's2b',
s1: 's1c',
o1: 'o1d',
s013: 's3e',
o023: 'o0f',
s13o02: 's3g',
})
expect(person.version).toEqual(4)
}
async function runProcessEvent(set: Properties, setOnce: Properties, ts: DateTime) {
await processEvent(
'distinct_id1',
'',
'',
{
event: 'some_event',
properties: {
$set: set,
$set_once: setOnce,
},
} as any as PluginEvent,
team.id,
ts,
new UUIDT().toString()
)
}
async function ingest0() {
await runProcessEvent(set0, setOnce0, ts0)
}
async function ingest1() {
await runProcessEvent(set1, setOnce1, ts1)
}
async function ingest2() {
await runProcessEvent(set2, setOnce2, ts2)
}
async function ingest3() {
await runProcessEvent(set3, setOnce3, ts3)
}
test('ingestion in order', async () => {
await ingest0()
await ingest1()
await ingest2()
await ingest3()
await verifyPersonPropertiesSetCorrectly()
})
})
})