fix(tempora): use sync to async for realtime cohort calculation (#40381)

This commit is contained in:
Meikel Ratz
2025-10-27 11:16:33 +01:00
committed by GitHub
parent 0e2b8ae4ff
commit 7aa9ef7294
2 changed files with 20 additions and 13 deletions

View File

@@ -12,6 +12,7 @@ from posthog.clickhouse.query_tagging import Feature, Product, tags_context
from posthog.kafka_client.client import KafkaProducer
from posthog.kafka_client.topics import KAFKA_COHORT_MEMBERSHIP_CHANGED
from posthog.models.action import Action
from posthog.sync import database_sync_to_async
from posthog.temporal.common.base import PostHogWorkflow
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
@@ -56,18 +57,22 @@ async def process_realtime_cohort_calculation_activity(inputs: RealtimeCohortCal
if not isinstance(inputs.min_matches, int) or inputs.min_matches < 0:
raise ValueError(f"Invalid min_matches value: {inputs.min_matches}")
# Only get actions that are not deleted and have bytecode
# Only fetch the fields we need for efficiency
queryset = Action.objects.filter(deleted=False, bytecode__isnull=False).only("id", "team_id")
@database_sync_to_async
def get_actions():
# Only get actions that are not deleted and have bytecode
# Only fetch the fields we need for efficiency
queryset = Action.objects.filter(deleted=False, bytecode__isnull=False).only("id", "team_id")
# Apply pagination
queryset = (
queryset.order_by("id")[inputs.offset : inputs.offset + inputs.limit]
if inputs.limit
else queryset[inputs.offset :]
)
# Apply pagination
queryset = (
queryset.order_by("id")[inputs.offset : inputs.offset + inputs.limit]
if inputs.limit
else queryset[inputs.offset :]
)
actions: list[Action] = list(queryset)
return list(queryset)
actions: list[Action] = await get_actions()
actions_count = 0

View File

@@ -64,10 +64,12 @@ async def get_realtime_cohort_calculation_count_activity(
) -> RealtimeCohortCalculationCountResult:
"""Get the total count of actions with bytecode."""
# Only get actions that are not deleted and have bytecode
queryset = Action.objects.filter(deleted=False, bytecode__isnull=False)
@database_sync_to_async
def get_action_count():
# Only get actions that are not deleted and have bytecode
return Action.objects.filter(deleted=False, bytecode__isnull=False).count()
count = await database_sync_to_async(queryset.count)()
count = await get_action_count()
return RealtimeCohortCalculationCountResult(count=count)