mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat(msg): add cassandra for behavioural cohorts (#35492)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
This commit is contained in:
1
.flox/env/manifest.toml
vendored
1
.flox/env/manifest.toml
vendored
@@ -44,6 +44,7 @@ mprocs = { pkg-path = "mprocs" }
|
||||
cmake = { pkg-path = "cmake", version = "3.31.5", pkg-group = "cmake" }
|
||||
sqlx-cli = { pkg-path = "sqlx-cli", version = "0.8.3" } # sqlx
|
||||
postgresql = { pkg-path = "postgresql_14" } # psql
|
||||
cassandra = { pkg-path = "cassandra" } # cqlsh
|
||||
|
||||
# Set environment variables in the `[vars]` section. These variables may not
|
||||
# reference one another, and are added to the environment without first
|
||||
|
||||
8
.github/workflows/ci-plugin-server.yml
vendored
8
.github/workflows/ci-plugin-server.yml
vendored
@@ -127,6 +127,8 @@ jobs:
|
||||
CLICKHOUSE_HOST: 'localhost'
|
||||
CLICKHOUSE_DATABASE: 'posthog_test'
|
||||
KAFKA_HOSTS: 'kafka:9092'
|
||||
CASSANDRA_HOST: 'localhost'
|
||||
CASSANDRA_KEYSPACE: 'test_posthog'
|
||||
|
||||
steps:
|
||||
- name: Code check out
|
||||
@@ -206,7 +208,7 @@ jobs:
|
||||
if: needs.changes.outputs.plugin-server == 'true'
|
||||
run: pnpm --filter=@posthog/plugin-server... install --frozen-lockfile
|
||||
|
||||
- name: Wait for Clickhouse, Redis & Kafka
|
||||
- name: Wait for Clickhouse, Redis, Kafka
|
||||
if: needs.changes.outputs.plugin-server == 'true'
|
||||
run: |
|
||||
docker compose -f docker-compose.dev.yml up kafka redis clickhouse -d --wait
|
||||
@@ -219,6 +221,8 @@ jobs:
|
||||
SECRET_KEY: 'abcdef' # unsafe - for testing only
|
||||
DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/posthog'
|
||||
PERSONS_DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/posthog'
|
||||
CASSANDRA_HOST: 'localhost'
|
||||
CASSANDRA_KEYSPACE: 'test_posthog'
|
||||
run: pnpm --filter=@posthog/plugin-server setup:test
|
||||
|
||||
- name: Test with Jest
|
||||
@@ -232,6 +236,8 @@ jobs:
|
||||
SHARD_INDEX: ${{ matrix.shard }}
|
||||
SHARD_COUNT: 3
|
||||
LOG_LEVEL: info
|
||||
CASSANDRA_HOST: 'localhost'
|
||||
CASSANDRA_KEYSPACE: 'test_posthog'
|
||||
run: bin/turbo run test --filter=@posthog/plugin-server
|
||||
|
||||
posthog-analytics:
|
||||
|
||||
@@ -390,6 +390,23 @@ services:
|
||||
networks:
|
||||
- otel_network
|
||||
|
||||
cassandra:
|
||||
image: cassandra:4.1
|
||||
restart: on-failure
|
||||
environment:
|
||||
- HEAP_NEWSIZE=128M
|
||||
- MAX_HEAP_SIZE=512M
|
||||
healthcheck:
|
||||
test: ['CMD-SHELL', 'cqlsh -e "SELECT now() FROM system.local;" || exit 1']
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
volumes:
|
||||
- cassandra_data:/var/lib/cassandra
|
||||
|
||||
networks:
|
||||
otel_network:
|
||||
driver: bridge
|
||||
|
||||
volumes:
|
||||
cassandra_data:
|
||||
|
||||
@@ -258,6 +258,13 @@ services:
|
||||
file: docker-compose.base.yml
|
||||
service: jaeger
|
||||
|
||||
cassandra:
|
||||
extends:
|
||||
file: docker-compose.base.yml
|
||||
service: cassandra
|
||||
ports:
|
||||
- '9042:9042'
|
||||
|
||||
networks:
|
||||
otel_network:
|
||||
driver: bridge
|
||||
|
||||
@@ -114,3 +114,10 @@ services:
|
||||
depends_on:
|
||||
- redis
|
||||
- db
|
||||
|
||||
cassandra:
|
||||
extends:
|
||||
file: docker-compose.base.yml
|
||||
service: cassandra
|
||||
ports:
|
||||
- '9042:9042'
|
||||
|
||||
@@ -238,6 +238,16 @@ services:
|
||||
file: docker-compose.base.yml
|
||||
service: jaeger
|
||||
|
||||
cassandra:
|
||||
extends:
|
||||
file: docker-compose.base.yml
|
||||
service: cassandra
|
||||
ports:
|
||||
- '9042:9042'
|
||||
|
||||
networks:
|
||||
otel_network:
|
||||
driver: bridge
|
||||
|
||||
volumes:
|
||||
cassandra_data:
|
||||
|
||||
@@ -283,6 +283,13 @@ services:
|
||||
- db
|
||||
- redis
|
||||
|
||||
cassandra:
|
||||
extends:
|
||||
file: docker-compose.base.yml
|
||||
service: cassandra
|
||||
volumes:
|
||||
- cassandra-data:/var/lib/cassandra
|
||||
|
||||
volumes:
|
||||
zookeeper-data:
|
||||
zookeeper-datalog:
|
||||
@@ -295,3 +302,4 @@ volumes:
|
||||
redis-data:
|
||||
redis7-data:
|
||||
kafka-data:
|
||||
cassandra-data:
|
||||
|
||||
50
plugin-server/bin/migrate-cassandra
Executable file
50
plugin-server/bin/migrate-cassandra
Executable file
@@ -0,0 +1,50 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
|
||||
|
||||
# Set default environment variables following Cyclotron pattern
|
||||
CASSANDRA_HOST=${CASSANDRA_HOST:-localhost}
|
||||
CASSANDRA_PORT=${CASSANDRA_PORT:-9042}
|
||||
CASSANDRA_KEYSPACE=${CASSANDRA_KEYSPACE:-posthog}
|
||||
|
||||
echo "Performing Cassandra migrations for keyspace $CASSANDRA_KEYSPACE (HOST=$CASSANDRA_HOST:$CASSANDRA_PORT)"
|
||||
|
||||
cd "$SCRIPT_DIR/.."
|
||||
|
||||
# Wait for Cassandra to be ready
|
||||
echo "Waiting for Cassandra to be ready..."
|
||||
max_attempts=30
|
||||
attempt=0
|
||||
|
||||
while [ $attempt -lt $max_attempts ]; do
|
||||
if timeout 3 bash -c "</dev/tcp/$CASSANDRA_HOST/$CASSANDRA_PORT" > /dev/null 2>&1; then
|
||||
echo "Cassandra port is open!"
|
||||
# Give Cassandra a bit more time to fully initialize after port opens
|
||||
sleep 3
|
||||
echo "Cassandra is ready!"
|
||||
break
|
||||
fi
|
||||
|
||||
attempt=$((attempt + 1))
|
||||
echo "Attempt $attempt/$max_attempts: Cassandra not ready, waiting 2 seconds..."
|
||||
sleep 2
|
||||
done
|
||||
|
||||
if [ $attempt -eq $max_attempts ]; then
|
||||
echo "ERROR: Cassandra failed to become ready after $max_attempts attempts"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Export environment variables for the Node.js migration script
|
||||
export CASSANDRA_HOST
|
||||
export CASSANDRA_PORT
|
||||
export CASSANDRA_KEYSPACE
|
||||
|
||||
# Run migrations using our custom Node.js script
|
||||
echo "Running migrations using custom migration runner..."
|
||||
cd "$SCRIPT_DIR/../cassandra"
|
||||
node migrate.js
|
||||
|
||||
echo "Cassandra migrations completed successfully!"
|
||||
12
plugin-server/cassandra/cassandra-migrate.ci.json
Normal file
12
plugin-server/cassandra/cassandra-migrate.ci.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"migrationsDir": "./cassandra/migrations",
|
||||
"hosts": ["cassandra"],
|
||||
"port": 9042,
|
||||
"keyspace": "posthog",
|
||||
"consistency": "LOCAL_ONE",
|
||||
"replication": {
|
||||
"class": "SimpleStrategy",
|
||||
"replication_factor": 1
|
||||
},
|
||||
"createKeyspace": true
|
||||
}
|
||||
12
plugin-server/cassandra/cassandra-migrate.json
Normal file
12
plugin-server/cassandra/cassandra-migrate.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"migrationsDir": "./cassandra/migrations",
|
||||
"hosts": ["localhost"],
|
||||
"port": 9042,
|
||||
"keyspace": "posthog",
|
||||
"consistency": "LOCAL_ONE",
|
||||
"replication": {
|
||||
"class": "SimpleStrategy",
|
||||
"replication_factor": 1
|
||||
},
|
||||
"createKeyspace": false
|
||||
}
|
||||
172
plugin-server/cassandra/migrate.js
Executable file
172
plugin-server/cassandra/migrate.js
Executable file
@@ -0,0 +1,172 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
const cassandra = require('cassandra-driver')
|
||||
const fs = require('fs')
|
||||
const path = require('path')
|
||||
|
||||
// Configuration
|
||||
const config = {
|
||||
contactPoints: [process.env.CASSANDRA_HOST || 'localhost'],
|
||||
localDataCenter: 'datacenter1',
|
||||
keyspace: process.env.CASSANDRA_KEYSPACE || 'posthog',
|
||||
}
|
||||
|
||||
const client = new cassandra.Client(config)
|
||||
|
||||
async function createKeyspace() {
|
||||
const systemClient = new cassandra.Client({
|
||||
contactPoints: config.contactPoints,
|
||||
localDataCenter: 'datacenter1',
|
||||
})
|
||||
|
||||
try {
|
||||
await systemClient.connect()
|
||||
console.log("Creating keyspace if it doesn't exist...")
|
||||
|
||||
await systemClient.execute(`
|
||||
CREATE KEYSPACE IF NOT EXISTS ${config.keyspace}
|
||||
WITH REPLICATION = {
|
||||
'class': 'SimpleStrategy',
|
||||
'replication_factor': 1
|
||||
}
|
||||
`)
|
||||
|
||||
console.log(`Keyspace '${config.keyspace}' is ready`)
|
||||
} finally {
|
||||
await systemClient.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
async function createMigrationTable() {
|
||||
try {
|
||||
// Check if table already exists
|
||||
const result = await client.execute(
|
||||
'SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?',
|
||||
[config.keyspace, 'migration_history']
|
||||
)
|
||||
|
||||
if (result.rows.length > 0) {
|
||||
console.log('⏭️ Migration tracking table already exists, skipping creation')
|
||||
} else {
|
||||
console.log('Creating migration tracking table...')
|
||||
await client.execute(`
|
||||
CREATE TABLE IF NOT EXISTS migration_history (
|
||||
filename TEXT,
|
||||
executed_at TIMESTAMP,
|
||||
PRIMARY KEY (filename)
|
||||
)
|
||||
`)
|
||||
console.log('✅ Migration tracking table created')
|
||||
}
|
||||
} catch (error) {
|
||||
// Fallback to simple creation if system tables query fails
|
||||
console.log('Creating migration tracking table...')
|
||||
await client.execute(`
|
||||
CREATE TABLE IF NOT EXISTS migration_history (
|
||||
filename TEXT,
|
||||
executed_at TIMESTAMP,
|
||||
PRIMARY KEY (filename)
|
||||
)
|
||||
`)
|
||||
}
|
||||
}
|
||||
|
||||
async function getExecutedMigrations() {
|
||||
try {
|
||||
const result = await client.execute('SELECT filename FROM migration_history')
|
||||
return new Set(result.rows.map((row) => row.filename))
|
||||
} catch (error) {
|
||||
// Table might not exist yet
|
||||
return new Set()
|
||||
}
|
||||
}
|
||||
|
||||
async function executeMigration(filename, content) {
|
||||
console.log(`Executing migration: ${filename}`)
|
||||
|
||||
// Remove comments and split content by semicolons to handle multiple statements
|
||||
const cleanContent = content
|
||||
.split('\n')
|
||||
.filter((line) => !line.trim().startsWith('--'))
|
||||
.join('\n')
|
||||
|
||||
const statements = cleanContent
|
||||
.split(';')
|
||||
.map((stmt) => stmt.trim())
|
||||
.filter((stmt) => stmt.length > 0)
|
||||
|
||||
for (const statement of statements) {
|
||||
if (statement.trim()) {
|
||||
console.log(`Executing statement: ${statement.substring(0, 50)}...`)
|
||||
await client.execute(statement)
|
||||
}
|
||||
}
|
||||
|
||||
// Record the migration as executed
|
||||
await client.execute('INSERT INTO migration_history (filename, executed_at) VALUES (?, ?)', [filename, new Date()])
|
||||
|
||||
console.log(`✅ Migration ${filename} completed`)
|
||||
}
|
||||
|
||||
async function runMigrations() {
|
||||
try {
|
||||
// Create keyspace first
|
||||
await createKeyspace()
|
||||
|
||||
// Connect to the keyspace
|
||||
await client.connect()
|
||||
console.log(`Connected to Cassandra keyspace: ${config.keyspace}`)
|
||||
|
||||
// Create migration tracking table
|
||||
await createMigrationTable()
|
||||
|
||||
// Get list of executed migrations
|
||||
const executedMigrations = await getExecutedMigrations()
|
||||
|
||||
// Read migration files
|
||||
const migrationsDir = path.join(__dirname, 'migrations')
|
||||
if (!fs.existsSync(migrationsDir)) {
|
||||
console.log('No migrations directory found, creating one...')
|
||||
fs.mkdirSync(migrationsDir, { recursive: true })
|
||||
console.log('✅ All migrations completed (no migration files found)')
|
||||
return
|
||||
}
|
||||
|
||||
const migrationFiles = fs
|
||||
.readdirSync(migrationsDir)
|
||||
.filter((file) => file.endsWith('.cql'))
|
||||
.sort()
|
||||
|
||||
if (migrationFiles.length === 0) {
|
||||
console.log('✅ All migrations completed (no migration files found)')
|
||||
return
|
||||
}
|
||||
|
||||
// Execute pending migrations
|
||||
let executed = 0
|
||||
for (const filename of migrationFiles) {
|
||||
if (!executedMigrations.has(filename)) {
|
||||
const filePath = path.join(migrationsDir, filename)
|
||||
const content = fs.readFileSync(filePath, 'utf8')
|
||||
await executeMigration(filename, content)
|
||||
executed++
|
||||
} else {
|
||||
console.log(`⏭️ Skipping already executed migration: ${filename}`)
|
||||
}
|
||||
}
|
||||
|
||||
if (executed === 0) {
|
||||
console.log('✅ All migrations are up to date')
|
||||
} else {
|
||||
console.log(`✅ Executed ${executed} migration(s) successfully`)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('❌ Migration failed:', error.message)
|
||||
process.exit(1)
|
||||
} finally {
|
||||
await client.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
// Run migrations
|
||||
runMigrations().catch(console.error)
|
||||
10
plugin-server/cassandra/migrations/001_initial_setup.cql
Normal file
10
plugin-server/cassandra/migrations/001_initial_setup.cql
Normal file
@@ -0,0 +1,10 @@
|
||||
-- Initial setup migration for PostHog Cassandra keyspace
|
||||
-- This migration creates the keyspace (handled automatically by the runner)
|
||||
-- and serves as a marker for the first migration
|
||||
|
||||
-- Create a simple test table to verify the migration system works
|
||||
CREATE TABLE IF NOT EXISTS migration_test (
|
||||
id UUID PRIMARY KEY,
|
||||
created_at TIMESTAMP,
|
||||
message TEXT
|
||||
);
|
||||
@@ -0,0 +1,19 @@
|
||||
-- Migration: Create behavioral event counters table
|
||||
-- Created: 2025-07-23
|
||||
-- Description: Table to store real-time behavioral event match counters
|
||||
-- Supports the RFC for real-time behavioral filtering (cohorts)
|
||||
|
||||
-- Create the behavioral event counters table
|
||||
-- This table stores counters for action filter matches with the pattern:
|
||||
-- team_id:filter_hash:person_id:date
|
||||
CREATE TABLE IF NOT EXISTS behavioral_event_counters (
|
||||
team_id INT,
|
||||
filter_hash TEXT,
|
||||
person_id UUID,
|
||||
date DATE,
|
||||
count COUNTER,
|
||||
PRIMARY KEY (team_id, filter_hash, person_id, date)
|
||||
) WITH CLUSTERING ORDER BY (filter_hash ASC, person_id ASC, date DESC);
|
||||
|
||||
-- Note: Cannot create secondary index on counter table
|
||||
-- Time-range queries should use the primary key (team_id, filter_hash, person_id, date)
|
||||
@@ -23,15 +23,19 @@
|
||||
"prettier:check": "prettier --check .",
|
||||
"prepublishOnly": "pnpm build",
|
||||
"setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse",
|
||||
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron",
|
||||
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron && pnpm run setup:test:cassandra",
|
||||
"setup:test:cyclotron": "CYCLOTRON_DATABASE_NAME=test_cyclotron ../rust/bin/migrate-cyclotron",
|
||||
"setup:test:cassandra": "CASSANDRA_KEYSPACE=test_posthog ./bin/migrate-cassandra",
|
||||
"services:start": "cd .. && docker compose -f docker-compose.dev.yml up",
|
||||
"services:stop": "cd .. && docker compose -f docker-compose.dev.yml down",
|
||||
"services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v",
|
||||
"services": "pnpm services:stop && pnpm services:clean && pnpm services:start",
|
||||
"build:cyclotron": "pnpm --filter=@posthog/cyclotron package",
|
||||
"update-ai-costs": "ts-node src/ingestion/ai-costs/scripts/update-ai-costs.ts",
|
||||
"sync-segment-icons": "DATABASE_URL=something ts-node src/cdp/segment/sync-segment-icons.ts"
|
||||
"sync-segment-icons": "DATABASE_URL=something ts-node src/cdp/segment/sync-segment-icons.ts",
|
||||
"cassandra:migrate": "./bin/migrate-cassandra",
|
||||
"cassandra:rollback": "cd cassandra && cassandra-migrate down",
|
||||
"cassandra:create": "cd cassandra && cassandra-migrate create"
|
||||
},
|
||||
"bin": {
|
||||
"posthog-plugin-server": "bin/posthog-plugin-server"
|
||||
@@ -62,6 +66,8 @@
|
||||
"@types/tail": "^2.2.1",
|
||||
"asn1.js": "^5.4.1",
|
||||
"aws-sdk": "^2.927.0",
|
||||
"cassandra-driver": "^4.8.0",
|
||||
"cassandra-migrate": "^1.2.333",
|
||||
"dayjs": "1.11.11",
|
||||
"detect-browser": "^5.3.0",
|
||||
"escape-string-regexp": "^4.0.0",
|
||||
|
||||
@@ -1,199 +1,460 @@
|
||||
import { Client as CassandraClient } from 'cassandra-driver'
|
||||
import { createHash } from 'crypto'
|
||||
|
||||
import { truncateBehavioralCounters } from '../../../tests/helpers/cassandra'
|
||||
import { createAction, getFirstTeam, resetTestDatabase } from '../../../tests/helpers/sql'
|
||||
import { Hub, RawClickHouseEvent, Team } from '../../types'
|
||||
import { BehavioralCounterRepository } from '../../utils/db/cassandra/behavioural-counter.repository'
|
||||
import { closeHub, createHub } from '../../utils/db/hub'
|
||||
import { createIncomingEvent } from '../_tests/fixtures'
|
||||
import { convertClickhouseRawEventToFilterGlobals } from '../utils/hog-function-filtering'
|
||||
import { BehavioralEvent, CdpBehaviouralEventsConsumer } from './cdp-behavioural-events.consumer'
|
||||
import { BehavioralEvent, CdpBehaviouralEventsConsumer, counterEventsDropped } from './cdp-behavioural-events.consumer'
|
||||
|
||||
class TestCdpBehaviouralEventsConsumer extends CdpBehaviouralEventsConsumer {
|
||||
public getCassandraClient(): CassandraClient | null {
|
||||
return this.cassandra
|
||||
}
|
||||
|
||||
public getBehavioralCounterRepository(): BehavioralCounterRepository | null {
|
||||
return this.behavioralCounterRepository
|
||||
}
|
||||
}
|
||||
|
||||
jest.setTimeout(5000)
|
||||
|
||||
const TEST_FILTERS = {
|
||||
// Simple pageview event filter: event == '$pageview'
|
||||
pageview: ['_H', 1, 32, '$pageview', 32, 'event', 1, 1, 11],
|
||||
|
||||
// Chrome browser AND pageview event filter: properties.$browser == 'Chrome' AND event == '$pageview'
|
||||
chromePageview: [
|
||||
'_H',
|
||||
1,
|
||||
32,
|
||||
'Chrome',
|
||||
32,
|
||||
'$browser',
|
||||
32,
|
||||
'properties',
|
||||
1,
|
||||
2,
|
||||
11,
|
||||
32,
|
||||
'$pageview',
|
||||
32,
|
||||
'event',
|
||||
1,
|
||||
1,
|
||||
11,
|
||||
3,
|
||||
2,
|
||||
4,
|
||||
1,
|
||||
],
|
||||
|
||||
// Complex filter: pageview event AND (Chrome browser contains match OR has IP property)
|
||||
complexChromeWithIp: [
|
||||
'_H',
|
||||
1,
|
||||
32,
|
||||
'$pageview',
|
||||
32,
|
||||
'event',
|
||||
1,
|
||||
1,
|
||||
11,
|
||||
32,
|
||||
'%Chrome%',
|
||||
32,
|
||||
'$browser',
|
||||
32,
|
||||
'properties',
|
||||
1,
|
||||
2,
|
||||
2,
|
||||
'toString',
|
||||
1,
|
||||
18,
|
||||
3,
|
||||
2,
|
||||
32,
|
||||
'$pageview',
|
||||
32,
|
||||
'event',
|
||||
1,
|
||||
1,
|
||||
11,
|
||||
31,
|
||||
32,
|
||||
'$ip',
|
||||
32,
|
||||
'properties',
|
||||
1,
|
||||
2,
|
||||
12,
|
||||
3,
|
||||
2,
|
||||
4,
|
||||
2,
|
||||
],
|
||||
}
|
||||
|
||||
describe('CdpBehaviouralEventsConsumer', () => {
|
||||
let processor: CdpBehaviouralEventsConsumer
|
||||
let hub: Hub
|
||||
let team: Team
|
||||
|
||||
beforeEach(async () => {
|
||||
// Helper function to setup test environment with Cassandra enabled
|
||||
async function setupWithCassandraEnabled() {
|
||||
await resetTestDatabase()
|
||||
hub = await createHub()
|
||||
team = await getFirstTeam(hub)
|
||||
processor = new CdpBehaviouralEventsConsumer(hub)
|
||||
const hub = await createHub({ WRITE_BEHAVIOURAL_COUNTERS_TO_CASSANDRA: true })
|
||||
const team = await getFirstTeam(hub)
|
||||
const processor = new TestCdpBehaviouralEventsConsumer(hub)
|
||||
const cassandra = processor.getCassandraClient()
|
||||
|
||||
if (!cassandra) {
|
||||
throw new Error('Cassandra client should be initialized when flag is enabled')
|
||||
}
|
||||
|
||||
await cassandra.connect()
|
||||
const repository = processor.getBehavioralCounterRepository()!
|
||||
await truncateBehavioralCounters(cassandra)
|
||||
|
||||
return { hub, team, processor, cassandra, repository }
|
||||
}
|
||||
|
||||
// Helper function to setup test environment with Cassandra disabled
|
||||
async function setupWithCassandraDisabled() {
|
||||
await resetTestDatabase()
|
||||
const hub = await createHub({ WRITE_BEHAVIOURAL_COUNTERS_TO_CASSANDRA: false })
|
||||
const team = await getFirstTeam(hub)
|
||||
const processor = new TestCdpBehaviouralEventsConsumer(hub)
|
||||
|
||||
// Processor should not have initialized Cassandra
|
||||
expect(processor.getCassandraClient()).toBeNull()
|
||||
expect(processor.getBehavioralCounterRepository()).toBeNull()
|
||||
|
||||
return { hub, team, processor }
|
||||
}
|
||||
|
||||
describe('with Cassandra enabled', () => {
|
||||
let processor: TestCdpBehaviouralEventsConsumer
|
||||
let hub: Hub
|
||||
let team: Team
|
||||
let cassandra: CassandraClient
|
||||
let repository: BehavioralCounterRepository
|
||||
|
||||
beforeEach(async () => {
|
||||
const setup = await setupWithCassandraEnabled()
|
||||
hub = setup.hub
|
||||
team = setup.team
|
||||
processor = setup.processor
|
||||
cassandra = setup.cassandra
|
||||
repository = setup.repository
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await cassandra.shutdown()
|
||||
await closeHub(hub)
|
||||
jest.restoreAllMocks()
|
||||
})
|
||||
|
||||
describe('action matching with actual database', () => {
|
||||
it('should match action when event matches bytecode filter', async () => {
|
||||
// Create an action with Chrome + pageview filter
|
||||
await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview)
|
||||
|
||||
// Create a matching event
|
||||
const matchingEvent = createIncomingEvent(team.id, {
|
||||
event: '$pageview',
|
||||
properties: JSON.stringify({ $browser: 'Chrome' }),
|
||||
} as RawClickHouseEvent)
|
||||
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(matchingEvent)
|
||||
const behavioralEvent: BehavioralEvent = {
|
||||
teamId: team.id,
|
||||
filterGlobals,
|
||||
personId: '550e8400-e29b-41d4-a716-446655440000',
|
||||
}
|
||||
|
||||
// Verify the action was loaded
|
||||
const actions = await hub.actionManagerCDP.getActionsForTeam(team.id)
|
||||
expect(actions).toHaveLength(1)
|
||||
expect(actions[0].name).toBe('Test action')
|
||||
|
||||
// Test processEvent directly and verify it returns 1 for matching event
|
||||
const counterUpdates: any[] = []
|
||||
const result = await (processor as any).processEvent(behavioralEvent, counterUpdates)
|
||||
expect(result).toBe(1)
|
||||
})
|
||||
|
||||
it('should not match action when event does not match bytecode filter', async () => {
|
||||
// Create an action with Chrome + pageview filter
|
||||
await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview)
|
||||
|
||||
// Create a non-matching event
|
||||
const nonMatchingEvent = createIncomingEvent(team.id, {
|
||||
event: '$pageview',
|
||||
properties: JSON.stringify({ $browser: 'Firefox' }), // Different browser
|
||||
} as RawClickHouseEvent)
|
||||
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(nonMatchingEvent)
|
||||
const behavioralEvent: BehavioralEvent = {
|
||||
teamId: team.id,
|
||||
filterGlobals,
|
||||
personId: '550e8400-e29b-41d4-a716-446655440000',
|
||||
}
|
||||
|
||||
// Verify the action was loaded
|
||||
const actions = await hub.actionManagerCDP.getActionsForTeam(team.id)
|
||||
expect(actions).toHaveLength(1)
|
||||
expect(actions[0].name).toBe('Test action')
|
||||
|
||||
// Test processEvent directly and verify it returns 0 for non-matching event
|
||||
const counterUpdates: any[] = []
|
||||
const result = await (processor as any).processEvent(behavioralEvent, counterUpdates)
|
||||
expect(result).toBe(0)
|
||||
})
|
||||
|
||||
it('should return count of matched actions when multiple actions match', async () => {
|
||||
// Create multiple actions with different filters
|
||||
await createAction(hub.postgres, team.id, 'Pageview action', TEST_FILTERS.pageview)
|
||||
await createAction(hub.postgres, team.id, 'Complex filter action', TEST_FILTERS.complexChromeWithIp)
|
||||
|
||||
// Create an event that matches both actions
|
||||
const matchingEvent = createIncomingEvent(team.id, {
|
||||
event: '$pageview',
|
||||
properties: JSON.stringify({ $browser: 'Chrome', $ip: '127.0.0.1' }),
|
||||
} as RawClickHouseEvent)
|
||||
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(matchingEvent)
|
||||
const behavioralEvent: BehavioralEvent = {
|
||||
teamId: team.id,
|
||||
filterGlobals,
|
||||
personId: '550e8400-e29b-41d4-a716-446655440000',
|
||||
}
|
||||
|
||||
// Test processEvent directly and verify it returns 2 for both matching actions
|
||||
const counterUpdates: any[] = []
|
||||
const result = await (processor as any).processEvent(behavioralEvent, counterUpdates)
|
||||
expect(result).toBe(2)
|
||||
})
|
||||
})
|
||||
|
||||
describe('Cassandra behavioral counter writes', () => {
|
||||
it('should write counter to Cassandra when action matches', async () => {
|
||||
// Arrange
|
||||
const personId = '550e8400-e29b-41d4-a716-446655440000'
|
||||
|
||||
await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview)
|
||||
|
||||
// Create a matching event with person ID
|
||||
const matchingEvent = createIncomingEvent(team.id, {
|
||||
event: '$pageview',
|
||||
properties: JSON.stringify({ $browser: 'Chrome' }),
|
||||
person_id: personId,
|
||||
} as RawClickHouseEvent)
|
||||
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(matchingEvent)
|
||||
const behavioralEvent: BehavioralEvent = {
|
||||
teamId: team.id,
|
||||
filterGlobals,
|
||||
personId,
|
||||
}
|
||||
|
||||
// Act
|
||||
await processor.processBatch([behavioralEvent])
|
||||
|
||||
// Assert - check that the counter was written to Cassandra
|
||||
const actions = await hub.actionManagerCDP.getActionsForTeam(team.id)
|
||||
const action = actions[0]
|
||||
const filterHash = createHash('sha256')
|
||||
.update(JSON.stringify(action.bytecode))
|
||||
.digest('hex')
|
||||
.substring(0, 16)
|
||||
const today = new Date().toISOString().split('T')[0]
|
||||
|
||||
const counter = await repository.getCounter({
|
||||
teamId: team.id,
|
||||
filterHash,
|
||||
personId,
|
||||
date: today,
|
||||
})
|
||||
|
||||
expect(counter).not.toBeNull()
|
||||
expect(counter!.count).toBe(1)
|
||||
})
|
||||
|
||||
it('should increment existing counter', async () => {
|
||||
// Arrange
|
||||
const personId = '550e8400-e29b-41d4-a716-446655440000'
|
||||
|
||||
await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview)
|
||||
|
||||
const matchingEvent = createIncomingEvent(team.id, {
|
||||
event: '$pageview',
|
||||
properties: JSON.stringify({ $browser: 'Chrome' }),
|
||||
person_id: personId,
|
||||
} as RawClickHouseEvent)
|
||||
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(matchingEvent)
|
||||
const behavioralEvent: BehavioralEvent = {
|
||||
teamId: team.id,
|
||||
filterGlobals,
|
||||
personId,
|
||||
}
|
||||
|
||||
// Act - process event twice
|
||||
await processor.processBatch([behavioralEvent])
|
||||
await processor.processBatch([behavioralEvent])
|
||||
|
||||
// Assert - check that the counter was incremented
|
||||
const actions = await hub.actionManagerCDP.getActionsForTeam(team.id)
|
||||
const action = actions[0]
|
||||
const filterHash = createHash('sha256')
|
||||
.update(JSON.stringify(action.bytecode))
|
||||
.digest('hex')
|
||||
.substring(0, 16)
|
||||
const today = new Date().toISOString().split('T')[0]
|
||||
|
||||
const counter = await repository.getCounter({
|
||||
teamId: team.id,
|
||||
filterHash,
|
||||
personId,
|
||||
date: today,
|
||||
})
|
||||
|
||||
expect(counter).not.toBeNull()
|
||||
expect(counter!.count).toBe(2)
|
||||
})
|
||||
|
||||
it('should not write counter when event does not match', async () => {
|
||||
// Arrange
|
||||
const personId = '550e8400-e29b-41d4-a716-446655440000'
|
||||
|
||||
await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview)
|
||||
|
||||
// Create a non-matching event
|
||||
const nonMatchingEvent = createIncomingEvent(team.id, {
|
||||
event: '$pageview',
|
||||
properties: JSON.stringify({ $browser: 'Firefox' }), // Different browser
|
||||
person_id: personId,
|
||||
} as RawClickHouseEvent)
|
||||
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(nonMatchingEvent)
|
||||
const behavioralEvent: BehavioralEvent = {
|
||||
teamId: team.id,
|
||||
filterGlobals,
|
||||
personId,
|
||||
}
|
||||
|
||||
// Act
|
||||
await processor.processBatch([behavioralEvent])
|
||||
|
||||
// Assert - check that no counter was written to Cassandra
|
||||
const actions = await hub.actionManagerCDP.getActionsForTeam(team.id)
|
||||
const action = actions[0]
|
||||
const filterHash = createHash('sha256')
|
||||
.update(JSON.stringify(action.bytecode))
|
||||
.digest('hex')
|
||||
.substring(0, 16)
|
||||
const today = new Date().toISOString().split('T')[0]
|
||||
|
||||
const counter = await repository.getCounter({
|
||||
teamId: team.id,
|
||||
filterHash,
|
||||
personId,
|
||||
date: today,
|
||||
})
|
||||
|
||||
expect(counter).toBeNull()
|
||||
})
|
||||
|
||||
it('should drop events with missing person ID at parsing stage', async () => {
|
||||
// Create a raw event without person_id (simulating what comes from Kafka)
|
||||
const rawEventWithoutPersonId = {
|
||||
uuid: '12345',
|
||||
event: '$pageview',
|
||||
team_id: team.id,
|
||||
properties: JSON.stringify({ $browser: 'Chrome' }),
|
||||
// person_id is undefined
|
||||
}
|
||||
|
||||
// Get initial metric value
|
||||
const initialDroppedCount = await counterEventsDropped.get()
|
||||
const initialMissingPersonIdCount =
|
||||
initialDroppedCount.values.find((v) => v.labels.reason === 'missing_person_id')?.value || 0
|
||||
|
||||
const messages = [
|
||||
{
|
||||
value: Buffer.from(JSON.stringify(rawEventWithoutPersonId)),
|
||||
},
|
||||
] as any[]
|
||||
|
||||
// Act - parse the batch (should drop the event)
|
||||
const parsedEvents = await (processor as any)._parseKafkaBatch(messages)
|
||||
|
||||
// Assert - no events should be parsed due to missing person_id
|
||||
expect(parsedEvents).toHaveLength(0)
|
||||
|
||||
// Assert - metric should be incremented
|
||||
const finalDroppedCount = await counterEventsDropped.get()
|
||||
const finalMissingPersonIdCount =
|
||||
finalDroppedCount.values.find((v) => v.labels.reason === 'missing_person_id')?.value || 0
|
||||
expect(finalMissingPersonIdCount).toBe(initialMissingPersonIdCount + 1)
|
||||
|
||||
// Assert - no counter should be written to Cassandra since event was dropped
|
||||
const counters = await repository.getCountersForTeam(team.id)
|
||||
|
||||
expect(counters).toHaveLength(0)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await closeHub(hub)
|
||||
jest.restoreAllMocks()
|
||||
})
|
||||
describe('with Cassandra disabled', () => {
|
||||
let processor: TestCdpBehaviouralEventsConsumer
|
||||
let hub: Hub
|
||||
let team: Team
|
||||
|
||||
describe('action matching with actual database', () => {
|
||||
it('should match action when event matches bytecode filter', async () => {
|
||||
// Create an action with bytecode
|
||||
const bytecode = [
|
||||
'_H',
|
||||
1,
|
||||
32,
|
||||
'Chrome',
|
||||
32,
|
||||
'$browser',
|
||||
32,
|
||||
'properties',
|
||||
1,
|
||||
2,
|
||||
11,
|
||||
32,
|
||||
'$pageview',
|
||||
32,
|
||||
'event',
|
||||
1,
|
||||
1,
|
||||
11,
|
||||
3,
|
||||
2,
|
||||
4,
|
||||
1,
|
||||
]
|
||||
beforeEach(async () => {
|
||||
const setup = await setupWithCassandraDisabled()
|
||||
hub = setup.hub
|
||||
team = setup.team
|
||||
processor = setup.processor
|
||||
})
|
||||
|
||||
await createAction(hub.postgres, team.id, 'Test action', bytecode)
|
||||
afterEach(async () => {
|
||||
await closeHub(hub)
|
||||
jest.restoreAllMocks()
|
||||
})
|
||||
|
||||
// Create a matching event
|
||||
it('should not write to Cassandra when WRITE_BEHAVIOURAL_COUNTERS_TO_CASSANDRA is false', async () => {
|
||||
// Arrange
|
||||
const personId = '550e8400-e29b-41d4-a716-446655440000'
|
||||
|
||||
await createAction(hub.postgres, team.id, 'Test action', TEST_FILTERS.chromePageview)
|
||||
|
||||
// Create a matching event with person ID
|
||||
const matchingEvent = createIncomingEvent(team.id, {
|
||||
event: '$pageview',
|
||||
properties: JSON.stringify({ $browser: 'Chrome' }),
|
||||
person_id: personId,
|
||||
} as RawClickHouseEvent)
|
||||
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(matchingEvent)
|
||||
const behavioralEvent: BehavioralEvent = {
|
||||
teamId: team.id,
|
||||
filterGlobals,
|
||||
personId,
|
||||
}
|
||||
|
||||
// Verify the action was loaded
|
||||
const actions = await hub.actionManagerCDP.getActionsForTeam(team.id)
|
||||
expect(actions).toHaveLength(1)
|
||||
expect(actions[0].name).toBe('Test action')
|
||||
// Spy on the writeBehavioralCounters method to ensure it's never called when Cassandra is disabled
|
||||
const writeSpy = jest.spyOn(processor as any, 'writeBehavioralCounters')
|
||||
|
||||
// Test processEvent directly and verify it returns 1 for matching event
|
||||
const result = await (processor as any).processEvent(behavioralEvent)
|
||||
expect(result).toBe(1)
|
||||
})
|
||||
// Act
|
||||
await processor.processBatch([behavioralEvent])
|
||||
|
||||
it('should not match action when event does not match bytecode filter', async () => {
|
||||
// Create an action with bytecode
|
||||
const bytecode = [
|
||||
'_H',
|
||||
1,
|
||||
32,
|
||||
'Chrome',
|
||||
32,
|
||||
'$browser',
|
||||
32,
|
||||
'properties',
|
||||
1,
|
||||
2,
|
||||
11,
|
||||
32,
|
||||
'$pageview',
|
||||
32,
|
||||
'event',
|
||||
1,
|
||||
1,
|
||||
11,
|
||||
3,
|
||||
2,
|
||||
4,
|
||||
1,
|
||||
]
|
||||
// Assert - writeBehavioralCounters should never be called when Cassandra is disabled
|
||||
expect(writeSpy).toHaveBeenCalledTimes(0)
|
||||
|
||||
await createAction(hub.postgres, team.id, 'Test action', bytecode)
|
||||
|
||||
// Create a non-matching event
|
||||
const nonMatchingEvent = createIncomingEvent(team.id, {
|
||||
event: '$pageview',
|
||||
properties: JSON.stringify({ $browser: 'Firefox' }), // Different browser
|
||||
} as RawClickHouseEvent)
|
||||
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(nonMatchingEvent)
|
||||
const behavioralEvent: BehavioralEvent = {
|
||||
teamId: team.id,
|
||||
filterGlobals,
|
||||
}
|
||||
|
||||
// Verify the action was loaded
|
||||
const actions = await hub.actionManagerCDP.getActionsForTeam(team.id)
|
||||
expect(actions).toHaveLength(1)
|
||||
expect(actions[0].name).toBe('Test action')
|
||||
|
||||
// Test processEvent directly and verify it returns 0 for non-matching event
|
||||
const result = await (processor as any).processEvent(behavioralEvent)
|
||||
expect(result).toBe(0)
|
||||
})
|
||||
|
||||
it('should return count of matched actions when multiple actions match', async () => {
|
||||
// Create multiple actions with different bytecode
|
||||
const pageViewBytecode = ['_H', 1, 32, '$pageview', 32, 'event', 1, 1, 11]
|
||||
|
||||
const filterBytecode = [
|
||||
'_H',
|
||||
1,
|
||||
32,
|
||||
'$pageview',
|
||||
32,
|
||||
'event',
|
||||
1,
|
||||
1,
|
||||
11,
|
||||
32,
|
||||
'%Chrome%',
|
||||
32,
|
||||
'$browser',
|
||||
32,
|
||||
'properties',
|
||||
1,
|
||||
2,
|
||||
2,
|
||||
'toString',
|
||||
1,
|
||||
18,
|
||||
3,
|
||||
2,
|
||||
32,
|
||||
'$pageview',
|
||||
32,
|
||||
'event',
|
||||
1,
|
||||
1,
|
||||
11,
|
||||
31,
|
||||
32,
|
||||
'$ip',
|
||||
32,
|
||||
'properties',
|
||||
1,
|
||||
2,
|
||||
12,
|
||||
3,
|
||||
2,
|
||||
4,
|
||||
2,
|
||||
]
|
||||
|
||||
await createAction(hub.postgres, team.id, 'Pageview action', pageViewBytecode)
|
||||
await createAction(hub.postgres, team.id, 'Filter action', filterBytecode)
|
||||
|
||||
// Create an event that matches both actions
|
||||
const matchingEvent = createIncomingEvent(team.id, {
|
||||
event: '$pageview',
|
||||
properties: JSON.stringify({ $browser: 'Chrome', $ip: '127.0.0.1' }),
|
||||
} as RawClickHouseEvent)
|
||||
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(matchingEvent)
|
||||
const behavioralEvent: BehavioralEvent = {
|
||||
teamId: team.id,
|
||||
filterGlobals,
|
||||
}
|
||||
|
||||
// Test processEvent directly and verify it returns 2 for both matching actions
|
||||
const result = await (processor as any).processEvent(behavioralEvent)
|
||||
expect(result).toBe(2)
|
||||
// Double-check repository is still null
|
||||
expect(processor.getBehavioralCounterRepository()).toBeNull()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { Client as CassandraClient } from 'cassandra-driver'
|
||||
import { createHash } from 'crypto'
|
||||
import { Message } from 'node-rdkafka'
|
||||
import { Counter } from 'prom-client'
|
||||
|
||||
@@ -6,6 +8,7 @@ import { KafkaConsumer } from '../../kafka/consumer'
|
||||
import { runInstrumentedFunction } from '../../main/utils'
|
||||
import { Hub, RawClickHouseEvent } from '../../types'
|
||||
import { Action } from '../../utils/action-manager-cdp'
|
||||
import { BehavioralCounterRepository, CounterUpdate } from '../../utils/db/cassandra/behavioural-counter.repository'
|
||||
import { parseJSON } from '../../utils/json-parse'
|
||||
import { logger } from '../../utils/logger'
|
||||
import { HogFunctionFilterGlobals } from '../types'
|
||||
@@ -16,6 +19,7 @@ import { CdpConsumerBase } from './cdp-base.consumer'
|
||||
export type BehavioralEvent = {
|
||||
teamId: number
|
||||
filterGlobals: HogFunctionFilterGlobals
|
||||
personId: string
|
||||
}
|
||||
|
||||
export const counterParseError = new Counter({
|
||||
@@ -24,6 +28,12 @@ export const counterParseError = new Counter({
|
||||
labelNames: ['error'],
|
||||
})
|
||||
|
||||
export const counterEventsDropped = new Counter({
|
||||
name: 'cdp_behavioural_events_dropped_total',
|
||||
help: 'Total number of events dropped due to missing personId or other validation errors',
|
||||
labelNames: ['reason'],
|
||||
})
|
||||
|
||||
export const counterEventsConsumed = new Counter({
|
||||
name: 'cdp_behavioural_events_consumed_total',
|
||||
help: 'Total number of events consumed by the behavioural consumer',
|
||||
@@ -37,10 +47,30 @@ export const counterEventsMatchedTotal = new Counter({
|
||||
export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
protected name = 'CdpBehaviouralEventsConsumer'
|
||||
protected kafkaConsumer: KafkaConsumer
|
||||
protected cassandra: CassandraClient | null
|
||||
protected behavioralCounterRepository: BehavioralCounterRepository | null
|
||||
private filterHashCache = new Map<string, string>()
|
||||
|
||||
constructor(hub: Hub, topic: string = KAFKA_EVENTS_JSON, groupId: string = 'cdp-behavioural-events-consumer') {
|
||||
super(hub)
|
||||
this.kafkaConsumer = new KafkaConsumer({ groupId, topic })
|
||||
|
||||
// Only initialize Cassandra client if the feature is enabled
|
||||
if (hub.WRITE_BEHAVIOURAL_COUNTERS_TO_CASSANDRA) {
|
||||
this.cassandra = new CassandraClient({
|
||||
contactPoints: [hub.CASSANDRA_HOST],
|
||||
localDataCenter: 'datacenter1',
|
||||
keyspace: hub.CASSANDRA_KEYSPACE,
|
||||
credentials:
|
||||
hub.CASSANDRA_USER && hub.CASSANDRA_PASSWORD
|
||||
? { username: hub.CASSANDRA_USER, password: hub.CASSANDRA_PASSWORD }
|
||||
: undefined,
|
||||
})
|
||||
this.behavioralCounterRepository = new BehavioralCounterRepository(this.cassandra)
|
||||
} else {
|
||||
this.cassandra = null
|
||||
this.behavioralCounterRepository = null
|
||||
}
|
||||
}
|
||||
|
||||
public async processBatch(events: BehavioralEvent[]): Promise<void> {
|
||||
@@ -51,17 +81,23 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
|
||||
// Track events consumed and matched (absolute numbers)
|
||||
let eventsMatched = 0
|
||||
const counterUpdates: CounterUpdate[] = []
|
||||
|
||||
const results = await Promise.all(events.map((event) => this.processEvent(event)))
|
||||
const results = await Promise.all(events.map((event) => this.processEvent(event, counterUpdates)))
|
||||
eventsMatched = results.reduce((sum, count) => sum + count, 0)
|
||||
|
||||
// Batch write all counter updates
|
||||
if (counterUpdates.length > 0 && this.hub.WRITE_BEHAVIOURAL_COUNTERS_TO_CASSANDRA && this.cassandra) {
|
||||
await this.writeBehavioralCounters(counterUpdates)
|
||||
}
|
||||
|
||||
// Update metrics with absolute numbers
|
||||
counterEventsConsumed.inc(events.length)
|
||||
counterEventsMatchedTotal.inc(eventsMatched)
|
||||
})
|
||||
}
|
||||
|
||||
private async processEvent(event: BehavioralEvent): Promise<number> {
|
||||
private async processEvent(event: BehavioralEvent, counterUpdates: CounterUpdate[]): Promise<number> {
|
||||
try {
|
||||
const actions = await this.loadActionsForTeam(event.teamId)
|
||||
|
||||
@@ -71,7 +107,7 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
}
|
||||
|
||||
const results = await Promise.all(
|
||||
actions.map((action) => this.doesEventMatchAction(event.filterGlobals, action))
|
||||
actions.map((action) => this.doesEventMatchAction(event, action, counterUpdates))
|
||||
)
|
||||
|
||||
return results.filter(Boolean).length
|
||||
@@ -94,7 +130,11 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
}
|
||||
}
|
||||
|
||||
private async doesEventMatchAction(filterGlobals: HogFunctionFilterGlobals, action: Action): Promise<boolean> {
|
||||
private async doesEventMatchAction(
|
||||
event: BehavioralEvent,
|
||||
action: Action,
|
||||
counterUpdates: CounterUpdate[]
|
||||
): Promise<boolean> {
|
||||
if (!action.bytecode) {
|
||||
return false
|
||||
}
|
||||
@@ -102,7 +142,7 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
try {
|
||||
// Execute bytecode directly with the filter globals
|
||||
const execHogOutcome = await execHog(action.bytecode, {
|
||||
globals: filterGlobals,
|
||||
globals: event.filterGlobals,
|
||||
telemetry: false,
|
||||
})
|
||||
|
||||
@@ -113,6 +153,17 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
const matchedFilter =
|
||||
typeof execHogOutcome.execResult.result === 'boolean' && execHogOutcome.execResult.result
|
||||
|
||||
if (matchedFilter) {
|
||||
const filterHash = this.createFilterHash(action.bytecode!)
|
||||
const date = new Date().toISOString().split('T')[0]
|
||||
counterUpdates.push({
|
||||
teamId: event.teamId,
|
||||
filterHash,
|
||||
personId: event.personId,
|
||||
date,
|
||||
})
|
||||
}
|
||||
|
||||
return matchedFilter
|
||||
} catch (error) {
|
||||
logger.error('Error executing action bytecode', {
|
||||
@@ -123,6 +174,33 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
}
|
||||
}
|
||||
|
||||
private async writeBehavioralCounters(updates: CounterUpdate[]): Promise<void> {
|
||||
if (!this.behavioralCounterRepository) {
|
||||
logger.warn('Behavioral counter repository not initialized, skipping counter writes')
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
await this.behavioralCounterRepository.batchIncrementCounters(updates)
|
||||
} catch (error) {
|
||||
logger.error('Error batch writing behavioral counters', { error, updateCount: updates.length })
|
||||
}
|
||||
}
|
||||
|
||||
private createFilterHash(bytecode: any): string {
|
||||
const data = typeof bytecode === 'string' ? bytecode : JSON.stringify(bytecode)
|
||||
|
||||
// Check cache first
|
||||
if (this.filterHashCache.has(data)) {
|
||||
return this.filterHashCache.get(data)!
|
||||
}
|
||||
|
||||
// Calculate hash and cache it
|
||||
const hash = createHash('sha256').update(data).digest('hex').substring(0, 16)
|
||||
this.filterHashCache.set(data, hash)
|
||||
return hash
|
||||
}
|
||||
|
||||
// This consumer always parses from kafka
|
||||
public async _parseKafkaBatch(messages: Message[]): Promise<BehavioralEvent[]> {
|
||||
return await this.runWithHeartbeat(() =>
|
||||
@@ -135,12 +213,23 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
try {
|
||||
const clickHouseEvent = parseJSON(message.value!.toString()) as RawClickHouseEvent
|
||||
|
||||
if (!clickHouseEvent.person_id) {
|
||||
logger.error('Dropping event: missing person_id', {
|
||||
teamId: clickHouseEvent.team_id,
|
||||
event: clickHouseEvent.event,
|
||||
uuid: clickHouseEvent.uuid,
|
||||
})
|
||||
counterEventsDropped.labels({ reason: 'missing_person_id' }).inc()
|
||||
return
|
||||
}
|
||||
|
||||
// Convert directly to filter globals
|
||||
const filterGlobals = convertClickhouseRawEventToFilterGlobals(clickHouseEvent)
|
||||
|
||||
events.push({
|
||||
teamId: clickHouseEvent.team_id,
|
||||
filterGlobals,
|
||||
personId: clickHouseEvent.person_id,
|
||||
})
|
||||
} catch (e) {
|
||||
logger.error('Error parsing message', e)
|
||||
@@ -157,6 +246,16 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
|
||||
public async start(): Promise<void> {
|
||||
await super.start()
|
||||
|
||||
// Only connect to Cassandra if initialized
|
||||
if (this.cassandra) {
|
||||
logger.info('🤔', `Connecting to Cassandra...`)
|
||||
await this.cassandra.connect()
|
||||
logger.info('👍', `Cassandra ready`)
|
||||
} else {
|
||||
logger.info('ℹ️', `Cassandra disabled, skipping connection`)
|
||||
}
|
||||
|
||||
// Start consuming messages
|
||||
await this.kafkaConsumer.connect(async (messages) => {
|
||||
logger.info('🔁', `${this.name} - handling batch`, {
|
||||
@@ -175,6 +274,12 @@ export class CdpBehaviouralEventsConsumer extends CdpConsumerBase {
|
||||
public async stop(): Promise<void> {
|
||||
logger.info('💤', 'Stopping behavioural events consumer...')
|
||||
await this.kafkaConsumer.disconnect()
|
||||
|
||||
// Only shutdown Cassandra if it was initialized
|
||||
if (this.cassandra) {
|
||||
await this.cassandra.shutdown()
|
||||
}
|
||||
|
||||
// IMPORTANT: super always comes last
|
||||
await super.stop()
|
||||
logger.info('💤', 'Behavioural events consumer stopped!')
|
||||
|
||||
@@ -45,6 +45,12 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
CLICKHOUSE_PASSWORD: null,
|
||||
CLICKHOUSE_CA: null,
|
||||
CLICKHOUSE_SECURE: false,
|
||||
CASSANDRA_HOST: 'localhost',
|
||||
CASSANDRA_PORT: 9042,
|
||||
CASSANDRA_KEYSPACE: isTestEnv() ? 'test_posthog' : 'posthog',
|
||||
CASSANDRA_USER: null,
|
||||
CASSANDRA_PASSWORD: null,
|
||||
WRITE_BEHAVIOURAL_COUNTERS_TO_CASSANDRA: false,
|
||||
EVENT_OVERFLOW_BUCKET_CAPACITY: 1000,
|
||||
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: 1.0,
|
||||
KAFKA_BATCH_START_LOGGING_ENABLED: false,
|
||||
|
||||
@@ -203,6 +203,12 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
|
||||
CLICKHOUSE_CA: string | null // ClickHouse CA certs
|
||||
CLICKHOUSE_SECURE: boolean // whether to secure ClickHouse connection
|
||||
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string // (advanced) topic to send events for clickhouse ingestion
|
||||
CASSANDRA_HOST: string
|
||||
CASSANDRA_PORT: number
|
||||
CASSANDRA_KEYSPACE: string
|
||||
CASSANDRA_USER: string | null
|
||||
CASSANDRA_PASSWORD: string | null
|
||||
WRITE_BEHAVIOURAL_COUNTERS_TO_CASSANDRA: boolean
|
||||
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string // (advanced) topic to send heatmap data for clickhouse ingestion
|
||||
EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC: string // (advanced) topic to send exception event data for stack trace processing
|
||||
// Redis url pretty much only used locally / self hosted
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
import { Client as CassandraClient, types as CassandraTypes } from 'cassandra-driver'
|
||||
|
||||
export interface BehavioralCounterRow {
|
||||
team_id: number
|
||||
filter_hash: string
|
||||
person_id: string
|
||||
date: string
|
||||
count: number
|
||||
}
|
||||
|
||||
export interface CounterUpdate {
|
||||
teamId: number
|
||||
filterHash: string
|
||||
personId: string
|
||||
date: string
|
||||
}
|
||||
|
||||
export class BehavioralCounterRepository {
|
||||
constructor(private cassandra: CassandraClient) {}
|
||||
|
||||
/**
|
||||
* Maps a Cassandra row to a BehavioralCounterRow
|
||||
*/
|
||||
private mapRowToBehavioralCounter(row: any): BehavioralCounterRow {
|
||||
return {
|
||||
team_id: row.team_id,
|
||||
filter_hash: row.filter_hash,
|
||||
person_id: row.person_id.toString(),
|
||||
date: row.date,
|
||||
count: row.count.toNumber(),
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a specific behavioral counter
|
||||
*/
|
||||
async getCounter(params: {
|
||||
teamId: number
|
||||
filterHash: string
|
||||
personId: string
|
||||
date: string
|
||||
}): Promise<BehavioralCounterRow | null> {
|
||||
const { teamId, filterHash, personId, date } = params
|
||||
const result = await this.cassandra.execute(
|
||||
'SELECT team_id, filter_hash, person_id, date, count FROM behavioral_event_counters WHERE team_id = ? AND filter_hash = ? AND person_id = ? AND date = ?',
|
||||
[teamId, filterHash, CassandraTypes.Uuid.fromString(personId), date],
|
||||
{ prepare: true }
|
||||
)
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return null
|
||||
}
|
||||
|
||||
const row = result.rows[0]
|
||||
return this.mapRowToBehavioralCounter(row)
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all behavioral counters for a team
|
||||
*/
|
||||
async getCountersForTeam(teamId: number): Promise<BehavioralCounterRow[]> {
|
||||
const result = await this.cassandra.execute(
|
||||
'SELECT team_id, filter_hash, person_id, date, count FROM behavioral_event_counters WHERE team_id = ?',
|
||||
[teamId],
|
||||
{ prepare: true }
|
||||
)
|
||||
|
||||
return result.rows.map((row) => this.mapRowToBehavioralCounter(row))
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch increments multiple behavioral counters
|
||||
*/
|
||||
async batchIncrementCounters(updates: CounterUpdate[]): Promise<void> {
|
||||
if (updates.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
const batch = updates.map((update) => ({
|
||||
query: 'UPDATE behavioral_event_counters SET count = count + 1 WHERE team_id = ? AND filter_hash = ? AND person_id = ? AND date = ?',
|
||||
params: [update.teamId, update.filterHash, CassandraTypes.Uuid.fromString(update.personId), update.date],
|
||||
}))
|
||||
|
||||
await this.cassandra.batch(batch, { prepare: true, logged: false })
|
||||
}
|
||||
}
|
||||
12
plugin-server/tests/helpers/cassandra.ts
Normal file
12
plugin-server/tests/helpers/cassandra.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
import { Client as CassandraClient } from 'cassandra-driver'
|
||||
|
||||
/**
|
||||
* Test helper functions for Cassandra operations
|
||||
*/
|
||||
|
||||
/**
|
||||
* Truncates the behavioral_event_counters table (useful for tests)
|
||||
*/
|
||||
export async function truncateBehavioralCounters(cassandra: CassandraClient): Promise<void> {
|
||||
await cassandra.execute('TRUNCATE behavioral_event_counters')
|
||||
}
|
||||
109
pnpm-lock.yaml
generated
109
pnpm-lock.yaml
generated
@@ -1299,6 +1299,12 @@ importers:
|
||||
aws-sdk:
|
||||
specifier: ^2.927.0
|
||||
version: 2.1366.0
|
||||
cassandra-driver:
|
||||
specifier: ^4.8.0
|
||||
version: 4.8.0
|
||||
cassandra-migrate:
|
||||
specifier: ^1.2.333
|
||||
version: 1.2.333
|
||||
dayjs:
|
||||
specifier: 1.11.11
|
||||
version: 1.11.11(patch_hash=lbfir4woetqmvzqg7l4q5mjtfq)
|
||||
@@ -8370,6 +8376,10 @@ packages:
|
||||
resolution: {integrity: sha512-4B/qKCfeE/ODUaAUpSwfzazo5x29WD4r3vXiWsB7I2mSDAihwEqKO+g8GELZUQSSAo5e1XTYh3ZVfLyxBc12nA==}
|
||||
engines: {node: '>= 10.0.0'}
|
||||
|
||||
adm-zip@0.5.16:
|
||||
resolution: {integrity: sha512-TGw5yVi4saajsSEgz25grObGHEUaDrniwvA2qwSC060KfqGPdglhvPMA2lPIoxs3PQIItj2iag35fONcQqgUaQ==}
|
||||
engines: {node: '>=12.0'}
|
||||
|
||||
agent-base@5.1.1:
|
||||
resolution: {integrity: sha512-TMeqbNl2fMW0nMjTEPOwe3J/PRFP4vqeoNuQMG0HlMrtm5QxKqdvAkZ1pRBQ/ulIyDD5Yq0nJ7YbdD8ey0TO3g==}
|
||||
engines: {node: '>= 6.0.0'}
|
||||
@@ -8620,6 +8630,9 @@ packages:
|
||||
async-retry@1.3.3:
|
||||
resolution: {integrity: sha512-wfr/jstw9xNi/0teMHrRW7dsz3Lt5ARhYNZ2ewpadnhaIp5mbALhOAP+EAdsC7t4Z6wqsDVv9+W6gm1Dk9mEyw==}
|
||||
|
||||
async@0.9.2:
|
||||
resolution: {integrity: sha512-l6ToIJIotphWahxxHyzK9bnLR6kM4jJIIgLShZeqLY7iboHoGkdgFl7W2/Ivi4SkMJYGKqW8vSuk0uKUj6qsSw==}
|
||||
|
||||
async@3.2.4:
|
||||
resolution: {integrity: sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==}
|
||||
|
||||
@@ -9060,6 +9073,22 @@ packages:
|
||||
caseless@0.12.0:
|
||||
resolution: {integrity: sha512-4tYFyifaFfGacoiObjJegolkwSU4xQNGbVgUiNYVUxbQ2x2lUsFvY4hVgVzGiIe6WLOPqycWXA40l+PWsxthUw==}
|
||||
|
||||
cassandra-driver@3.6.0:
|
||||
resolution: {integrity: sha512-CkN3V+oPaF5RvakUjD3uUjEm8f6U8S0aT1+YqeQsVT3UDpPT2K8SOdNDEHA1KjamakHch6zkDgHph1xWyqBGGw==}
|
||||
engines: {node: '>=4'}
|
||||
|
||||
cassandra-driver@4.8.0:
|
||||
resolution: {integrity: sha512-HritfMGq9V7SuESeSodHvArs0mLuMk7uh+7hQK2lqdvXrvm50aWxb4RPxkK3mPDdsgHjJ427xNRFITMH2ei+Sw==}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
cassandra-migrate@1.2.333:
|
||||
resolution: {integrity: sha512-4uizL1SxK8tfpg18+qMuJge+VQQIlo4Or8fYA/xI6b2sdyXbCwTTLu31jJx1rXtmgoP/W5F4aQ3TfGeFX9YQIA==}
|
||||
hasBin: true
|
||||
|
||||
catharsis@0.9.0:
|
||||
resolution: {integrity: sha512-prMTQVpcns/tzFgFVkVp6ak6RykZyWb3gu8ckUpd6YkTlacOd3DXGJjIpD4Q6zJirizvaiAjSSHlOsA+6sNh2A==}
|
||||
engines: {node: '>= 10'}
|
||||
|
||||
chalk-template@1.1.0:
|
||||
resolution: {integrity: sha512-T2VJbcDuZQ0Tb2EWwSotMPJjgpy1/tGee1BTpUNsGZ/qgNjV2t7Mvu+d4600U564nbLesN1x2dPL+xii174Ekg==}
|
||||
engines: {node: '>=14.16'}
|
||||
@@ -12830,6 +12859,16 @@ packages:
|
||||
long@5.3.2:
|
||||
resolution: {integrity: sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==}
|
||||
|
||||
long@2.4.0:
|
||||
resolution: {integrity: sha512-ijUtjmO/n2A5PaosNG9ZGDsQ3vxJg7ZW8vsY8Kp0f2yIZWhSJvjmegV7t+9RPQKxKrvj8yKGehhS+po14hPLGQ==}
|
||||
engines: {node: '>=0.6'}
|
||||
|
||||
long@4.0.0:
|
||||
resolution: {integrity: sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==}
|
||||
|
||||
long@5.2.4:
|
||||
resolution: {integrity: sha512-qtzLbJE8hq7VabR3mISmVGtoXP8KGc2Z/AT8OuqlYD7JTR3oqrgwdjnk07wpj1twXxYmgDXgoKVWUG/fReSzHg==}
|
||||
|
||||
loose-envify@1.4.0:
|
||||
resolution: {integrity: sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==}
|
||||
hasBin: true
|
||||
@@ -25677,6 +25716,8 @@ snapshots:
|
||||
|
||||
address@1.2.2: {}
|
||||
|
||||
adm-zip@0.5.16: {}
|
||||
|
||||
agent-base@5.1.1: {}
|
||||
|
||||
agent-base@6.0.2:
|
||||
@@ -25995,6 +26036,8 @@ snapshots:
|
||||
dependencies:
|
||||
retry: 0.13.1
|
||||
|
||||
async@0.9.2: {}
|
||||
|
||||
async@3.2.4: {}
|
||||
|
||||
asynckit@0.4.0: {}
|
||||
@@ -26639,6 +26682,26 @@ snapshots:
|
||||
|
||||
caseless@0.12.0: {}
|
||||
|
||||
cassandra-driver@3.6.0:
|
||||
dependencies:
|
||||
long: 2.4.0
|
||||
|
||||
cassandra-driver@4.8.0:
|
||||
dependencies:
|
||||
'@types/node': 18.18.4
|
||||
adm-zip: 0.5.16
|
||||
long: 5.2.4
|
||||
|
||||
cassandra-migrate@1.2.333:
|
||||
dependencies:
|
||||
async: 0.9.2
|
||||
cassandra-driver: 3.6.0
|
||||
commander: 2.20.3
|
||||
|
||||
catharsis@0.9.0:
|
||||
dependencies:
|
||||
lodash: 4.17.21
|
||||
|
||||
chalk-template@1.1.0:
|
||||
dependencies:
|
||||
chalk: 5.4.1
|
||||
@@ -30339,6 +30402,25 @@ snapshots:
|
||||
- supports-color
|
||||
- ts-node
|
||||
|
||||
jest-cli@29.7.0(@types/node@22.15.17)(ts-node@10.9.1(@swc/core@1.11.4(@swc/helpers@0.5.15))(@types/node@22.15.17)(typescript@5.2.2)):
|
||||
dependencies:
|
||||
'@jest/core': 29.7.0(ts-node@10.9.1(@swc/core@1.11.4(@swc/helpers@0.5.15))(@types/node@22.15.17)(typescript@5.2.2))
|
||||
'@jest/test-result': 29.7.0
|
||||
'@jest/types': 29.6.3
|
||||
chalk: 4.1.2
|
||||
create-jest: 29.7.0(@types/node@22.15.17)(ts-node@10.9.1(@swc/core@1.11.4(@swc/helpers@0.5.15))(@types/node@22.15.17)(typescript@5.2.2))
|
||||
exit: 0.1.2
|
||||
import-local: 3.1.0
|
||||
jest-config: 29.7.0(@types/node@22.15.17)(ts-node@10.9.1(@swc/core@1.11.4(@swc/helpers@0.5.15))(@types/node@22.15.17)(typescript@5.2.2))
|
||||
jest-util: 29.7.0
|
||||
jest-validate: 29.7.0
|
||||
yargs: 17.7.1
|
||||
transitivePeerDependencies:
|
||||
- '@types/node'
|
||||
- babel-plugin-macros
|
||||
- supports-color
|
||||
- ts-node
|
||||
|
||||
jest-cli@29.7.0(@types/node@24.0.15)(ts-node@10.9.1(@swc/core@1.11.4(@swc/helpers@0.5.15))(@types/node@24.0.15)(typescript@5.2.2)):
|
||||
dependencies:
|
||||
'@jest/core': 29.7.0(ts-node@10.9.1(@swc/core@1.11.4(@swc/helpers@0.5.15))(@types/node@24.0.15)(typescript@5.2.2))
|
||||
@@ -31381,6 +31463,12 @@ snapshots:
|
||||
|
||||
long@5.3.2: {}
|
||||
|
||||
long@2.4.0: {}
|
||||
|
||||
long@4.0.0: {}
|
||||
|
||||
long@5.2.4: {}
|
||||
|
||||
loose-envify@1.4.0:
|
||||
dependencies:
|
||||
js-tokens: 4.0.0
|
||||
@@ -36035,6 +36123,27 @@ snapshots:
|
||||
optionalDependencies:
|
||||
'@swc/core': 1.11.4(@swc/helpers@0.5.15)
|
||||
|
||||
ts-node@10.9.1(@swc/core@1.11.4(@swc/helpers@0.5.15))(@types/node@22.15.17)(typescript@5.2.2):
|
||||
dependencies:
|
||||
'@cspotcode/source-map-support': 0.8.1
|
||||
'@tsconfig/node10': 1.0.9
|
||||
'@tsconfig/node12': 1.0.11
|
||||
'@tsconfig/node14': 1.0.3
|
||||
'@tsconfig/node16': 1.0.3
|
||||
'@types/node': 22.15.17
|
||||
acorn: 8.10.0
|
||||
acorn-walk: 8.2.0
|
||||
arg: 4.1.3
|
||||
create-require: 1.1.1
|
||||
diff: 4.0.2
|
||||
make-error: 1.3.6
|
||||
typescript: 5.2.2
|
||||
v8-compile-cache-lib: 3.0.1
|
||||
yn: 3.1.1
|
||||
optionalDependencies:
|
||||
'@swc/core': 1.11.4(@swc/helpers@0.5.15)
|
||||
optional: true
|
||||
|
||||
ts-node@10.9.1(@swc/core@1.11.4(@swc/helpers@0.5.15))(@types/node@24.0.15)(typescript@5.2.2):
|
||||
dependencies:
|
||||
'@cspotcode/source-map-support': 0.8.1
|
||||
|
||||
Reference in New Issue
Block a user