diff --git a/.vscode/launch.json b/.vscode/launch.json index 1127b0b7d9..1d02b35884 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -303,19 +303,8 @@ "id": "temporalTaskQueue", "type": "pickString", "description": "Task queue name", - "options": [ - "general-purpose-task-queue", - "max-ai-task-queue", - "batch-exports-task-queue", - "data-warehouse-task-queue", - "data-warehouse-compaction-task-queue", - "data-modeling-task-queue", - "tasks-task-queue", - "billing-task-queue", - "video-export-task-queue", - "session-replay-task-queue" - ], - "default": "general-purpose-task-queue" + "options": ["development-task-queue"], + "default": "development-task-queue" } ], "compounds": [ diff --git a/bin/mprocs-with-logging.yaml b/bin/mprocs-with-logging.yaml index 2b0afd9039..d809964640 100644 --- a/bin/mprocs-with-logging.yaml +++ b/bin/mprocs-with-logging.yaml @@ -15,42 +15,8 @@ procs: shell: './bin/start-frontend 2>&1 | tee /tmp/posthog-frontend.log' autorestart: true - temporal-worker-general-purpose: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue general-purpose-task-queue 2>&1 | tee /tmp/posthog-temporal-worker-general-purpose.log' - - temporal-worker-batch-exports: - # added a sleep to give the docker stuff time to start - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue batch-exports-task-queue --metrics-port 8002 2>&1 | tee /tmp/posthog-temporal-worker-batch-exports.log' - - temporal-worker-data-warehouse: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue data-warehouse-task-queue --metrics-port 8003 2>&1 | tee /tmp/posthog-temporal-worker-data-warehouse.log' - - temporal-worker-data-warehouse-compaction: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue data-warehouse-compaction-task-queue --metrics-port 8004 2>&1 | tee /tmp/posthog-temporal-worker-data-warehouse-compaction.log' - - temporal-worker-data-modeling: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue data-modeling-task-queue --metrics-port 8005 2>&1 | tee /tmp/posthog-temporal-worker-data-modeling.log' - - temporal-worker-max-ai: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && nodemon -w common -w dags -w ee -w posthog -w products -w pyproject.toml -e py --signal SIGTERM --exec "python manage.py start_temporal_worker --task-queue max-ai-task-queue --metrics-port 8006" 2>&1 | tee /tmp/posthog-temporal-worker-max-ai.log' - - temporal-worker-tasks-agent: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue tasks-task-queue --metrics-port 8007 2>&1 | tee /tmp/posthog-temporal-worker-tasks-agent.log' - - temporal-worker-billing: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue billing-task-queue --metrics-port 8008 2>&1 | tee /tmp/posthog-temporal-worker-billing.log' - - temporal-worker-video-export: - shell: 'bin/check_video_deps && bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue video-export-task-queue --metrics-port 8009 2>&1 | tee /tmp/posthog-temporal-worker-video-export.log' - - temporal-worker-session-replay: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue session-replay-task-queue --metrics-port 8010 2>&1 | tee /tmp/posthog-temporal-worker-session-replay.log' - - temporal-worker-analytics-platform: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue analytics-platform-task-queue --metrics-port 8011 2>&1 | tee /tmp/posthog-temporal-worker-analytics-platform.log' - - temporal-worker-weekly-digest: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue weekly-digest-task-queue --metrics-port 8012 --use-pydantic-converter 2>&1 | tee /tmp/posthog-temporal-worker-weekly-digest.log' + temporal-worker: + shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue development-task-queue 2>&1 | tee /tmp/posthog-temporal-worker.log' dagster: shell: |- diff --git a/bin/mprocs.yaml b/bin/mprocs.yaml index ede6aed077..a40da0e455 100755 --- a/bin/mprocs.yaml +++ b/bin/mprocs.yaml @@ -15,42 +15,8 @@ procs: shell: './bin/start-frontend' autorestart: true - temporal-worker-general-purpose: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue general-purpose-task-queue' - - temporal-worker-batch-exports: - # added a sleep to give the docker stuff time to start - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue batch-exports-task-queue --metrics-port 8002' - - temporal-worker-data-warehouse: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue data-warehouse-task-queue --metrics-port 8003' - - temporal-worker-data-warehouse-compaction: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue data-warehouse-compaction-task-queue --metrics-port 8004' - - temporal-worker-data-modeling: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue data-modeling-task-queue --metrics-port 8005' - - temporal-worker-max-ai: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && nodemon -w common -w dags -w ee -w posthog -w products -w pyproject.toml -e py --signal SIGTERM --exec "python manage.py start_temporal_worker --task-queue max-ai-task-queue --metrics-port 8006"' - - temporal-worker-tasks-agent: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue tasks-task-queue --metrics-port 8007' - - temporal-worker-billing: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue billing-task-queue --metrics-port 8008' - - temporal-worker-video-export: - shell: 'bin/check_video_deps && bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue video-export-task-queue --metrics-port 8009' - - temporal-worker-session-replay: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue session-replay-task-queue --metrics-port 8010' - - temporal-worker-analytics-platform: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue analytics-platform-task-queue --metrics-port 8011' - - temporal-worker-weekly-digest: - shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue weekly-digest-task-queue --metrics-port 8012 --use-pydantic-converter' + temporal-worker: + shell: 'bin/check_kafka_clickhouse_up && bin/check_temporal_up && python manage.py start_temporal_worker --task-queue development-task-queue' dagster: shell: |- diff --git a/ee/api/subscription.py b/ee/api/subscription.py index 57fbc7a983..7c436a477e 100644 --- a/ee/api/subscription.py +++ b/ee/api/subscription.py @@ -2,6 +2,7 @@ import uuid import asyncio from typing import Any +from django.conf import settings from django.db.models import QuerySet from django.http import HttpRequest, JsonResponse @@ -12,7 +13,7 @@ from rest_framework.exceptions import ValidationError from posthog.api.forbid_destroy_model import ForbidDestroyModel from posthog.api.routing import TeamAndOrgViewSetMixin from posthog.api.shared import UserBasicSerializer -from posthog.constants import ANALYTICS_PLATFORM_TASK_QUEUE, AvailableFeature +from posthog.constants import AvailableFeature from posthog.models.subscription import Subscription, unsubscribe_using_token from posthog.permissions import PremiumFeaturePermission from posthog.temporal.common.client import sync_connect @@ -98,7 +99,7 @@ class SubscriptionSerializer(serializers.ModelSerializer): invite_message=invite_message, ), id=workflow_id, - task_queue=ANALYTICS_PLATFORM_TASK_QUEUE, + task_queue=settings.ANALYTICS_PLATFORM_TASK_QUEUE, ) ) @@ -123,7 +124,7 @@ class SubscriptionSerializer(serializers.ModelSerializer): invite_message=invite_message, ), id=workflow_id, - task_queue=ANALYTICS_PLATFORM_TASK_QUEUE, + task_queue=settings.ANALYTICS_PLATFORM_TASK_QUEUE, ) ) diff --git a/ee/hogai/stream/conversation_stream.py b/ee/hogai/stream/conversation_stream.py index 8e85eaf695..ee5a4b7ace 100644 --- a/ee/hogai/stream/conversation_stream.py +++ b/ee/hogai/stream/conversation_stream.py @@ -3,13 +3,14 @@ from collections.abc import AsyncGenerator from typing import Any from uuid import uuid4 +from django.conf import settings + import structlog from temporalio.client import WorkflowExecutionStatus, WorkflowHandle from temporalio.common import WorkflowIDConflictPolicy, WorkflowIDReusePolicy from posthog.schema import AssistantEventType, FailureMessage -from posthog.constants import MAX_AI_TASK_QUEUE from posthog.temporal.ai.conversation import ( AssistantConversationRunnerWorkflow, AssistantConversationRunnerWorkflowInputs, @@ -76,7 +77,7 @@ class ConversationStreamManager: AssistantConversationRunnerWorkflow.run, workflow_inputs, id=self._workflow_id, - task_queue=MAX_AI_TASK_QUEUE, + task_queue=settings.MAX_AI_TASK_QUEUE, id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE, ) diff --git a/ee/hogai/stream/test/test_conversation_stream.py b/ee/hogai/stream/test/test_conversation_stream.py index 75e0070c4c..f63ed9a138 100644 --- a/ee/hogai/stream/test/test_conversation_stream.py +++ b/ee/hogai/stream/test/test_conversation_stream.py @@ -4,11 +4,12 @@ from uuid import uuid4 from posthog.test.base import BaseTest from unittest.mock import AsyncMock, Mock, patch +from django.conf import settings + from temporalio.client import WorkflowExecutionStatus from posthog.schema import AssistantEventType, AssistantMessage, HumanMessage -from posthog.constants import MAX_AI_TASK_QUEUE from posthog.temporal.ai.conversation import ( AssistantConversationRunnerWorkflow, AssistantConversationRunnerWorkflowInputs, @@ -90,7 +91,7 @@ class TestConversationStreamManager(BaseTest): self.assertEqual(call_args[0][1], workflow_inputs) # Check keyword arguments - self.assertEqual(call_args[1]["task_queue"], MAX_AI_TASK_QUEUE) + self.assertEqual(call_args[1]["task_queue"], settings.MAX_AI_TASK_QUEUE) self.assertIn("conversation-", call_args[1]["id"]) @patch("ee.hogai.stream.conversation_stream.async_connect") diff --git a/playwright/__snapshots__/insight-sharing-password-login.png b/playwright/__snapshots__/insight-sharing-password-login.png index ae135c0f23..b2b426c0e4 100644 Binary files a/playwright/__snapshots__/insight-sharing-password-login.png and b/playwright/__snapshots__/insight-sharing-password-login.png differ diff --git a/playwright/__snapshots__/pageview-trends-insight.png b/playwright/__snapshots__/pageview-trends-insight.png index c41c9f22a7..7ade46cb4f 100644 Binary files a/playwright/__snapshots__/pageview-trends-insight.png and b/playwright/__snapshots__/pageview-trends-insight.png differ diff --git a/plugin-server/src/llm-analytics/services/temporal.service.ts b/plugin-server/src/llm-analytics/services/temporal.service.ts index 39b5527e74..871218d546 100644 --- a/plugin-server/src/llm-analytics/services/temporal.service.ts +++ b/plugin-server/src/llm-analytics/services/temporal.service.ts @@ -3,9 +3,10 @@ import fs from 'fs/promises' import { Counter } from 'prom-client' import { Hub } from '../../types' +import { isDevEnv } from '../../utils/env-utils' import { logger } from '../../utils/logger' -const EVALUATION_TASK_QUEUE = 'general-purpose-task-queue' +const EVALUATION_TASK_QUEUE = isDevEnv() ? 'development-task-queue' : 'general-purpose-task-queue' const temporalWorkflowsStarted = new Counter({ name: 'evaluation_run_workflows_started', diff --git a/posthog/api/exports.py b/posthog/api/exports.py index a8643a1019..bc79586620 100644 --- a/posthog/api/exports.py +++ b/posthog/api/exports.py @@ -2,6 +2,7 @@ import threading from datetime import timedelta from typing import Any +from django.conf import settings from django.http import HttpResponse from django.utils.timezone import now @@ -16,7 +17,6 @@ from temporalio.common import RetryPolicy, WorkflowIDReusePolicy from posthog.api.routing import TeamAndOrgViewSetMixin from posthog.api.utils import action -from posthog.constants import VIDEO_EXPORT_TASK_QUEUE from posthog.event_usage import groups from posthog.models import Insight, User from posthog.models.activity_logging.activity_log import Change, Detail, log_activity @@ -188,7 +188,7 @@ class ExportedAssetSerializer(serializers.ModelSerializer): VideoExportWorkflow.run, VideoExportInputs(exported_asset_id=instance.id), id=f"export-video-{instance.id}", - task_queue=VIDEO_EXPORT_TASK_QUEUE, + task_queue=settings.VIDEO_EXPORT_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=int(TEMPORAL_WORKFLOW_MAX_ATTEMPTS)), id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY, ) diff --git a/posthog/api/person.py b/posthog/api/person.py index af876ecbe1..2893c473f4 100644 --- a/posthog/api/person.py +++ b/posthog/api/person.py @@ -6,6 +6,7 @@ from collections.abc import Callable from datetime import UTC, datetime, timedelta from typing import Any, List, Optional, TypeVar, Union, cast # noqa: UP035 +from django.conf import settings from django.db.models import Prefetch from django.shortcuts import get_object_or_404 @@ -31,7 +32,7 @@ from posthog.api.documentation import PersonPropertiesSerializer, extend_schema from posthog.api.insight import capture_legacy_api_call from posthog.api.routing import TeamAndOrgViewSetMixin from posthog.api.utils import action, format_paginated_url, get_pk_or_uuid, get_target_entity -from posthog.constants import INSIGHT_FUNNELS, LIMIT, OFFSET, SESSION_REPLAY_TASK_QUEUE, FunnelVizType +from posthog.constants import INSIGHT_FUNNELS, LIMIT, OFFSET, FunnelVizType from posthog.decorators import cached_by_filters from posthog.logging.timing import timed from posthog.metrics import LABEL_TEAM_ID @@ -879,7 +880,7 @@ class PersonViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet): "delete-recordings-with-person", input, id=workflow_id, - task_queue=SESSION_REPLAY_TASK_QUEUE, + task_queue=settings.SESSION_REPLAY_TASK_QUEUE, retry_policy=common.RetryPolicy( maximum_attempts=2, initial_interval=timedelta(minutes=1), diff --git a/posthog/api/proxy_record.py b/posthog/api/proxy_record.py index 6072237238..c2183d4bd7 100644 --- a/posthog/api/proxy_record.py +++ b/posthog/api/proxy_record.py @@ -9,7 +9,6 @@ from rest_framework.response import Response from rest_framework.viewsets import ModelViewSet from posthog.api.routing import TeamAndOrgViewSetMixin -from posthog.constants import GENERAL_PURPOSE_TASK_QUEUE from posthog.event_usage import groups from posthog.models import ProxyRecord from posthog.models.organization import Organization @@ -89,7 +88,7 @@ class ProxyRecordViewset(TeamAndOrgViewSetMixin, ModelViewSet): "create-proxy", inputs, id=workflow_id, - task_queue=GENERAL_PURPOSE_TASK_QUEUE, + task_queue=settings.GENERAL_PURPOSE_TASK_QUEUE, ) ) @@ -119,7 +118,7 @@ class ProxyRecordViewset(TeamAndOrgViewSetMixin, ModelViewSet): "delete-proxy", inputs, id=workflow_id, - task_queue=GENERAL_PURPOSE_TASK_QUEUE, + task_queue=settings.GENERAL_PURPOSE_TASK_QUEUE, ) ) record.status = ProxyRecord.Status.DELETING diff --git a/posthog/api/test/batch_exports/operations.py b/posthog/api/test/batch_exports/operations.py index 4f2c297110..6105783298 100644 --- a/posthog/api/test/batch_exports/operations.py +++ b/posthog/api/test/batch_exports/operations.py @@ -4,6 +4,7 @@ import datetime as dt import threading from contextlib import contextmanager +from django.conf import settings from django.test.client import Client as TestClient import temporalio.client @@ -13,7 +14,6 @@ from rest_framework import status from temporalio.client import Client as TemporalClient from temporalio.worker import Worker -from posthog import constants from posthog.models.utils import UUIDT from products.batch_exports.backend.temporal import ACTIVITIES, WORKFLOWS @@ -64,7 +64,7 @@ class ThreadedWorker(Worker): def start_test_worker(temporal: TemporalClient): with ThreadedWorker( client=temporal, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=WORKFLOWS, activities=ACTIVITIES, workflow_runner=temporalio.worker.UnsandboxedWorkflowRunner(), diff --git a/posthog/api/test/test_person.py b/posthog/api/test/test_person.py index 0bfee3a96a..a38ee8d4a9 100644 --- a/posthog/api/test/test_person.py +++ b/posthog/api/test/test_person.py @@ -17,6 +17,7 @@ from posthog.test.base import ( from unittest import mock from unittest.mock import patch +from django.conf import settings from django.utils import timezone from flaky import flaky @@ -25,7 +26,6 @@ from temporalio import common import posthog.models.person.deletion from posthog.clickhouse.client import sync_execute -from posthog.constants import SESSION_REPLAY_TASK_QUEUE from posthog.models import Cohort, Organization, Person, PropertyDefinition, Team from posthog.models.async_deletion import AsyncDeletion, DeletionType from posthog.models.person import PersonDistinctId @@ -410,7 +410,7 @@ class TestPerson(ClickhouseTestMixin, APIBaseTest): team_id=self.team.id, ), id=f"delete-recordings-with-person-{person.uuid}-1234", - task_queue=SESSION_REPLAY_TASK_QUEUE, + task_queue=settings.SESSION_REPLAY_TASK_QUEUE, retry_policy=common.RetryPolicy( initial_interval=timedelta(seconds=60), backoff_coefficient=2.0, @@ -451,7 +451,7 @@ class TestPerson(ClickhouseTestMixin, APIBaseTest): team_id=self.team.id, ), id=f"delete-recordings-with-person-{person.uuid}-1234", - task_queue=SESSION_REPLAY_TASK_QUEUE, + task_queue=settings.SESSION_REPLAY_TASK_QUEUE, retry_policy=common.RetryPolicy( initial_interval=timedelta(seconds=60), backoff_coefficient=2.0, diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 814c2268e2..bb80a629ad 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -5,6 +5,8 @@ import collections.abc from dataclasses import asdict, dataclass, fields from uuid import UUID +from django.conf import settings + import structlog import temporalio import temporalio.common @@ -25,7 +27,6 @@ from posthog.hogql.hogql import HogQLContext from posthog.batch_exports.models import BatchExport, BatchExportBackfill, BatchExportDestination, BatchExportRun from posthog.clickhouse.client import sync_execute -from posthog.constants import BATCH_EXPORTS_TASK_QUEUE, SYNC_BATCH_EXPORTS_TASK_QUEUE from posthog.temporal.common.client import sync_connect from posthog.temporal.common.schedule import ( a_pause_schedule, @@ -613,7 +614,7 @@ async def start_backfill_batch_export_workflow( "backfill-batch-export", inputs, id=workflow_id, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, ) return workflow_id @@ -735,7 +736,11 @@ def sync_batch_export(batch_export: BatchExport, created: bool): destination_config_fields = {field.name for field in fields(workflow_inputs)} destination_config = {k: v for k, v in batch_export.destination.config.items() if k in destination_config_fields} - task_queue = SYNC_BATCH_EXPORTS_TASK_QUEUE if batch_export.destination.type == "HTTP" else BATCH_EXPORTS_TASK_QUEUE + task_queue = ( + settings.SYNC_BATCH_EXPORTS_TASK_QUEUE + if batch_export.destination.type == "HTTP" + else settings.BATCH_EXPORTS_TASK_QUEUE + ) context = HogQLContext( team_id=batch_export.team.id, diff --git a/posthog/constants.py b/posthog/constants.py index cae4bbf31c..474269540c 100644 --- a/posthog/constants.py +++ b/posthog/constants.py @@ -313,21 +313,6 @@ SURVEY_TARGETING_FLAG_PREFIX = "survey-targeting-" GENERATED_DASHBOARD_PREFIX = "Generated Dashboard" ENRICHED_DASHBOARD_INSIGHT_IDENTIFIER = "Feature Viewed" -DATA_WAREHOUSE_TASK_QUEUE = "data-warehouse-task-queue" -MAX_AI_TASK_QUEUE = "max-ai-task-queue" -DATA_WAREHOUSE_COMPACTION_TASK_QUEUE = "data-warehouse-compaction-task-queue" -BATCH_EXPORTS_TASK_QUEUE = "batch-exports-task-queue" -DATA_MODELING_TASK_QUEUE = "data-modeling-task-queue" -SYNC_BATCH_EXPORTS_TASK_QUEUE = "no-sandbox-python-django" -GENERAL_PURPOSE_TASK_QUEUE = "general-purpose-task-queue" -TASKS_TASK_QUEUE = "tasks-task-queue" -TEST_TASK_QUEUE = "test-task-queue" -BILLING_TASK_QUEUE = "billing-task-queue" -VIDEO_EXPORT_TASK_QUEUE = "video-export-task-queue" -MESSAGING_TASK_QUEUE = "messaging-task-queue" -ANALYTICS_PLATFORM_TASK_QUEUE = "analytics-platform-task-queue" -SESSION_REPLAY_TASK_QUEUE = "session-replay-task-queue" -WEEKLY_DIGEST_TASK_QUEUE = "weekly-digest-task-queue" PERMITTED_FORUM_DOMAINS = ["localhost", "posthog.com"] diff --git a/posthog/management/commands/analyze_realtime_cohort_calculation.py b/posthog/management/commands/analyze_realtime_cohort_calculation.py index 2d78aae5d4..f2a1622aa6 100644 --- a/posthog/management/commands/analyze_realtime_cohort_calculation.py +++ b/posthog/management/commands/analyze_realtime_cohort_calculation.py @@ -2,12 +2,12 @@ import time import asyncio import logging +from django.conf import settings from django.core.management.base import BaseCommand import structlog from temporalio.common import WorkflowIDReusePolicy -from posthog.constants import MESSAGING_TASK_QUEUE from posthog.temporal.common.client import async_connect from posthog.temporal.messaging.realtime_cohort_calculation_workflow_coordinator import ( RealtimeCohortCalculationCoordinatorWorkflowInputs, @@ -119,7 +119,7 @@ class Command(BaseCommand): inputs, id=workflow_id, id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY, - task_queue=MESSAGING_TASK_QUEUE, + task_queue=settings.MESSAGING_TASK_QUEUE, ) logger.info(f"Workflow {workflow_id} started successfully") diff --git a/posthog/management/commands/start_temporal_worker.py b/posthog/management/commands/start_temporal_worker.py index 46d872b93d..b963f494c2 100644 --- a/posthog/management/commands/start_temporal_worker.py +++ b/posthog/management/commands/start_temporal_worker.py @@ -1,35 +1,22 @@ import signal +import typing as t import asyncio import datetime as dt import functools import faulthandler +from collections import defaultdict import structlog from temporalio import workflow from temporalio.worker import Worker +from posthog.temporal.common.base import PostHogWorkflow + with workflow.unsafe.imports_passed_through(): from django.conf import settings from django.core.management.base import BaseCommand from posthog.clickhouse.query_tagging import tag_queries -from posthog.constants import ( - ANALYTICS_PLATFORM_TASK_QUEUE, - BATCH_EXPORTS_TASK_QUEUE, - BILLING_TASK_QUEUE, - DATA_MODELING_TASK_QUEUE, - DATA_WAREHOUSE_COMPACTION_TASK_QUEUE, - DATA_WAREHOUSE_TASK_QUEUE, - GENERAL_PURPOSE_TASK_QUEUE, - MAX_AI_TASK_QUEUE, - MESSAGING_TASK_QUEUE, - SESSION_REPLAY_TASK_QUEUE, - SYNC_BATCH_EXPORTS_TASK_QUEUE, - TASKS_TASK_QUEUE, - TEST_TASK_QUEUE, - VIDEO_EXPORT_TASK_QUEUE, - WEEKLY_DIGEST_TASK_QUEUE, -) from posthog.temporal.ai import ( ACTIVITIES as AI_ACTIVITIES, WORKFLOWS as AI_WORKFLOWS, @@ -110,56 +97,116 @@ from products.tasks.backend.temporal import ( WORKFLOWS as TASKS_WORKFLOWS, ) -# Workflow and activity index -WORKFLOWS_DICT = { - SYNC_BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_WORKFLOWS, - BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_WORKFLOWS, - DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS, - DATA_WAREHOUSE_COMPACTION_TASK_QUEUE: DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS, - DATA_MODELING_TASK_QUEUE: DATA_MODELING_WORKFLOWS, - GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_WORKFLOWS - + DELETE_PERSONS_WORKFLOWS - + USAGE_REPORTS_WORKFLOWS - + SALESFORCE_ENRICHMENT_WORKFLOWS - + PRODUCT_ANALYTICS_WORKFLOWS - + LLM_ANALYTICS_WORKFLOWS, - ANALYTICS_PLATFORM_TASK_QUEUE: SUBSCRIPTION_WORKFLOWS, - TASKS_TASK_QUEUE: TASKS_WORKFLOWS, - MAX_AI_TASK_QUEUE: AI_WORKFLOWS, - TEST_TASK_QUEUE: TEST_WORKFLOWS, - BILLING_TASK_QUEUE: QUOTA_LIMITING_WORKFLOWS + SALESFORCE_ENRICHMENT_WORKFLOWS, - VIDEO_EXPORT_TASK_QUEUE: VIDEO_EXPORT_WORKFLOWS, - SESSION_REPLAY_TASK_QUEUE: DELETE_RECORDING_WORKFLOWS + ENFORCE_MAX_REPLAY_RETENTION_WORKFLOWS, - MESSAGING_TASK_QUEUE: MESSAGING_WORKFLOWS, - WEEKLY_DIGEST_TASK_QUEUE: WEEKLY_DIGEST_WORKFLOWS, -} -ACTIVITIES_DICT = { - SYNC_BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_ACTIVITIES, - BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_ACTIVITIES, - DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES, - DATA_WAREHOUSE_COMPACTION_TASK_QUEUE: DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES, - DATA_MODELING_TASK_QUEUE: DATA_MODELING_ACTIVITIES, - GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_ACTIVITIES - + DELETE_PERSONS_ACTIVITIES - + USAGE_REPORTS_ACTIVITIES - + QUOTA_LIMITING_ACTIVITIES - + SALESFORCE_ENRICHMENT_ACTIVITIES - + PRODUCT_ANALYTICS_ACTIVITIES - + LLM_ANALYTICS_ACTIVITIES, - ANALYTICS_PLATFORM_TASK_QUEUE: SUBSCRIPTION_ACTIVITIES, - TASKS_TASK_QUEUE: TASKS_ACTIVITIES, - MAX_AI_TASK_QUEUE: AI_ACTIVITIES, - TEST_TASK_QUEUE: TEST_ACTIVITIES, - BILLING_TASK_QUEUE: QUOTA_LIMITING_ACTIVITIES + SALESFORCE_ENRICHMENT_ACTIVITIES, - VIDEO_EXPORT_TASK_QUEUE: VIDEO_EXPORT_ACTIVITIES, - SESSION_REPLAY_TASK_QUEUE: DELETE_RECORDING_ACTIVITIES + ENFORCE_MAX_REPLAY_RETENTION_ACTIVITIES, - MESSAGING_TASK_QUEUE: MESSAGING_ACTIVITIES, - WEEKLY_DIGEST_TASK_QUEUE: WEEKLY_DIGEST_ACTIVITIES, -} +_task_queue_specs = [ + ( + settings.SYNC_BATCH_EXPORTS_TASK_QUEUE, + BATCH_EXPORTS_WORKFLOWS, + BATCH_EXPORTS_ACTIVITIES, + ), + ( + settings.BATCH_EXPORTS_TASK_QUEUE, + BATCH_EXPORTS_WORKFLOWS, + BATCH_EXPORTS_ACTIVITIES, + ), + ( + settings.DATA_WAREHOUSE_TASK_QUEUE, + DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS, + DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES, + ), + ( + settings.DATA_WAREHOUSE_COMPACTION_TASK_QUEUE, + DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS, + DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES, + ), + ( + settings.DATA_MODELING_TASK_QUEUE, + DATA_MODELING_WORKFLOWS, + DATA_MODELING_ACTIVITIES, + ), + ( + settings.GENERAL_PURPOSE_TASK_QUEUE, + PROXY_SERVICE_WORKFLOWS + + DELETE_PERSONS_WORKFLOWS + + USAGE_REPORTS_WORKFLOWS + + SALESFORCE_ENRICHMENT_WORKFLOWS + + PRODUCT_ANALYTICS_WORKFLOWS + + LLM_ANALYTICS_WORKFLOWS, + PROXY_SERVICE_ACTIVITIES + + DELETE_PERSONS_ACTIVITIES + + USAGE_REPORTS_ACTIVITIES + + QUOTA_LIMITING_ACTIVITIES + + SALESFORCE_ENRICHMENT_ACTIVITIES + + PRODUCT_ANALYTICS_ACTIVITIES + + LLM_ANALYTICS_ACTIVITIES, + ), + ( + settings.ANALYTICS_PLATFORM_TASK_QUEUE, + SUBSCRIPTION_WORKFLOWS, + SUBSCRIPTION_ACTIVITIES, + ), + ( + settings.TASKS_TASK_QUEUE, + TASKS_WORKFLOWS, + TASKS_ACTIVITIES, + ), + ( + settings.MAX_AI_TASK_QUEUE, + AI_WORKFLOWS, + AI_ACTIVITIES, + ), + ( + settings.TEST_TASK_QUEUE, + TEST_WORKFLOWS, + TEST_ACTIVITIES, + ), + ( + settings.BILLING_TASK_QUEUE, + QUOTA_LIMITING_WORKFLOWS + SALESFORCE_ENRICHMENT_WORKFLOWS, + QUOTA_LIMITING_ACTIVITIES + SALESFORCE_ENRICHMENT_ACTIVITIES, + ), + ( + settings.VIDEO_EXPORT_TASK_QUEUE, + VIDEO_EXPORT_WORKFLOWS, + VIDEO_EXPORT_ACTIVITIES, + ), + ( + settings.SESSION_REPLAY_TASK_QUEUE, + DELETE_RECORDING_WORKFLOWS + ENFORCE_MAX_REPLAY_RETENTION_WORKFLOWS, + DELETE_RECORDING_ACTIVITIES + ENFORCE_MAX_REPLAY_RETENTION_ACTIVITIES, + ), + ( + settings.MESSAGING_TASK_QUEUE, + MESSAGING_WORKFLOWS, + MESSAGING_ACTIVITIES, + ), + ( + settings.WEEKLY_DIGEST_TASK_QUEUE, + WEEKLY_DIGEST_WORKFLOWS, + WEEKLY_DIGEST_ACTIVITIES, + ), +] -TASK_QUEUE_METRIC_PREFIXES = { - BATCH_EXPORTS_TASK_QUEUE: "batch_exports_", -} +# Note: When running locally, many task queues resolve to the same queue name. +# If we used plain dict literals, later entries would overwrite earlier ones for +# the same queue. We aggregate with defaultdict(set) so all workflows/activities +# registered for a shared queue name are combined, ensuring the worker registers +# everything it should. +_workflows: defaultdict[str, set[type[PostHogWorkflow]]] = defaultdict(set) +_activities: defaultdict[str, set[t.Callable[..., t.Any]]] = defaultdict(set) +for task_queue_name, workflows_for_queue, activities_for_queue in _task_queue_specs: + _workflows[task_queue_name].update(workflows_for_queue) # type: ignore + _activities[task_queue_name].update(activities_for_queue) + +WORKFLOWS_DICT = _workflows +ACTIVITIES_DICT = _activities + + +if settings.DEBUG: + TASK_QUEUE_METRIC_PREFIXES = {} +else: + TASK_QUEUE_METRIC_PREFIXES = { + settings.BATCH_EXPORTS_TASK_QUEUE: "batch_exports_", + } LOGGER = get_logger(__name__) @@ -244,8 +291,8 @@ class Command(BaseCommand): use_pydantic_converter = options["use_pydantic_converter"] try: - workflows = WORKFLOWS_DICT[task_queue] - activities = ACTIVITIES_DICT[task_queue] + workflows = list(WORKFLOWS_DICT[task_queue]) + activities = list(ACTIVITIES_DICT[task_queue]) except KeyError: raise ValueError(f'Task queue "{task_queue}" not found in WORKFLOWS_DICT or ACTIVITIES_DICT') @@ -301,7 +348,7 @@ class Command(BaseCommand): server_root_ca_cert=server_root_ca_cert, client_cert=client_cert, client_key=client_key, - workflows=workflows, # type: ignore + workflows=workflows, activities=activities, graceful_shutdown_timeout=( dt.timedelta(seconds=graceful_shutdown_timeout_seconds) diff --git a/posthog/settings/temporal.py b/posthog/settings/temporal.py index 0af8ba2f33..43394db2c6 100644 --- a/posthog/settings/temporal.py +++ b/posthog/settings/temporal.py @@ -1,9 +1,9 @@ import os +from posthog.settings.base_variables import DEBUG from posthog.settings.utils import get_from_env TEMPORAL_NAMESPACE: str = os.getenv("TEMPORAL_NAMESPACE", "default") -TEMPORAL_TASK_QUEUE: str = os.getenv("TEMPORAL_TASK_QUEUE", "general-purpose-task-queue") TEMPORAL_HOST: str = os.getenv("TEMPORAL_HOST", "127.0.0.1") TEMPORAL_PORT: str = os.getenv("TEMPORAL_PORT", "7233") TEMPORAL_CLIENT_ROOT_CA: str | None = os.getenv("TEMPORAL_CLIENT_ROOT_CA", None) @@ -38,3 +38,32 @@ CLICKHOUSE_MAX_BLOCK_SIZE_DEFAULT: int = get_from_env("CLICKHOUSE_MAX_BLOCK_SIZE CLICKHOUSE_MAX_BLOCK_SIZE_OVERRIDES: dict[int, int] = dict( [map(int, o.split(":")) for o in os.getenv("CLICKHOUSE_MAX_BLOCK_SIZE_OVERRIDES", "").split(",") if o] # type: ignore ) + + +# Temporal task queues +# Temporal has a limitation where a worker can only listen to a single queue. +# To avoid running multiple workers, when running locally (DEBUG=True), we use a single queue for all tasks. +# In production (DEBUG=False), we use separate queues for each worker type. +def _set_temporal_task_queue(task_queue: str) -> str: + if DEBUG: + return "development-task-queue" + return task_queue + + +default_task_queue = os.getenv("TEMPORAL_TASK_QUEUE", "general-purpose-task-queue") +TEMPORAL_TASK_QUEUE: str = _set_temporal_task_queue(default_task_queue) +DATA_WAREHOUSE_TASK_QUEUE = _set_temporal_task_queue("data-warehouse-task-queue") +MAX_AI_TASK_QUEUE = _set_temporal_task_queue("max-ai-task-queue") +DATA_WAREHOUSE_COMPACTION_TASK_QUEUE = _set_temporal_task_queue("data-warehouse-compaction-task-queue") +BATCH_EXPORTS_TASK_QUEUE = _set_temporal_task_queue("batch-exports-task-queue") +DATA_MODELING_TASK_QUEUE = _set_temporal_task_queue("data-modeling-task-queue") +SYNC_BATCH_EXPORTS_TASK_QUEUE = _set_temporal_task_queue("no-sandbox-python-django") +GENERAL_PURPOSE_TASK_QUEUE = _set_temporal_task_queue("general-purpose-task-queue") +TASKS_TASK_QUEUE = _set_temporal_task_queue("tasks-task-queue") +TEST_TASK_QUEUE = _set_temporal_task_queue("test-task-queue") +BILLING_TASK_QUEUE = _set_temporal_task_queue("billing-task-queue") +VIDEO_EXPORT_TASK_QUEUE = _set_temporal_task_queue("video-export-task-queue") +MESSAGING_TASK_QUEUE = _set_temporal_task_queue("messaging-task-queue") +ANALYTICS_PLATFORM_TASK_QUEUE = _set_temporal_task_queue("analytics-platform-task-queue") +SESSION_REPLAY_TASK_QUEUE = _set_temporal_task_queue("session-replay-task-queue") +WEEKLY_DIGEST_TASK_QUEUE = _set_temporal_task_queue("weekly-digest-task-queue") diff --git a/posthog/temporal/ai/session_summary/summarize_session.py b/posthog/temporal/ai/session_summary/summarize_session.py index e85310fe93..ad76f73df5 100644 --- a/posthog/temporal/ai/session_summary/summarize_session.py +++ b/posthog/temporal/ai/session_summary/summarize_session.py @@ -16,7 +16,6 @@ from temporalio.client import WorkflowExecutionStatus, WorkflowHandle from temporalio.common import RetryPolicy, WorkflowIDReusePolicy from temporalio.exceptions import ApplicationError -from posthog import constants from posthog.models.team.team import Team from posthog.models.user import User from posthog.redis import get_client @@ -365,7 +364,7 @@ async def _execute_single_session_summary_workflow(inputs: SingleSessionSummaryI inputs, id=workflow_id, id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY, - task_queue=constants.MAX_AI_TASK_QUEUE, + task_queue=settings.MAX_AI_TASK_QUEUE, retry_policy=retry_policy, ) @@ -381,7 +380,7 @@ async def _start_single_session_summary_workflow_stream( inputs, id=workflow_id, id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY, - task_queue=constants.MAX_AI_TASK_QUEUE, + task_queue=settings.MAX_AI_TASK_QUEUE, retry_policy=retry_policy, ) return handle diff --git a/posthog/temporal/ai/session_summary/summarize_session_group.py b/posthog/temporal/ai/session_summary/summarize_session_group.py index 8755d9472f..f620f6cca8 100644 --- a/posthog/temporal/ai/session_summary/summarize_session_group.py +++ b/posthog/temporal/ai/session_summary/summarize_session_group.py @@ -17,7 +17,6 @@ from temporalio.exceptions import ApplicationError from posthog.schema import CachedSessionBatchEventsQueryResponse -from posthog import constants from posthog.hogql_queries.ai.session_batch_events_query_runner import ( SessionBatchEventsQueryRunner, create_session_batch_events_query, @@ -582,7 +581,7 @@ async def _start_session_group_summary_workflow( inputs, id=workflow_id, id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY, - task_queue=constants.MAX_AI_TASK_QUEUE, + task_queue=settings.MAX_AI_TASK_QUEUE, retry_policy=retry_policy, ) diff --git a/posthog/temporal/data_imports/deltalake_compaction_job.py b/posthog/temporal/data_imports/deltalake_compaction_job.py index ecd1a38e66..a8dadfaf43 100644 --- a/posthog/temporal/data_imports/deltalake_compaction_job.py +++ b/posthog/temporal/data_imports/deltalake_compaction_job.py @@ -4,6 +4,7 @@ import asyncio import datetime as dt import dataclasses +from django.conf import settings from django.db import close_old_connections from structlog.contextvars import bind_contextvars @@ -12,7 +13,6 @@ from temporalio import activity, workflow from temporalio.common import RetryPolicy from temporalio.exceptions import WorkflowAlreadyStartedError -from posthog.constants import DATA_WAREHOUSE_COMPACTION_TASK_QUEUE from posthog.exceptions_capture import capture_exception from posthog.settings import DEBUG, TEST from posthog.temporal.common.base import PostHogWorkflow @@ -37,7 +37,7 @@ def trigger_compaction_job(job: ExternalDataJob, schema: ExternalDataSchema, log DeltalakeCompactionJobWorkflowInputs(team_id=job.team_id, external_data_job_id=job.id) ), id=workflow_id, - task_queue=str(DATA_WAREHOUSE_COMPACTION_TASK_QUEUE), + task_queue=str(settings.DATA_WAREHOUSE_COMPACTION_TASK_QUEUE), retry_policy=RetryPolicy( maximum_attempts=1, non_retryable_error_types=["NondeterminismError"], diff --git a/posthog/temporal/messaging/realtime_cohort_calculation_workflow_coordinator.py b/posthog/temporal/messaging/realtime_cohort_calculation_workflow_coordinator.py index 146c24e141..acc85f6df1 100644 --- a/posthog/temporal/messaging/realtime_cohort_calculation_workflow_coordinator.py +++ b/posthog/temporal/messaging/realtime_cohort_calculation_workflow_coordinator.py @@ -3,11 +3,12 @@ import datetime as dt import dataclasses from typing import Any, TypedDict +from django.conf import settings + import temporalio.common import temporalio.activity import temporalio.workflow -from posthog.constants import MESSAGING_TASK_QUEUE from posthog.models.action import Action from posthog.sync import database_sync_to_async from posthog.temporal.common.base import PostHogWorkflow @@ -154,7 +155,7 @@ class RealtimeCohortCalculationCoordinatorWorkflow(PostHogWorkflow): RealtimeCohortCalculationWorkflow.run, config["inputs"], id=config["id"], - task_queue=MESSAGING_TASK_QUEUE, + task_queue=settings.MESSAGING_TASK_QUEUE, parent_close_policy=temporalio.workflow.ParentClosePolicy.ABANDON, ) workflows_scheduled += 1 diff --git a/posthog/temporal/proxy_service/create.py b/posthog/temporal/proxy_service/create.py index 2e5dc3666a..dd26bc8aa2 100644 --- a/posthog/temporal/proxy_service/create.py +++ b/posthog/temporal/proxy_service/create.py @@ -7,6 +7,8 @@ import datetime as dt import ipaddress from dataclasses import asdict, dataclass +from django.conf import settings + import grpc.aio import requests import dns.resolver @@ -22,7 +24,6 @@ from temporalio.client import ( ) from temporalio.exceptions import ActivityError, ApplicationError, RetryState -from posthog.constants import GENERAL_PURPOSE_TASK_QUEUE from posthog.models import ProxyRecord from posthog.temporal.common.base import PostHogWorkflow from posthog.temporal.common.client import async_connect @@ -249,7 +250,7 @@ async def schedule_monitor_job(inputs: ScheduleMonitorJobInputs): ) ), id=f"monitor-proxy-{inputs.proxy_record_id}", - task_queue=GENERAL_PURPOSE_TASK_QUEUE, + task_queue=settings.GENERAL_PURPOSE_TASK_QUEUE, retry_policy=temporalio.common.RetryPolicy( initial_interval=dt.timedelta(seconds=30), maximum_interval=dt.timedelta(minutes=5), diff --git a/posthog/temporal/schedule.py b/posthog/temporal/schedule.py index 02c48e1d53..c2d9589f3d 100644 --- a/posthog/temporal/schedule.py +++ b/posthog/temporal/schedule.py @@ -18,13 +18,6 @@ from temporalio.client import ( ScheduleSpec, ) -from posthog.constants import ( - ANALYTICS_PLATFORM_TASK_QUEUE, - BILLING_TASK_QUEUE, - GENERAL_PURPOSE_TASK_QUEUE, - MAX_AI_TASK_QUEUE, - SESSION_REPLAY_TASK_QUEUE, -) from posthog.hogql_queries.ai.vector_search_query_runner import LATEST_ACTIONS_EMBEDDING_VERSION from posthog.temporal.ai import SyncVectorsInputs from posthog.temporal.ai.sync_vectors import EmbeddingVersion @@ -47,7 +40,7 @@ async def create_sync_vectors_schedule(client: Client): "ai-sync-vectors", asdict(SyncVectorsInputs(embedding_versions=EmbeddingVersion(actions=LATEST_ACTIONS_EMBEDDING_VERSION))), id="ai-sync-vectors-schedule", - task_queue=MAX_AI_TASK_QUEUE, + task_queue=settings.MAX_AI_TASK_QUEUE, ), spec=ScheduleSpec(intervals=[ScheduleIntervalSpec(every=timedelta(minutes=30))]), ) @@ -67,7 +60,7 @@ async def create_run_quota_limiting_schedule(client: Client): "run-quota-limiting", asdict(RunQuotaLimitingInputs()), id="run-quota-limiting-schedule", - task_queue=BILLING_TASK_QUEUE, + task_queue=settings.BILLING_TASK_QUEUE, ), spec=ScheduleSpec(cron_expressions=["10,25,40,55 * * * *"]), # Run at minutes 10, 25, 40, and 55 of every hour ) @@ -90,7 +83,7 @@ async def create_schedule_all_subscriptions_schedule(client: Client): "schedule-all-subscriptions", asdict(ScheduleAllSubscriptionsWorkflowInputs()), id="schedule-all-subscriptions-schedule", - task_queue=ANALYTICS_PLATFORM_TASK_QUEUE, + task_queue=settings.ANALYTICS_PLATFORM_TASK_QUEUE, ), spec=ScheduleSpec(cron_expressions=["55 * * * *"]), # Run at minute 55 of every hour ) @@ -116,7 +109,7 @@ async def create_upgrade_queries_schedule(client: Client): "upgrade-queries", asdict(UpgradeQueriesWorkflowInputs()), id="upgrade-queries-schedule", - task_queue=GENERAL_PURPOSE_TASK_QUEUE, + task_queue=settings.GENERAL_PURPOSE_TASK_QUEUE, ), spec=ScheduleSpec(intervals=[ScheduleIntervalSpec(every=timedelta(hours=6))]), ) @@ -137,7 +130,7 @@ async def create_salesforce_enrichment_schedule(client: Client): "salesforce-enrichment-async", SalesforceEnrichmentInputs(chunk_size=DEFAULT_CHUNK_SIZE), id="salesforce-enrichment-schedule", - task_queue=BILLING_TASK_QUEUE, + task_queue=settings.BILLING_TASK_QUEUE, ), spec=ScheduleSpec( calendars=[ @@ -168,7 +161,7 @@ async def create_enforce_max_replay_retention_schedule(client: Client): "enforce-max-replay-retention", EnforceMaxReplayRetentionInput(dry_run=False), id="enforce-max-replay-retention-schedule", - task_queue=SESSION_REPLAY_TASK_QUEUE, + task_queue=settings.SESSION_REPLAY_TASK_QUEUE, retry_policy=common.RetryPolicy( maximum_attempts=1, ), diff --git a/posthog/temporal/tests/ai/test_summarize_session.py b/posthog/temporal/tests/ai/test_summarize_session.py index bda45ac252..a224dbd416 100644 --- a/posthog/temporal/tests/ai/test_summarize_session.py +++ b/posthog/temporal/tests/ai/test_summarize_session.py @@ -9,6 +9,8 @@ from typing import Any import pytest from unittest.mock import AsyncMock, MagicMock, patch +from django.conf import settings + from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, Choice, ChoiceDelta from openai.types.completion_usage import CompletionUsage from pytest_mock import MockerFixture @@ -17,7 +19,6 @@ from temporalio.exceptions import ApplicationError from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.models import Team from posthog.models.user import User from posthog.sync import database_sync_to_async @@ -298,7 +299,7 @@ class TestSummarizeSingleSessionStreamWorkflow: try: async with Worker( activity_environment.client, - task_queue=constants.MAX_AI_TASK_QUEUE, + task_queue=settings.MAX_AI_TASK_QUEUE, workflows=WORKFLOWS, activities=[stream_llm_single_session_summary_activity, fetch_session_data_activity], workflow_runner=UnsandboxedWorkflowRunner(), diff --git a/posthog/temporal/tests/ai/test_summarize_session_group.py b/posthog/temporal/tests/ai/test_summarize_session_group.py index adc12a6506..635f986a46 100644 --- a/posthog/temporal/tests/ai/test_summarize_session_group.py +++ b/posthog/temporal/tests/ai/test_summarize_session_group.py @@ -10,6 +10,8 @@ from typing import Any import pytest from unittest.mock import AsyncMock, MagicMock, patch +from django.conf import settings + from openai.types.chat.chat_completion import ChatCompletion, ChatCompletionMessage, Choice from pytest_mock import MockerFixture from temporalio.client import WorkflowExecutionStatus @@ -17,7 +19,6 @@ from temporalio.exceptions import ApplicationError from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.models import Team from posthog.models.user import User from posthog.redis import get_async_client @@ -923,7 +924,7 @@ class TestSummarizeSessionGroupWorkflow: async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.GENERAL_PURPOSE_TASK_QUEUE, + task_queue=settings.GENERAL_PURPOSE_TASK_QUEUE, workflows=WORKFLOWS, activities=[ get_llm_single_session_summary_activity, diff --git a/posthog/temporal/tests/data_imports/conftest.py b/posthog/temporal/tests/data_imports/conftest.py index 2c57e958eb..022878cf9c 100644 --- a/posthog/temporal/tests/data_imports/conftest.py +++ b/posthog/temporal/tests/data_imports/conftest.py @@ -21,7 +21,6 @@ from posthog.schema import HogQLQueryResponse from posthog.hogql.query import execute_hogql_query -from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE from posthog.temporal.data_imports.external_data_job import ExternalDataJobWorkflow from posthog.temporal.data_imports.settings import ACTIVITIES from posthog.temporal.utils import ExternalDataWorkflowInputs @@ -116,7 +115,7 @@ async def run_external_data_job_workflow( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, + task_queue=settings.DATA_WAREHOUSE_TASK_QUEUE, workflows=[ExternalDataJobWorkflow], activities=ACTIVITIES, # type: ignore workflow_runner=UnsandboxedWorkflowRunner(), @@ -127,7 +126,7 @@ async def run_external_data_job_workflow( ExternalDataJobWorkflow.run, inputs, id=workflow_id, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, + task_queue=settings.DATA_WAREHOUSE_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) diff --git a/posthog/temporal/tests/data_imports/test_compaction_job.py b/posthog/temporal/tests/data_imports/test_compaction_job.py index 409fcd71c6..b486af260e 100644 --- a/posthog/temporal/tests/data_imports/test_compaction_job.py +++ b/posthog/temporal/tests/data_imports/test_compaction_job.py @@ -17,7 +17,6 @@ from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog.constants import DATA_WAREHOUSE_COMPACTION_TASK_QUEUE from posthog.models.team.team import Team from posthog.temporal.data_imports.deltalake_compaction_job import DeltalakeCompactionJobWorkflowInputs from posthog.temporal.data_imports.pipelines.pipeline.delta_table_helper import DeltaTableHelper @@ -106,7 +105,7 @@ async def _run(team: Team): async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=DATA_WAREHOUSE_COMPACTION_TASK_QUEUE, + task_queue=settings.DATA_WAREHOUSE_COMPACTION_TASK_QUEUE, workflows=[DeltalakeCompactionJobWorkflow], activities=ACTIVITIES, # type: ignore workflow_runner=UnsandboxedWorkflowRunner(), @@ -117,7 +116,7 @@ async def _run(team: Team): DeltalakeCompactionJobWorkflow.run, inputs, id=str(uuid.uuid4()), - task_queue=DATA_WAREHOUSE_COMPACTION_TASK_QUEUE, + task_queue=settings.DATA_WAREHOUSE_COMPACTION_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index da40714181..3bdbffb2c1 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -40,7 +40,6 @@ from posthog.schema import ( from posthog.hogql.modifiers import create_default_modifiers_for_team from posthog.hogql.query import execute_hogql_query -from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE from posthog.hogql_queries.insights.funnels.funnel import Funnel from posthog.hogql_queries.insights.funnels.funnel_query_context import FunnelQueryContext from posthog.models import DataWarehouseTable @@ -339,7 +338,7 @@ async def _execute_run(workflow_id: str, inputs: ExternalDataWorkflowInputs, moc async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, + task_queue=settings.DATA_WAREHOUSE_TASK_QUEUE, workflows=[ExternalDataJobWorkflow], activities=ACTIVITIES, # type: ignore workflow_runner=UnsandboxedWorkflowRunner(), @@ -350,7 +349,7 @@ async def _execute_run(workflow_id: str, inputs: ExternalDataWorkflowInputs, moc ExternalDataJobWorkflow.run, inputs, id=workflow_id, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, + task_queue=settings.DATA_WAREHOUSE_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) diff --git a/posthog/temporal/tests/data_modeling/test_run_workflow.py b/posthog/temporal/tests/data_modeling/test_run_workflow.py index 4be2762c8e..1ca5d333d7 100644 --- a/posthog/temporal/tests/data_modeling/test_run_workflow.py +++ b/posthog/temporal/tests/data_modeling/test_run_workflow.py @@ -23,7 +23,6 @@ from asgiref.sync import sync_to_async from posthog.hogql.database.database import Database from posthog.hogql.query import execute_hogql_query -from posthog import constants from posthog.models import Team from posthog.models.event.util import bulk_create_events from posthog.sync import database_sync_to_async @@ -765,7 +764,7 @@ async def test_run_workflow_with_minio_bucket( ): async with temporalio.worker.Worker( temporal_client, - task_queue=constants.DATA_MODELING_TASK_QUEUE, + task_queue=settings.DATA_MODELING_TASK_QUEUE, workflows=[RunWorkflow], activities=[ start_run_activity, @@ -784,7 +783,7 @@ async def test_run_workflow_with_minio_bucket( RunWorkflow.run, inputs, id=workflow_id, - task_queue=constants.DATA_MODELING_TASK_QUEUE, + task_queue=settings.DATA_MODELING_TASK_QUEUE, retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=30), ) @@ -883,7 +882,7 @@ async def test_run_workflow_with_minio_bucket_with_errors( ): async with temporalio.worker.Worker( temporal_client, - task_queue=constants.DATA_MODELING_TASK_QUEUE, + task_queue=settings.DATA_MODELING_TASK_QUEUE, workflows=[RunWorkflow], activities=[ start_run_activity, @@ -902,7 +901,7 @@ async def test_run_workflow_with_minio_bucket_with_errors( RunWorkflow.run, inputs, id=workflow_id, - task_queue=constants.DATA_MODELING_TASK_QUEUE, + task_queue=settings.DATA_MODELING_TASK_QUEUE, retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=30), ) @@ -939,7 +938,7 @@ async def test_run_workflow_revert_materialization( ): async with temporalio.worker.Worker( temporal_client, - task_queue=constants.DATA_MODELING_TASK_QUEUE, + task_queue=settings.DATA_MODELING_TASK_QUEUE, workflows=[RunWorkflow], activities=[ start_run_activity, @@ -958,7 +957,7 @@ async def test_run_workflow_revert_materialization( RunWorkflow.run, inputs, id=workflow_id, - task_queue=constants.DATA_MODELING_TASK_QUEUE, + task_queue=settings.DATA_MODELING_TASK_QUEUE, retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=30), ) diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index ce34cf81ce..16a982b1c2 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -17,7 +17,6 @@ from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE from posthog.models import Team from posthog.temporal.data_imports.external_data_job import ( Any_Source_Errors, @@ -691,7 +690,7 @@ async def test_external_data_job_workflow_with_schema(team, **kwargs): async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, + task_queue=settings.DATA_WAREHOUSE_TASK_QUEUE, workflows=[ExternalDataJobWorkflow], activities=[ create_external_data_job_model_activity, @@ -710,7 +709,7 @@ async def test_external_data_job_workflow_with_schema(team, **kwargs): ExternalDataJobWorkflow.run, inputs, id=workflow_id, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, + task_queue=settings.DATA_WAREHOUSE_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) diff --git a/posthog/warehouse/data_load/saved_query_service.py b/posthog/warehouse/data_load/saved_query_service.py index 3c85eef167..c6f4b44061 100644 --- a/posthog/warehouse/data_load/saved_query_service.py +++ b/posthog/warehouse/data_load/saved_query_service.py @@ -3,6 +3,7 @@ from dataclasses import asdict from datetime import timedelta from typing import TYPE_CHECKING +from django.conf import settings from django.db import transaction import temporalio @@ -17,7 +18,6 @@ from temporalio.client import ( ) from temporalio.common import RetryPolicy -from posthog.constants import DATA_MODELING_TASK_QUEUE from posthog.temporal.common.client import sync_connect from posthog.temporal.common.schedule import ( create_schedule, @@ -60,7 +60,7 @@ def get_saved_query_schedule(saved_query: "DataWarehouseSavedQuery") -> Schedule "data-modeling-run", asdict(inputs), id=str(saved_query.id), - task_queue=str(DATA_MODELING_TASK_QUEUE), + task_queue=str(settings.DATA_MODELING_TASK_QUEUE), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=10), maximum_interval=timedelta(seconds=60), diff --git a/posthog/warehouse/data_load/service.py b/posthog/warehouse/data_load/service.py index e7738d5022..df80bc22c1 100644 --- a/posthog/warehouse/data_load/service.py +++ b/posthog/warehouse/data_load/service.py @@ -19,7 +19,6 @@ from temporalio.client import ( ) from temporalio.common import RetryPolicy -from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE from posthog.temporal.common.client import async_connect, sync_connect from posthog.temporal.common.schedule import ( a_create_schedule, @@ -84,7 +83,7 @@ def to_temporal_schedule( "external-data-job", asdict(inputs), id=str(external_data_schema.id), - task_queue=str(DATA_WAREHOUSE_TASK_QUEUE), + task_queue=str(settings.DATA_WAREHOUSE_TASK_QUEUE), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=10), maximum_interval=timedelta(seconds=60), diff --git a/products/batch_exports/backend/tests/temporal/conftest.py b/products/batch_exports/backend/tests/temporal/conftest.py index 0092a49ae3..a350feb398 100644 --- a/products/batch_exports/backend/tests/temporal/conftest.py +++ b/products/batch_exports/backend/tests/temporal/conftest.py @@ -14,7 +14,6 @@ from infi.clickhouse_orm import Database from psycopg import sql from temporalio.testing import ActivityEnvironment -from posthog import constants from posthog.conftest import create_clickhouse_tables from posthog.models import Organization, Team from posthog.models.utils import uuid7 @@ -288,7 +287,7 @@ async def setup_postgres_test_db(postgres_config): async def temporal_worker(temporal_client, workflows, activities): worker = temporalio.worker.Worker( temporal_client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=workflows, activities=activities, interceptors=[BatchExportsMetricsInterceptor()], diff --git a/products/batch_exports/backend/tests/temporal/destinations/base_destination_tests.py b/products/batch_exports/backend/tests/temporal/destinations/base_destination_tests.py index 2668e2b1a9..e5be6e5624 100644 --- a/products/batch_exports/backend/tests/temporal/destinations/base_destination_tests.py +++ b/products/batch_exports/backend/tests/temporal/destinations/base_destination_tests.py @@ -14,12 +14,13 @@ from collections.abc import AsyncGenerator, Callable import pytest +from django.conf import settings + from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.batch_exports.models import BatchExport from posthog.batch_exports.service import ( BackfillDetails, @@ -164,7 +165,7 @@ class BaseDestinationTest(ABC): async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[self.workflow_class], activities=[ start_batch_export_run, @@ -180,7 +181,7 @@ class BaseDestinationTest(ABC): self.workflow_class.run, # type: ignore[attr-defined] inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(minutes=5), ) @@ -190,7 +191,7 @@ class BaseDestinationTest(ABC): self.workflow_class.run, # type: ignore[attr-defined] inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(minutes=5), ) diff --git a/products/batch_exports/backend/tests/temporal/destinations/bigquery/test_workflow.py b/products/batch_exports/backend/tests/temporal/destinations/bigquery/test_workflow.py index 97e26522a7..bc4e716364 100644 --- a/products/batch_exports/backend/tests/temporal/destinations/bigquery/test_workflow.py +++ b/products/batch_exports/backend/tests/temporal/destinations/bigquery/test_workflow.py @@ -15,6 +15,7 @@ import datetime as dt import pytest import unittest.mock +from django.conf import settings from django.test import override_settings from temporalio import activity @@ -29,7 +30,6 @@ from posthog.batch_exports.service import ( BatchExportSchema, BigQueryBatchExportInputs, ) -from posthog.constants import BATCH_EXPORTS_TASK_QUEUE from posthog.temporal.tests.utils.models import acreate_batch_export, adelete_batch_export, afetch_batch_export_runs from products.batch_exports.backend.temporal.batch_exports import finish_batch_export_run, start_batch_export_run @@ -153,7 +153,7 @@ async def test_bigquery_export_workflow( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[BigQueryBatchExportWorkflow], activities=[ start_batch_export_run, @@ -168,7 +168,7 @@ async def test_bigquery_export_workflow( BigQueryBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=60), ) @@ -249,7 +249,7 @@ async def test_bigquery_export_workflow_without_events( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[BigQueryBatchExportWorkflow], activities=[ start_batch_export_run, @@ -264,7 +264,7 @@ async def test_bigquery_export_workflow_without_events( BigQueryBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=10), ) @@ -334,7 +334,7 @@ async def test_bigquery_export_workflow_backfill_earliest_persons( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[BigQueryBatchExportWorkflow], activities=[ start_batch_export_run, @@ -349,7 +349,7 @@ async def test_bigquery_export_workflow_backfill_earliest_persons( BigQueryBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(minutes=10), ) @@ -401,7 +401,7 @@ async def test_bigquery_export_workflow_handles_unexpected_insert_activity_error async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[BigQueryBatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -427,7 +427,7 @@ async def test_bigquery_export_workflow_handles_unexpected_insert_activity_error BigQueryBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=20), ) @@ -470,7 +470,7 @@ async def test_bigquery_export_workflow_handles_insert_activity_non_retryable_er async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[BigQueryBatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -495,7 +495,7 @@ async def test_bigquery_export_workflow_handles_insert_activity_non_retryable_er BigQueryBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) @@ -537,7 +537,7 @@ async def test_bigquery_export_workflow_handles_cancellation( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[BigQueryBatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -551,7 +551,7 @@ async def test_bigquery_export_workflow_handles_cancellation( BigQueryBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) diff --git a/products/batch_exports/backend/tests/temporal/destinations/redshift/test_workflow.py b/products/batch_exports/backend/tests/temporal/destinations/redshift/test_workflow.py index a2e1128843..cb18dd4b1b 100644 --- a/products/batch_exports/backend/tests/temporal/destinations/redshift/test_workflow.py +++ b/products/batch_exports/backend/tests/temporal/destinations/redshift/test_workflow.py @@ -4,6 +4,7 @@ import datetime as dt import pytest import unittest.mock +from django.conf import settings from django.test import override_settings from psycopg import sql @@ -12,7 +13,6 @@ from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.batch_exports.service import BatchExportModel, BatchExportSchema, RedshiftCopyInputs from posthog.temporal.tests.utils.models import acreate_batch_export, adelete_batch_export, afetch_batch_export_runs @@ -178,7 +178,7 @@ async def test_redshift_export_workflow( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[RedshiftBatchExportWorkflow], activities=[ start_batch_export_run, @@ -195,7 +195,7 @@ async def test_redshift_export_workflow( RedshiftBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=20), ) @@ -261,7 +261,7 @@ async def test_redshift_export_workflow_handles_unexpected_insert_activity_error async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[RedshiftBatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -288,7 +288,7 @@ async def test_redshift_export_workflow_handles_unexpected_insert_activity_error RedshiftBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=20), ) @@ -332,7 +332,7 @@ async def test_redshift_export_workflow_handles_insert_activity_non_retryable_er async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[RedshiftBatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -358,7 +358,7 @@ async def test_redshift_export_workflow_handles_insert_activity_non_retryable_er RedshiftBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) @@ -442,7 +442,7 @@ async def test_redshift_export_workflow_handles_undefined_function_error( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[RedshiftBatchExportWorkflow], activities=[ start_batch_export_run, @@ -459,7 +459,7 @@ async def test_redshift_export_workflow_handles_undefined_function_error( RedshiftBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=20), ) @@ -510,7 +510,7 @@ async def test_redshift_export_workflow_handles_undefined_function_error( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[RedshiftBatchExportWorkflow], activities=[ start_batch_export_run, @@ -527,7 +527,7 @@ async def test_redshift_export_workflow_handles_undefined_function_error( RedshiftBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=20), ) diff --git a/products/batch_exports/backend/tests/temporal/destinations/s3/test_workflow_error_handling.py b/products/batch_exports/backend/tests/temporal/destinations/s3/test_workflow_error_handling.py index 34d5a22222..e3965d01be 100644 --- a/products/batch_exports/backend/tests/temporal/destinations/s3/test_workflow_error_handling.py +++ b/products/batch_exports/backend/tests/temporal/destinations/s3/test_workflow_error_handling.py @@ -7,6 +7,8 @@ import contextlib import pytest from unittest import mock +from django.conf import settings + import aioboto3 import botocore.exceptions from temporalio import activity @@ -16,7 +18,6 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker from types_aiobotocore_s3.client import S3Client -from posthog import constants from posthog.batch_exports.service import BatchExportModel from posthog.temporal.tests.utils.models import afetch_batch_export_runs @@ -69,7 +70,7 @@ async def test_s3_export_workflow_handles_unexpected_insert_activity_errors(atea async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[S3BatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -88,7 +89,7 @@ async def test_s3_export_workflow_handles_unexpected_insert_activity_errors(atea S3BatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) @@ -133,7 +134,7 @@ async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(a async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[S3BatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -151,7 +152,7 @@ async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(a S3BatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) @@ -192,7 +193,7 @@ async def test_s3_export_workflow_handles_cancellation(ateam, s3_batch_export, i async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[S3BatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -206,7 +207,7 @@ async def test_s3_export_workflow_handles_cancellation(ateam, s3_batch_export, i S3BatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) await asyncio.sleep(5) @@ -286,7 +287,7 @@ async def test_s3_export_workflow_with_request_timeouts( await WorkflowEnvironment.start_time_skipping() as activity_environment, Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[S3BatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -308,7 +309,7 @@ async def test_s3_export_workflow_with_request_timeouts( S3BatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=2), execution_timeout=dt.timedelta(minutes=2), ) diff --git a/products/batch_exports/backend/tests/temporal/destinations/s3/utils.py b/products/batch_exports/backend/tests/temporal/destinations/s3/utils.py index 177278fb1d..1ac32a322a 100644 --- a/products/batch_exports/backend/tests/temporal/destinations/s3/utils.py +++ b/products/batch_exports/backend/tests/temporal/destinations/s3/utils.py @@ -13,7 +13,6 @@ from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.batch_exports.service import BackfillDetails, BatchExportModel, BatchExportSchema from posthog.temporal.common.clickhouse import ClickHouseClient from posthog.temporal.tests.utils.models import afetch_batch_export_runs @@ -348,7 +347,7 @@ async def run_s3_batch_export_workflow( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[S3BatchExportWorkflow], activities=[ start_batch_export_run, @@ -362,7 +361,7 @@ async def run_s3_batch_export_workflow( S3BatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(minutes=10), ) diff --git a/products/batch_exports/backend/tests/temporal/destinations/snowflake/test_workflow.py b/products/batch_exports/backend/tests/temporal/destinations/snowflake/test_workflow.py index b7270fa437..4cb93d023b 100644 --- a/products/batch_exports/backend/tests/temporal/destinations/snowflake/test_workflow.py +++ b/products/batch_exports/backend/tests/temporal/destinations/snowflake/test_workflow.py @@ -11,6 +11,7 @@ from uuid import uuid4 import pytest import unittest.mock +from django.conf import settings from django.test import override_settings from temporalio import activity @@ -20,7 +21,6 @@ from temporalio.exceptions import ActivityError, ApplicationError from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.batch_exports.models import BatchExport, BatchExportRun from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse from posthog.temporal.tests.utils.models import afetch_batch_export_runs @@ -65,7 +65,7 @@ async def _run_workflow( await WorkflowEnvironment.start_time_skipping() as activity_environment, Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[SnowflakeBatchExportWorkflow], activities=[ start_batch_export_run, @@ -81,7 +81,7 @@ async def _run_workflow( inputs, id=workflow_id, execution_timeout=dt.timedelta(seconds=10), - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) @@ -372,7 +372,7 @@ async def test_snowflake_export_workflow_handles_cancellation_mocked(ateam, snow await WorkflowEnvironment.start_time_skipping() as activity_environment, Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[SnowflakeBatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -387,7 +387,7 @@ async def test_snowflake_export_workflow_handles_cancellation_mocked(ateam, snow SnowflakeBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) await asyncio.sleep(5) diff --git a/products/batch_exports/backend/tests/temporal/destinations/snowflake/test_workflow_e2e.py b/products/batch_exports/backend/tests/temporal/destinations/snowflake/test_workflow_e2e.py index 153848e66a..eb56198065 100644 --- a/products/batch_exports/backend/tests/temporal/destinations/snowflake/test_workflow_e2e.py +++ b/products/batch_exports/backend/tests/temporal/destinations/snowflake/test_workflow_e2e.py @@ -10,6 +10,7 @@ from uuid import uuid4 import pytest +from django.conf import settings from django.test import override_settings from temporalio.client import WorkflowFailureError @@ -17,7 +18,6 @@ from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.batch_exports.service import BackfillDetails, BatchExportModel, BatchExportSchema from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse from posthog.temporal.tests.utils.models import afetch_batch_export_runs @@ -80,7 +80,7 @@ async def _run_workflow( await WorkflowEnvironment.start_time_skipping() as activity_environment, Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[SnowflakeBatchExportWorkflow], activities=[ start_batch_export_run, @@ -96,7 +96,7 @@ async def _run_workflow( SnowflakeBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=execution_timeout, ) @@ -313,7 +313,7 @@ async def test_snowflake_export_workflow_handles_cancellation( await WorkflowEnvironment.start_time_skipping() as activity_environment, Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[SnowflakeBatchExportWorkflow], activities=[ start_batch_export_run, @@ -330,7 +330,7 @@ async def test_snowflake_export_workflow_handles_cancellation( SnowflakeBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) diff --git a/products/batch_exports/backend/tests/temporal/destinations/test_postgres_batch_export_workflow.py b/products/batch_exports/backend/tests/temporal/destinations/test_postgres_batch_export_workflow.py index ad8cb7aede..0335bb15ca 100644 --- a/products/batch_exports/backend/tests/temporal/destinations/test_postgres_batch_export_workflow.py +++ b/products/batch_exports/backend/tests/temporal/destinations/test_postgres_batch_export_workflow.py @@ -22,7 +22,6 @@ from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.batch_exports.service import BackfillDetails, BatchExportModel, BatchExportSchema from posthog.temporal.common.clickhouse import ClickHouseClient from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse @@ -796,7 +795,7 @@ async def test_postgres_export_workflow( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[PostgresBatchExportWorkflow], activities=[ start_batch_export_run, @@ -810,7 +809,7 @@ async def test_postgres_export_workflow( PostgresBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=10), ) @@ -892,7 +891,7 @@ async def test_postgres_export_workflow_without_events( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[PostgresBatchExportWorkflow], activities=[ start_batch_export_run, @@ -906,7 +905,7 @@ async def test_postgres_export_workflow_without_events( PostgresBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=10), ) @@ -973,7 +972,7 @@ async def test_postgres_export_workflow_backfill_earliest_persons( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[PostgresBatchExportWorkflow], activities=[ start_batch_export_run, @@ -986,7 +985,7 @@ async def test_postgres_export_workflow_backfill_earliest_persons( PostgresBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(minutes=10), ) @@ -1037,7 +1036,7 @@ async def test_postgres_export_workflow_handles_unexpected_insert_activity_error async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[PostgresBatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -1055,7 +1054,7 @@ async def test_postgres_export_workflow_handles_unexpected_insert_activity_error PostgresBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) @@ -1095,7 +1094,7 @@ async def test_postgres_export_workflow_handles_insert_activity_non_retryable_er async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[PostgresBatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -1112,7 +1111,7 @@ async def test_postgres_export_workflow_handles_insert_activity_non_retryable_er PostgresBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) @@ -1147,7 +1146,7 @@ async def test_postgres_export_workflow_handles_cancellation(ateam, postgres_bat async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[PostgresBatchExportWorkflow], activities=[ mocked_start_batch_export_run, @@ -1160,7 +1159,7 @@ async def test_postgres_export_workflow_handles_cancellation(ateam, postgres_bat PostgresBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) await asyncio.sleep(5) @@ -1310,7 +1309,7 @@ async def test_postgres_export_workflow_with_many_files( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[PostgresBatchExportWorkflow], activities=[ start_batch_export_run, @@ -1326,7 +1325,7 @@ async def test_postgres_export_workflow_with_many_files( PostgresBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(minutes=2), ) diff --git a/products/batch_exports/backend/tests/temporal/pipeline/test_entrypoint.py b/products/batch_exports/backend/tests/temporal/pipeline/test_entrypoint.py index 54aff9a23c..a1ca954767 100644 --- a/products/batch_exports/backend/tests/temporal/pipeline/test_entrypoint.py +++ b/products/batch_exports/backend/tests/temporal/pipeline/test_entrypoint.py @@ -5,13 +5,14 @@ from dataclasses import dataclass import pytest +from django.conf import settings + from temporalio import activity, workflow from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.batch_exports.service import BaseBatchExportInputs, BatchExportInsertInputs, BatchExportModel from posthog.models import BatchExport, BatchExportDestination from posthog.temporal.common.base import PostHogWorkflow @@ -163,7 +164,7 @@ class TestErrorHandling: async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[DummyExportWorkflow], activities=[ start_batch_export_run, @@ -180,7 +181,7 @@ class TestErrorHandling: DummyExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(minutes=1), ) @@ -189,7 +190,7 @@ class TestErrorHandling: DummyExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(minutes=1), ) diff --git a/products/batch_exports/backend/tests/temporal/test_backfill_batch_export.py b/products/batch_exports/backend/tests/temporal/test_backfill_batch_export.py index 72f77a47af..0c796749a1 100644 --- a/products/batch_exports/backend/tests/temporal/test_backfill_batch_export.py +++ b/products/batch_exports/backend/tests/temporal/test_backfill_batch_export.py @@ -18,7 +18,6 @@ import temporalio.exceptions from asgiref.sync import sync_to_async from flaky import flaky -from posthog import constants from posthog.models import Team from posthog.temporal.tests.utils.datetimes import date_range from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse @@ -311,7 +310,7 @@ async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule BackfillBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, execution_timeout=dt.timedelta(minutes=1), retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), ) @@ -402,7 +401,7 @@ async def test_backfill_batch_export_workflow_no_end_at( BackfillBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, execution_timeout=dt.timedelta(minutes=1), retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), ) @@ -490,7 +489,7 @@ async def test_backfill_batch_export_workflow_fails_when_schedule_deleted( BackfillBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, execution_timeout=dt.timedelta(seconds=20), retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), ) @@ -533,7 +532,7 @@ async def test_backfill_batch_export_workflow_fails_when_schedule_deleted_after_ BackfillBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, execution_timeout=dt.timedelta(seconds=20), retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), ) @@ -620,7 +619,7 @@ async def test_backfill_batch_export_workflow_is_cancelled_on_repeated_failures( BackfillBatchExportWorkflow.run, inputs, id=backfill_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, execution_timeout=dt.timedelta(minutes=2), retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), ) @@ -691,7 +690,7 @@ async def test_backfill_utc_batch_export_workflow_with_timezone_aware_bounds( BackfillBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, execution_timeout=dt.timedelta(minutes=1), retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), ) @@ -787,7 +786,7 @@ async def test_backfill_aware_batch_export_workflow_with_timezone_aware_bounds( BackfillBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, execution_timeout=dt.timedelta(minutes=1), retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), ) @@ -858,7 +857,7 @@ async def test_backfill_batch_export_workflow_no_start_at(temporal_worker, tempo BackfillBatchExportWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, execution_timeout=dt.timedelta(minutes=1), retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), ) diff --git a/products/batch_exports/backend/tests/temporal/test_metrics.py b/products/batch_exports/backend/tests/temporal/test_metrics.py index fd804b0e4d..9eaa52a4cc 100644 --- a/products/batch_exports/backend/tests/temporal/test_metrics.py +++ b/products/batch_exports/backend/tests/temporal/test_metrics.py @@ -13,7 +13,6 @@ from structlog.testing import capture_logs from temporalio.common import RetryPolicy from posthog.batch_exports.service import BatchExportModel -from posthog.constants import BATCH_EXPORTS_TASK_QUEUE from posthog.temporal.tests.utils.models import acreate_batch_export, adelete_batch_export from products.batch_exports.backend.temporal.destinations.postgres_batch_export import PostgresBatchExportInputs @@ -106,7 +105,7 @@ async def test_interceptor_calls_histogram_metrics( "postgres-export", inputs, id=workflow_id, - task_queue=BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, execution_timeout=dt.timedelta(seconds=10), retry_policy=RetryPolicy(maximum_attempts=1), ) diff --git a/products/batch_exports/backend/tests/temporal/test_monitoring.py b/products/batch_exports/backend/tests/temporal/test_monitoring.py index 9fc87429a3..d33f7ce299 100644 --- a/products/batch_exports/backend/tests/temporal/test_monitoring.py +++ b/products/batch_exports/backend/tests/temporal/test_monitoring.py @@ -5,11 +5,12 @@ import pytest from freezegun import freeze_time from unittest.mock import MagicMock, patch +from django.conf import settings + from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import afetch_batch_export_runs_in_range from posthog.temporal.tests.utils.models import acreate_batch_export, adelete_batch_export, afetch_batch_export_runs @@ -133,7 +134,7 @@ async def _run_workflow(batch_export): async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, workflows=[BatchExportMonitoringWorkflow], activities=[ get_batch_export, @@ -148,7 +149,7 @@ async def _run_workflow(batch_export): BatchExportMonitoringWorkflow.run, inputs, id=workflow_id, - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + task_queue=settings.BATCH_EXPORTS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=dt.timedelta(seconds=30), ) diff --git a/products/llm_analytics/backend/api/evaluation_runs.py b/products/llm_analytics/backend/api/evaluation_runs.py index 677b3cc1de..8fc33a37cb 100644 --- a/products/llm_analytics/backend/api/evaluation_runs.py +++ b/products/llm_analytics/backend/api/evaluation_runs.py @@ -1,6 +1,8 @@ import time import asyncio +from django.conf import settings + import structlog from rest_framework import serializers, viewsets from rest_framework.permissions import IsAuthenticated @@ -9,7 +11,6 @@ from rest_framework.response import Response from temporalio.common import RetryPolicy, WorkflowIDReusePolicy from posthog.api.routing import TeamAndOrgViewSetMixin -from posthog.constants import GENERAL_PURPOSE_TASK_QUEUE from posthog.temporal.common.client import sync_connect from posthog.temporal.llm_analytics.run_evaluation import RunEvaluationInputs @@ -64,7 +65,7 @@ class EvaluationRunViewSet(TeamAndOrgViewSetMixin, viewsets.ViewSet): "run-evaluation", inputs, id=workflow_id, - task_queue=GENERAL_PURPOSE_TASK_QUEUE, + task_queue=settings.GENERAL_PURPOSE_TASK_QUEUE, id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy=RetryPolicy(maximum_attempts=3), ) diff --git a/products/llm_analytics/backend/api/test/test_evaluation_runs.py b/products/llm_analytics/backend/api/test/test_evaluation_runs.py index 01849b141a..4f3b531cb4 100644 --- a/products/llm_analytics/backend/api/test/test_evaluation_runs.py +++ b/products/llm_analytics/backend/api/test/test_evaluation_runs.py @@ -3,6 +3,8 @@ import uuid from posthog.test.base import APIBaseTest from unittest.mock import AsyncMock, MagicMock, patch +from django.conf import settings + from rest_framework import status from ...models.evaluations import Evaluation @@ -52,7 +54,7 @@ class TestEvaluationRunViewSet(APIBaseTest): call_args = mock_client.start_workflow.call_args assert call_args[0][0] == "run-evaluation" # workflow name - assert call_args[1]["task_queue"] == "general-purpose-task-queue" + assert call_args[1]["task_queue"] == settings.GENERAL_PURPOSE_TASK_QUEUE def test_create_evaluation_run_invalid_evaluation(self): """Test creating evaluation run with non-existent evaluation""" diff --git a/products/tasks/backend/temporal/client.py b/products/tasks/backend/temporal/client.py index c9a02e7798..c1440363bc 100644 --- a/products/tasks/backend/temporal/client.py +++ b/products/tasks/backend/temporal/client.py @@ -4,10 +4,11 @@ import asyncio import logging from typing import Optional +from django.conf import settings + import posthoganalytics from temporalio.common import RetryPolicy, WorkflowIDReusePolicy -from posthog.constants import TASKS_TASK_QUEUE from posthog.models.team.team import Team from posthog.models.user import User from posthog.temporal.common.client import async_connect @@ -31,7 +32,7 @@ async def _execute_task_processing_workflow(task_id: str, team_id: int, user_id: workflow_input, id=workflow_id, id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY, - task_queue=TASKS_TASK_QUEUE, + task_queue=settings.TASKS_TASK_QUEUE, retry_policy=retry_policy, ) diff --git a/products/tasks/backend/temporal/process_task/tests/test_workflow.py b/products/tasks/backend/temporal/process_task/tests/test_workflow.py index beb50a4a9f..8ec353a5e5 100644 --- a/products/tasks/backend/temporal/process_task/tests/test_workflow.py +++ b/products/tasks/backend/temporal/process_task/tests/test_workflow.py @@ -7,13 +7,13 @@ from typing import cast import pytest from unittest.mock import patch +from django.conf import settings + from asgiref.sync import sync_to_async from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog import constants - from products.tasks.backend.models import SandboxSnapshot from products.tasks.backend.services.sandbox_environment import SandboxEnvironment, SandboxEnvironmentStatus from products.tasks.backend.temporal.process_task.activities.check_snapshot_exists_for_repository import ( @@ -69,7 +69,7 @@ class TestProcessTaskWorkflow: await WorkflowEnvironment.start_time_skipping() as env, Worker( env.client, - task_queue=constants.TASKS_TASK_QUEUE, + task_queue=settings.TASKS_TASK_QUEUE, workflows=[ProcessTaskWorkflow], activities=[ get_task_details, @@ -94,7 +94,7 @@ class TestProcessTaskWorkflow: ProcessTaskWorkflow.run, task_id, id=workflow_id, - task_queue=constants.TASKS_TASK_QUEUE, + task_queue=settings.TASKS_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), execution_timeout=timedelta(minutes=60), )