mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
fix(msg): prevent child workflows from terminating when parent completes (#38487)
This commit is contained in:
@@ -374,7 +374,7 @@
|
||||
FROM person AS where_optimization
|
||||
WHERE and(equals(where_optimization.team_id, 99999), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(where_optimization.properties, 'name'), ''), 'null'), '^"|"$', ''), 'test'), 0)))))
|
||||
GROUP BY person.id
|
||||
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))) AS persons
|
||||
HAVING and(equals(argMax(person.is_deleted, person.version), 0), less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))))) AS persons
|
||||
WHERE ifNull(equals(persons.properties___name, 'test'), 0)
|
||||
ORDER BY persons.id ASC
|
||||
LIMIT 1000000000 SETTINGS optimize_aggregation_in_order=1,
|
||||
@@ -398,7 +398,7 @@
|
||||
FROM person_distinct_id_overrides
|
||||
WHERE equals(person_distinct_id_overrides.team_id, 99999)
|
||||
GROUP BY person_distinct_id_overrides.distinct_id
|
||||
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id)
|
||||
HAVING equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id)
|
||||
WHERE and(equals(e.team_id, 99999), greaterOrEquals(timestamp, toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(timestamp, toDateTime64('today', 6, 'UTC')), equals(e.event, '$pageview')))
|
||||
GROUP BY actor_id) AS source
|
||||
ORDER BY source.id ASC
|
||||
@@ -467,7 +467,7 @@
|
||||
FROM person AS where_optimization
|
||||
WHERE and(equals(where_optimization.team_id, 99999), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(where_optimization.properties, 'name'), ''), 'null'), '^"|"$', ''), 'test'), 0)))))
|
||||
GROUP BY person.id
|
||||
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))) AS persons
|
||||
HAVING and(equals(argMax(person.is_deleted, person.version), 0), less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))))) AS persons
|
||||
WHERE ifNull(equals(persons.properties___name, 'test'), 0)
|
||||
ORDER BY persons.id ASC
|
||||
LIMIT 1000000000 SETTINGS optimize_aggregation_in_order=1,
|
||||
@@ -491,7 +491,7 @@
|
||||
FROM person_distinct_id_overrides
|
||||
WHERE equals(person_distinct_id_overrides.team_id, 99999)
|
||||
GROUP BY person_distinct_id_overrides.distinct_id
|
||||
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id)
|
||||
HAVING equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id)
|
||||
WHERE and(equals(e.team_id, 99999), greaterOrEquals(timestamp, toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(timestamp, toDateTime64('today', 6, 'UTC')), equals(e.event, '$pageview')))
|
||||
GROUP BY actor_id) AS source
|
||||
ORDER BY source.id ASC
|
||||
|
||||
@@ -3,6 +3,7 @@ import datetime as dt
|
||||
import dataclasses
|
||||
from typing import Any, Optional
|
||||
|
||||
import temporalio.common
|
||||
import temporalio.activity
|
||||
import temporalio.workflow
|
||||
from structlog.contextvars import bind_contextvars
|
||||
@@ -143,8 +144,11 @@ class BehavioralCohortsCoordinatorWorkflow(PostHogWorkflow):
|
||||
conditions_per_workflow = math.ceil(total_conditions / inputs.parallelism)
|
||||
conditions_per_workflow = min(conditions_per_workflow, inputs.conditions_per_workflow)
|
||||
|
||||
# Step 3: Import the child workflow inputs
|
||||
from posthog.temporal.messaging.behavioral_cohorts_workflow import BehavioralCohortsWorkflowInputs
|
||||
# Step 3: Import the child workflow inputs and workflow class
|
||||
from posthog.temporal.messaging.behavioral_cohorts_workflow import (
|
||||
BehavioralCohortsWorkflow,
|
||||
BehavioralCohortsWorkflowInputs,
|
||||
)
|
||||
|
||||
# Step 4: Launch child workflows - fire and forget
|
||||
workflows_scheduled = 0
|
||||
@@ -168,11 +172,13 @@ class BehavioralCohortsCoordinatorWorkflow(PostHogWorkflow):
|
||||
)
|
||||
|
||||
# Start child workflow - fire and forget, don't wait for result
|
||||
# Set parent_close_policy to ABANDON so child workflows continue after parent completes
|
||||
await temporalio.workflow.start_child_workflow(
|
||||
"behavioral-cohorts-analysis",
|
||||
BehavioralCohortsWorkflow.run,
|
||||
child_inputs,
|
||||
id=child_id,
|
||||
task_queue=MESSAGING_TASK_QUEUE,
|
||||
parent_close_policy=temporalio.workflow.ParentClosePolicy.ABANDON,
|
||||
)
|
||||
workflows_scheduled += 1
|
||||
|
||||
|
||||
Reference in New Issue
Block a user