diff --git a/ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr b/ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr index a3141f1ead..400cc3e155 100644 --- a/ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr +++ b/ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr @@ -374,7 +374,7 @@ FROM person AS where_optimization WHERE and(equals(where_optimization.team_id, 99999), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(where_optimization.properties, 'name'), ''), 'null'), '^"|"$', ''), 'test'), 0))))) GROUP BY person.id - HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))) AS persons + HAVING and(equals(argMax(person.is_deleted, person.version), 0), less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))))) AS persons WHERE ifNull(equals(persons.properties___name, 'test'), 0) ORDER BY persons.id ASC LIMIT 1000000000 SETTINGS optimize_aggregation_in_order=1, @@ -398,7 +398,7 @@ FROM person_distinct_id_overrides WHERE equals(person_distinct_id_overrides.team_id, 99999) GROUP BY person_distinct_id_overrides.distinct_id - HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id) + HAVING equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id) WHERE and(equals(e.team_id, 99999), greaterOrEquals(timestamp, toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(timestamp, toDateTime64('today', 6, 'UTC')), equals(e.event, '$pageview'))) GROUP BY actor_id) AS source ORDER BY source.id ASC @@ -467,7 +467,7 @@ FROM person AS where_optimization WHERE and(equals(where_optimization.team_id, 99999), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(where_optimization.properties, 'name'), ''), 'null'), '^"|"$', ''), 'test'), 0))))) GROUP BY person.id - HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))) AS persons + HAVING and(equals(argMax(person.is_deleted, person.version), 0), less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))))) AS persons WHERE ifNull(equals(persons.properties___name, 'test'), 0) ORDER BY persons.id ASC LIMIT 1000000000 SETTINGS optimize_aggregation_in_order=1, @@ -491,7 +491,7 @@ FROM person_distinct_id_overrides WHERE equals(person_distinct_id_overrides.team_id, 99999) GROUP BY person_distinct_id_overrides.distinct_id - HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id) + HAVING equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id) WHERE and(equals(e.team_id, 99999), greaterOrEquals(timestamp, toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(timestamp, toDateTime64('today', 6, 'UTC')), equals(e.event, '$pageview'))) GROUP BY actor_id) AS source ORDER BY source.id ASC diff --git a/posthog/temporal/messaging/behavioral_cohorts_workflow_coordinator.py b/posthog/temporal/messaging/behavioral_cohorts_workflow_coordinator.py index 093ed83c47..e2d59e8bea 100644 --- a/posthog/temporal/messaging/behavioral_cohorts_workflow_coordinator.py +++ b/posthog/temporal/messaging/behavioral_cohorts_workflow_coordinator.py @@ -3,6 +3,7 @@ import datetime as dt import dataclasses from typing import Any, Optional +import temporalio.common import temporalio.activity import temporalio.workflow from structlog.contextvars import bind_contextvars @@ -143,8 +144,11 @@ class BehavioralCohortsCoordinatorWorkflow(PostHogWorkflow): conditions_per_workflow = math.ceil(total_conditions / inputs.parallelism) conditions_per_workflow = min(conditions_per_workflow, inputs.conditions_per_workflow) - # Step 3: Import the child workflow inputs - from posthog.temporal.messaging.behavioral_cohorts_workflow import BehavioralCohortsWorkflowInputs + # Step 3: Import the child workflow inputs and workflow class + from posthog.temporal.messaging.behavioral_cohorts_workflow import ( + BehavioralCohortsWorkflow, + BehavioralCohortsWorkflowInputs, + ) # Step 4: Launch child workflows - fire and forget workflows_scheduled = 0 @@ -168,11 +172,13 @@ class BehavioralCohortsCoordinatorWorkflow(PostHogWorkflow): ) # Start child workflow - fire and forget, don't wait for result + # Set parent_close_policy to ABANDON so child workflows continue after parent completes await temporalio.workflow.start_child_workflow( - "behavioral-cohorts-analysis", + BehavioralCohortsWorkflow.run, child_inputs, id=child_id, task_queue=MESSAGING_TASK_QUEUE, + parent_close_policy=temporalio.workflow.ParentClosePolicy.ABANDON, ) workflows_scheduled += 1