feat(data-warehouse): external data job rewrite (#21494)

* WIP

* Reworked the worker to self-manage making schema schedules and use async temporal calls

* Added schema status and use it for the job status

* Fixed existing tests

* Added new tests to cover check_schedule_activity

* Updated the source API to trigger active schemas

* Added master changes for stripe source

* Updated mypy

* add blank to field

* update migrations

* update mypy

* fix tpyes

* Update query snapshots

* Update query snapshots

* fix types

* update mypy

* type ignore

* add comment

* add default args, fix missing schema sync creation, add deletion logic

* remove defaults

* add blank

* cleanup

* add failsafe

* update reload logic

* create new schemas if triggered between reloads

* add schema off check

---------

Co-authored-by: eric <eeoneric@gmail.com>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Tom Owers
2024-04-17 22:08:37 +01:00
committed by GitHub
parent fff6720947
commit b2773cb011
23 changed files with 788 additions and 510 deletions

View File

@@ -1,6 +1,5 @@
import { LemonButton } from '@posthog/lemon-ui'
import { useActions, useValues } from 'kea'
import { router } from 'kea-router'
import { PageHeader } from 'lib/components/PageHeader'
import { FEATURE_FLAGS } from 'lib/constants'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
@@ -10,7 +9,6 @@ import stripeLogo from 'public/stripe-logo.svg'
import zendeskLogo from 'public/zendesk-logo.png'
import { useCallback } from 'react'
import { SceneExport } from 'scenes/sceneTypes'
import { urls } from 'scenes/urls'
import { SourceConfig } from '~/types'
@@ -26,7 +24,7 @@ export const scene: SceneExport = {
}
export function NewSourceWizard(): JSX.Element {
const { modalTitle, modalCaption } = useValues(sourceWizardLogic)
const { onBack, onSubmit, closeWizard, cancelWizard } = useActions(sourceWizardLogic)
const { onBack, onSubmit, closeWizard } = useActions(sourceWizardLogic)
const { currentStep, isLoading, canGoBack, canGoNext, nextButtonText, showSkipButton } =
useValues(sourceWizardLogic)
@@ -65,17 +63,17 @@ export function NewSourceWizard(): JSX.Element {
)
}, [currentStep, isLoading, canGoNext, canGoBack, nextButtonText, showSkipButton])
const onCancel = (): void => {
cancelWizard()
router.actions.push(urls.dataWarehouse())
}
return (
<>
<PageHeader
buttons={
<>
<LemonButton type="secondary" center data-attr="source-form-cancel-button" onClick={onCancel}>
<LemonButton
type="secondary"
center
data-attr="source-form-cancel-button"
onClick={closeWizard}
>
Cancel
</LemonButton>
</>

View File

@@ -424,6 +424,7 @@ export const sourceWizardLogic = kea<sourceWizardLogicType>([
}
},
closeWizard: () => {
actions.onClear()
actions.clearSource()
actions.loadSources(null)
router.actions.push(urls.dataWarehouseSettings())

View File

@@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0401_experiment_exposure_cohort
posthog: 0402_externaldatajob_schema
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019

View File

@@ -2,7 +2,6 @@ posthog/temporal/common/utils.py:0: error: Argument 1 to "abstractclassmethod" h
posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" has named arguments: "cls". Consider marking them positional-only
posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/talk_api.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "str") [assignment]
posthog/hogql/modifiers.py:0: error: Incompatible types in assignment (expression has type "PersonOnEventsMode", variable has type "PersonsOnEventsMode | None") [assignment]
posthog/hogql/database/argmax.py:0: error: Argument "chain" to "Field" has incompatible type "list[str]"; expected "list[str | int]" [arg-type]
posthog/hogql/database/argmax.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance
posthog/hogql/database/argmax.py:0: note: Consider using "Sequence" instead, which is covariant
@@ -134,6 +133,7 @@ posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict ent
posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "LifecycleFilter"; expected "str": "TrendsFilter" [dict-item]
posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "StickinessFilter"; expected "str": "TrendsFilter" [dict-item]
posthog/hogql_queries/legacy_compatibility/feature_flag.py:0: error: Item "AnonymousUser" of "User | AnonymousUser" has no attribute "email" [union-attr]
posthog/hogql/modifiers.py:0: error: Incompatible types in assignment (expression has type "PersonOnEventsMode", variable has type "PersonsOnEventsMode | None") [assignment]
posthog/api/utils.py:0: error: Incompatible types in assignment (expression has type "type[EventDefinition]", variable has type "type[EnterpriseEventDefinition]") [assignment]
posthog/api/utils.py:0: error: Argument 1 to "UUID" has incompatible type "int | str"; expected "str | None" [arg-type]
ee/billing/quota_limiting.py:0: error: List comprehension has incompatible type List[int]; expected List[str] [misc]
@@ -167,13 +167,6 @@ posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "
posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "None" of "list[Expr] | Any | None" has no attribute "append" [union-attr]
ee/billing/billing_manager.py:0: error: TypedDict "CustomerInfo" has no key "available_product_features" [typeddict-item]
ee/billing/billing_manager.py:0: note: Did you mean "available_features"?
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/hogql/resolver.py:0: error: Argument 1 of "visit" is incompatible with supertype "Visitor"; supertype defines the argument type as "AST" [override]
posthog/hogql/resolver.py:0: note: This violates the Liskov substitution principle
posthog/hogql/resolver.py:0: note: See https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
@@ -233,9 +226,6 @@ posthog/hogql/resolver.py:0: error: Argument 1 to "get_child" of "Type" has inco
posthog/hogql/resolver.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "Alias") [assignment]
posthog/hogql/resolver.py:0: error: Argument "alias" to "Alias" has incompatible type "str | int"; expected "str" [arg-type]
posthog/hogql/resolver.py:0: error: Argument 1 to "join" of "str" has incompatible type "list[str | int]"; expected "Iterable[str]" [arg-type]
posthog/temporal/data_imports/external_data_job.py:0: error: Argument "team_id" has incompatible type "int"; expected "str" [arg-type]
posthog/temporal/data_imports/external_data_job.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/external_data_job.py:0: error: Argument "team_id" has incompatible type "int"; expected "str" [arg-type]
posthog/hogql/transforms/lazy_tables.py:0: error: Incompatible default for argument "context" (default has type "None", argument has type "HogQLContext") [assignment]
posthog/hogql/transforms/lazy_tables.py:0: note: PEP 484 prohibits implicit Optional. Accordingly, mypy has changed its default to no_implicit_optional=True
posthog/hogql/transforms/lazy_tables.py:0: note: Use https://github.com/hauntsaninja/no_implicit_optional to automatically upgrade your codebase
@@ -577,6 +567,15 @@ posthog/hogql/database/schema/event_sessions.py:0: error: Statement is unreachab
posthog/api/organization_member.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc]
ee/api/role.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc]
ee/clickhouse/views/insights.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc]
posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: error: Argument 6 has incompatible type "ExternalDataSchema"; expected "str" [arg-type]
posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get" [attr-defined]
posthog/queries/trends/test/test_person.py:0: error: Invalid index type "int" for "HttpResponse"; expected type "str | bytes" [index]
posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get" [attr-defined]

View File

@@ -0,0 +1,25 @@
# Generated by Django 4.1.13 on 2024-04-15 14:32
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
("posthog", "0401_experiment_exposure_cohort"),
]
operations = [
migrations.AddField(
model_name="externaldatajob",
name="schema",
field=models.ForeignKey(
blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to="posthog.externaldataschema"
),
),
migrations.AddField(
model_name="externaldataschema",
name="status",
field=models.CharField(max_length=400, null=True, blank=True),
),
]

View File

@@ -49,3 +49,16 @@ async def sync_connect() -> Client:
settings.TEMPORAL_CLIENT_KEY,
)
return client
async def async_connect() -> Client:
"""Asynchronous connect to Temporal and return a Client."""
client = await connect(
settings.TEMPORAL_HOST,
settings.TEMPORAL_PORT,
settings.TEMPORAL_NAMESPACE,
settings.TEMPORAL_CLIENT_ROOT_CA,
settings.TEMPORAL_CLIENT_CERT,
settings.TEMPORAL_CLIENT_KEY,
)
return client

View File

@@ -12,6 +12,15 @@ async def create_schedule(temporal: Client, id: str, schedule: Schedule, trigger
)
async def a_create_schedule(temporal: Client, id: str, schedule: Schedule, trigger_immediately: bool = False):
"""Async create a Temporal Schedule."""
return await temporal.create_schedule(
id=id,
schedule=schedule,
trigger_immediately=trigger_immediately,
)
@async_to_sync
async def update_schedule(temporal: Client, id: str, schedule: Schedule) -> None:
"""Update a Temporal Schedule."""
@@ -25,6 +34,18 @@ async def update_schedule(temporal: Client, id: str, schedule: Schedule) -> None
)
async def a_update_schedule(temporal: Client, id: str, schedule: Schedule) -> None:
"""Async update a Temporal Schedule."""
handle = temporal.get_schedule_handle(id)
async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate:
return ScheduleUpdate(schedule=schedule)
return await handle.update(
updater=updater,
)
@async_to_sync
async def unpause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None:
"""Unpause a Temporal Schedule."""
@@ -39,6 +60,12 @@ async def delete_schedule(temporal: Client, schedule_id: str) -> None:
await handle.delete()
async def a_delete_schedule(temporal: Client, schedule_id: str) -> None:
"""Async delete a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
await handle.delete()
@async_to_sync
async def describe_schedule(temporal: Client, schedule_id: str):
"""Describe a Temporal Schedule."""
@@ -55,6 +82,21 @@ async def pause_schedule(temporal: Client, schedule_id: str, note: str | None =
@async_to_sync
async def trigger_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None:
"""Pause a Temporal Schedule."""
"""Trigger a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
await handle.trigger()
async def a_trigger_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None:
"""Trigger a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
await handle.trigger()
async def a_schedule_exists(temporal: Client, schedule_id: str) -> bool:
"""Check whether a schedule exists."""
try:
await temporal.get_schedule_handle(schedule_id).describe()
return True
except:
return False

View File

@@ -1,18 +1,20 @@
from posthog.temporal.data_imports.external_data_job import (
ExternalDataJobWorkflow,
create_external_data_job_model,
create_external_data_job_model_activity,
create_source_templates,
run_external_data_job,
import_data_activity,
update_external_data_job_model,
validate_schema_activity,
check_schedule_activity,
)
WORKFLOWS = [ExternalDataJobWorkflow]
ACTIVITIES = [
create_external_data_job_model,
create_external_data_job_model_activity,
update_external_data_job_model,
run_external_data_job,
import_data_activity,
validate_schema_activity,
create_source_templates,
check_schedule_activity,
]

View File

@@ -10,81 +10,32 @@ from temporalio.common import RetryPolicy
# TODO: remove dependency
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.data_imports.pipelines.helpers import aupdate_job_count
from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING
from posthog.temporal.data_imports.pipelines.zendesk.credentials import ZendeskCredentialsToken
from posthog.temporal.utils import ExternalDataWorkflowInputs
from posthog.temporal.data_imports.workflow_activities.create_job_model import (
CreateExternalDataJobModelActivityInputs,
create_external_data_job_model_activity,
)
from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity
from posthog.warehouse.data_load.service import (
a_delete_external_data_schedule,
a_external_data_workflow_exists,
a_sync_external_data_job_workflow,
a_trigger_external_data_workflow,
)
from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source
from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table
from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs
from posthog.warehouse.external_data_source.jobs import (
create_external_data_job,
update_external_job_status,
)
from posthog.warehouse.models import (
ExternalDataJob,
get_active_schemas_for_source_id,
sync_old_schemas_with_new_schemas,
ExternalDataSource,
get_external_data_job,
aget_schema_by_id,
)
from posthog.warehouse.models.external_data_schema import get_postgres_schemas
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.utils import get_instance_region
from typing import Dict, Tuple
import asyncio
from django.conf import settings
from django.utils import timezone
@dataclasses.dataclass
class CreateExternalDataJobInputs:
team_id: int
external_data_source_id: uuid.UUID
@activity.defn
async def create_external_data_job_model(inputs: CreateExternalDataJobInputs) -> Tuple[str, list[Tuple[str, str]]]:
run = await sync_to_async(create_external_data_job)(
team_id=inputs.team_id,
external_data_source_id=inputs.external_data_source_id,
workflow_id=activity.info().workflow_id,
)
source = await sync_to_async(ExternalDataSource.objects.get)(
team_id=inputs.team_id, id=inputs.external_data_source_id
)
source.status = "Running"
await sync_to_async(source.save)()
if source.source_type == ExternalDataSource.Type.POSTGRES:
host = source.job_inputs.get("host")
port = source.job_inputs.get("port")
user = source.job_inputs.get("user")
password = source.job_inputs.get("password")
database = source.job_inputs.get("database")
schema = source.job_inputs.get("schema")
schemas_to_sync = await sync_to_async(get_postgres_schemas)(host, port, database, user, password, schema)
else:
schemas_to_sync = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING.get(source.source_type, ()))
await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore
schemas_to_sync,
source_id=inputs.external_data_source_id,
team_id=inputs.team_id,
)
schemas = await sync_to_async(get_active_schemas_for_source_id)(
team_id=inputs.team_id, source_id=inputs.external_data_source_id
)
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
logger.info(
f"Created external data job with for external data source {inputs.external_data_source_id}",
)
return str(run.id), schemas
from typing import Dict
@dataclasses.dataclass
@@ -115,7 +66,7 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu
class ValidateSchemaInputs:
run_id: str
team_id: int
schemas: list[Tuple[str, str]]
schema_id: uuid.UUID
table_schema: TSchemaTables
table_row_counts: Dict[str, int]
@@ -125,7 +76,7 @@ async def validate_schema_activity(inputs: ValidateSchemaInputs) -> None:
await validate_schema_and_update_table(
run_id=inputs.run_id,
team_id=inputs.team_id,
schemas=inputs.schemas,
schema_id=inputs.schema_id,
table_schema=inputs.table_schema,
table_row_counts=inputs.table_row_counts,
)
@@ -147,141 +98,36 @@ async def create_source_templates(inputs: CreateSourceTemplateInputs) -> None:
await create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id)
@dataclasses.dataclass
class ExternalDataWorkflowInputs:
team_id: int
external_data_source_id: uuid.UUID
@dataclasses.dataclass
class ExternalDataJobInputs:
team_id: int
source_id: uuid.UUID
run_id: str
schemas: list[Tuple[str, str]]
@activity.defn
async def run_external_data_job(inputs: ExternalDataJobInputs) -> Tuple[TSchemaTables, Dict[str, int]]: # noqa: F821
model: ExternalDataJob = await get_external_data_job(
job_id=inputs.run_id,
)
async def check_schedule_activity(inputs: ExternalDataWorkflowInputs) -> bool:
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
job_inputs = PipelineInputs(
source_id=inputs.source_id,
schemas=inputs.schemas,
run_id=inputs.run_id,
team_id=inputs.team_id,
job_type=model.pipeline.source_type,
dataset_name=model.folder_path,
)
endpoints = [schema[1] for schema in inputs.schemas]
source = None
if model.pipeline.source_type == ExternalDataSource.Type.STRIPE:
from posthog.temporal.data_imports.pipelines.stripe.helpers import stripe_source
stripe_secret_key = model.pipeline.job_inputs.get("stripe_secret_key", None)
account_id = model.pipeline.job_inputs.get("stripe_account_id", None)
# Cludge: account_id should be checked here too but can deal with nulls
# until we require re update of account_ids in stripe so they're all store
if not stripe_secret_key:
raise ValueError(f"Stripe secret key not found for job {model.id}")
# Hacky just for specific user
region = get_instance_region()
if region == "EU" and inputs.team_id == 11870:
prev_day = timezone.now() - dt.timedelta(days=1)
start_date = prev_day.replace(hour=0, minute=0, second=0, microsecond=0)
end_date = start_date + dt.timedelta(1)
else:
start_date = None
end_date = None
source = stripe_source(
api_key=stripe_secret_key,
account_id=account_id,
endpoints=tuple(endpoints),
team_id=inputs.team_id,
job_id=inputs.run_id,
start_date=start_date,
end_date=end_date,
# Creates schedules for all schemas if they don't exist yet, and then remove itself as a source schedule
if inputs.external_data_schema_id is None:
logger.info("Schema ID is none, creating schedules for schemas...")
schemas = await get_active_schemas_for_source_id(
team_id=inputs.team_id, source_id=inputs.external_data_source_id
)
elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT:
from posthog.temporal.data_imports.pipelines.hubspot.auth import refresh_access_token
from posthog.temporal.data_imports.pipelines.hubspot import hubspot
for schema in schemas:
if await a_external_data_workflow_exists(schema.id):
await a_trigger_external_data_workflow(schema)
logger.info(f"Schedule exists for schema {schema.id}. Triggered schedule")
else:
await a_sync_external_data_job_workflow(schema, create=True)
logger.info(f"Created schedule for schema {schema.id}")
# Delete the source schedule in favour of the schema schedules
await a_delete_external_data_schedule(ExternalDataSource(id=inputs.external_data_source_id))
logger.info(f"Deleted schedule for source {inputs.external_data_source_id}")
return True
hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None)
refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None)
if not refresh_token:
raise ValueError(f"Hubspot refresh token not found for job {model.id}")
schema_model = await aget_schema_by_id(inputs.external_data_schema_id, inputs.team_id)
if not hubspot_access_code:
hubspot_access_code = refresh_access_token(refresh_token)
# schema turned off so don't sync
if schema_model and not schema_model.should_sync:
return True
source = hubspot(
api_key=hubspot_access_code,
refresh_token=refresh_token,
endpoints=tuple(endpoints),
)
elif model.pipeline.source_type == ExternalDataSource.Type.POSTGRES:
from posthog.temporal.data_imports.pipelines.postgres import postgres_source
host = model.pipeline.job_inputs.get("host")
port = model.pipeline.job_inputs.get("port")
user = model.pipeline.job_inputs.get("user")
password = model.pipeline.job_inputs.get("password")
database = model.pipeline.job_inputs.get("database")
schema = model.pipeline.job_inputs.get("schema")
source = postgres_source(
host=host,
port=port,
user=user,
password=password,
database=database,
sslmode="prefer" if settings.TEST or settings.DEBUG else "require",
schema=schema,
table_names=endpoints,
)
elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK:
from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support
credentials = ZendeskCredentialsToken()
credentials.token = model.pipeline.job_inputs.get("zendesk_api_key")
credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain")
credentials.email = model.pipeline.job_inputs.get("zendesk_email_address")
data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id)
# Uncomment to support zendesk chat and talk
# data_chat = zendesk_chat()
# data_talk = zendesk_talk()
source = data_support
else:
raise ValueError(f"Source type {model.pipeline.source_type} not supported")
# Temp background heartbeat for now
async def heartbeat() -> None:
while True:
await asyncio.sleep(10)
activity.heartbeat()
heartbeat_task = asyncio.create_task(heartbeat())
try:
table_row_counts = await DataImportPipeline(job_inputs, source, logger).run()
total_rows_synced = sum(table_row_counts.values())
await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced)
finally:
heartbeat_task.cancel()
await asyncio.wait([heartbeat_task])
return source.schema.tables, table_row_counts
logger.info("Schema ID is set. Continuing...")
return False
# TODO: update retry policies
@@ -296,14 +142,30 @@ class ExternalDataJobWorkflow(PostHogWorkflow):
async def run(self, inputs: ExternalDataWorkflowInputs):
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
# create external data job and trigger activity
create_external_data_job_inputs = CreateExternalDataJobInputs(
team_id=inputs.team_id,
external_data_source_id=inputs.external_data_source_id,
should_exit = await workflow.execute_activity(
check_schedule_activity,
inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)
run_id, schemas = await workflow.execute_activity(
create_external_data_job_model,
if should_exit:
return
assert inputs.external_data_schema_id is not None
# create external data job and trigger activity
create_external_data_job_inputs = CreateExternalDataJobModelActivityInputs(
team_id=inputs.team_id, schema_id=inputs.external_data_schema_id, source_id=inputs.external_data_source_id
)
run_id = await workflow.execute_activity(
create_external_data_job_model_activity,
create_external_data_job_inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
@@ -319,15 +181,15 @@ class ExternalDataJobWorkflow(PostHogWorkflow):
)
try:
job_inputs = ExternalDataJobInputs(
source_id=inputs.external_data_source_id,
job_inputs = ImportDataActivityInputs(
team_id=inputs.team_id,
run_id=run_id,
schemas=schemas,
schema_id=inputs.external_data_schema_id,
source_id=inputs.external_data_source_id,
)
table_schemas, table_row_counts = await workflow.execute_activity(
run_external_data_job,
import_data_activity,
job_inputs,
start_to_close_timeout=dt.timedelta(hours=30),
retry_policy=RetryPolicy(maximum_attempts=5),
@@ -338,7 +200,7 @@ class ExternalDataJobWorkflow(PostHogWorkflow):
validate_inputs = ValidateSchemaInputs(
run_id=run_id,
team_id=inputs.team_id,
schemas=schemas,
schema_id=inputs.external_data_schema_id,
table_schema=table_schemas,
table_row_counts=table_row_counts,
)

View File

@@ -17,7 +17,7 @@ from dlt.sources import DltSource
class PipelineInputs:
source_id: UUID
run_id: str
schemas: list[tuple[str, str]]
schema_id: UUID
dataset_name: str
job_type: str
team_id: int
@@ -68,13 +68,6 @@ class DataImportPipeline:
dataset_name=self.inputs.dataset_name,
)
def _get_schemas(self):
if not self.inputs.schemas:
self.logger.info(f"No schemas found for source id {self.inputs.source_id}")
return None
return self.inputs.schemas
def _run(self) -> Dict[str, int]:
pipeline = self._create_pipeline()
pipeline.run(self.source, loader_file_format=self.loader_file_format)
@@ -86,10 +79,6 @@ class DataImportPipeline:
return dict(filtered_rows)
async def run(self) -> Dict[str, int]:
schemas = self._get_schemas()
if not schemas:
return {}
try:
return await asyncio.to_thread(self._run)
except PipelineStepFailed:

View File

@@ -117,8 +117,8 @@ class SqlDatabaseTableConfiguration(BaseConfiguration):
class SqlTableResourceConfiguration(BaseConfiguration):
credentials: ConnectionStringCredentials
table: str
incremental: Optional[dlt.sources.incremental] = None
schema: Optional[str]
incremental: Optional[dlt.sources.incremental] = None
__source_name__ = "sql_database"

View File

@@ -13,7 +13,7 @@ class ZendeskCredentialsBase(CredentialsConfiguration):
The Base version of all the ZendeskCredential classes.
"""
subdomain: str
subdomain: str = ""
__config_gen_annotations__: ClassVar[List[str]] = []
@@ -23,7 +23,7 @@ class ZendeskCredentialsEmailPass(ZendeskCredentialsBase):
This class is used to store credentials for Email + Password Authentication
"""
email: str
email: str = ""
password: TSecretValue
@@ -42,7 +42,7 @@ class ZendeskCredentialsToken(ZendeskCredentialsBase):
This class is used to store credentials for Token Authentication
"""
email: str
email: str = ""
token: TSecretValue

View File

@@ -0,0 +1,68 @@
import dataclasses
import uuid
from asgiref.sync import sync_to_async
from temporalio import activity
# TODO: remove dependency
from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING
from posthog.warehouse.external_data_source.jobs import (
create_external_data_job,
)
from posthog.warehouse.models import (
sync_old_schemas_with_new_schemas,
ExternalDataSource,
)
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, get_postgres_schemas
from posthog.temporal.common.logger import bind_temporal_worker_logger
@dataclasses.dataclass
class CreateExternalDataJobModelActivityInputs:
team_id: int
schema_id: uuid.UUID
source_id: uuid.UUID
@activity.defn
async def create_external_data_job_model_activity(inputs: CreateExternalDataJobModelActivityInputs) -> str:
run = await sync_to_async(create_external_data_job)(
team_id=inputs.team_id,
external_data_source_id=inputs.source_id,
external_data_schema_id=inputs.schema_id,
workflow_id=activity.info().workflow_id,
)
schema = await sync_to_async(ExternalDataSchema.objects.get)(team_id=inputs.team_id, id=inputs.schema_id)
schema.status = ExternalDataSchema.Status.RUNNING
await sync_to_async(schema.save)()
source = await sync_to_async(ExternalDataSource.objects.get)(team_id=inputs.team_id, id=inputs.source_id)
if source.source_type == ExternalDataSource.Type.POSTGRES:
host = source.job_inputs.get("host")
port = source.job_inputs.get("port")
user = source.job_inputs.get("user")
password = source.job_inputs.get("password")
database = source.job_inputs.get("database")
schema = source.job_inputs.get("schema")
schemas_to_sync = await sync_to_async(get_postgres_schemas)(host, port, database, user, password, schema)
else:
schemas_to_sync = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING.get(source.source_type, ()))
# TODO: this could cause a race condition where each schema worker creates the missing schema
await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore
schemas_to_sync,
source_id=inputs.source_id,
team_id=inputs.team_id,
)
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
logger.info(
f"Created external data job with for external data source {inputs.source_id}",
)
return str(run.id)

View File

@@ -0,0 +1,159 @@
import dataclasses
import datetime as dt
import uuid
from dlt.common.schema.typing import TSchemaTables
from temporalio import activity
# TODO: remove dependency
from posthog.temporal.data_imports.pipelines.helpers import aupdate_job_count
from posthog.temporal.data_imports.pipelines.zendesk.credentials import ZendeskCredentialsToken
from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs
from posthog.utils import get_instance_region
from posthog.warehouse.models import (
ExternalDataJob,
ExternalDataSource,
get_external_data_job,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
from typing import Dict, Tuple
import asyncio
from django.conf import settings
from django.utils import timezone
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id
@dataclasses.dataclass
class ImportDataActivityInputs:
team_id: int
schema_id: uuid.UUID
source_id: uuid.UUID
run_id: str
@activity.defn
async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchemaTables, Dict[str, int]]: # noqa: F821
model: ExternalDataJob = await get_external_data_job(
job_id=inputs.run_id,
)
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
job_inputs = PipelineInputs(
source_id=inputs.source_id,
schema_id=inputs.schema_id,
run_id=inputs.run_id,
team_id=inputs.team_id,
job_type=model.pipeline.source_type,
dataset_name=model.folder_path,
)
schema: ExternalDataSchema = await aget_schema_by_id(inputs.schema_id, inputs.team_id)
endpoints = [schema.name]
source = None
if model.pipeline.source_type == ExternalDataSource.Type.STRIPE:
from posthog.temporal.data_imports.pipelines.stripe.helpers import stripe_source
stripe_secret_key = model.pipeline.job_inputs.get("stripe_secret_key", None)
account_id = model.pipeline.job_inputs.get("stripe_account_id", None)
# Cludge: account_id should be checked here too but can deal with nulls
# until we require re update of account_ids in stripe so they're all store
if not stripe_secret_key:
raise ValueError(f"Stripe secret key not found for job {model.id}")
# Hacky just for specific user
region = get_instance_region()
if region == "EU" and inputs.team_id == 11870:
prev_day = timezone.now() - dt.timedelta(days=1)
start_date = prev_day.replace(hour=0, minute=0, second=0, microsecond=0)
end_date = start_date + dt.timedelta(1)
else:
start_date = None
end_date = None
source = stripe_source(
api_key=stripe_secret_key,
account_id=account_id,
endpoints=tuple(endpoints),
team_id=inputs.team_id,
job_id=inputs.run_id,
start_date=start_date,
end_date=end_date,
)
elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT:
from posthog.temporal.data_imports.pipelines.hubspot.auth import refresh_access_token
from posthog.temporal.data_imports.pipelines.hubspot import hubspot
hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None)
refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None)
if not refresh_token:
raise ValueError(f"Hubspot refresh token not found for job {model.id}")
if not hubspot_access_code:
hubspot_access_code = refresh_access_token(refresh_token)
source = hubspot(
api_key=hubspot_access_code,
refresh_token=refresh_token,
endpoints=tuple(endpoints),
)
elif model.pipeline.source_type == ExternalDataSource.Type.POSTGRES:
from posthog.temporal.data_imports.pipelines.postgres import postgres_source
host = model.pipeline.job_inputs.get("host")
port = model.pipeline.job_inputs.get("port")
user = model.pipeline.job_inputs.get("user")
password = model.pipeline.job_inputs.get("password")
database = model.pipeline.job_inputs.get("database")
pg_schema = model.pipeline.job_inputs.get("schema")
source = postgres_source(
host=host,
port=port,
user=user,
password=password,
database=database,
sslmode="prefer" if settings.TEST or settings.DEBUG else "require",
schema=pg_schema,
table_names=endpoints,
)
elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK:
from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support
# NOTE: this line errors on CI mypy but not locally. Putting arguments within the function causes the opposite error
credentials = ZendeskCredentialsToken()
credentials.token = model.pipeline.job_inputs.get("zendesk_api_key")
credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain")
credentials.email = model.pipeline.job_inputs.get("zendesk_email_address")
data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id)
# Uncomment to support zendesk chat and talk
# data_chat = zendesk_chat()
# data_talk = zendesk_talk()
source = data_support
else:
raise ValueError(f"Source type {model.pipeline.source_type} not supported")
# Temp background heartbeat for now
async def heartbeat() -> None:
while True:
await asyncio.sleep(10)
activity.heartbeat()
heartbeat_task = asyncio.create_task(heartbeat())
try:
table_row_counts = await DataImportPipeline(job_inputs, source, logger).run()
total_rows_synced = sum(table_row_counts.values())
await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced)
finally:
heartbeat_task.cancel()
await asyncio.wait([heartbeat_task])
return source.schema.tables, table_row_counts

View File

@@ -6,21 +6,23 @@ from asgiref.sync import sync_to_async
from django.test import override_settings
from posthog.temporal.data_imports.external_data_job import (
CreateExternalDataJobInputs,
UpdateExternalDataJobStatusInputs,
ValidateSchemaInputs,
create_external_data_job,
create_external_data_job_model,
check_schedule_activity,
create_source_templates,
run_external_data_job,
update_external_data_job_model,
validate_schema_activity,
)
from posthog.temporal.data_imports.external_data_job import (
ExternalDataJobWorkflow,
ExternalDataJobInputs,
ExternalDataWorkflowInputs,
)
from posthog.temporal.data_imports.workflow_activities.create_job_model import (
CreateExternalDataJobModelActivityInputs,
create_external_data_job_model_activity,
)
from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity
from posthog.warehouse.external_data_source.jobs import create_external_data_job
from posthog.warehouse.models import (
get_latest_run_if_exists,
DataWarehouseTable,
@@ -146,13 +148,16 @@ async def test_create_external_job_activity(activity_environment, team, **kwargs
source_type="Stripe",
)
inputs = CreateExternalDataJobInputs(team_id=team.id, external_data_source_id=new_source.pk)
test_1_schema = await _create_schema("test-1", new_source, team)
run_id, schemas = await activity_environment.run(create_external_data_job_model, inputs)
inputs = CreateExternalDataJobModelActivityInputs(
team_id=team.id, source_id=new_source.pk, schema_id=test_1_schema.id
)
run_id = await activity_environment.run(create_external_data_job_model_activity, inputs)
runs = ExternalDataJob.objects.filter(id=run_id)
assert await sync_to_async(runs.exists)()
assert len(schemas) == 0
@pytest.mark.django_db(transaction=True)
@@ -167,26 +172,18 @@ async def test_create_external_job_activity_schemas_exist(activity_environment,
source_type="Stripe",
)
await sync_to_async(ExternalDataSchema.objects.create)(
schema = await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0],
team_id=team.id,
source_id=new_source.pk,
)
await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][1],
team_id=team.id,
source_id=new_source.pk,
should_sync=False,
)
inputs = CreateExternalDataJobModelActivityInputs(team_id=team.id, source_id=new_source.pk, schema_id=schema.id)
inputs = CreateExternalDataJobInputs(team_id=team.id, external_data_source_id=new_source.pk)
run_id, schemas = await activity_environment.run(create_external_data_job_model, inputs)
run_id = await activity_environment.run(create_external_data_job_model_activity, inputs)
runs = ExternalDataJob.objects.filter(id=run_id)
assert await sync_to_async(runs.exists)()
assert len(schemas) == 1
@pytest.mark.django_db(transaction=True)
@@ -201,22 +198,16 @@ async def test_create_external_job_activity_update_schemas(activity_environment,
source_type="Stripe",
)
await sync_to_async(ExternalDataSchema.objects.create)(
schema = await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0],
team_id=team.id,
source_id=new_source.pk,
should_sync=True,
)
await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][1],
team_id=team.id,
source_id=new_source.pk,
)
inputs = CreateExternalDataJobModelActivityInputs(team_id=team.id, source_id=new_source.pk, schema_id=schema.id)
inputs = CreateExternalDataJobInputs(team_id=team.id, external_data_source_id=new_source.pk)
run_id, schemas = await activity_environment.run(create_external_data_job_model, inputs)
run_id = await activity_environment.run(create_external_data_job_model_activity, inputs)
runs = ExternalDataJob.objects.filter(id=run_id)
assert await sync_to_async(runs.exists)()
@@ -241,8 +232,18 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs
source_type="Stripe",
)
schema = await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0],
team_id=team.id,
source_id=new_source.pk,
should_sync=True,
)
new_job = await sync_to_async(create_external_data_job)(
team_id=team.id, external_data_source_id=new_source.pk, workflow_id=activity_environment.info.workflow_id
team_id=team.id,
external_data_source_id=new_source.pk,
workflow_id=activity_environment.info.workflow_id,
external_data_schema_id=schema.id,
)
inputs = UpdateExternalDataJobStatusInputs(
@@ -255,8 +256,10 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs
await activity_environment.run(update_external_data_job_model, inputs)
await sync_to_async(new_job.refresh_from_db)()
await sync_to_async(schema.refresh_from_db)()
assert new_job.status == ExternalDataJob.Status.COMPLETED
assert schema.status == ExternalDataJob.Status.COMPLETED
@pytest.mark.django_db(transaction=True)
@@ -283,13 +286,12 @@ async def test_run_stripe_job(activity_environment, team, minio_client, **kwargs
new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()
customer_schema = await _create_schema("Customer", new_source, team)
schemas = [(customer_schema.id, "Customer")]
inputs = ExternalDataJobInputs(
inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=new_job.pk,
source_id=new_source.pk,
schemas=schemas,
schema_id=customer_schema.id,
)
return new_job, inputs
@@ -314,14 +316,13 @@ async def test_run_stripe_job(activity_environment, team, minio_client, **kwargs
new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()
customer_schema = await _create_schema("Customer", new_source, team)
invoice_schema = await _create_schema("Invoice", new_source, team)
schemas = [(customer_schema.id, "Customer"), (invoice_schema.id, "Invoice")]
inputs = ExternalDataJobInputs(
inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=new_job.pk,
source_id=new_source.pk,
schemas=schemas,
schema_id=invoice_schema.id,
)
return new_job, inputs
@@ -356,26 +357,18 @@ async def test_run_stripe_job(activity_environment, team, minio_client, **kwargs
"has_more": False,
}
await asyncio.gather(
activity_environment.run(run_external_data_job, job_1_inputs),
activity_environment.run(run_external_data_job, job_2_inputs),
activity_environment.run(import_data_activity, job_1_inputs),
activity_environment.run(import_data_activity, job_2_inputs),
)
job_1_customer_objects = await minio_client.list_objects_v2(
Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/customer/"
)
job_1_invoice_objects = await minio_client.list_objects_v2(
Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/invoice/"
)
assert len(job_1_customer_objects["Contents"]) == 1
assert job_1_invoice_objects.get("Contents", None) is None
job_2_customer_objects = await minio_client.list_objects_v2(
Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path}/customer/"
)
job_2_invoice_objects = await minio_client.list_objects_v2(
Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path}/invoice/"
)
assert len(job_2_customer_objects["Contents"]) == 1
assert len(job_2_invoice_objects["Contents"]) == 1
@@ -405,12 +398,12 @@ async def test_run_stripe_job_cancelled(activity_environment, team, minio_client
new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()
customer_schema = await _create_schema("Customer", new_source, team)
schemas = [(customer_schema.id, "Customer")]
inputs = ExternalDataJobInputs(
inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=new_job.pk,
source_id=new_source.pk,
schemas=schemas,
schema_id=customer_schema.id,
)
return new_job, inputs
@@ -432,7 +425,7 @@ async def test_run_stripe_job_cancelled(activity_environment, team, minio_client
"has_more": True,
}
await asyncio.gather(
activity_environment.run(run_external_data_job, job_1_inputs),
activity_environment.run(import_data_activity, job_1_inputs),
)
job_1_customer_objects = await minio_client.list_objects_v2(
@@ -470,12 +463,12 @@ async def test_run_stripe_job_row_count_update(activity_environment, team, minio
new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()
customer_schema = await _create_schema("Customer", new_source, team)
schemas = [(customer_schema.id, "Customer")]
inputs = ExternalDataJobInputs(
inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=new_job.pk,
source_id=new_source.pk,
schemas=schemas,
schema_id=customer_schema.id,
)
return new_job, inputs
@@ -499,7 +492,7 @@ async def test_run_stripe_job_row_count_update(activity_environment, team, minio
"has_more": False,
}
await asyncio.gather(
activity_environment.run(run_external_data_job, job_1_inputs),
activity_environment.run(import_data_activity, job_1_inputs),
)
job_1_customer_objects = await minio_client.list_objects_v2(
@@ -533,17 +526,6 @@ async def test_validate_schema_and_update_table_activity(activity_environment, t
)
test_1_schema = await _create_schema("test-1", new_source, team)
test_2_schema = await _create_schema("test-2", new_source, team)
test_3_schema = await _create_schema("test-3", new_source, team)
test_4_schema = await _create_schema("test-4", new_source, team)
test_5_schema = await _create_schema("test-5", new_source, team)
schemas = [
(test_1_schema.id, "test-1"),
(test_2_schema.id, "test-2"),
(test_3_schema.id, "test-3"),
(test_4_schema.id, "test-4"),
(test_5_schema.id, "test-5"),
]
with mock.patch(
"posthog.warehouse.models.table.DataWarehouseTable.get_columns"
@@ -554,21 +536,17 @@ async def test_validate_schema_and_update_table_activity(activity_environment, t
ValidateSchemaInputs(
run_id=new_job.pk,
team_id=team.id,
schemas=schemas,
schema_id=test_1_schema.id,
table_schema={
"test-1": {"name": "test-1", "resource": "test-1", "columns": {"id": {"data_type": "text"}}},
"test-2": {"name": "test-2", "resource": "test-2", "columns": {"id": {"data_type": "text"}}},
"test-3": {"name": "test-3", "resource": "test-3", "columns": {"id": {"data_type": "text"}}},
"test-4": {"name": "test-4", "resource": "test-4", "columns": {"id": {"data_type": "text"}}},
"test-5": {"name": "test-5", "resource": "test-5", "columns": {"id": {"data_type": "text"}}},
},
table_row_counts={},
),
)
assert mock_get_columns.call_count == 10
assert mock_get_columns.call_count == 2
assert (
await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 5
await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1
)
@@ -618,17 +596,6 @@ async def test_validate_schema_and_update_table_activity_with_existing(activity_
)
test_1_schema = await _create_schema("test-1", new_source, team, table_id=existing_table.id)
test_2_schema = await _create_schema("test-2", new_source, team)
test_3_schema = await _create_schema("test-3", new_source, team)
test_4_schema = await _create_schema("test-4", new_source, team)
test_5_schema = await _create_schema("test-5", new_source, team)
schemas = [
(test_1_schema.id, "test-1"),
(test_2_schema.id, "test-2"),
(test_3_schema.id, "test-3"),
(test_4_schema.id, "test-4"),
(test_5_schema.id, "test-5"),
]
with mock.patch(
"posthog.warehouse.models.table.DataWarehouseTable.get_columns"
@@ -639,21 +606,17 @@ async def test_validate_schema_and_update_table_activity_with_existing(activity_
ValidateSchemaInputs(
run_id=new_job.pk,
team_id=team.id,
schemas=schemas,
schema_id=test_1_schema.id,
table_schema={
"test-1": {"name": "test-1", "resource": "test-1", "columns": {"id": {"data_type": "text"}}},
"test-2": {"name": "test-2", "resource": "test-2", "columns": {"id": {"data_type": "text"}}},
"test-3": {"name": "test-3", "resource": "test-3", "columns": {"id": {"data_type": "text"}}},
"test-4": {"name": "test-4", "resource": "test-4", "columns": {"id": {"data_type": "text"}}},
"test-5": {"name": "test-5", "resource": "test-5", "columns": {"id": {"data_type": "text"}}},
},
table_row_counts={},
),
)
assert mock_get_columns.call_count == 10
assert mock_get_columns.call_count == 2
assert (
await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 5
await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1
)
@@ -699,34 +662,27 @@ async def test_validate_schema_and_update_table_activity_half_run(activity_envir
]
broken_schema = await _create_schema("broken_schema", new_source, team)
test_schema = await _create_schema("test_schema", new_source, team)
schemas = [(broken_schema.id, "broken_schema"), (test_schema.id, "test_schema")]
await activity_environment.run(
validate_schema_activity,
ValidateSchemaInputs(
run_id=new_job.pk,
team_id=team.id,
schemas=schemas,
schema_id=broken_schema.id,
table_schema={
"broken_schema": {
"name": "broken_schema",
"resource": "broken_schema",
"columns": {"id": {"data_type": "text"}},
},
"test_schema": {
"name": "test_schema",
"resource": "test_schema",
"columns": {"id": {"data_type": "text"}},
},
},
table_row_counts={},
),
)
assert mock_get_columns.call_count == 1
assert mock_get_columns.call_count == 0
assert (
await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1
await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 0
)
@@ -751,17 +707,6 @@ async def test_create_schema_activity(activity_environment, team, **kwargs):
)
test_1_schema = await _create_schema("test-1", new_source, team)
test_2_schema = await _create_schema("test-2", new_source, team)
test_3_schema = await _create_schema("test-3", new_source, team)
test_4_schema = await _create_schema("test-4", new_source, team)
test_5_schema = await _create_schema("test-5", new_source, team)
schemas = [
(test_1_schema.id, "test-1"),
(test_2_schema.id, "test-2"),
(test_3_schema.id, "test-3"),
(test_4_schema.id, "test-4"),
(test_5_schema.id, "test-5"),
]
with mock.patch(
"posthog.warehouse.models.table.DataWarehouseTable.get_columns"
@@ -772,74 +717,18 @@ async def test_create_schema_activity(activity_environment, team, **kwargs):
ValidateSchemaInputs(
run_id=new_job.pk,
team_id=team.id,
schemas=schemas,
schema_id=test_1_schema.id,
table_schema={
"test-1": {"name": "test-1", "resource": "test-1", "columns": {"id": {"data_type": "text"}}},
"test-2": {"name": "test-2", "resource": "test-2", "columns": {"id": {"data_type": "text"}}},
"test-3": {"name": "test-3", "resource": "test-3", "columns": {"id": {"data_type": "text"}}},
"test-4": {"name": "test-4", "resource": "test-4", "columns": {"id": {"data_type": "text"}}},
"test-5": {"name": "test-5", "resource": "test-5", "columns": {"id": {"data_type": "text"}}},
},
table_row_counts={},
),
)
assert mock_get_columns.call_count == 10
assert mock_get_columns.call_count == 2
all_tables = DataWarehouseTable.objects.all()
table_length = await sync_to_async(len)(all_tables)
assert table_length == 5
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_external_data_job_workflow_blank(team, **kwargs):
"""
Test workflow with no schema.
Smoke test for making sure all activities run.
"""
new_source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key"},
)
workflow_id = str(uuid.uuid4())
inputs = ExternalDataWorkflowInputs(
team_id=team.id,
external_data_source_id=new_source.pk,
)
with override_settings(AIRBYTE_BUCKET_KEY="test-key", AIRBYTE_BUCKET_SECRET="test-secret"):
with mock.patch.dict(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, {ExternalDataSource.Type.STRIPE: ()}):
async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
activity_environment.client,
task_queue=DATA_WAREHOUSE_TASK_QUEUE,
workflows=[ExternalDataJobWorkflow],
activities=[
create_external_data_job_model,
update_external_data_job_model,
run_external_data_job,
validate_schema_activity,
create_source_templates,
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
await activity_environment.client.execute_workflow(
ExternalDataJobWorkflow.run,
inputs,
id=workflow_id,
task_queue=DATA_WAREHOUSE_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
)
run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=new_source.pk)
assert run is not None
assert run.status == ExternalDataJob.Status.COMPLETED
assert table_length == 1
@pytest.mark.django_db(transaction=True)
@@ -858,20 +747,19 @@ async def test_external_data_job_workflow_with_schema(team, **kwargs):
job_inputs={"stripe_secret_key": "test-key"},
)
schema = await sync_to_async(ExternalDataSchema.objects.create)(
name="Customer",
team_id=team.id,
source_id=new_source.pk,
)
workflow_id = str(uuid.uuid4())
inputs = ExternalDataWorkflowInputs(
team_id=team.id,
external_data_source_id=new_source.pk,
external_data_schema_id=schema.id,
)
schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type]
for schema in schemas:
await sync_to_async(ExternalDataSchema.objects.create)(
name=schema,
team_id=team.id,
source_id=new_source.pk,
)
async def mock_async_func(inputs):
return {}
@@ -885,9 +773,10 @@ async def test_external_data_job_workflow_with_schema(team, **kwargs):
task_queue=DATA_WAREHOUSE_TASK_QUEUE,
workflows=[ExternalDataJobWorkflow],
activities=[
create_external_data_job_model,
check_schedule_activity,
create_external_data_job_model_activity,
update_external_data_job_model,
run_external_data_job,
import_data_activity,
validate_schema_activity,
create_source_templates,
],
@@ -906,9 +795,7 @@ async def test_external_data_job_workflow_with_schema(team, **kwargs):
assert run is not None
assert run.status == ExternalDataJob.Status.COMPLETED
assert await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == len(
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type]
)
assert await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1
@pytest.mark.django_db(transaction=True)
@@ -952,12 +839,9 @@ async def test_run_postgres_job(
new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()
posthog_test_schema = await _create_schema("posthog_test", new_source, team)
schemas = [(posthog_test_schema.id, "posthog_test")]
inputs = ExternalDataJobInputs(
team_id=team.id,
run_id=new_job.pk,
source_id=new_source.pk,
schemas=schemas,
inputs = ImportDataActivityInputs(
team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, schema_id=posthog_test_schema.id
)
return new_job, inputs
@@ -970,10 +854,117 @@ async def test_run_postgres_job(
AIRBYTE_BUCKET_SECRET=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
):
await asyncio.gather(
activity_environment.run(run_external_data_job, job_1_inputs),
activity_environment.run(import_data_activity, job_1_inputs),
)
job_1_team_objects = await minio_client.list_objects_v2(
Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/posthog_test/"
)
assert len(job_1_team_objects["Contents"]) == 1
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_check_schedule_activity_with_schema_id(activity_environment, team, **kwargs):
new_source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key"},
)
test_1_schema = await _create_schema("test-1", new_source, team)
should_exit = await activity_environment.run(
check_schedule_activity,
ExternalDataWorkflowInputs(
team_id=team.id,
external_data_source_id=new_source.id,
external_data_schema_id=test_1_schema.id,
),
)
assert should_exit is False
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_check_schedule_activity_with_missing_schema_id_but_with_schedule(activity_environment, team, **kwargs):
new_source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key"},
)
await sync_to_async(ExternalDataSchema.objects.create)(
name="test-1",
team_id=team.id,
source_id=new_source.pk,
should_sync=True,
)
with mock.patch(
"posthog.temporal.data_imports.external_data_job.a_external_data_workflow_exists", return_value=True
), mock.patch(
"posthog.temporal.data_imports.external_data_job.a_delete_external_data_schedule", return_value=True
), mock.patch(
"posthog.temporal.data_imports.external_data_job.a_trigger_external_data_workflow"
) as mock_a_trigger_external_data_workflow:
should_exit = await activity_environment.run(
check_schedule_activity,
ExternalDataWorkflowInputs(
team_id=team.id,
external_data_source_id=new_source.id,
external_data_schema_id=None,
),
)
assert should_exit is True
assert mock_a_trigger_external_data_workflow.call_count == 1
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_check_schedule_activity_with_missing_schema_id_and_no_schedule(activity_environment, team, **kwargs):
new_source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key"},
)
await sync_to_async(ExternalDataSchema.objects.create)(
name="test-1",
team_id=team.id,
source_id=new_source.pk,
should_sync=True,
)
with mock.patch(
"posthog.temporal.data_imports.external_data_job.a_external_data_workflow_exists", return_value=False
), mock.patch(
"posthog.temporal.data_imports.external_data_job.a_delete_external_data_schedule", return_value=True
), mock.patch(
"posthog.temporal.data_imports.external_data_job.a_sync_external_data_job_workflow"
) as mock_a_sync_external_data_job_workflow:
should_exit = await activity_environment.run(
check_schedule_activity,
ExternalDataWorkflowInputs(
team_id=team.id,
external_data_source_id=new_source.id,
external_data_schema_id=None,
),
)
assert should_exit is True
assert mock_a_sync_external_data_job_workflow.call_count == 1

10
posthog/temporal/utils.py Normal file
View File

@@ -0,0 +1,10 @@
import dataclasses
import uuid
# Dataclass living here to avoid circular reference
@dataclasses.dataclass
class ExternalDataWorkflowInputs:
team_id: int
external_data_source_id: uuid.UUID
external_data_schema_id: uuid.UUID | None = None

View File

@@ -17,6 +17,7 @@ from posthog.warehouse.data_load.service import (
cancel_external_data_workflow,
delete_data_import_folder,
is_any_external_data_job_paused,
trigger_external_data_source_workflow,
)
from posthog.warehouse.models import ExternalDataSource, ExternalDataSchema, ExternalDataJob
from posthog.warehouse.api.external_data_schema import ExternalDataSchemaSerializer
@@ -41,6 +42,7 @@ class ExternalDataSourceSerializers(serializers.ModelSerializer):
account_id = serializers.CharField(write_only=True)
client_secret = serializers.CharField(write_only=True)
last_run_at = serializers.SerializerMethodField(read_only=True)
status = serializers.SerializerMethodField(read_only=True)
schemas = serializers.SerializerMethodField(read_only=True)
class Meta:
@@ -68,6 +70,28 @@ class ExternalDataSourceSerializers(serializers.ModelSerializer):
return latest_completed_run.created_at if latest_completed_run else None
def get_status(self, instance: ExternalDataSource) -> str:
active_schemas: List[ExternalDataSchema] = list(instance.schemas.filter(should_sync=True).all())
any_failures = any(schema.status == ExternalDataSchema.Status.ERROR for schema in active_schemas)
any_cancelled = any(schema.status == ExternalDataSchema.Status.CANCELLED for schema in active_schemas)
any_paused = any(schema.status == ExternalDataSchema.Status.PAUSED for schema in active_schemas)
any_running = any(schema.status == ExternalDataSchema.Status.RUNNING for schema in active_schemas)
any_completed = any(schema.status == ExternalDataSchema.Status.COMPLETED for schema in active_schemas)
if any_failures:
return ExternalDataSchema.Status.ERROR
elif any_cancelled:
return ExternalDataSchema.Status.CANCELLED
elif any_paused:
return ExternalDataSchema.Status.PAUSED
elif any_running:
return ExternalDataSchema.Status.RUNNING
elif any_completed:
return ExternalDataSchema.Status.COMPLETED
else:
# Fallback during migration phase of going from source -> schema as the source of truth for syncs
return instance.status
def get_schemas(self, instance: ExternalDataSource):
schemas = instance.schemas.order_by("name").all()
return ExternalDataSchemaSerializer(schemas, many=True, read_only=True, context=self.context).data
@@ -169,13 +193,20 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet):
disabled_schemas = [schema for schema in default_schemas if schema not in enabled_schemas]
active_schemas: List[ExternalDataSchema] = []
for schema in enabled_schemas:
ExternalDataSchema.objects.create(name=schema, team=self.team, source=new_source_model, should_sync=True)
active_schemas.append(
ExternalDataSchema.objects.create(
name=schema, team=self.team, source=new_source_model, should_sync=True
)
)
for schema in disabled_schemas:
ExternalDataSchema.objects.create(name=schema, team=self.team, source=new_source_model, should_sync=False)
try:
sync_external_data_job_workflow(new_source_model, create=True)
for active_schema in active_schemas:
sync_external_data_job_workflow(active_schema, create=True)
except Exception as e:
# Log error but don't fail because the source model was already created
logger.exception("Could not trigger external data job", exc_info=e)
@@ -331,7 +362,12 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet):
)
pass
delete_external_data_schedule(instance)
for schema in ExternalDataSchema.objects.filter(
team_id=self.team_id, source_id=instance.id, should_sync=True
).all():
delete_external_data_schedule(str(schema.id))
delete_external_data_schedule(str(instance.id))
return super().destroy(request, *args, **kwargs)
@action(methods=["POST"], detail=True)
@@ -345,12 +381,23 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet):
)
try:
trigger_external_data_workflow(instance)
trigger_external_data_source_workflow(instance)
except temporalio.service.RPCError:
# if the source schedule has been removed - trigger the schema schedules
for schema in ExternalDataSchema.objects.filter(
team_id=self.team_id, source_id=instance.id, should_sync=True
).all():
try:
trigger_external_data_workflow(schema)
except temporalio.service.RPCError as e:
# schedule doesn't exist
if e.message == "sql: no rows in result set":
sync_external_data_job_workflow(schema, create=True)
except Exception as e:
logger.exception(f"Could not trigger external data job for schema {schema.name}", exc_info=e)
except temporalio.service.RPCError as e:
# schedule doesn't exist
if e.message == "sql: no rows in result set":
sync_external_data_job_workflow(instance, create=True)
except Exception as e:
logger.exception("Could not trigger external data job", exc_info=e)
raise

View File

@@ -12,18 +12,21 @@ from temporalio.client import (
)
from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE
from posthog.temporal.common.client import sync_connect
from posthog.temporal.common.client import async_connect, sync_connect
from posthog.temporal.common.schedule import (
a_create_schedule,
a_delete_schedule,
a_trigger_schedule,
a_update_schedule,
create_schedule,
pause_schedule,
a_schedule_exists,
trigger_schedule,
update_schedule,
delete_schedule,
unpause_schedule,
)
from posthog.temporal.data_imports.external_data_job import (
ExternalDataWorkflowInputs,
)
from posthog.temporal.utils import ExternalDataWorkflowInputs
from posthog.warehouse.models import ExternalDataSource
import temporalio
from temporalio.client import Client as TemporalClient
@@ -32,48 +35,86 @@ from asgiref.sync import async_to_sync
from django.conf import settings
import s3fs
from posthog.warehouse.models.external_data_schema import ExternalDataSchema
def sync_external_data_job_workflow(
external_data_source: ExternalDataSource, create: bool = False
) -> ExternalDataSource:
temporal = sync_connect()
def get_sync_schedule(external_data_schema: ExternalDataSchema):
inputs = ExternalDataWorkflowInputs(
team_id=external_data_source.team.id,
external_data_source_id=external_data_source.pk,
team_id=external_data_schema.team_id,
external_data_schema_id=external_data_schema.id,
external_data_source_id=external_data_schema.source_id,
)
schedule = Schedule(
return Schedule(
action=ScheduleActionStartWorkflow(
"external-data-job",
asdict(inputs),
id=str(external_data_source.pk),
id=str(external_data_schema.id),
task_queue=str(DATA_WAREHOUSE_TASK_QUEUE),
),
spec=ScheduleSpec(
intervals=[
ScheduleIntervalSpec(
every=timedelta(hours=24), offset=timedelta(hours=external_data_source.created_at.hour)
every=timedelta(hours=24), offset=timedelta(hours=external_data_schema.created_at.hour)
)
],
jitter=timedelta(hours=2),
),
state=ScheduleState(note=f"Schedule for external data source: {external_data_source.pk}"),
state=ScheduleState(note=f"Schedule for external data source: {external_data_schema.pk}"),
policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.SKIP),
)
def sync_external_data_job_workflow(
external_data_schema: ExternalDataSchema, create: bool = False
) -> ExternalDataSchema:
temporal = sync_connect()
schedule = get_sync_schedule(external_data_schema)
if create:
create_schedule(temporal, id=str(external_data_source.id), schedule=schedule, trigger_immediately=True)
create_schedule(temporal, id=str(external_data_schema.id), schedule=schedule, trigger_immediately=True)
else:
update_schedule(temporal, id=str(external_data_source.id), schedule=schedule)
update_schedule(temporal, id=str(external_data_schema.id), schedule=schedule)
return external_data_source
return external_data_schema
def trigger_external_data_workflow(external_data_source: ExternalDataSource):
async def a_sync_external_data_job_workflow(
external_data_schema: ExternalDataSchema, create: bool = False
) -> ExternalDataSchema:
temporal = await async_connect()
schedule = get_sync_schedule(external_data_schema)
if create:
await a_create_schedule(temporal, id=str(external_data_schema.id), schedule=schedule, trigger_immediately=True)
else:
await a_update_schedule(temporal, id=str(external_data_schema.id), schedule=schedule)
return external_data_schema
def trigger_external_data_source_workflow(external_data_source: ExternalDataSource):
temporal = sync_connect()
trigger_schedule(temporal, schedule_id=str(external_data_source.id))
def trigger_external_data_workflow(external_data_schema: ExternalDataSchema):
temporal = sync_connect()
trigger_schedule(temporal, schedule_id=str(external_data_schema.id))
async def a_trigger_external_data_workflow(external_data_schema: ExternalDataSchema):
temporal = await async_connect()
await a_trigger_schedule(temporal, schedule_id=str(external_data_schema.id))
async def a_external_data_workflow_exists(id: str) -> bool:
temporal = await async_connect()
return await a_schedule_exists(temporal, schedule_id=id)
def pause_external_data_schedule(external_data_source: ExternalDataSource):
temporal = sync_connect()
pause_schedule(temporal, schedule_id=str(external_data_source.id))
@@ -84,10 +125,21 @@ def unpause_external_data_schedule(external_data_source: ExternalDataSource):
unpause_schedule(temporal, schedule_id=str(external_data_source.id))
def delete_external_data_schedule(external_data_source: ExternalDataSource):
def delete_external_data_schedule(schedule_id: str):
temporal = sync_connect()
try:
delete_schedule(temporal, schedule_id=str(external_data_source.id))
delete_schedule(temporal, schedule_id=schedule_id)
except temporalio.service.RPCError as e:
# Swallow error if schedule does not exist already
if e.status == temporalio.service.RPCStatusCode.NOT_FOUND:
return
raise
async def a_delete_external_data_schedule(external_data_source: ExternalDataSource):
temporal = await async_connect()
try:
await a_delete_schedule(temporal, schedule_id=str(external_data_source.id))
except temporalio.service.RPCError as e:
# Swallow error if schedule does not exist already
if e.status == temporalio.service.RPCStatusCode.NOT_FOUND:

View File

@@ -1,3 +1,4 @@
import uuid
from django.conf import settings
from dlt.common.schema.typing import TSchemaTables
from dlt.common.data_types.typing import TDataType
@@ -26,8 +27,9 @@ from posthog.warehouse.models.external_data_job import ExternalDataJob
from posthog.temporal.common.logger import bind_temporal_worker_logger
from clickhouse_driver.errors import ServerException
from asgiref.sync import sync_to_async
from typing import Dict, Tuple, Type
from typing import Dict, Type
from posthog.utils import camel_to_snake_case
from posthog.warehouse.models.external_data_schema import ExternalDataSchema
def dlt_to_hogql_type(dlt_type: TDataType | None) -> str:
@@ -91,7 +93,7 @@ async def validate_schema(
async def validate_schema_and_update_table(
run_id: str,
team_id: int,
schemas: list[Tuple[str, str]],
schema_id: uuid.UUID,
table_schema: TSchemaTables,
table_row_counts: Dict[str, int],
) -> None:
@@ -103,51 +105,40 @@ async def validate_schema_and_update_table(
Arguments:
run_id: The id of the external data job
team_id: The id of the team
schemas: The list of schemas that have been synced by the external data job
schema_id: The schema for which the data job relates to
table_schema: The DLT schema from the data load stage
table_row_counts: The count of synced rows from DLT
"""
logger = await bind_temporal_worker_logger(team_id=team_id)
job: ExternalDataJob = await get_external_data_job(job_id=run_id)
last_successful_job: ExternalDataJob | None = await get_latest_run_if_exists(job.team_id, job.pipeline_id)
last_successful_job: ExternalDataJob | None = await get_latest_run_if_exists(team_id, job.pipeline_id)
credential: DataWarehouseCredential = await get_or_create_datawarehouse_credential(
team_id=job.team_id,
team_id=team_id,
access_key=settings.AIRBYTE_BUCKET_KEY,
access_secret=settings.AIRBYTE_BUCKET_SECRET,
)
for _schema in schemas:
_schema_id = _schema[0]
_schema_name = _schema[1]
external_data_schema: ExternalDataSchema = await aget_schema_by_id(schema_id, team_id)
table_name = f"{job.pipeline.prefix or ''}{job.pipeline.source_type}_{_schema_name}".lower()
new_url_pattern = job.url_pattern_by_schema(camel_to_snake_case(_schema_name))
row_count = table_row_counts.get(_schema_name, 0)
_schema_id = external_data_schema.id
_schema_name: str = external_data_schema.name
# Check
try:
data = await validate_schema(
credential=credential,
table_name=table_name,
new_url_pattern=new_url_pattern,
team_id=team_id,
row_count=row_count,
)
except ServerException as err:
if err.code == 636:
logger.exception(
f"Data Warehouse: No data for schema {_schema_name} for external data job {job.pk}",
exc_info=err,
)
continue
except Exception as e:
# TODO: handle other exceptions here
logger.exception(
f"Data Warehouse: Could not validate schema for external data job {job.pk}",
exc_info=e,
)
continue
table_name = f"{job.pipeline.prefix or ''}{job.pipeline.source_type}_{_schema_name}".lower()
new_url_pattern = job.url_pattern_by_schema(camel_to_snake_case(_schema_name))
row_count = table_row_counts.get(_schema_name.lower(), 0)
# Check
try:
data = await validate_schema(
credential=credential,
table_name=table_name,
new_url_pattern=new_url_pattern,
team_id=team_id,
row_count=row_count,
)
# create or update
table_created = None
@@ -190,13 +181,26 @@ async def validate_schema_and_update_table(
await asave_datawarehousetable(table_created)
# schema could have been deleted by this point
schema_model = await aget_schema_by_id(schema_id=_schema_id, team_id=job.team_id)
schema_model = await aget_schema_by_id(schema_id=_schema_id, team_id=team_id)
if schema_model:
schema_model.table = table_created
schema_model.last_synced_at = job.created_at
await asave_external_data_schema(schema_model)
except ServerException as err:
if err.code == 636:
logger.exception(
f"Data Warehouse: No data for schema {_schema_name} for external data job {job.pk}",
exc_info=err,
)
except Exception as e:
# TODO: handle other exceptions here
logger.exception(
f"Data Warehouse: Could not validate schema for external data job {job.pk}",
exc_info=e,
)
if last_successful_job:
try:
last_successful_job.delete_data_in_bucket()
@@ -205,4 +209,3 @@ async def validate_schema_and_update_table(
f"Data Warehouse: Could not delete deprecated data source {last_successful_job.pk}",
exc_info=e,
)
pass

View File

@@ -1,6 +1,7 @@
from uuid import UUID
from posthog.warehouse.models.external_data_job import ExternalDataJob
from posthog.warehouse.models.external_data_schema import ExternalDataSchema
from posthog.warehouse.models.external_data_source import ExternalDataSource
@@ -10,12 +11,14 @@ def get_external_data_source(team_id: str, external_data_source_id: str) -> Exte
def create_external_data_job(
external_data_source_id: UUID,
external_data_schema_id: UUID,
workflow_id: str,
team_id: str,
team_id: int,
) -> ExternalDataJob:
job = ExternalDataJob.objects.create(
team_id=team_id,
pipeline_id=external_data_source_id,
schema_id=external_data_schema_id,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
workflow_id=workflow_id,
@@ -24,15 +27,15 @@ def create_external_data_job(
return job
def update_external_job_status(run_id: UUID, team_id: str, status: str, latest_error: str | None) -> ExternalDataJob:
def update_external_job_status(run_id: UUID, team_id: int, status: str, latest_error: str | None) -> ExternalDataJob:
model = ExternalDataJob.objects.get(id=run_id, team_id=team_id)
model.status = status
model.latest_error = latest_error
model.save()
pipeline = ExternalDataSource.objects.get(id=model.pipeline_id, team_id=team_id)
pipeline.status = status
pipeline.save()
schema = ExternalDataSchema.objects.get(id=model.schema_id, team_id=team_id)
schema.status = status
schema.save()
model.refresh_from_db()

View File

@@ -16,6 +16,9 @@ class ExternalDataJob(CreatedMetaFields, UUIDModel):
team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE)
pipeline: models.ForeignKey = models.ForeignKey("posthog.ExternalDataSource", on_delete=models.CASCADE)
schema: models.ForeignKey = models.ForeignKey(
"posthog.ExternalDataSchema", on_delete=models.CASCADE, null=True, blank=True
)
status: models.CharField = models.CharField(max_length=400)
rows_synced: models.BigIntegerField = models.BigIntegerField(null=True, blank=True)
latest_error: models.TextField = models.TextField(

View File

@@ -10,6 +10,13 @@ from posthog.warehouse.util import database_sync_to_async
class ExternalDataSchema(CreatedMetaFields, UUIDModel):
class Status(models.TextChoices):
RUNNING = "Running", "Running"
PAUSED = "Paused", "Paused"
ERROR = "Error", "Error"
COMPLETED = "Completed", "Completed"
CANCELLED = "Cancelled", "Cancelled"
name: models.CharField = models.CharField(max_length=400)
team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE)
source: models.ForeignKey = models.ForeignKey(
@@ -22,6 +29,7 @@ class ExternalDataSchema(CreatedMetaFields, UUIDModel):
latest_error: models.TextField = models.TextField(
null=True, help_text="The latest error that occurred when syncing this schema."
)
status: models.CharField = models.CharField(max_length=400, null=True, blank=True)
last_synced_at: models.DateTimeField = models.DateTimeField(null=True, blank=True)
__repr__ = sane_repr("name")
@@ -47,19 +55,20 @@ def aget_schema_by_id(schema_id: str, team_id: int) -> ExternalDataSchema | None
return ExternalDataSchema.objects.get(id=schema_id, team_id=team_id)
@database_sync_to_async
def get_active_schemas_for_source_id(source_id: uuid.UUID, team_id: int):
schemas = ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id, should_sync=True).values().all()
return [(val["id"], val["name"]) for val in schemas]
return list(ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id, should_sync=True).all())
def get_all_schemas_for_source_id(source_id: uuid.UUID, team_id: int):
schemas = ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id).values().all()
return [val["name"] for val in schemas]
return list(ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id).all())
def sync_old_schemas_with_new_schemas(new_schemas: list, source_id: uuid.UUID, team_id: int):
old_schemas = get_all_schemas_for_source_id(source_id=source_id, team_id=team_id)
schemas_to_create = [schema for schema in new_schemas if schema not in old_schemas]
old_schemas_names = [schema.name for schema in old_schemas]
schemas_to_create = [schema for schema in new_schemas if schema not in old_schemas_names]
for schema in schemas_to_create:
ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id, should_sync=False)

View File

@@ -25,6 +25,8 @@ class ExternalDataSource(CreatedMetaFields, UUIDModel):
connection_id: models.CharField = models.CharField(max_length=400)
destination_id: models.CharField = models.CharField(max_length=400, null=True, blank=True)
team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE)
# `status` is deprecated in favour of external_data_schema.status
status: models.CharField = models.CharField(max_length=400)
source_type: models.CharField = models.CharField(max_length=128, choices=Type.choices)
job_inputs: encrypted_fields.fields.EncryptedJSONField = encrypted_fields.fields.EncryptedJSONField(