feat(messaging): Trigger incoming webhook UI (#37798)

This commit is contained in:
Ben White
2025-09-09 17:40:13 +02:00
committed by GitHub
parent bf3f290215
commit 00cce2cde0
41 changed files with 1273 additions and 526 deletions

View File

@@ -208,6 +208,7 @@ export const FEATURE_FLAGS = {
REPLAY_TEMPLATES: 'replay-templates', // owner: @raquelmsmith #team-replay
MESSAGING: 'messaging', // owner @haven #team-messaging
MESSAGING_EARLY_ACCESS: 'messaging-product', // owner @haven #team-messaging
MESSAGING_TRIGGER_WEBHOOK: 'messaging-trigger-webhook', // owner #team-messaging
ENVIRONMENTS_ROLLBACK: 'environments-rollback', // owner: @yasen-posthog #team-platform-features
SELF_SERVE_CREDIT_OVERRIDE: 'self-serve-credit-override', // owner: @zach
CUSTOM_CSS_THEMES: 'custom-css-themes', // owner: @daibhin

View File

@@ -50,10 +50,7 @@ export class FixtureHogFlowBuilder {
name: 'Hog Flow',
team_id: 1,
status: 'active',
trigger: {
type: 'event',
filters: {},
},
trigger: undefined as any,
exit_condition: 'exit_on_conversion',
edges: [],
actions: [],
@@ -61,22 +58,16 @@ export class FixtureHogFlowBuilder {
}
build(): HogFlow {
if (this.hogFlow.actions.length === 0) {
this.withSimpleWorkflow()
}
const triggerAction = findActionByType(this.hogFlow, 'trigger')
this.hogFlow.trigger =
this.hogFlow.trigger ??
(triggerAction
? {
type: triggerAction?.type ?? 'event',
filters: triggerAction?.config?.filters ?? {},
}
: undefined)
this.hogFlow.trigger = this.hogFlow.trigger ?? (triggerAction ? triggerAction.config : undefined)
if (!this.hogFlow.trigger) {
logger.error('[HogFlowBuilder] No trigger action found. Indicates a faulty built workflow')
}
// TODO: Run throught the zod validation?
return this.hogFlow
}
@@ -95,11 +86,6 @@ export class FixtureHogFlowBuilder {
return this
}
withTrigger(trigger: HogFlow['trigger']): this {
this.hogFlow.trigger = trigger
return this
}
withExitCondition(exitCondition: HogFlow['exit_condition']): this {
this.hogFlow.exit_condition = exitCondition
return this
@@ -126,14 +112,14 @@ export class FixtureHogFlowBuilder {
return this
}
withSimpleWorkflow(): this {
withSimpleWorkflow({ trigger }: { trigger?: HogFlow['trigger'] } = {}): this {
return this.withWorkflow({
actions: {
trigger: {
type: 'trigger',
config: {
config: trigger ?? {
type: 'event',
filters: HOG_FILTERS_EXAMPLES.no_filters.filters,
filters: HOG_FILTERS_EXAMPLES.no_filters.filters ?? {},
},
},
exit: {

View File

@@ -8,6 +8,7 @@ import { ClickHousePerson, ClickHouseTimestamp, ProjectId, RawClickHouseEvent, T
import { PostgresRouter } from '../../utils/db/postgres'
import { UUIDT } from '../../utils/utils'
import { CdpInternalEvent } from '../schema'
import { compileHog } from '../templates/compiler'
import {
CyclotronJobInvocationHogFunction,
CyclotronJobQueueKind,
@@ -175,6 +176,9 @@ export const insertHogFunctionTemplate = async (
const template = createHogFunctionTemplate({
...hogFunctionTemplate,
})
if (template.code_language === 'hog') {
template.bytecode = await compileHog(template.code)
}
const res = await insertRow(postgres, 'posthog_hogfunctiontemplate', {
id: randomUUID(),

View File

@@ -12,7 +12,8 @@ import { CdpSourceWebhooksConsumer, SourceWebhookError } from './consumers/cdp-s
import { HogTransformerService } from './hog-transformations/hog-transformer.service'
import { createCdpRedisPool } from './redis'
import { HogExecutorExecuteAsyncOptions, HogExecutorService, MAX_ASYNC_STEPS } from './services/hog-executor.service'
import { HogFlowExecutorService } from './services/hogflows/hogflow-executor.service'
import { HogFlowExecutorService, createHogFlowInvocation } from './services/hogflows/hogflow-executor.service'
import { HogFlowFunctionsService } from './services/hogflows/hogflow-functions.service'
import { HogFlowManagerService } from './services/hogflows/hogflow-manager.service'
import { HogFunctionManagerService } from './services/managers/hog-function-manager.service'
import { HogFunctionTemplateManagerService } from './services/managers/hog-function-template-manager.service'
@@ -40,6 +41,7 @@ export class CdpApi {
private recipientsManager: RecipientsManagerService
private hogFlowExecutor: HogFlowExecutorService
private hogFlowFunctionsService: HogFlowFunctionsService
private hogWatcher: HogWatcherService
private hogTransformer: HogTransformerService
private hogFunctionMonitoringService: HogFunctionMonitoringService
@@ -54,13 +56,15 @@ export class CdpApi {
this.hogFlowManager = new HogFlowManagerService(hub)
this.recipientsManager = new RecipientsManagerService(hub)
this.hogExecutor = new HogExecutorService(hub)
this.hogFlowFunctionsService = new HogFlowFunctionsService(
hub,
this.hogFunctionTemplateManager,
this.hogExecutor
)
this.recipientPreferencesService = new RecipientPreferencesService(this.recipientsManager)
this.recipientTokensService = new RecipientTokensService(hub)
this.hogFlowExecutor = new HogFlowExecutorService(
hub,
this.hogExecutor,
this.hogFunctionTemplateManager,
this.hogFlowFunctionsService,
this.recipientPreferencesService
)
this.nativeDestinationExecutorService = new NativeDestinationExecutorService(hub)
@@ -469,11 +473,7 @@ export class CdpApi {
groups: globals.groups,
})
const invocation = this.hogFlowExecutor.createHogFlowInvocation(
triggerGlobals,
compoundConfiguration,
filterGlobals
)
const invocation = createHogFlowInvocation(triggerGlobals, compoundConfiguration, filterGlobals)
invocation.state.currentAction = current_action_id
? {

View File

@@ -4,6 +4,7 @@ import { logger } from '../../utils/logger'
import { CdpRedis, createCdpRedisPool } from '../redis'
import { HogExecutorService } from '../services/hog-executor.service'
import { HogFlowExecutorService } from '../services/hogflows/hogflow-executor.service'
import { HogFlowFunctionsService } from '../services/hogflows/hogflow-functions.service'
import { HogFlowManagerService } from '../services/hogflows/hogflow-manager.service'
import { LegacyPluginExecutorService } from '../services/legacy-plugin-executor.service'
import { GroupsManagerService } from '../services/managers/groups-manager.service'
@@ -36,6 +37,7 @@ export abstract class CdpConsumerBase {
hogFlowManager: HogFlowManagerService
hogFunctionManager: HogFunctionManagerService
hogFunctionTemplateManager: HogFunctionTemplateManagerService
hogFlowFunctionsService: HogFlowFunctionsService
personsManager: PersonsManagerService
recipientsManager: RecipientsManagerService
@@ -58,13 +60,16 @@ export abstract class CdpConsumerBase {
this.hogMasker = new HogMaskerService(this.redis)
this.hogExecutor = new HogExecutorService(this.hub)
this.hogFunctionTemplateManager = new HogFunctionTemplateManagerService(this.hub)
this.hogFlowFunctionsService = new HogFlowFunctionsService(
this.hub,
this.hogFunctionTemplateManager,
this.hogExecutor
)
this.recipientsManager = new RecipientsManagerService(this.hub)
this.recipientPreferencesService = new RecipientPreferencesService(this.recipientsManager)
this.hogFlowExecutor = new HogFlowExecutorService(
this.hub,
this.hogExecutor,
this.hogFunctionTemplateManager,
this.hogFlowFunctionsService,
this.recipientPreferencesService
)

View File

@@ -530,7 +530,26 @@ describe('hog flow processing', () => {
})
it('should not create hog flow invocations with no filters', async () => {
await insertHogFlow(new FixtureHogFlowBuilder().withTeamId(team.id).build())
const hogFlow = new FixtureHogFlowBuilder().withTeamId(team.id).build()
hogFlow.trigger = {} as any
await insertHogFlow(hogFlow)
const invocations = await processor['createHogFlowInvocations']([globals])
expect(invocations).toHaveLength(0)
})
it('should not create hog flow invocations with webhook triggers', async () => {
const hogFlow = new FixtureHogFlowBuilder()
.withTeamId(team.id)
.withSimpleWorkflow({
trigger: {
type: 'webhook',
template_id: 'test',
inputs: {},
},
})
.build()
await insertHogFlow(hogFlow)
const invocations = await processor['createHogFlowInvocations']([globals])
expect(invocations).toHaveLength(0)
@@ -540,9 +559,11 @@ describe('hog flow processing', () => {
const hogFlow = await insertHogFlow(
new FixtureHogFlowBuilder()
.withTeamId(team.id)
.withTrigger({
type: 'event',
filters: HOG_FILTERS_EXAMPLES.pageview_or_autocapture_filter.filters,
.withSimpleWorkflow({
trigger: {
type: 'event',
filters: HOG_FILTERS_EXAMPLES.pageview_or_autocapture_filter.filters ?? {},
},
})
.build()
)

View File

@@ -7,15 +7,18 @@ import supertest from 'supertest'
import express from 'ultimate-express'
import { setupExpressApp } from '~/api/router'
import { insertHogFunction } from '~/cdp/_tests/fixtures'
import { insertHogFunction, insertHogFunctionTemplate } from '~/cdp/_tests/fixtures'
import { CdpApi } from '~/cdp/cdp-api'
import { template as incomingWebhookTemplate } from '~/cdp/templates/_sources/webhook/incoming_webhook.template'
import { HogFunctionType } from '~/cdp/types'
import { HogFlow } from '~/schema/hogflow'
import { forSnapshot } from '~/tests/helpers/snapshots'
import { getFirstTeam, resetTestDatabase } from '~/tests/helpers/sql'
import { Hub, Team } from '~/types'
import { closeHub, createHub } from '~/utils/db/hub'
import { FixtureHogFlowBuilder } from '../_tests/builders/hogflow.builder'
import { insertHogFlow } from '../_tests/fixtures-hogflows'
import { HogWatcherState } from '../services/monitoring/hog-watcher.service'
import { compileHog } from '../templates/compiler'
import { compileInputs } from '../templates/test/test-helpers'
@@ -79,13 +82,13 @@ describe('SourceWebhooksConsumer', () => {
})
const doRequest = async (options: {
hogFunctionId?: string
webhookId?: string
method?: string
headers?: Record<string, string>
body?: Record<string, any>
}) => {
return supertest(app)
.post(`/public/webhooks/${options.hogFunctionId ?? hogFunction.id}`)
.post(`/public/webhooks/${options.webhookId ?? hogFunction.id}`)
.set('Content-Type', 'application/json')
.set(options.headers ?? {})
.send(options.body)
@@ -94,16 +97,19 @@ describe('SourceWebhooksConsumer', () => {
const waitForBackgroundTasks = async () => {
await api['cdpSourceWebhooksConsumer']['promiseScheduler'].waitForAllSettled()
}
const getLogs = (): string[] => {
const res = mockProducerObserver.getProducedKafkaMessagesForTopic('log_entries_test')
return res.map((x) => x.value.message) as string[]
}
const getMetrics = (): any[] => {
const res = mockProducerObserver.getProducedKafkaMessagesForTopic('clickhouse_app_metrics2_test')
return res.map((x) => x.value) as any[]
}
describe('processWebhook', () => {
const getLogs = (): string[] => {
const res = mockProducerObserver.getProducedKafkaMessagesForTopic('log_entries_test')
return res.map((x) => x.value.message) as string[]
}
describe('hog function processing', () => {
it('should 404 if the hog function does not exist', async () => {
const res = await doRequest({
hogFunctionId: 'non-existent-hog-function-id',
webhookId: 'non-existent-hog-function-id',
})
expect(res.status).toEqual(404)
expect(res.body).toEqual({
@@ -178,6 +184,146 @@ describe('SourceWebhooksConsumer', () => {
})
})
describe('hog flow processing', () => {
let hogFlow: HogFlow
beforeEach(async () => {
const template = await insertHogFunctionTemplate(hub.postgres, incomingWebhookTemplate)
hogFlow = new FixtureHogFlowBuilder()
.withTeamId(team.id)
.withSimpleWorkflow({
trigger: {
type: 'webhook',
template_id: template.template_id,
inputs: {
event: {
value: 'my-event',
bytecode: await compileHog(`return f'my-event'`),
},
distinct_id: {
value: '{request.body.distinct_id}',
bytecode: await compileHog(`return f'{request.body.distinct_id}'`),
},
},
},
})
.build()
await insertHogFlow(hub.postgres, hogFlow)
})
it('should 404 if the hog flow does not exist', async () => {
const res = await doRequest({
webhookId: 'non-existent-hog-flow-id',
})
expect(res.status).toEqual(404)
})
it('should invoke a workflow with the parsed inputs', async () => {
const res = await doRequest({
webhookId: hogFlow.id,
body: {
event: 'my-event',
distinct_id: 'test-distinct-id',
},
})
expect(res.status).toEqual(201)
expect(res.body).toEqual({
status: 'queued',
})
expect(mockExecuteSpy).toHaveBeenCalledTimes(1)
expect(mockQueueInvocationsSpy).toHaveBeenCalledTimes(1)
const call = mockQueueInvocationsSpy.mock.calls[0][0][0]
expect(call.queue).toEqual('hogflow')
expect(call.hogFlow).toMatchObject(hogFlow)
})
it('should add logs and metrics', async () => {
const res = await doRequest({
webhookId: hogFlow.id,
body: {
event: 'my-event',
distinct_id: 'test-distinct-id',
},
})
expect(res.status).toEqual(201)
await waitForBackgroundTasks()
expect(getLogs()).toEqual([expect.stringContaining('[Action:trigger] Function completed in')])
expect(getMetrics()).toEqual([
expect.objectContaining({
metric_kind: 'other',
metric_name: 'triggered',
count: 1,
}),
expect.objectContaining({
metric_kind: 'billing',
metric_name: 'billable_invocation',
count: 1,
}),
])
})
it('should add logs and metrics for a controlled failed hog flow', async () => {
const res = await doRequest({
webhookId: hogFlow.id,
body: {
event: 'my-event',
missing_distinct_id: 'test-distinct-id',
},
})
expect(res.status).toEqual(400)
expect(res.body).toEqual({
error: '"distinct_id" could not be parsed correctly',
})
await waitForBackgroundTasks()
expect(getLogs()).toEqual([
expect.stringContaining('[Action:trigger] Function completed in'),
'[Action:trigger] Responded with response status - 400',
])
expect(getMetrics()).toEqual([
expect.objectContaining({ metric_kind: 'failure', metric_name: 'trigger_failed', count: 1 }),
])
})
it('should add logs and metrics for an uncontrolled failed hog flow', async () => {
// Hacky but otherwise its quite hard to trigger an uncontrolled error
hogFlow = new FixtureHogFlowBuilder()
.withTeamId(team.id)
.withSimpleWorkflow({
trigger: {
type: 'webhook',
template_id: incomingWebhookTemplate.id,
inputs: {
distinct_id: {
value: '{i.do.not.exist}',
bytecode: await compileHog(`return f'{i.do.not.exist}'`),
},
},
},
})
.build()
await insertHogFlow(hub.postgres, hogFlow)
const res = await doRequest({
webhookId: hogFlow.id,
body: {
event: 'my-event',
missing_distinct_id: 'test-distinct-id',
},
})
expect(res.status).toEqual(500)
expect(res.body).toEqual({
status: 'Unhandled error',
})
await waitForBackgroundTasks()
expect(getLogs()).toEqual([
'[Action:trigger] Error triggering flow: Could not execute bytecode for input field: distinct_id',
])
expect(getMetrics()).toEqual([
expect.objectContaining({ metric_kind: 'failure', metric_name: 'trigger_failed', count: 1 }),
])
})
})
describe('hogwatcher', () => {
it('should return a degraded response if the function is degraded', async () => {
await api['cdpSourceWebhooksConsumer']['hogWatcher'].forceStateChange(

View File

@@ -2,20 +2,26 @@ import { DateTime } from 'luxon'
import { ModifiedRequest } from '~/api/router'
import { instrumented } from '~/common/tracing/tracing-utils'
import { HogFlow } from '~/schema/hogflow'
import { HealthCheckResult, HealthCheckResultOk, Hub } from '../../types'
import { logger } from '../../utils/logger'
import { PromiseScheduler } from '../../utils/promise-scheduler'
import { UUID, UUIDT } from '../../utils/utils'
import { createHogFlowInvocation } from '../services/hogflows/hogflow-executor.service'
import { actionIdForLogging } from '../services/hogflows/hogflow-utils'
import { CyclotronJobQueue } from '../services/job-queue/job-queue'
import { HogWatcherState } from '../services/monitoring/hog-watcher.service'
import { HogWatcherFunctionState, HogWatcherState } from '../services/monitoring/hog-watcher.service'
import {
CyclotronJobInvocationHogFunction,
CyclotronJobInvocationResult,
HogFunctionFilterGlobals,
HogFunctionInvocationGlobals,
HogFunctionType,
LogEntryLevel,
MinimalAppMetric,
} from '../types'
import { createAddLogFunction } from '../utils'
import { logEntry } from '../utils'
import { createInvocation, createInvocationResult } from '../utils/invocation-utils'
import { CdpConsumerBase } from './cdp-base.consumer'
@@ -75,48 +81,29 @@ export class CdpSourceWebhooksConsumer extends CdpConsumerBase {
this.cyclotronJobQueue = new CyclotronJobQueue(hub, 'hog')
}
public async getWebhook(webhookId: string): Promise<HogFunctionType | null> {
public async getWebhook(webhookId: string): Promise<{ hogFlow?: HogFlow; hogFunction: HogFunctionType } | null> {
if (!UUID.validateString(webhookId, false)) {
return null
}
// Check for hog functions
const hogFunction = await this.hogFunctionManager.getHogFunction(webhookId)
if (hogFunction?.type !== 'source_webhook' || !hogFunction.enabled) {
return null
if (hogFunction?.type === 'source_webhook' && hogFunction?.enabled) {
return { hogFunction }
}
return hogFunction
// Otherwise check for hog flows
const hogFlow = await this.hogFlowManager.getHogFlow(webhookId)
if (hogFlow && hogFlow.status === 'active' && hogFlow.trigger?.type === 'webhook') {
const hogFunction = await this.hogFlowFunctionsService.buildHogFunction(hogFlow, hogFlow.trigger)
return { hogFlow, hogFunction }
}
return null
}
@instrumented('cdpSourceWebhooksConsumer.processWebhook')
public async processWebhook(
webhookId: string,
req: ModifiedRequest
): Promise<CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>> {
const [hogFunction, hogFunctionState] = await Promise.all([
this.getWebhook(webhookId),
this.hogWatcher.getCachedEffectiveState(webhookId),
])
if (!hogFunction) {
throw new SourceWebhookError(404, 'Not found')
}
if (hogFunctionState?.state === HogWatcherState.disabled) {
this.hogFunctionMonitoringService.queueAppMetric(
{
team_id: hogFunction.team_id,
app_source_id: hogFunction.id,
metric_kind: 'failure',
metric_name: 'disabled_permanently',
count: 1,
},
'hog_function'
)
throw new SourceWebhookError(429, 'Disabled')
}
private buildRequestGlobals(hogFunction: HogFunctionType, req: ModifiedRequest): HogFunctionInvocationGlobals {
const body: Record<string, any> = req.body
const ipValue = getFirstHeaderValue(req.headers['x-forwarded-for']) || req.socket.remoteAddress || req.ip
@@ -134,7 +121,7 @@ export class CdpSourceWebhooksConsumer extends CdpConsumerBase {
}
}
const globals: HogFunctionInvocationGlobals = {
return {
source: {
name: hogFunction.name ?? `Hog function: ${hogFunction.id}`,
url: `${projectUrl}/functions/${hogFunction.id}`,
@@ -160,13 +147,145 @@ export class CdpSourceWebhooksConsumer extends CdpConsumerBase {
stringBody: req.rawBody ?? '',
},
}
}
private async executeHogFlow(
req: ModifiedRequest,
hogFlow: HogFlow,
hogFunction: HogFunctionType
): Promise<CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>> {
logger.info('Executing hog flow trigger', {
id: hogFlow.id,
template_id: hogFunction.template_id,
team_id: hogFlow.team_id,
})
const invocationId = new UUIDT().toString()
const triggerActionId = hogFlow.actions.find((action) => action.type === 'trigger')?.id ?? 'trigger_node'
const addLog = (level: LogEntryLevel, message: string) => {
this.hogFunctionMonitoringService.queueLogs(
[
{
team_id: hogFlow.team_id,
log_source: 'hog_flow',
log_source_id: hogFlow.id,
instance_id: invocationId,
...logEntry(level, `${actionIdForLogging({ id: triggerActionId })} ${message}`),
},
],
'hog_flow'
)
}
const addMetric = (metric: Pick<MinimalAppMetric, 'metric_kind' | 'metric_name' | 'count'>) => {
this.hogFunctionMonitoringService.queueAppMetric(
{
team_id: hogFlow.team_id,
app_source_id: hogFlow.id,
...metric,
},
'hog_flow'
)
}
try {
const globals: HogFunctionInvocationGlobals = this.buildRequestGlobals(hogFunction, req)
const globalsWithInputs = await this.hogExecutor.buildInputsWithGlobals(hogFunction, globals)
const invocation = createInvocation(globalsWithInputs, hogFunction)
// Slightly different handling for hog flows
// Run the initial step - this allows functions not using fetches to respond immediately
const functionResult = await this.hogFlowFunctionsService.execute(invocation)
functionResult.logs.forEach((log) => addLog(log.level, log.message))
functionResult.logs = []
// Queue any queued work here. This allows us to enable delayed work like fetching eventually without blocking the API.
if (!functionResult.finished) {
throw new SourceWebhookError(500, 'Delayed processing not supported')
}
const customHttpResponse = getCustomHttpResponse(functionResult)
if (customHttpResponse) {
const level = customHttpResponse.status >= 400 ? 'warn' : 'info'
addLog(level, `Responded with response status - ${customHttpResponse.status}`)
}
const capturedPostHogEvent = functionResult.capturedPostHogEvents[0]
// Add all logs to the result
if (capturedPostHogEvent) {
// Invoke the hogflow
const triggerGlobals: HogFunctionInvocationGlobals = {
...invocation.state.globals,
event: {
...capturedPostHogEvent,
uuid: new UUIDT().toString(),
elements_chain: '',
url: '',
},
}
const hogFlowInvocation = createHogFlowInvocation(
triggerGlobals,
hogFlow,
{} as HogFunctionFilterGlobals
)
addMetric({
metric_kind: 'other',
metric_name: 'triggered',
count: 1,
})
addMetric({
metric_kind: 'billing',
metric_name: 'billable_invocation',
count: 1,
})
await this.cyclotronJobQueue.queueInvocations([hogFlowInvocation])
} else {
addMetric({
metric_kind: 'failure',
metric_name: 'trigger_failed',
count: 1,
})
}
// Always set to false for hog flows as this triggers the flow to continue so we dont want metrics for this
functionResult.finished = false
return functionResult
} catch (error) {
logger.error('Error triggering hog flow', { error })
addMetric({
metric_kind: 'failure',
metric_name: 'trigger_failed',
count: 1,
})
addLog('error', `Error triggering flow: ${error.message}`)
// NOTE: We only return a hog function result. We track out own logs and errors here
return createInvocationResult(
createInvocation({} as any, hogFunction),
{},
{
finished: false,
error: error.message,
}
)
}
}
private async executeHogFunction(
req: ModifiedRequest,
hogFunction: HogFunctionType,
hogFunctionState: HogWatcherFunctionState | null
): Promise<CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>> {
let result: CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>
try {
// TODO: Add error handling and logging
const globals: HogFunctionInvocationGlobals = this.buildRequestGlobals(hogFunction, req)
const globalsWithInputs = await this.hogExecutor.buildInputsWithGlobals(hogFunction, globals)
const invocation = createInvocation(globalsWithInputs, hogFunction)
if (hogFunctionState?.state === HogWatcherState.degraded) {
@@ -174,7 +293,7 @@ export class CdpSourceWebhooksConsumer extends CdpConsumerBase {
invocation.queue = 'hog_overflow'
await this.cyclotronJobQueue.queueInvocations([invocation])
result = createInvocationResult(
result = createInvocationResult<CyclotronJobInvocationHogFunction>(
invocation,
{},
{
@@ -200,8 +319,6 @@ export class CdpSourceWebhooksConsumer extends CdpConsumerBase {
// Run the initial step - this allows functions not using fetches to respond immediately
result = await this.hogExecutor.execute(invocation)
const addLog = createAddLogFunction(result.logs)
// Queue any queued work here. This allows us to enable delayed work like fetching eventually without blocking the API.
if (!result.finished) {
await this.cyclotronJobQueue.queueInvocationResults([result])
@@ -210,7 +327,7 @@ export class CdpSourceWebhooksConsumer extends CdpConsumerBase {
const customHttpResponse = getCustomHttpResponse(result)
if (customHttpResponse) {
const level = customHttpResponse.status >= 400 ? 'warn' : 'info'
addLog(level, `Responded with response status - ${customHttpResponse.status}`)
result.logs.push(logEntry(level, `Responded with response status - ${customHttpResponse.status}`))
}
}
} catch (error) {
@@ -226,13 +343,46 @@ export class CdpSourceWebhooksConsumer extends CdpConsumerBase {
)
}
await this.hogFunctionMonitoringService.queueInvocationResults([result])
return result
}
@instrumented('cdpSourceWebhooksConsumer.processWebhook')
public async processWebhook(
webhookId: string,
req: ModifiedRequest
): Promise<CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>> {
const [webhook, hogFunctionState] = await Promise.all([
this.getWebhook(webhookId),
this.hogWatcher.getCachedEffectiveState(webhookId),
])
if (!webhook) {
throw new SourceWebhookError(404, 'Not found')
}
const { hogFunction, hogFlow } = webhook
if (hogFunctionState?.state === HogWatcherState.disabled) {
this.hogFunctionMonitoringService.queueAppMetric(
{
team_id: hogFunction.team_id,
app_source_id: hogFunction.id,
metric_kind: 'failure',
metric_name: 'disabled_permanently',
count: 1,
},
hogFlow ? 'hog_flow' : 'hog_function'
)
throw new SourceWebhookError(429, 'Disabled')
}
const result = hogFlow
? await this.executeHogFlow(req, hogFlow, hogFunction)
: await this.executeHogFunction(req, hogFunction, hogFunctionState)
void this.promiseScheduler.schedule(
Promise.all([
this.hogFunctionMonitoringService.queueInvocationResults([result]).then(() => {
return this.hogFunctionMonitoringService.flush()
}),
this.hogWatcher.observeResultsBuffered(result),
])
Promise.all([this.hogFunctionMonitoringService.flush(), this.hogWatcher.observeResultsBuffered(result)])
)
return result

View File

@@ -0,0 +1,132 @@
// Jest Snapshot v1, https://jestjs.io/docs/snapshot-testing
exports[`HogFlowManager returns the hog flow 1`] = `
[
{
"abort_action": null,
"actions": [
{
"config": {
"filters": {
"actions": [],
"bytecode": [
"_h",
29,
],
"events": [],
},
"type": "event",
},
"created_at": "CREATED_AT",
"description": "trigger",
"id": "trigger",
"name": "trigger",
"on_error": "continue",
"type": "trigger",
"updated_at": "UPDATED_AT",
},
{
"config": {},
"created_at": "CREATED_AT",
"description": "exit",
"id": "exit",
"name": "exit",
"on_error": "continue",
"type": "exit",
"updated_at": "UPDATED_AT",
},
],
"conversion": null,
"created_at": "CREATED_AT",
"description": "",
"edges": [
{
"from": "trigger",
"to": "exit",
"type": "continue",
},
],
"exit_condition": "exit_on_conversion",
"id": "<REPLACED-UUID-0>",
"name": "Test Hog Flow team 1",
"status": "active",
"team_id": "TEAM_ID",
"trigger": {
"filters": {
"actions": [],
"bytecode": [
"_h",
29,
],
"events": [],
},
"type": "event",
},
"updated_at": "UPDATED_AT",
"version": 1,
},
{
"abort_action": null,
"actions": [
{
"config": {
"filters": {
"actions": [],
"bytecode": [
"_h",
29,
],
"events": [],
},
"type": "event",
},
"created_at": "CREATED_AT",
"description": "trigger",
"id": "trigger",
"name": "trigger",
"on_error": "continue",
"type": "trigger",
"updated_at": "UPDATED_AT",
},
{
"config": {},
"created_at": "CREATED_AT",
"description": "exit",
"id": "exit",
"name": "exit",
"on_error": "continue",
"type": "exit",
"updated_at": "UPDATED_AT",
},
],
"conversion": null,
"created_at": "CREATED_AT",
"description": "",
"edges": [
{
"from": "trigger",
"to": "exit",
"type": "continue",
},
],
"exit_condition": "exit_on_conversion",
"id": "<REPLACED-UUID-1>",
"name": "Test Hog Flow team 1 - other",
"status": "active",
"team_id": "TEAM_ID",
"trigger": {
"filters": {
"actions": [],
"bytecode": [
"_h",
29,
],
"events": [],
},
"type": "event",
},
"updated_at": "UPDATED_AT",
"version": 1,
},
]
`;

View File

@@ -5,7 +5,6 @@ import { DateTime } from 'luxon'
import { FixtureHogFlowBuilder } from '~/cdp/_tests/builders/hogflow.builder'
import { insertHogFunctionTemplate, insertIntegration } from '~/cdp/_tests/fixtures'
import { createExampleHogFlowInvocation } from '~/cdp/_tests/fixtures-hogflows'
import { compileHog } from '~/cdp/templates/compiler'
import { createInvocationResult } from '~/cdp/utils/invocation-utils'
import { getFirstTeam, resetTestDatabase } from '~/tests/helpers/sql'
import { Hub, Team } from '~/types'
@@ -16,6 +15,7 @@ import { CyclotronJobInvocationHogFlow } from '../../../types'
import { HogExecutorService } from '../../hog-executor.service'
import { HogFunctionTemplateManagerService } from '../../managers/hog-function-template-manager.service'
import { RecipientPreferencesService } from '../../messaging/recipient-preferences.service'
import { HogFlowFunctionsService } from '../hogflow-functions.service'
import { findActionByType } from '../hogflow-utils'
import { HogFunctionHandler } from './hog_function'
@@ -25,6 +25,7 @@ describe('HogFunctionHandler', () => {
let hogFunctionHandler: HogFunctionHandler
let mockHogFunctionExecutor: HogExecutorService
let mockHogFunctionTemplateManager: HogFunctionTemplateManagerService
let mockHogFlowFunctionsService: HogFlowFunctionsService
let mockRecipientPreferencesService: RecipientPreferencesService
let invocation: CyclotronJobInvocationHogFlow
@@ -37,23 +38,22 @@ describe('HogFunctionHandler', () => {
mockHogFunctionExecutor = new HogExecutorService(hub)
mockHogFunctionTemplateManager = new HogFunctionTemplateManagerService(hub)
mockHogFlowFunctionsService = new HogFlowFunctionsService(
hub,
mockHogFunctionTemplateManager,
mockHogFunctionExecutor
)
mockRecipientPreferencesService = {
shouldSkipAction: jest.fn().mockResolvedValue(false),
} as any
hogFunctionHandler = new HogFunctionHandler(
hub,
mockHogFunctionExecutor,
mockHogFunctionTemplateManager,
mockRecipientPreferencesService
)
hogFunctionHandler = new HogFunctionHandler(mockHogFlowFunctionsService, mockRecipientPreferencesService)
// Simple hog function that prints the inputs
const exampleHog = `fetch('http://localhost/test', { 'method': 'POST', 'body': inputs })`
const template = await insertHogFunctionTemplate(hub.postgres, {
id: 'template-test-hogflow-executor',
name: 'Test Template',
code: exampleHog,
code: `fetch('http://localhost/test', { 'method': 'POST', 'body': inputs })`,
inputs_schema: [
{
key: 'name',
@@ -66,7 +66,6 @@ describe('HogFunctionHandler', () => {
required: true,
},
],
bytecode: await compileHog(exampleHog),
})
await insertIntegration(hub.postgres, team.id, {

View File

@@ -1,18 +1,14 @@
import { DateTime } from 'luxon'
import { HogFlowAction } from '../../../../schema/hogflow'
import { Hub } from '../../../../types'
import {
CyclotronJobInvocationHogFlow,
CyclotronJobInvocationHogFunction,
CyclotronJobInvocationResult,
HogFunctionInvocationGlobals,
HogFunctionType,
MinimalLogEntry,
} from '../../../types'
import { HogExecutorService } from '../../hog-executor.service'
import { HogFunctionTemplateManagerService } from '../../managers/hog-function-template-manager.service'
import { RecipientPreferencesService } from '../../messaging/recipient-preferences.service'
import { HogFlowFunctionsService } from '../hogflow-functions.service'
import { actionIdForLogging, findContinueAction } from '../hogflow-utils'
import { ActionHandler, ActionHandlerResult } from './action.interface'
@@ -22,9 +18,7 @@ type Action = Extract<HogFlowAction, { type: FunctionActionType }>
export class HogFunctionHandler implements ActionHandler {
constructor(
private hub: Hub,
private hogFunctionExecutor: HogExecutorService,
private hogFunctionTemplateManager: HogFunctionTemplateManagerService,
private hogFlowFunctionsService: HogFlowFunctionsService,
private recipientPreferencesService: RecipientPreferencesService
) {}
@@ -63,53 +57,15 @@ export class HogFunctionHandler implements ActionHandler {
invocation: CyclotronJobInvocationHogFlow,
action: Action
): Promise<CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>> {
const template = await this.hogFunctionTemplateManager.getHogFunctionTemplate(action.config.template_id)
if (!template) {
throw new Error(`Template '${action.config.template_id}' not found`)
}
const hogFunction: HogFunctionType = {
id: invocation.hogFlow.id,
team_id: invocation.teamId,
name: `${invocation.hogFlow.name} - ${template.name}`,
enabled: true,
type: 'destination',
deleted: false,
hog: '<<TEMPLATE>>',
bytecode: template.bytecode,
inputs: action.config.inputs,
inputs_schema: template.inputs_schema,
created_at: '',
updated_at: '',
}
const teamId = invocation.hogFlow.team_id
const projectUrl = `${this.hub.SITE_URL}/project/${teamId}`
const globals: HogFunctionInvocationGlobals = {
source: {
name: hogFunction.name ?? `Hog function: ${hogFunction.id}`,
url: `${projectUrl}/functions/${hogFunction.id}`,
},
project: {
id: hogFunction.team_id,
name: '',
url: '',
},
event: invocation.state.event,
person: invocation.person,
}
const hogFunctionInvocation: CyclotronJobInvocationHogFunction = {
...invocation,
const hogFunction = await this.hogFlowFunctionsService.buildHogFunction(invocation.hogFlow, action.config)
const hogFunctionInvocation = await this.hogFlowFunctionsService.buildHogFunctionInvocation(
invocation,
hogFunction,
state: invocation.state.currentAction?.hogFunctionState ?? {
globals: await this.hogFunctionExecutor.buildInputsWithGlobals(hogFunction, globals),
timings: [],
attempts: 0,
},
}
{
event: invocation.state.event,
person: invocation.person,
}
)
if (await this.recipientPreferencesService.shouldSkipAction(hogFunctionInvocation, action)) {
return {
@@ -127,6 +83,6 @@ export class HogFunctionHandler implements ActionHandler {
}
}
return this.hogFunctionExecutor.executeWithAsyncFunctions(hogFunctionInvocation)
return this.hogFlowFunctionsService.executeWithAsyncFunctions(hogFunctionInvocation)
}
}

View File

@@ -12,6 +12,10 @@ export class TriggerHandler implements ActionHandler {
invocation: CyclotronJobInvocationHogFlow,
action: Extract<HogFlowAction, { type: 'trigger' }>
): Promise<ActionHandlerResult> {
if (action.config.type === 'webhook') {
return { nextAction: findContinueAction(invocation) }
}
const filterResults = await filterFunctionInstrumented({
fn: invocation.hogFlow,
filters: action.config.filters,

View File

@@ -17,6 +17,7 @@ import { HogFunctionTemplateManagerService } from '../managers/hog-function-temp
import { RecipientsManagerService } from '../managers/recipients-manager.service'
import { RecipientPreferencesService } from '../messaging/recipient-preferences.service'
import { HogFlowExecutorService } from './hogflow-executor.service'
import { HogFlowFunctionsService } from './hogflow-functions.service'
// Mock before importing fetch
jest.mock('~/utils/request', () => {
@@ -56,17 +57,16 @@ describe('Hogflow Executor', () => {
})
const hogExecutor = new HogExecutorService(hub)
const hogFunctionTemplateManager = new HogFunctionTemplateManagerService(hub)
const hogFlowFunctionsService = new HogFlowFunctionsService(hub, hogFunctionTemplateManager, hogExecutor)
const recipientsManager = new RecipientsManagerService(hub)
const recipientPreferencesService = new RecipientPreferencesService(recipientsManager)
const exampleHog = `
print(f'Hello, {inputs.name}!')
print('Fetch 1', fetch('https://posthog.com').status)`
await insertHogFunctionTemplate(hub.postgres, {
id: 'template-test-hogflow-executor',
name: 'Test Template',
code: exampleHog,
code: `
print(f'Hello, {inputs.name}!')
print('Fetch 1', fetch('https://posthog.com').status)`,
inputs_schema: [
{
key: 'name',
@@ -74,20 +74,17 @@ describe('Hogflow Executor', () => {
required: true,
},
],
bytecode: await compileHog(exampleHog),
})
const exampleHogMultiFetch = `
print(f'Hello, {inputs.name}!')
print('Fetch 1', fetch('https://posthog.com').status)
print('Fetch 2', fetch('https://posthog.com').status)
print('Fetch 3', fetch('https://posthog.com').status)
print('All fetches done!')`
await insertHogFunctionTemplate(hub.postgres, {
id: 'template-test-hogflow-executor-async',
name: 'Test template multi fetch',
code: exampleHogMultiFetch,
code: `
print(f'Hello, {inputs.name}!')
print('Fetch 1', fetch('https://posthog.com').status)
print('Fetch 2', fetch('https://posthog.com').status)
print('Fetch 3', fetch('https://posthog.com').status)
print('All fetches done!')`,
inputs_schema: [
{
key: 'name',
@@ -95,10 +92,9 @@ describe('Hogflow Executor', () => {
required: true,
},
],
bytecode: await compileHog(exampleHogMultiFetch),
})
executor = new HogFlowExecutorService(hub, hogExecutor, hogFunctionTemplateManager, recipientPreferencesService)
executor = new HogFlowExecutorService(hogFlowFunctionsService, recipientPreferencesService)
})
describe('general event processing', () => {
@@ -112,7 +108,7 @@ describe('Hogflow Executor', () => {
type: 'trigger',
config: {
type: 'event',
filters: HOG_FILTERS_EXAMPLES.no_filters.filters,
filters: HOG_FILTERS_EXAMPLES.no_filters.filters ?? {},
},
},
@@ -407,7 +403,7 @@ describe('Hogflow Executor', () => {
type: 'trigger',
config: {
type: 'event',
filters: HOG_FILTERS_EXAMPLES.no_filters.filters,
filters: HOG_FILTERS_EXAMPLES.no_filters.filters ?? {},
},
},
delay: {
@@ -440,7 +436,7 @@ describe('Hogflow Executor', () => {
type: 'trigger',
config: {
type: 'event',
filters: HOG_FILTERS_EXAMPLES.pageview_or_autocapture_filter.filters,
filters: HOG_FILTERS_EXAMPLES.pageview_or_autocapture_filter.filters ?? {},
},
},
function_id_1: {
@@ -537,7 +533,7 @@ describe('Hogflow Executor', () => {
hogFlow.exit_condition = 'exit_on_trigger_not_matched'
hogFlow.trigger = {
type: 'event',
filters: HOG_FILTERS_EXAMPLES.pageview_or_autocapture_filter.filters,
filters: HOG_FILTERS_EXAMPLES.pageview_or_autocapture_filter.filters ?? {},
}
const invocation1 = createExampleHogFlowInvocation(hogFlow, {
@@ -575,7 +571,7 @@ describe('Hogflow Executor', () => {
hogFlow.exit_condition = 'exit_on_trigger_not_matched_or_conversion'
hogFlow.trigger = {
type: 'event',
filters: HOG_FILTERS_EXAMPLES.no_filters.filters,
filters: HOG_FILTERS_EXAMPLES.no_filters.filters ?? {},
}
hogFlow.conversion = {
window_minutes: 10,

View File

@@ -1,7 +1,6 @@
import { DateTime } from 'luxon'
import { HogFlow, HogFlowAction } from '../../../schema/hogflow'
import { Hub } from '../../../types'
import { logger } from '../../../utils/logger'
import { UUIDT } from '../../../utils/utils'
import {
@@ -16,8 +15,6 @@ import {
} from '../../types'
import { convertToHogFunctionFilterGlobal, filterFunctionInstrumented } from '../../utils/hog-function-filtering'
import { createInvocationResult } from '../../utils/invocation-utils'
import { HogExecutorService } from '../hog-executor.service'
import { HogFunctionTemplateManagerService } from '../managers/hog-function-template-manager.service'
import { RecipientPreferencesService } from '../messaging/recipient-preferences.service'
import { ActionHandler } from './actions/action.interface'
import { ConditionalBranchHandler } from './actions/conditional_branch'
@@ -27,25 +24,40 @@ import { HogFunctionHandler } from './actions/hog_function'
import { RandomCohortBranchHandler } from './actions/random_cohort_branch'
import { TriggerHandler } from './actions/trigger.handler'
import { WaitUntilTimeWindowHandler } from './actions/wait_until_time_window'
import { HogFlowFunctionsService } from './hogflow-functions.service'
import { actionIdForLogging, ensureCurrentAction, findContinueAction, shouldSkipAction } from './hogflow-utils'
export const MAX_ACTION_STEPS_HARD_LIMIT = 1000
export function createHogFlowInvocation(
globals: HogFunctionInvocationGlobals,
hogFlow: HogFlow,
filterGlobals: HogFunctionFilterGlobals
): CyclotronJobInvocationHogFlow {
return {
id: new UUIDT().toString(),
state: {
event: globals.event,
actionStepCount: 0,
},
teamId: hogFlow.team_id,
functionId: hogFlow.id, // TODO: Include version?
hogFlow,
person: globals.person, // This is outside of state as we don't persist it
filterGlobals,
queue: 'hogflow',
queuePriority: 1,
}
}
export class HogFlowExecutorService {
private readonly actionHandlers: Record<HogFlowAction['type'], ActionHandler>
constructor(
private hub: Hub,
private hogFunctionExecutor: HogExecutorService,
private hogFunctionTemplateManager: HogFunctionTemplateManagerService,
private recipientPreferencesService: RecipientPreferencesService
hogFlowFunctionsService: HogFlowFunctionsService,
recipientPreferencesService: RecipientPreferencesService
) {
const hogFunctionHandler = new HogFunctionHandler(
this.hub,
this.hogFunctionExecutor,
this.hogFunctionTemplateManager,
this.recipientPreferencesService
)
const hogFunctionHandler = new HogFunctionHandler(hogFlowFunctionsService, recipientPreferencesService)
this.actionHandlers = {
trigger: new TriggerHandler(),
@@ -61,27 +73,6 @@ export class HogFlowExecutorService {
}
}
public createHogFlowInvocation(
globals: HogFunctionInvocationGlobals,
hogFlow: HogFlow,
filterGlobals: HogFunctionFilterGlobals
): CyclotronJobInvocationHogFlow {
return {
id: new UUIDT().toString(),
state: {
event: globals.event,
actionStepCount: 0,
},
teamId: hogFlow.team_id,
functionId: hogFlow.id, // TODO: Include version?
hogFlow,
person: globals.person, // This is outside of state as we don't persist it
filterGlobals,
queue: 'hogflow',
queuePriority: 1,
}
}
async buildHogFlowInvocations(
hogFlows: HogFlow[],
triggerGlobals: HogFunctionInvocationGlobals
@@ -115,7 +106,7 @@ export class HogFlowExecutorService {
continue
}
const invocation = this.createHogFlowInvocation(triggerGlobals, hogFlow, filterGlobals)
const invocation = createHogFlowInvocation(triggerGlobals, hogFlow, filterGlobals)
invocations.push(invocation)
}
@@ -181,7 +172,7 @@ export class HogFlowExecutorService {
let conversionMatch: boolean | undefined = undefined
// Use the same filter evaluation as in buildHogFlowInvocations
if (hogFlow.trigger.filters && person) {
if (hogFlow.trigger.type === 'event' && hogFlow.trigger.filters && person) {
const filterResult = await filterFunctionInstrumented({
fn: hogFlow,
filters: hogFlow.trigger.filters,

View File

@@ -0,0 +1,96 @@
import {
CyclotronJobInvocationHogFlow,
CyclotronJobInvocationHogFunction,
CyclotronJobInvocationResult,
HogFunctionInvocationGlobals,
HogFunctionType,
} from '~/cdp/types'
import { HogFlow, HogFlowAction } from '~/schema/hogflow'
import { Hub } from '~/types'
import { HogExecutorService } from '../hog-executor.service'
import { HogFunctionTemplateManagerService } from '../managers/hog-function-template-manager.service'
type FunctionActionType = 'function' | 'function_email' | 'function_sms'
type Action = Extract<HogFlowAction, { type: FunctionActionType }>
// Helper class that can turn a hog flow action into a hog function
export class HogFlowFunctionsService {
constructor(
private hub: Hub,
private hogFunctionTemplateManager: HogFunctionTemplateManagerService,
private hogFunctionExecutor: HogExecutorService
) {}
async buildHogFunction(hogFlow: HogFlow, configuration: Action['config']): Promise<HogFunctionType> {
const template = await this.hogFunctionTemplateManager.getHogFunctionTemplate(configuration.template_id)
if (!template) {
throw new Error(`Template '${configuration.template_id}' not found`)
}
const hogFunction: HogFunctionType = {
id: hogFlow.id,
team_id: hogFlow.team_id,
name: `${hogFlow.name} - ${template.name}`,
enabled: true,
type: template.type,
deleted: false,
hog: '<<TEMPLATE>>',
bytecode: template.bytecode,
inputs: configuration.inputs,
inputs_schema: template.inputs_schema,
created_at: '',
updated_at: '',
}
return hogFunction
}
async buildHogFunctionInvocation(
invocation: CyclotronJobInvocationHogFlow,
hogFunction: HogFunctionType,
globals: Omit<HogFunctionInvocationGlobals, 'source' | 'project'>
): Promise<CyclotronJobInvocationHogFunction> {
const teamId = invocation.hogFlow.team_id
const projectUrl = `${this.hub.SITE_URL}/project/${teamId}`
const globalsWithSource: HogFunctionInvocationGlobals = {
...globals,
source: {
name: hogFunction.name ?? `Hog flow: ${invocation.hogFlow.id}`,
url: `${projectUrl}/messaging/campaigns/${invocation.hogFlow.id}/workflow?node=${hogFunction.id}`,
},
project: {
id: hogFunction.team_id,
name: '',
url: '',
},
}
const hogFunctionInvocation: CyclotronJobInvocationHogFunction = {
...invocation,
hogFunction,
state: invocation.state.currentAction?.hogFunctionState ?? {
globals: await this.hogFunctionExecutor.buildInputsWithGlobals(hogFunction, globalsWithSource),
timings: [],
attempts: 0,
},
}
return hogFunctionInvocation
}
async execute(
invocation: CyclotronJobInvocationHogFunction
): Promise<CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>> {
return this.hogFunctionExecutor.execute(invocation)
}
async executeWithAsyncFunctions(
invocation: CyclotronJobInvocationHogFunction
): Promise<CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>> {
return this.hogFunctionExecutor.executeWithAsyncFunctions(invocation)
}
}

View File

@@ -1,5 +1,8 @@
import '~/tests/helpers/mocks/date.mock'
import { FixtureHogFlowBuilder } from '~/cdp/_tests/builders/hogflow.builder'
import { HogFlow } from '~/schema/hogflow'
import { forSnapshot } from '~/tests/helpers/snapshots'
import { createTeam, getTeam, resetTestDatabase } from '~/tests/helpers/sql'
import { Hub } from '~/types'
import { closeHub, createHub } from '~/utils/db/hub'
@@ -70,49 +73,17 @@ describe('HogFlowManager', () => {
it('returns the hog flow', async () => {
let items = await manager.getHogFlowsForTeam(teamId1)
expect(items.map((item) => item.team_id)).toEqual([teamId1, teamId1])
expect(items).toEqual([
{
abort_action: null,
actions: {},
conversion: null,
created_at: expect.any(String),
description: '',
edges: {},
exit_condition: 'exit_on_conversion',
id: hogFlows[0].id,
name: 'Test Hog Flow team 1',
status: 'active',
team_id: teamId1,
trigger: {
filters: {},
type: 'event',
expect(
forSnapshot(items, {
overrides: {
team_id: 'TEAM_ID',
created_at: 'CREATED_AT',
updated_at: 'UPDATED_AT',
},
trigger_masking: null,
updated_at: expect.any(String),
version: 1,
},
{
abort_action: null,
actions: {},
conversion: null,
created_at: expect.any(String),
description: '',
edges: {},
exit_condition: 'exit_on_conversion',
id: hogFlows[1].id,
name: 'Test Hog Flow team 1 - other',
status: 'active',
team_id: teamId1,
trigger: {
filters: {},
type: 'event',
},
trigger_masking: null,
updated_at: expect.any(String),
version: 1,
},
])
})
).toMatchSnapshot()
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
@@ -154,7 +125,6 @@ describe('HogFlowManager', () => {
const originalResult = await manager.getHogFlowIdsForTeams([teamId1, teamId2])
expect(originalResult[teamId1]).toHaveLength(2)
// Archive a hog flow
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_hogflow SET status='archived', updated_at = NOW() WHERE id = $1`,
@@ -162,7 +132,6 @@ describe('HogFlowManager', () => {
'testKey'
)
// This is normally dispatched by django
manager['onHogFlowsReloaded'](teamId1, [hogFlows[0].id])
const result = await manager.getHogFlowIdsForTeams([teamId1])

View File

@@ -15,7 +15,6 @@ const HOG_FLOW_FIELDS = [
'created_at',
'updated_at',
'trigger',
'trigger_masking',
'conversion',
'exit_condition',
'edges',

View File

@@ -100,6 +100,6 @@ export async function shouldSkipAction(
}
// Special format which the frontend understands and can render as a link
export const actionIdForLogging = (action: HogFlowAction): string => {
export const actionIdForLogging = (action: Pick<HogFlowAction, 'id'>): string => {
return `[Action:${action.id}]`
}

View File

@@ -5,7 +5,6 @@ import { Hub } from '~/types'
import { closeHub, createHub } from '~/utils/db/hub'
import { insertHogFunctionTemplate } from '../../_tests/fixtures'
import { compileHog } from '../../templates/compiler'
import { HogFunctionTemplateManagerService } from './hog-function-template-manager.service'
describe('HogFunctionTemplateManager', () => {
@@ -32,7 +31,6 @@ describe('HogFunctionTemplateManager', () => {
},
],
code: 'fetch(inputs.url)',
bytecode: await compileHog('fetch(inputs.url)'),
})
)
})

View File

@@ -188,6 +188,7 @@ export type MinimalAppMetric = {
metric_name:
| 'early_exit'
| 'triggered'
| 'trigger_failed'
| 'succeeded'
| 'failed'
| 'filtered'

View File

@@ -12,15 +12,29 @@ const _commonActionFields = {
filters: z.any(), // TODO: Correct to the right type
}
const HogFlowTriggerSchema = z.discriminatedUnion('type', [
z.object({
type: z.literal('event'),
filters: z.object({
events: z.array(z.any()).optional(),
properties: z.array(z.any()).optional(),
actions: z.array(z.any()).optional(),
}),
}),
z.object({
type: z.literal('webhook'),
template_uuid: z.string().uuid().optional(), // May be used later to specify a specific template version
template_id: z.string(),
inputs: z.record(CyclotronInputSchema),
}),
])
const HogFlowActionSchema = z.discriminatedUnion('type', [
// Trigger
z.object({
..._commonActionFields,
type: z.literal('trigger'),
config: z.object({
type: z.literal('event'),
filters: z.any(),
}),
config: HogFlowTriggerSchema,
// A trigger's event filters are stored on the top-level Hogflow object
}),
// Branching
@@ -142,17 +156,7 @@ export const HogFlowSchema = z.object({
version: z.number(),
name: z.string(),
status: z.enum(['active', 'draft', 'archived']),
trigger: z.object({
type: z.literal('event'),
filters: z.any(),
}),
trigger_masking: z
.object({
ttl: z.number(),
hash: z.string(),
threshold: z.number(),
})
.optional(),
trigger: HogFlowTriggerSchema,
conversion: z
.object({
window_minutes: z.number(),

View File

@@ -4,6 +4,8 @@
*
* It does not mock the producer itself, for that see `producer.mock.ts`
*/
import { uncompressSync } from 'snappy'
import { KafkaProducerWrapper, TopicMessage } from '../../../src/kafka/producer'
import { parseJSON } from '../../../src/utils/json-parse'
@@ -23,6 +25,14 @@ export type DecodedKafkaMessage = {
headers?: TopicMessage['messages'][number]['headers']
}
const tryDecompress = (value: string | Buffer): string => {
try {
return uncompressSync(value).toString()
} catch (error) {
return value.toString()
}
}
export class KafkaProducerObserver {
public readonly produceSpy: jest.SpyInstance<
Promise<void>,
@@ -57,7 +67,7 @@ export class KafkaProducerObserver {
topic: topicMessage.topic,
messages: topicMessage.messages.map((message) => ({
key: message.key?.toString() ?? null,
value: message.value ? parseJSON(message.value.toString()) : null,
value: message.value ? parseJSON(tryDecompress(message.value)) : null,
headers: message.headers,
})),
}))

View File

@@ -1,4 +1,5 @@
import json
from typing import Optional, cast
from django.db.models import QuerySet
@@ -25,11 +26,6 @@ from posthog.plugins.plugin_server_api import create_hog_flow_invocation_test
logger = structlog.get_logger(__name__)
class HogFlowTriggerSerializer(serializers.Serializer):
filters = HogFunctionFiltersSerializer()
type = serializers.ChoiceField(choices=["event"], required=True)
class HogFlowConfigFunctionInputsSerializer(serializers.Serializer):
inputs_schema = serializers.ListField(child=InputsSchemaItemSerializer(), required=False)
inputs = InputsSerializer(required=False)
@@ -59,14 +55,20 @@ class HogFlowActionSerializer(serializers.Serializer):
return super().to_internal_value(data)
def validate(self, data):
trigger_is_function = False
if data.get("type") == "trigger":
filters = data.get("config", {}).get("filters", {})
if filters:
serializer = HogFunctionFiltersSerializer(data=filters, context=self.context)
serializer.is_valid(raise_exception=True)
data["config"]["filters"] = serializer.validated_data
if data.get("config", {}).get("type") == "webhook":
trigger_is_function = True
elif data.get("config", {}).get("type") == "event":
filters = data.get("config", {}).get("filters", {})
if filters:
serializer = HogFunctionFiltersSerializer(data=filters, context=self.context)
serializer.is_valid(raise_exception=True)
data["config"]["filters"] = serializer.validated_data
else:
raise serializers.ValidationError({"config": "Invalid trigger type"})
if "function" in data.get("type", ""):
if "function" in data.get("type", "") or trigger_is_function:
template_id = data.get("config", {}).get("template_id", "")
template = HogFunctionTemplate.get_template(template_id)
if not template:
@@ -105,7 +107,6 @@ class HogFlowMinimalSerializer(serializers.ModelSerializer):
"created_by",
"updated_at",
"trigger",
"trigger_masking",
"conversion",
"exit_condition",
"edges",
@@ -116,7 +117,6 @@ class HogFlowMinimalSerializer(serializers.ModelSerializer):
class HogFlowSerializer(HogFlowMinimalSerializer):
trigger = HogFlowTriggerSerializer()
actions = serializers.ListField(child=HogFlowActionSerializer(), required=True)
class Meta:
@@ -131,7 +131,6 @@ class HogFlowSerializer(HogFlowMinimalSerializer):
"created_by",
"updated_at",
"trigger",
"trigger_masking",
"conversion",
"exit_condition",
"edges",
@@ -143,10 +142,22 @@ class HogFlowSerializer(HogFlowMinimalSerializer):
"version",
"created_at",
"created_by",
"trigger_masking",
"abort_action",
]
def validate(self, data):
instance = cast(Optional[HogFlow], self.instance)
actions = data.get("actions", instance.actions if instance else [])
# The trigger is derived from the actions. We can trust the action level validation and pull it out
trigger_actions = [action for action in actions if action.get("type") == "trigger"]
if len(trigger_actions) != 1:
raise serializers.ValidationError({"actions": "Exactly one trigger action is required"})
data["trigger"] = trigger_actions[0]["config"]
return data
def create(self, validated_data: dict, *args, **kwargs) -> HogFlow:
request = self.context["request"]
team_id = self.context["team_id"]

View File

@@ -18,6 +18,17 @@ class TestHogFlowAPI(APIBaseTest):
sync_template_to_db(webhook_template)
def _create_hog_flow_with_action(self, action_config: dict):
trigger_action = {
"id": "trigger_node",
"name": "trigger_1",
"type": "trigger",
"config": {
"type": "event",
"filters": {
"events": [{"id": "$pageview", "name": "$pageview", "type": "events", "order": 0}],
},
},
}
action = {
"id": "action_1",
"name": "action_1",
@@ -27,17 +38,71 @@ class TestHogFlowAPI(APIBaseTest):
hog_flow = {
"name": "Test Flow",
"trigger": {
"type": "event",
"filters": {
"events": [{"id": "$pageview", "name": "$pageview", "type": "events", "order": 0}],
},
},
"actions": [action],
"actions": [trigger_action, action],
}
return hog_flow, action
def test_hog_flow_function_trigger_check(self):
hog_flow = {
"name": "Test Flow",
"actions": [],
}
response = self.client.post(f"/api/projects/{self.team.id}/hog_flows", hog_flow)
assert response.status_code == 400, response.json()
assert response.json() == {
"attr": "actions",
"code": "invalid_input",
"detail": "Exactly one trigger action is required",
"type": "validation_error",
}
def test_hog_flow_function_trigger_copied_from_action(self):
trigger_action = {
"id": "trigger_node",
"name": "trigger_1",
"type": "trigger",
"config": {
"type": "webhook",
"template_id": "template-webhook",
"inputs": {
"url": {"value": "https://example.com"},
},
},
}
hog_flow = {
"name": "Test Flow",
"actions": [trigger_action],
}
response = self.client.post(f"/api/projects/{self.team.id}/hog_flows", hog_flow)
trigger_action_expectation = {
"id": "trigger_node",
"name": "trigger_1",
"description": "",
"on_error": None,
"filters": None,
"type": "trigger",
"config": {
"type": "webhook",
"template_id": "template-webhook",
"inputs": {
"url": {
"value": "https://example.com",
"bytecode": ["_H", 1, 32, "https://example.com"],
"order": 0,
}
},
},
}
assert response.status_code == 201, response.json()
assert response.json()["actions"] == [trigger_action_expectation]
assert response.json()["trigger"] == trigger_action_expectation["config"]
def test_hog_flow_function_validation(self):
hog_flow, action = self._create_hog_flow_with_action(
{
@@ -47,11 +112,10 @@ class TestHogFlowAPI(APIBaseTest):
)
# Check that the template is found but missing required inputs
response = self.client.post(f"/api/projects/{self.team.id}/hog_flows", hog_flow)
assert response.status_code == 400, response.json()
assert response.json() == {
"attr": "actions__0__template_id",
"attr": "actions__1__template_id",
"code": "invalid_input",
"detail": "Template not found",
"type": "validation_error",
@@ -67,7 +131,7 @@ class TestHogFlowAPI(APIBaseTest):
response = self.client.post(f"/api/projects/{self.team.id}/hog_flows", hog_flow)
assert response.status_code == 400, response.json()
assert response.json() == {
"attr": "actions__0__inputs__url",
"attr": "actions__1__inputs__url",
"code": "invalid_input",
"detail": "This field is required.",
"type": "validation_error",
@@ -94,10 +158,32 @@ class TestHogFlowAPI(APIBaseTest):
["_H", 1, 32, "$pageview", 32, "event", 1, 1, 11]
)
assert hog_flow.actions[0]["filters"].get("bytecode") == snapshot(
assert hog_flow.actions[1]["filters"].get("bytecode") == snapshot(
["_H", 1, 32, "custom_event", 32, "event", 1, 1, 11]
)
assert hog_flow.actions[0]["config"]["inputs"] == snapshot(
assert hog_flow.actions[1]["config"]["inputs"] == snapshot(
{"url": {"order": 0, "value": "https://example.com", "bytecode": ["_H", 1, 32, "https://example.com"]}}
)
def test_hog_flow_enable_disable(self):
hog_flow, _ = self._create_hog_flow_with_action(
{
"template_id": "template-webhook",
"inputs": {"url": {"value": "https://example.com"}},
}
)
response = self.client.post(f"/api/projects/{self.team.id}/hog_flows", hog_flow)
assert response.status_code == 201, response.json()
assert response.json()["status"] == "draft"
response = self.client.patch(
f"/api/projects/{self.team.id}/hog_flows/{response.json()['id']}", {"status": "active"}
)
assert response.status_code == 200, response.json()
assert response.json()["status"] == "active"
response = self.client.patch(
f"/api/projects/{self.team.id}/hog_flows/{response.json()['id']}", {"status": "draft"}
)
assert response.status_code == 200, response.json()
assert response.json()["status"] == "draft"

View File

@@ -1,10 +1,9 @@
import '@xyflow/react/dist/style.css'
import { useValues } from 'kea'
import { Form } from 'kea-forms'
import posthog from 'posthog-js'
import { IconBolt, IconLeave, IconPlusSmall, IconTarget } from '@posthog/icons'
import { IconLeave, IconPlusSmall, IconTarget } from '@posthog/icons'
import { LemonButton, LemonLabel, LemonTag, LemonTextArea, lemonToast } from '@posthog/lemon-ui'
import { PropertyFilters } from 'lib/components/PropertyFilters/PropertyFilters'
@@ -16,7 +15,6 @@ import { LemonRadio } from 'lib/lemon-ui/LemonRadio'
import { LemonSelect } from 'lib/lemon-ui/LemonSelect'
import { CampaignLogicProps, campaignLogic } from './campaignLogic'
import { HogFlowFilters } from './hogflows/filters/HogFlowFilters'
export function CampaignOverview(props: CampaignLogicProps): JSX.Element {
return (
@@ -24,7 +22,6 @@ export function CampaignOverview(props: CampaignLogicProps): JSX.Element {
<Form id="campaign-overview" logic={campaignLogic} props={props} formKey="campaign" enableFormOnSubmit>
<div className="flex flex-col flex-wrap gap-4 items-start">
<BasicInfoSection />
<TriggerSection {...props} />
<ConversionGoalSection />
<ExitConditionSection />
</div>
@@ -46,37 +43,6 @@ function BasicInfoSection(): JSX.Element {
)
}
function TriggerSection(props: CampaignLogicProps): JSX.Element {
const logic = campaignLogic(props)
const { campaignValidationErrors } = useValues(logic)
return (
<div className="flex flex-col py-2 w-full">
<div className="flex flex-col">
<span className="flex items-center">
<IconBolt className="text-lg" />
<span className="text-lg font-semibold">Trigger event</span>
</span>
<p className="mb-0">Choose which events or actions will enter a user into the campaign.</p>
</div>
<LemonDivider />
<LemonField name={['trigger', 'filters']} className="max-w-200">
{({ value, onChange }) => (
<HogFlowFilters
filters={value ?? {}}
setFilters={onChange}
typeKey="campaign-trigger"
buttonCopy="Add trigger event"
/>
)}
</LemonField>
{campaignValidationErrors.trigger?.filters && (
<span className="text-danger text-sm mt-2">{campaignValidationErrors.trigger.filters}</span>
)}
</div>
)
}
function ConversionGoalSection(): JSX.Element {
return (
<div className="flex flex-col py-2 w-full">

View File

@@ -74,10 +74,27 @@ export function CampaignsTable(): JSX.Element {
key: 'name',
sorter: (a, b) => (a.name || '').localeCompare(b.name || ''),
render: (_, item) => {
return <LemonTableLink to={urls.messagingCampaign(item.id)} title={item.name} />
return (
<LemonTableLink
to={urls.messagingCampaign(item.id)}
title={item.name}
description={item.description}
/>
)
},
},
{
title: 'Trigger',
width: 0,
render: (_, item) => {
return (
<Link to={urls.messagingCampaign(item.id, 'workflow') + '?node=trigger_node'}>
<LemonTag type="default">{capitalizeFirstLetter(item.trigger?.type ?? 'unknown')}</LemonTag>
</Link>
)
},
},
{
title: 'Actions',
width: 0,
@@ -153,7 +170,7 @@ export function CampaignsTable(): JSX.Element {
dataSource={campaigns}
loading={campaignsLoading}
columns={columns}
defaultSorting={{ columnKey: 'status', order: -1 }}
defaultSorting={{ columnKey: 'status', order: 1 }}
/>
</div>
)

View File

@@ -24,6 +24,8 @@ export interface CampaignLogicProps {
export const TRIGGER_NODE_ID = 'trigger_node'
export const EXIT_NODE_ID = 'exit_node'
export type TriggerAction = Extract<HogFlowAction, { type: 'trigger' }>
const NEW_CAMPAIGN: HogFlow = {
id: 'new',
name: '',
@@ -59,14 +61,6 @@ const NEW_CAMPAIGN: HogFlow = {
type: 'continue',
},
],
trigger: {
type: 'event',
filters: {
events: [],
actions: [],
},
},
trigger_masking: { ttl: 0, hash: '', threshold: 0 },
conversion: { window_minutes: 0, filters: [] },
exit_condition: 'exit_only_at_end',
version: 1,
@@ -81,7 +75,11 @@ export const campaignLogic = kea<campaignLogicType>([
props({ id: 'new' } as CampaignLogicProps),
key((props) => props.id || 'new'),
actions({
setCampaignActionConfig: (actionId: string, config: Partial<HogFlowAction['config']>) => ({ actionId, config }),
partialSetCampaignActionConfig: (actionId: string, config: Partial<HogFlowAction['config']>) => ({
actionId,
config,
}),
setCampaignActionConfig: (actionId: string, config: HogFlowAction['config']) => ({ actionId, config }),
setCampaignAction: (actionId: string, action: HogFlowAction) => ({ actionId, action }),
setCampaignActionEdges: (actionId: string, edges: HogFlow['edges']) => ({ actionId, edges }),
// NOTE: This is a wrapper for setCampaignValues, to get around some weird typegen issues
@@ -114,7 +112,9 @@ export const campaignLogic = kea<campaignLogicType>([
{} as Record<string, HogFunctionTemplateType>,
{
loadHogFunctionTemplatesById: async () => {
const allTemplates = await api.hogFunctions.listTemplates({ types: ['destination'] })
const allTemplates = await api.hogFunctions.listTemplates({
types: ['destination', 'source_webhook'],
})
const allTemplatesById = allTemplates.results.reduce(
(acc, template) => {
@@ -132,16 +132,9 @@ export const campaignLogic = kea<campaignLogicType>([
forms(({ actions, values }) => ({
campaign: {
defaults: NEW_CAMPAIGN,
errors: ({ name, trigger, actions }) => {
errors: ({ name, actions }) => {
const errors = {
name: !name ? 'Name is required' : undefined,
trigger: {
type: trigger.type === 'event' ? undefined : 'Invalid trigger type',
filters:
trigger.filters.events.length === 0 && trigger.filters.actions.length === 0
? 'At least one event or action is required'
: undefined,
},
actions: actions.some((action) => !(values.actionValidationErrorsById[action.id]?.valid ?? true))
? 'Some fields need work'
: undefined,
@@ -215,6 +208,16 @@ export const campaignLogic = kea<campaignLogicType>([
result.valid = configValidation.valid
result.errors = configValidation.errors
}
} else if (action.type === 'trigger') {
// custom validation here that we can't easily express in the schema
if (action.config.type === 'event') {
if (!action.config.filters.events?.length && !action.config.filters.actions?.length) {
result.valid = false
result.errors = {
filters: 'At least one event or action is required',
}
}
}
}
acc[action.id] = result
@@ -224,6 +227,13 @@ export const campaignLogic = kea<campaignLogicType>([
)
},
],
triggerAction: [
(s) => [s.campaign],
(campaign): TriggerAction | null => {
return (campaign.actions.find((action) => action.type === 'trigger') as TriggerAction) ?? null
},
],
}),
listeners(({ actions, values, props }) => ({
loadCampaignSuccess: async ({ originalCampaign }) => {
@@ -265,9 +275,17 @@ export const campaignLogic = kea<campaignLogicType>([
return
}
action.config = { ...action.config, ...config }
action.config = { ...config } as HogFlowAction['config']
actions.setCampaignValues({ actions: [...values.campaign.actions] })
},
partialSetCampaignActionConfig: async ({ actionId, config }) => {
const action = values.campaign.actions.find((action) => action.id === actionId)
if (!action) {
return
}
actions.setCampaignActionConfig(actionId, { ...action.config, ...config } as HogFlowAction['config'])
},
setCampaignAction: async ({ actionId, action }) => {
const newActions = values.campaign.actions.map((a) => (a.id === actionId ? action : a))
actions.setCampaignValues({ actions: newActions })

View File

@@ -60,13 +60,7 @@ export const hogFlowEditorLogic = kea<hogFlowEditorLogicType>([
],
actions: [
campaignLogic(props),
[
'setCampaignInfo',
'setCampaignActionConfig',
'setCampaignAction',
'setCampaignActionEdges',
'loadCampaignSuccess',
],
['setCampaignInfo', 'setCampaignAction', 'setCampaignActionEdges', 'loadCampaignSuccess'],
optOutCategoriesLogic(),
['loadCategories'],
],

View File

@@ -1,14 +1,13 @@
import { useActions, useValues } from 'kea'
import { IconExternal } from '@posthog/icons'
import { LemonBanner, LemonButton, LemonDivider, LemonLabel, LemonSwitch } from '@posthog/lemon-ui'
import { LemonButton, LemonDivider, LemonLabel, LemonSwitch } from '@posthog/lemon-ui'
import { ScrollableShadows } from 'lib/components/ScrollableShadows/ScrollableShadows'
import { urls } from 'scenes/urls'
import { CategorySelect } from 'products/messaging/frontend/OptOuts/CategorySelect'
import { campaignLogic } from '../../campaignLogic'
import { HogFlowFilters } from '../filters/HogFlowFilters'
import { hogFlowEditorLogic } from '../hogFlowEditorLogic'
import { useHogFlowStep } from '../steps/HogFlowSteps'
@@ -18,8 +17,6 @@ import { HogFlowAction } from '../types'
export function HogFlowEditorPanelBuildDetail(): JSX.Element | null {
const { selectedNode, categories, categoriesLoading } = useValues(hogFlowEditorLogic)
const { setCampaignAction } = useActions(hogFlowEditorLogic)
const { actionValidationErrorsById } = useValues(campaignLogic)
const validationResult = actionValidationErrorsById[selectedNode?.id ?? '']
const Step = useHogFlowStep(selectedNode?.data)
@@ -37,15 +34,6 @@ export function HogFlowEditorPanelBuildDetail(): JSX.Element | null {
innerClassName="flex flex-col gap-2 p-3"
styledScrollbars
>
{validationResult?.schema && (
<div>
{Object.values(validationResult.schema.errors).map(({ path, message }) => (
<LemonBanner type="error" key={path.join('.')}>
{path.join('.')}: {message}
</LemonBanner>
))}
</div>
)}
{Step?.renderConfiguration(selectedNode)}
</ScrollableShadows>

View File

@@ -24,7 +24,6 @@ import {
import { CampaignLogicProps, campaignLogic } from '../../../campaignLogic'
import { hogFlowEditorLogic } from '../../hogFlowEditorLogic'
import { HogflowTestResult } from '../../steps/types'
import { HogFlow } from '../../types'
import type { hogFlowEditorTestLogicType } from './hogFlowEditorTestLogicType'
export interface HogflowTestInvocation {
@@ -37,7 +36,7 @@ export const hogFlowEditorTestLogic = kea<hogFlowEditorTestLogicType>([
props({} as CampaignLogicProps),
key((props) => `${props.id}`),
connect((props: CampaignLogicProps) => ({
values: [campaignLogic(props), ['campaign'], hogFlowEditorLogic, ['selectedNodeId']],
values: [campaignLogic(props), ['campaign', 'triggerAction'], hogFlowEditorLogic, ['selectedNodeId']],
actions: [hogFlowEditorLogic, ['setSelectedNodeId']],
})),
actions({
@@ -89,7 +88,7 @@ export const hogFlowEditorTestLogic = kea<hogFlowEditorTestLogicType>([
null as CyclotronJobInvocationGlobals | null,
{
loadSampleGlobals: async () => {
if (!values.campaign.trigger?.filters) {
if (!values.shouldLoadSampleGlobals) {
return null
}
@@ -162,15 +161,25 @@ export const hogFlowEditorTestLogic = kea<hogFlowEditorTestLogicType>([
})),
selectors(() => ({
shouldLoadSampleGlobals: [
(s) => [s.campaign],
(campaign: HogFlow): boolean => {
return !!campaign.trigger?.filters?.events?.length || !!campaign.trigger?.filters?.actions?.length
(s) => [s.triggerAction],
(triggerAction): boolean => {
// Only load samples if the trigger is event
return !!(triggerAction && triggerAction.config.type === 'event')
},
],
// TODO(messaging): DRY up matchingFilters with implementation in hogFunctionConfigurationLogic
matchingFilters: [
(s) => [s.campaign],
(campaign: HogFlow): PropertyGroupFilter => {
(s) => [s.triggerAction],
(triggerAction): PropertyGroupFilter => {
if (!triggerAction || triggerAction.config.type !== 'event') {
return {
type: FilterLogicalOperator.And,
values: [],
}
}
const triggerActionConfig = triggerAction.config
const seriesProperties: PropertyGroupFilterValue = {
type: FilterLogicalOperator.Or,
values: [],
@@ -179,8 +188,8 @@ export const hogFlowEditorTestLogic = kea<hogFlowEditorTestLogicType>([
type: FilterLogicalOperator.And,
values: [seriesProperties],
}
const allPossibleEventFilters = campaign.trigger.filters?.events ?? []
const allPossibleActionFilters = campaign.trigger.filters?.actions ?? []
const allPossibleEventFilters = triggerActionConfig.filters?.events ?? []
const allPossibleActionFilters = triggerActionConfig.filters?.actions ?? []
for (const event of allPossibleEventFilters) {
const eventProperties: AnyPropertyFilter[] = [...(event.properties ?? [])]
@@ -214,12 +223,12 @@ export const hogFlowEditorTestLogic = kea<hogFlowEditorTestLogicType>([
values: actionProperties,
})
}
if ((campaign.trigger.filters?.properties?.length ?? 0) > 0) {
if ((triggerActionConfig.filters?.properties?.length ?? 0) > 0) {
const globalProperties: PropertyGroupFilterValue = {
type: FilterLogicalOperator.And,
values: [],
}
for (const property of campaign.trigger.filters?.properties ?? []) {
for (const property of triggerActionConfig.filters?.properties ?? []) {
globalProperties.values.push(property as AnyPropertyFilter)
}
properties.values.push(globalProperties)

View File

@@ -10,6 +10,7 @@ import { LemonLabel } from 'lib/lemon-ui/LemonLabel'
import { HogFlowFilters } from '../filters/HogFlowFilters'
import { hogFlowEditorLogic } from '../hogFlowEditorLogic'
import { HogFlow, HogFlowAction } from '../types'
import { StepSchemaErrors } from './components/StepSchemaErrors'
export function StepConditionalBranchConfiguration({
node,
@@ -85,6 +86,7 @@ export function StepConditionalBranchConfiguration({
return (
<>
<StepSchemaErrors />
{conditions.map((condition, index) => (
<div key={index} className="flex flex-col gap-2 p-2 rounded border">
<div className="flex justify-between items-center">

View File

@@ -1,9 +1,10 @@
import { Node } from '@xyflow/react'
import { useActions } from 'kea'
import { hogFlowEditorLogic } from '../hogFlowEditorLogic'
import { campaignLogic } from '../../campaignLogic'
import { HogFlowAction } from '../types'
import { HogFlowDuration } from './components/HogFlowDuration'
import { StepSchemaErrors } from './components/StepSchemaErrors'
export function StepDelayConfiguration({
node,
@@ -13,10 +14,12 @@ export function StepDelayConfiguration({
const action = node.data
const { delay_duration } = action.config
const { setCampaignActionConfig } = useActions(hogFlowEditorLogic)
const { setCampaignActionConfig } = useActions(campaignLogic)
return (
<>
<StepSchemaErrors />
<p className="mb-0">Wait for a specified duration.</p>
<HogFlowDuration
value={delay_duration}

View File

@@ -1,73 +1,29 @@
import { useActions, useValues } from 'kea'
import { Form } from 'kea-forms'
import { useEffect } from 'react'
import { Spinner } from '@posthog/lemon-ui'
import { CyclotronJobInputs } from 'lib/components/CyclotronJob/CyclotronJobInputs'
import { CyclotronJobInputType } from '~/types'
import { campaignLogic } from '../../campaignLogic'
import { hogFlowEditorLogic } from '../hogFlowEditorLogic'
import { StepFunctionNode, hogFunctionStepLogic } from './hogFunctionStepLogic'
import { HogFlowFunctionConfiguration } from './components/HogFlowFunctionConfiguration'
import { StepSchemaErrors } from './components/StepSchemaErrors'
import { StepFunctionNode } from './hogFunctionStepLogic'
export function StepFunctionConfiguration({ node }: { node: StepFunctionNode }): JSX.Element {
const { hogFunctionTemplatesById, hogFunctionTemplatesByIdLoading, actionValidationErrorsById } =
useValues(campaignLogic)
const { setCampaignActionConfig } = useActions(hogFlowEditorLogic)
const { actionValidationErrorsById } = useValues(campaignLogic)
const { partialSetCampaignActionConfig } = useActions(campaignLogic)
const templateId = node.data.config.template_id
const template = hogFunctionTemplatesById[templateId]
const stepLogic = hogFunctionStepLogic({ node, template })
const { configuration, configurationValidationErrors } = useValues(stepLogic)
const validationResult = actionValidationErrorsById[node.id]
useEffect(() => {
setCampaignActionConfig(node.id, {
inputs: configuration.inputs as Record<string, CyclotronJobInputType>,
})
}, [configuration.inputs, template, setCampaignActionConfig, node.id])
if (hogFunctionTemplatesByIdLoading) {
return (
<div className="flex justify-center items-center">
<Spinner />
</div>
)
}
if (!template) {
return <div>Template not found!</div>
}
const inputs = node.data.config.inputs as Record<string, CyclotronJobInputType>
return (
<Form logic={hogFunctionStepLogic} props={{ node }} formKey="configuration" className="flex flex-col gap-2">
<CyclotronJobInputs
<>
<StepSchemaErrors />
<HogFlowFunctionConfiguration
templateId={templateId}
inputs={inputs}
setInputs={(inputs) => partialSetCampaignActionConfig(node.id, { inputs })}
errors={validationResult?.errors}
configuration={{
inputs: node.data.config.inputs as Record<string, CyclotronJobInputType>,
inputs_schema: template?.inputs_schema ?? [],
}}
showSource={false}
sampleGlobalsWithInputs={null} // TODO: Load this based on the trigger event
onInputChange={(key, value) => {
setCampaignActionConfig(node.id, {
inputs: { ...node.data.config.inputs, [key]: value },
})
}}
/>
<div className="text-danger flex items-center gap-1 text-sm">
{configurationValidationErrors?.inputs && (
<div>
{Object.entries(configurationValidationErrors.inputs).map(([key, value]) => (
<div key={key}>{value}</div>
))}
</div>
)}
</div>
</Form>
</>
)
}

View File

@@ -9,6 +9,7 @@ import { LemonLabel } from 'lib/lemon-ui/LemonLabel'
import { hogFlowEditorLogic } from '../hogFlowEditorLogic'
import { HogFlow, HogFlowAction } from '../types'
import { StepSchemaErrors } from './components/StepSchemaErrors'
export function StepRandomCohortBranchConfiguration({
node,
@@ -80,6 +81,8 @@ export function StepRandomCohortBranchConfiguration({
return (
<>
<StepSchemaErrors />
{cohorts.map((cohort, index) => (
<div key={index} className="flex flex-col gap-2 p-2 rounded border">
<div className="flex justify-between items-center">

View File

@@ -1,30 +1,162 @@
import { Node } from '@xyflow/react'
import { useActions } from 'kea'
import { useActions, useValues } from 'kea'
import { IconBolt, IconWebhooks } from '@posthog/icons'
import { LemonLabel, LemonSelect } from '@posthog/lemon-ui'
import { CodeSnippet } from 'lib/components/CodeSnippet'
import { useFeatureFlag } from 'lib/hooks/useFeatureFlag'
import { LemonField } from 'lib/lemon-ui/LemonField'
import { publicWebhooksHostOrigin } from 'lib/utils/apiHost'
import { campaignLogic } from '../../campaignLogic'
import { HogFlowFilters } from '../filters/HogFlowFilters'
import { hogFlowEditorLogic } from '../hogFlowEditorLogic'
import { HogFlowAction } from '../types'
import { HogFlowFunctionConfiguration } from './components/HogFlowFunctionConfiguration'
export function StepTriggerConfiguration({
node,
}: {
node: Node<Extract<HogFlowAction, { type: 'trigger' }>>
}): JSX.Element {
const action = node.data
const { filters } = action.config
const { setCampaignActionConfig } = useActions(campaignLogic)
const { actionValidationErrorsById } = useValues(campaignLogic)
const { setCampaignActionConfig } = useActions(hogFlowEditorLogic)
const type = node.data.config.type
const validationResult = actionValidationErrorsById[node.id]
const webhookTriggerEnabled = useFeatureFlag('MESSAGING_TRIGGER_WEBHOOK')
if (!webhookTriggerEnabled && node.data.config.type === 'event') {
return <StepTriggerConfigurationEvents action={node.data} config={node.data.config} />
}
return (
<>
<LemonField.Pure label="Trigger type" error={validationResult?.errors?.type}>
<LemonSelect
options={[
{
label: 'Event',
value: 'event',
icon: <IconBolt />,
labelInMenu: (
<div className="flex flex-col my-1">
<div className="font-semibold">Event</div>
<p className="text-xs text-muted">
Trigger your workflow based on incoming realtime PostHog events
</p>
</div>
),
},
{
label: 'Webhook',
value: 'webhook',
icon: <IconWebhooks />,
labelInMenu: (
<div className="flex flex-col my-1">
<div className="font-semibold">Webhook</div>
<p className="text-xs text-muted">
Trigger your workflow using an incoming HTTP webhook
</p>
</div>
),
},
]}
value={type}
placeholder="Select trigger type"
onChange={(value) => {
value === 'event'
? setCampaignActionConfig(node.id, { type: 'event', filters: {} })
: setCampaignActionConfig(node.id, {
type: 'webhook',
template_id: 'template-source-webhook',
inputs: {},
})
}}
/>
</LemonField.Pure>
{node.data.config.type === 'event' ? (
<StepTriggerConfigurationEvents action={node.data} config={node.data.config} />
) : (
<StepTriggerConfigurationWebhook action={node.data} config={node.data.config} />
)}
</>
)
}
function StepTriggerConfigurationEvents({
action,
config,
}: {
action: Extract<HogFlowAction, { type: 'trigger' }>
config: Extract<HogFlowAction['config'], { type: 'event' }>
}): JSX.Element {
const { setCampaignActionConfig } = useActions(campaignLogic)
const { actionValidationErrorsById } = useValues(campaignLogic)
const validationResult = actionValidationErrorsById[action.id]
return (
<>
<div className="flex flex-col">
<p className="mb-0">Choose which events or actions will enter a user into the campaign.</p>
</div>
<HogFlowFilters
filters={filters ?? {}}
setFilters={(filters) => setCampaignActionConfig(action.id, { filters })}
typeKey="campaign-trigger"
buttonCopy="Add trigger event"
<LemonField.Pure error={validationResult?.errors?.filters}>
<HogFlowFilters
filters={config.filters ?? {}}
setFilters={(filters) => setCampaignActionConfig(action.id, { type: 'event', filters })}
typeKey="campaign-trigger"
buttonCopy="Add trigger event"
/>
</LemonField.Pure>
</>
)
}
function StepTriggerConfigurationWebhook({
action,
config,
}: {
action: Extract<HogFlowAction, { type: 'trigger' }>
config: Extract<HogFlowAction['config'], { type: 'webhook' }>
}): JSX.Element {
const { setCampaignActionConfig } = useActions(campaignLogic)
const { campaign, actionValidationErrorsById } = useValues(campaignLogic)
const validationResult = actionValidationErrorsById[action.id]
return (
<>
<div className="p-2 rounded border deprecated-space-y-2 bg-surface-secondary">
<LemonLabel>Webhook URL</LemonLabel>
{campaign.id === 'new' ? (
<div className="text-xs text-muted italic border rounded p-1 bg-surface-primary">
The webhook URL will be shown here once you save the workflow
</div>
) : (
<CodeSnippet thing="Webhook URL">
{publicWebhooksHostOrigin() + '/public/webhooks/' + campaign.id}
</CodeSnippet>
)}
<p className="text-sm">
The webhook can be called with a POST request and any JSON payload. You can then use the
configuration options to parse the <code>request.body</code> or <code>request.headers</code> to map
to the required fields.
</p>
</div>
<HogFlowFunctionConfiguration
templateId={config.template_id}
inputs={config.inputs}
setInputs={(inputs) =>
setCampaignActionConfig(action.id, {
type: 'webhook',
inputs,
template_id: config.template_id,
template_uuid: config.template_uuid,
})
}
errors={validationResult?.errors}
/>
</>
)

View File

@@ -3,10 +3,11 @@ import { useActions } from 'kea'
import { LemonLabel } from '@posthog/lemon-ui'
import { campaignLogic } from '../../campaignLogic'
import { HogFlowFilters } from '../filters/HogFlowFilters'
import { hogFlowEditorLogic } from '../hogFlowEditorLogic'
import { HogFlowAction } from '../types'
import { HogFlowDuration } from './components/HogFlowDuration'
import { StepSchemaErrors } from './components/StepSchemaErrors'
export function StepWaitUntilConditionConfiguration({
node,
@@ -16,16 +17,18 @@ export function StepWaitUntilConditionConfiguration({
const action = node.data
const { condition, max_wait_duration } = action.config
const { setCampaignActionConfig } = useActions(hogFlowEditorLogic)
const { partialSetCampaignActionConfig } = useActions(campaignLogic)
return (
<>
<StepSchemaErrors />
<div>
<LemonLabel>Wait time</LemonLabel>
<HogFlowDuration
value={max_wait_duration}
onChange={(value) => {
setCampaignActionConfig(action.id, { max_wait_duration: value })
partialSetCampaignActionConfig(action.id, { max_wait_duration: value })
}}
/>
</div>
@@ -34,7 +37,7 @@ export function StepWaitUntilConditionConfiguration({
<LemonLabel>Conditions to wait for</LemonLabel>
<HogFlowFilters
filters={condition.filters ?? {}}
setFilters={(filters) => setCampaignActionConfig(action.id, { condition: { filters } })}
setFilters={(filters) => partialSetCampaignActionConfig(action.id, { condition: { filters } })}
typeKey="campaign-wait-until-condition"
/>
</div>

View File

@@ -9,8 +9,9 @@ import { teamLogic } from 'scenes/teamLogic'
import { WeekdayType } from '~/types'
import { hogFlowEditorLogic } from '../hogFlowEditorLogic'
import { campaignLogic } from '../../campaignLogic'
import { HogFlowAction } from '../types'
import { StepSchemaErrors } from './components/StepSchemaErrors'
type DayConfig = 'any' | 'weekday' | 'weekend' | WeekdayType[]
type TimeConfig = 'any' | [string, string]
@@ -103,7 +104,7 @@ export function StepWaitUntilTimeWindowConfiguration({ node }: { node: Node<Wait
const action = node.data
const { timezone, day, time } = action.config
const { setCampaignActionConfig } = useActions(hogFlowEditorLogic)
const { partialSetCampaignActionConfig } = useActions(campaignLogic)
const { preflight } = useValues(preflightLogic)
const { currentTeam } = useValues(teamLogic)
@@ -119,49 +120,52 @@ export function StepWaitUntilTimeWindowConfiguration({ node }: { node: Node<Wait
if (!preflight?.available_timezones) {
throw new Error('No timezones are available')
}
setCampaignActionConfig(action.id, { timezone: newTimezone[0] })
partialSetCampaignActionConfig(action.id, { timezone: newTimezone[0] })
}
return (
<div className="flex flex-col gap-4">
<div className="flex flex-wrap">
<DayConfiguration
day={day}
isCustomDate={isCustomDate}
onDayChange={(value) => {
const config = getUpdatedDayConfig(value)
setCampaignActionConfig(action.id, config)
}}
onCustomDaysChange={(newDays) =>
setCampaignActionConfig(action.id, { day: [...newDays] as WeekdayType[] })
}
/>
<LemonDivider vertical />
<TimeConfiguration
time={time}
isCustomTime={isCustomTimeRange}
onTimeChange={(value) => {
const config = getUpdatedTimeConfig(value)
setCampaignActionConfig(action.id, config)
}}
onTimeRangeChange={(newTime, index) => {
if (isCustomTimeRange) {
const config = getUpdatedTimeRangeConfig(newTime, index, time)
setCampaignActionConfig(action.id, config)
<>
<StepSchemaErrors />
<div className="flex flex-col gap-4">
<div className="flex flex-wrap">
<DayConfiguration
day={day}
isCustomDate={isCustomDate}
onDayChange={(value) => {
const config = getUpdatedDayConfig(value)
partialSetCampaignActionConfig(action.id, config)
}}
onCustomDaysChange={(newDays) =>
partialSetCampaignActionConfig(action.id, { day: [...newDays] as WeekdayType[] })
}
}}
/>
<LemonDivider vertical />
<TimeConfiguration
time={time}
isCustomTime={isCustomTimeRange}
onTimeChange={(value) => {
const config = getUpdatedTimeConfig(value)
partialSetCampaignActionConfig(action.id, config)
}}
onTimeRangeChange={(newTime, index) => {
if (isCustomTimeRange) {
const config = getUpdatedTimeRangeConfig(newTime, index, time)
partialSetCampaignActionConfig(action.id, config)
}
}}
/>
</div>
<TimezoneConfiguration
timezone={timezone}
currentTeamTimezone={currentTeam?.timezone}
timezoneOptions={timezoneOptions}
onTimezoneChange={handleTimezoneChange}
/>
</div>
<TimezoneConfiguration
timezone={timezone}
currentTeamTimezone={currentTeam?.timezone}
timezoneOptions={timezoneOptions}
onTimezoneChange={handleTimezoneChange}
/>
</div>
</>
)
}

View File

@@ -0,0 +1,57 @@
import { useValues } from 'kea'
import { useEffect } from 'react'
import { Spinner } from '@posthog/lemon-ui'
import { CyclotronJobInputs } from 'lib/components/CyclotronJob/CyclotronJobInputs'
import { templateToConfiguration } from 'scenes/hog-functions/configuration/hogFunctionConfigurationLogic'
import { CyclotronJobInputType } from '~/types'
import { campaignLogic } from '../../../campaignLogic'
export function HogFlowFunctionConfiguration({
templateId,
inputs,
setInputs,
errors,
}: {
templateId: string
inputs: Record<string, CyclotronJobInputType>
setInputs: (inputs: Record<string, CyclotronJobInputType>) => void
errors?: Record<string, string>
}): JSX.Element {
const { hogFunctionTemplatesById, hogFunctionTemplatesByIdLoading } = useValues(campaignLogic)
const template = hogFunctionTemplatesById[templateId]
useEffect(() => {
if (template && Object.keys(inputs ?? {}).length === 0) {
setInputs(templateToConfiguration(template).inputs ?? {})
}
}, [template, setInputs, inputs])
if (hogFunctionTemplatesByIdLoading) {
return (
<div className="flex justify-center items-center">
<Spinner />
</div>
)
}
if (!template) {
return <div>Template not found!</div>
}
return (
<CyclotronJobInputs
errors={errors}
configuration={{
inputs: inputs as Record<string, CyclotronJobInputType>,
inputs_schema: template?.inputs_schema ?? [],
}}
showSource={false}
sampleGlobalsWithInputs={null} // TODO: Load this based on the trigger event
onInputChange={(key, value) => setInputs({ ...inputs, [key]: value })}
/>
)
}

View File

@@ -0,0 +1,26 @@
import { useValues } from 'kea'
import { LemonBanner } from '@posthog/lemon-ui'
import { campaignLogic } from '../../../campaignLogic'
import { hogFlowEditorLogic } from '../../hogFlowEditorLogic'
export function StepSchemaErrors(): JSX.Element | null {
const { selectedNode } = useValues(hogFlowEditorLogic)
const { actionValidationErrorsById } = useValues(campaignLogic)
const validationResult = actionValidationErrorsById[selectedNode?.id ?? '']
if (!validationResult?.schema) {
return null
}
return (
<div className="flex flex-col gap-1">
{Object.values(validationResult.schema.errors).map(({ path, message }) => (
<LemonBanner type="error" key={path.join('.')}>
{path.join('.')}: {message}
</LemonBanner>
))}
</div>
)
}

View File

@@ -34,16 +34,29 @@ const CyclotronInputSchema = z.object({
export type CyclotronInputType = z.infer<typeof CyclotronInputSchema>
export const HogFlowTriggerSchema = z.discriminatedUnion('type', [
z.object({
type: z.literal('event'),
filters: z.object({
events: z.array(z.any()).optional(),
properties: z.array(z.any()).optional(),
actions: z.array(z.any()).optional(),
}),
}),
z.object({
type: z.literal('webhook'),
template_uuid: z.string().uuid().optional(), // May be used later to specify a specific template version
template_id: z.string(),
inputs: z.record(CyclotronInputSchema),
}),
])
export const HogFlowActionSchema = z.discriminatedUnion('type', [
// Trigger
z.object({
..._commonActionFields,
type: z.literal('trigger'),
config: z.object({
type: z.literal('event'),
filters: z.any(),
}),
// A trigger's event filters are stored on the top-level Hogflow object
config: HogFlowTriggerSchema,
}),
// Branching
z.object({

View File

@@ -3,7 +3,7 @@ import { z } from 'zod'
import { CyclotronJobInputsValidationResult } from 'lib/components/CyclotronJob/CyclotronJobInputsValidation'
import { HogFlowActionSchema } from './steps/types'
import { HogFlowActionSchema, HogFlowTriggerSchema } from './steps/types'
const HogFlowEdgeSchema = z.object({
from: z.string(),
@@ -17,18 +17,9 @@ export const HogFlowSchema = z.object({
team_id: z.number(),
version: z.number(),
name: z.string(),
description: z.string().optional(),
status: z.enum(['active', 'draft', 'archived']),
trigger: z.object({
type: z.literal('event'),
filters: z.any(),
}),
trigger_masking: z
.object({
ttl: z.number(),
hash: z.string(),
threshold: z.number(),
})
.optional(),
trigger: HogFlowTriggerSchema.optional(),
conversion: z
.object({
window_minutes: z.number(),