diff --git a/.dagster_home/.gitkeep b/.dagster_home/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/.gitignore b/.gitignore index 1fd79ca086..a5498db7c1 100644 --- a/.gitignore +++ b/.gitignore @@ -96,3 +96,6 @@ playwright-report/ test-results/ playwright/playwright-report/ playwright/test-results/ + +.dagster_home/* +!.dagster_home/.gitkeep \ No newline at end of file diff --git a/dags/README.md b/dags/README.md index 9ae1c5e5e1..3d7d5a6707 100644 --- a/dags/README.md +++ b/dags/README.md @@ -1,17 +1,99 @@ -# Dagster +# PostHog Dagster DAGs -## Running locally +This directory contains [Dagster](https://dagster.io/) data pipelines (DAGs) for PostHog. Dagster is a data orchestration framework that allows us to define, schedule, and monitor data workflows. -You'll need to set DAGSTER_HOME +## What is Dagster? + +Dagster is an open-source data orchestration tool designed to help you define and execute data pipelines. Key concepts include: + +- **Assets**: Data artifacts that your pipelines produce and consume (e.g., tables, files) +- **Ops**: Individual units of computation (functions) +- **Jobs**: Collections of ops that are executed together +- **Resources**: Shared infrastructure and connections (e.g. database connections) +- **Schedules**: Time-based triggers for jobs +- **Sensors**: Event-based triggers for jobs + +## Project Structure + +- `definitions.py`: Main Dagster definition file that defines assets, jobs, schedules, sensors, and resources +- `common.py`: Shared utilities and resources +- Individual DAG files (e.g., `exchange_rate.py`, `deletes.py`, `person_overrides.py`) +- `tests/`: Tests for the DAGs + +## Local Development + +### Environment Setup + +Dagster uses the `DAGSTER_HOME` environment variable to determine where to store instance configuration, logs, and other local artifacts. If not set, Dagster will use a temporary folder that's erased after you bring `dagster dev` down -Easiest is to just start jobs from your cli ```bash -dagster job execute -m dags.export_query_logs_to_s3 --config dags/query_log_example.yaml +# Set DAGSTER_HOME to a directory of your choice +export DAGSTER_HOME=/path/to/your/dagster/home ``` -You can also run the interface +For consistency with the PostHog development environment, you might want to set this to a subdirectory within your project: + ```bash -dagster dev +export DAGSTER_HOME=$(pwd)/.dagster_home ``` -By default this will run on http://127.0.0.1:3000/ +You can add this to your shell profile if you want to always store your assets, or to your local `.env` file which will be automatically detected by `dagster dev`. + +### Running the Development Server + +To run the Dagster development server locally: + +```bash +# Important: Set DEBUG=1 when running locally to use local resources +DEBUG=1 dagster dev +``` + +Setting `DEBUG=1` is critical to get it to run properly + +The Dagster UI will be available at http://localhost:3000 by default, where you can: + +- Browse assets, jobs, and schedules +- Manually trigger job runs +- View execution logs and status +- Debug pipeline issues + +## Adding New DAGs + +When adding a new DAG: + +1. Create a new Python file for your DAG +2. Define your assets, ops, and jobs +3. Import and register them in `definitions.py` +4. Add appropriate tests in the `tests/` directory + +## Running Tests + +Tests are implemented using pytest. The following command will run all DAG tests: + +```bash +# From the project root +pytest dags/ +``` + +To run a specific test file: + +```bash +pytest dags/tests/test_exchange_rate.py +``` + +To run a specific test: + +```bash +pytest dags/tests/test_exchange_rate.py::test_name +``` + +Add `-v` for verbose output: + +```bash +pytest -v dags/tests/test_exchange_rate.py +``` + +## Additional Resources + +- [Dagster Documentation](https://docs.dagster.io/) +- [PostHog Documentation](https://posthog.com/docs) diff --git a/dags/ch_examples.py b/dags/ch_examples.py index 5db3d3b370..0c89edbc1c 100644 --- a/dags/ch_examples.py +++ b/dags/ch_examples.py @@ -1,26 +1,24 @@ -from dagster import ( - Config, - MaterializeResult, - asset, -) +import dagster from posthog.clickhouse.client import sync_execute # noqa -class ClickHouseConfig(Config): +class ClickHouseConfig(dagster.Config): result_path: str = "/tmp/clickhouse_version.txt" -@asset -def get_clickhouse_version(config: ClickHouseConfig) -> MaterializeResult: +@dagster.asset +def get_clickhouse_version(config: ClickHouseConfig) -> dagster.MaterializeResult: version = sync_execute("SELECT version()")[0][0] with open(config.result_path, "w") as f: f.write(version) - return MaterializeResult(metadata={"version": version}) + + return dagster.MaterializeResult(metadata={"version": version}) -@asset(deps=[get_clickhouse_version]) +@dagster.asset(deps=[get_clickhouse_version]) def print_clickhouse_version(config: ClickHouseConfig): with open(config.result_path) as f: print(f.read()) # noqa - return MaterializeResult(metadata={"version": config.result_path}) + + return dagster.MaterializeResult(metadata={"version": config.result_path}) diff --git a/dags/common.py b/dags/common.py index d405dbc237..2ccd94765e 100644 --- a/dags/common.py +++ b/dags/common.py @@ -12,6 +12,7 @@ from posthog.clickhouse.cluster import ( class JobOwners(str, Enum): TEAM_CLICKHOUSE = "team-clickhouse" + TEAM_WEB_ANALYTICS = "team-web-analytics" class ClickhouseClusterResource(dagster.ConfigurableResource): diff --git a/dags/definitions.py b/dags/definitions.py index 3433b2e883..35b07ac870 100644 --- a/dags/definitions.py +++ b/dags/definitions.py @@ -1,29 +1,21 @@ -from dagster import ( - DagsterRunStatus, - Definitions, - EnvVar, - ResourceDefinition, - RunRequest, - ScheduleDefinition, - fs_io_manager, - load_assets_from_modules, - run_status_sensor, -) +import dagster +import dagster_slack + from dagster_aws.s3.io_manager import s3_pickle_io_manager from dagster_aws.s3.resources import s3_resource -from dagster_slack import SlackResource from django.conf import settings -from dags.slack_alerts import notify_slack_on_failure - -from . import ch_examples, deletes, materialized_columns, orm_examples, person_overrides, export_query_logs_to_s3 -from .common import ClickhouseClusterResource - -all_assets = load_assets_from_modules([ch_examples, orm_examples]) - - -env = "local" if settings.DEBUG else "prod" - +from dags.common import ClickhouseClusterResource +from dags import ( + ch_examples, + deletes, + exchange_rate, + export_query_logs_to_s3, + materialized_columns, + orm_examples, + person_overrides, + slack_alerts, +) # Define resources for different environments resources_by_env = { @@ -34,57 +26,50 @@ resources_by_env = { ), "s3": s3_resource, # Using EnvVar instead of the Django setting to ensure that the token is not leaked anywhere in the Dagster UI - "slack": SlackResource(token=EnvVar("SLACK_TOKEN")), + "slack": dagster_slack.SlackResource(token=dagster.EnvVar("SLACK_TOKEN")), }, "local": { "cluster": ClickhouseClusterResource.configure_at_launch(), - "io_manager": fs_io_manager, - "slack": ResourceDefinition.none_resource(description="Dummy Slack resource for local development"), + "io_manager": dagster.fs_io_manager, + "slack": dagster.ResourceDefinition.none_resource(description="Dummy Slack resource for local development"), }, } # Get resources for current environment, fallback to local if env not found +env = "local" if settings.DEBUG else "prod" resources = resources_by_env.get(env, resources_by_env["local"]) - -# Schedule to run squash at 10 PM on Saturdays -squash_schedule = ScheduleDefinition( - job=person_overrides.squash_person_overrides, - cron_schedule="0 22 * * 6", # At 22:00 (10 PM) on Saturday - execution_timezone="UTC", - name="squash_person_overrides_schedule", -) - -# Schedule to run query logs export at 1 AM daily -query_logs_export_schedule = ScheduleDefinition( - job=export_query_logs_to_s3.export_query_logs_to_s3, - cron_schedule="0 1 * * *", # At 01:00 (1 AM) every day - execution_timezone="UTC", - name="query_logs_export_schedule", -) - - -@run_status_sensor( - run_status=DagsterRunStatus.SUCCESS, - monitored_jobs=[person_overrides.squash_person_overrides], - request_job=deletes.deletes_job, -) -def run_deletes_after_squash(context): - return RunRequest(run_key=None) - - -defs = Definitions( - assets=all_assets, +defs = dagster.Definitions( + assets=[ + ch_examples.get_clickhouse_version, + ch_examples.print_clickhouse_version, + exchange_rate.daily_exchange_rates, + exchange_rate.hourly_exchange_rates, + exchange_rate.daily_exchange_rates_in_clickhouse, + exchange_rate.hourly_exchange_rates_in_clickhouse, + orm_examples.pending_deletions, + orm_examples.process_pending_deletions, + ], jobs=[ deletes.deletes_job, + exchange_rate.daily_exchange_rates_job, + exchange_rate.hourly_exchange_rates_job, + export_query_logs_to_s3.export_query_logs_to_s3, materialized_columns.materialize_column, person_overrides.cleanup_orphaned_person_overrides_snapshot, person_overrides.squash_person_overrides, - export_query_logs_to_s3.export_query_logs_to_s3, ], - schedules=[squash_schedule, query_logs_export_schedule], - sensors=[run_deletes_after_squash, notify_slack_on_failure], + schedules=[ + exchange_rate.daily_exchange_rates_schedule, + exchange_rate.hourly_exchange_rates_schedule, + export_query_logs_to_s3.query_logs_export_schedule, + person_overrides.squash_schedule, + ], + sensors=[ + deletes.run_deletes_after_squash, + slack_alerts.notify_slack_on_failure, + ], resources=resources, ) diff --git a/dags/deletes.py b/dags/deletes.py index 60595207bd..6283fe3169 100644 --- a/dags/deletes.py +++ b/dags/deletes.py @@ -3,20 +3,12 @@ import time from clickhouse_driver.client import Client from datetime import datetime from dataclasses import dataclass -from dagster import ( - op, - job, - OpExecutionContext, - Config, - MetadataValue, - ResourceParam, -) +import dagster from django.conf import settings from functools import partial import uuid from django.utils import timezone -from dags.common import JobOwners from posthog.clickhouse.cluster import ( ClickhouseCluster, Mutation, @@ -28,8 +20,11 @@ from posthog.models.async_deletion import AsyncDeletion, DeletionType from posthog.models.event.sql import EVENTS_DATA_TABLE from posthog.models.person.sql import PERSON_DISTINCT_ID_OVERRIDES_TABLE +from dags.common import JobOwners +from dags.person_overrides import squash_person_overrides -class DeleteConfig(Config): + +class DeleteConfig(dagster.Config): team_id: int | None = pydantic.Field( default=None, description="The team ID to delete events for. If not provided, all teams will be deleted :fire:" ) @@ -264,9 +259,9 @@ class PendingDeletesDictionary: ) -@op +@dagster.op def get_oldest_person_override_timestamp( - cluster: ResourceParam[ClickhouseCluster], + cluster: dagster.ResourceParam[ClickhouseCluster], ) -> datetime: """Get the oldest person override timestamp from the person_distinct_id_overrides table.""" @@ -277,10 +272,10 @@ def get_oldest_person_override_timestamp( return result -@op +@dagster.op def create_pending_person_deletions_table( config: DeleteConfig, - cluster: ResourceParam[ClickhouseCluster], + cluster: dagster.ResourceParam[ClickhouseCluster], oldest_person_override_timestamp: datetime, ) -> PendingPersonEventDeletesTable: """ @@ -298,10 +293,10 @@ def create_pending_person_deletions_table( return table -@op +@dagster.op def create_reporting_pending_person_deletions_table( config: DeleteConfig, - cluster: ResourceParam[ClickhouseCluster], + cluster: dagster.ResourceParam[ClickhouseCluster], ) -> PendingPersonEventDeletesTable: """Create a merge tree table in ClickHouse to store pending deletes.""" table = PendingPersonEventDeletesTable( @@ -314,11 +309,11 @@ def create_reporting_pending_person_deletions_table( return table -@op +@dagster.op def load_pending_person_deletions( - context: OpExecutionContext, + context: dagster.OpExecutionContext, create_pending_person_deletions_table: PendingPersonEventDeletesTable, - cluster: ResourceParam[ClickhouseCluster], + cluster: dagster.ResourceParam[ClickhouseCluster], cleanup_delete_assets: bool | None = None, ) -> PendingPersonEventDeletesTable: """Query postgres using django ORM to get pending person deletions and insert directly into ClickHouse.""" @@ -373,18 +368,18 @@ def load_pending_person_deletions( context.add_output_metadata( { - "total_rows": MetadataValue.int(total_rows), - "table_name": MetadataValue.text(create_pending_person_deletions_table.table_name), + "total_rows": dagster.MetadataValue.int(total_rows), + "table_name": dagster.MetadataValue.text(create_pending_person_deletions_table.table_name), } ) return create_pending_person_deletions_table -@op +@dagster.op def create_deletes_dict( load_pending_person_deletions: PendingPersonEventDeletesTable, config: DeleteConfig, - cluster: ResourceParam[ClickhouseCluster], + cluster: dagster.ResourceParam[ClickhouseCluster], ) -> PendingDeletesDictionary: """Create a dictionary in ClickHouse to store pending event deletions.""" @@ -410,9 +405,9 @@ def create_deletes_dict( return del_dict -@op +@dagster.op def load_and_verify_deletes_dictionary( - cluster: ResourceParam[ClickhouseCluster], + cluster: dagster.ResourceParam[ClickhouseCluster], dictionary: PendingDeletesDictionary, ) -> PendingDeletesDictionary: """Load the dictionary data on all hosts in the cluster, and ensure all hosts have identical data.""" @@ -421,10 +416,10 @@ def load_and_verify_deletes_dictionary( return dictionary -@op +@dagster.op def delete_person_events( - context: OpExecutionContext, - cluster: ResourceParam[ClickhouseCluster], + context: dagster.OpExecutionContext, + cluster: dagster.ResourceParam[ClickhouseCluster], load_and_verify_deletes_dictionary: PendingDeletesDictionary, ) -> tuple[PendingDeletesDictionary, ShardMutations]: """Delete events from sharded_events table for persons pending deletion.""" @@ -442,12 +437,14 @@ def delete_person_events( all_zero = all(count == 0 for count in count_result.values()) if all_zero: - context.add_output_metadata({"events_deleted": MetadataValue.int(0), "message": "No pending deletions found"}) + context.add_output_metadata( + {"events_deleted": dagster.MetadataValue.int(0), "message": "No pending deletions found"} + ) return (load_and_verify_deletes_dictionary, {}) context.add_output_metadata( { - "events_deleted": MetadataValue.int(sum(count_result.values())), + "events_deleted": dagster.MetadataValue.int(sum(count_result.values())), } ) @@ -463,10 +460,10 @@ def delete_person_events( return (load_and_verify_deletes_dictionary, shard_mutations) -@op +@dagster.op def wait_for_delete_mutations( - context: OpExecutionContext, - cluster: ResourceParam[ClickhouseCluster], + context: dagster.OpExecutionContext, + cluster: dagster.ResourceParam[ClickhouseCluster], delete_person_events: tuple[PendingDeletesDictionary, ShardMutations], ) -> PendingDeletesDictionary: pending_deletes_dict, shard_mutations = delete_person_events @@ -476,9 +473,9 @@ def wait_for_delete_mutations( return pending_deletes_dict -@op +@dagster.op def cleanup_delete_assets( - cluster: ResourceParam[ClickhouseCluster], + cluster: dagster.ResourceParam[ClickhouseCluster], config: DeleteConfig, create_pending_person_deletions_table: PendingPersonEventDeletesTable, create_deletes_dict: PendingDeletesDictionary, @@ -513,7 +510,7 @@ def cleanup_delete_assets( return True -@job(tags={"owner": JobOwners.TEAM_CLICKHOUSE.value}) +@dagster.job(tags={"owner": JobOwners.TEAM_CLICKHOUSE.value}) def deletes_job(): """Job that handles deletion of person events.""" oldest_override_timestamp = get_oldest_person_override_timestamp() @@ -526,3 +523,12 @@ def deletes_job(): waited_mutation = wait_for_delete_mutations(delete_events) cleaned = cleanup_delete_assets(person_table, create_deletes_dict_op, waited_mutation) load_pending_person_deletions(report_person_table, cleaned) + + +@dagster.run_status_sensor( + run_status=dagster.DagsterRunStatus.SUCCESS, + monitored_jobs=[squash_person_overrides], + request_job=deletes_job, +) +def run_deletes_after_squash(context): + return dagster.RunRequest(run_key=None) diff --git a/dags/exchange_rate.py b/dags/exchange_rate.py new file mode 100644 index 0000000000..1766081960 --- /dev/null +++ b/dags/exchange_rate.py @@ -0,0 +1,322 @@ +import os +import requests +import datetime +from typing import Any + +import dagster + +from clickhouse_driver import Client +from posthog.clickhouse.cluster import ClickhouseCluster +from posthog.models.exchange_rate.sql import ( + EXCHANGE_RATE_DICTIONARY_NAME, + EXCHANGE_RATE_DATA_BACKFILL_SQL, +) +from posthog.models.exchange_rate.currencies import SUPPORTED_CURRENCY_CODES + +from dags.common import JobOwners + +OPEN_EXCHANGE_RATES_API_BASE_URL = "https://openexchangerates.org/api" + + +class ExchangeRateConfig(dagster.Config): + """Configuration for the exchange rate API.""" + + # NOTE: For local development, you can add this key to a `.env` file in the root of the project + app_id: str = os.environ.get("OPEN_EXCHANGE_RATES_APP_ID", "") + api_base_url: str = OPEN_EXCHANGE_RATES_API_BASE_URL + + +# We'll have one partition for each day, starting from 2025-01-01 for the daily job +# And one partition for hourly updates for the hourly job +DAILY_PARTITION_DEFINITION = dagster.DailyPartitionsDefinition(start_date="2025-01-01") +HOURLY_PARTITION_DEFINITION = dagster.HourlyPartitionsDefinition(start_date="2025-01-01-00:00Z") + + +def get_date_partition_from_hourly_partition(hourly_partition: str) -> str: + """ + Convert a hourly partition key to a daily partition key. + """ + return "-".join(hourly_partition.split("-", 3)[0:3]) + + +@dagster.op( + retry_policy=dagster.RetryPolicy( + max_retries=5, + delay=0.2, # 200ms + backoff=dagster.Backoff.EXPONENTIAL, + jitter=dagster.Jitter.PLUS_MINUS, + ) +) +def fetch_exchange_rates( + context: dagster.OpExecutionContext, date_str: str, app_id: str, api_base_url: str +) -> dict[str, Any]: + """ + Fetches exchange rates from the Open Exchange Rates API for a specific date. + """ + # Construct the API URL + url = f"{api_base_url}/historical/{date_str}.json" + + # Prepare query parameters + params = {"app_id": app_id} + + # Make the API request + context.log.info(f"Fetching exchange rates for {date_str} with params {params}") + response = requests.get(url, params=params) + + if response.status_code != 200: + error_msg = f"Failed to fetch exchange rates: {response.status_code} - {response.text}" + context.log.error(error_msg) + raise Exception(error_msg) + + # Parse the response + data = response.json() + + # Log some information about the fetched data + context.log.info(f"Successfully fetched exchange rates for {date_str}") + context.log.info(f"Base currency: {data.get('base')}") + context.log.info(f"Number of rates: {len(data.get('rates', {}))}") + + if not data.get("rates"): + raise Exception(f"No rates found for {date_str}") + + return data.get("rates") + + +@dagster.asset(partitions_def=DAILY_PARTITION_DEFINITION) +def daily_exchange_rates( + context: dagster.AssetExecutionContext, config: ExchangeRateConfig +) -> dagster.Output[dict[str, Any]]: + """ + Fetches exchange rates from the Open Exchange Rates API for a specific date. + The date is determined by the partition key, which is in the format %Y-%m-%d. + """ + date_str = context.partition_key + app_id = config.app_id + api_base_url = config.api_base_url + + if not app_id: + raise ValueError("Open Exchange Rates API key (app_id) is required") + + rates = fetch_exchange_rates( + context=dagster.build_op_context(), date_str=date_str, app_id=app_id, api_base_url=api_base_url + ) + + return dagster.Output( + value=rates, + metadata={ + "date_str": date_str, + "rates_count": len(rates), + "rates": rates, + }, + ) + + +@dagster.asset(partitions_def=HOURLY_PARTITION_DEFINITION) +def hourly_exchange_rates( + context: dagster.AssetExecutionContext, config: ExchangeRateConfig +) -> dagster.Output[dict[str, Any]]: + """ + Fetches exchange rates from the Open Exchange Rates API for a specific hour. + The date is determined by the partition key, which is in the format %Y-%m-%d-%H:%M. + """ + # Convert hourly partition key to daily format because we always fetch information for the day + date_str = get_date_partition_from_hourly_partition(context.partition_key) + app_id = config.app_id + api_base_url = config.api_base_url + + if not app_id: + raise ValueError("Open Exchange Rates API key (app_id) is required") + + rates = fetch_exchange_rates( + context=dagster.build_op_context(), date_str=date_str, app_id=app_id, api_base_url=api_base_url + ) + + return dagster.Output( + value=rates, + metadata={ + "date_str": date_str, + "rates_count": len(rates), + "rates": rates, + }, + ) + + +@dagster.op +def store_exchange_rates_in_clickhouse( + context: dagster.OpExecutionContext, + date_str: str, + exchange_rates: dict[str, Any], + cluster: dagster.ResourceParam[ClickhouseCluster], +) -> tuple[list[dict[str, Any]], list[tuple[str, str, Any]]]: + """ + Stores exchange rates data in ClickHouse. + """ + # Transform data into rows for ClickHouse + rows = [ + {"date": date_str, "currency": currency, "rate": rate} + for currency, rate in exchange_rates.items() + if currency in SUPPORTED_CURRENCY_CODES + ] + + # Log information about the data being stored + context.log.info(f"Storing {len(rows)} exchange rates for {date_str} in ClickHouse") + + # Prepare values for batch insert + # Use toDate() to cast the string date to a ClickHouse Date type + values = [(row["date"], row["currency"], row["rate"]) for row in rows] + + # Execute the insert if there are values to insert + if values: + # Batch insert all values + def insert(client: Client) -> bool: + try: + client.execute(EXCHANGE_RATE_DATA_BACKFILL_SQL(exchange_rates=values)) + context.log.info("Successfully inserted exchange rates") + return True + except Exception as e: + context.log.warning(f"Failed to insert exchange rates: {e}") + return False + + # Simply ask the dictionary to be reloaded with the new data + def reload_dict(client: Client) -> bool: + try: + client.execute(f"SYSTEM RELOAD DICTIONARY {EXCHANGE_RATE_DICTIONARY_NAME}") + context.log.info("Successfully reloaded exchange_rate_dict dictionary") + return True + except Exception as e: + context.log.warning(f"Failed to reload exchange_rate_dict dictionary: {e}") + return False + + insert_results = cluster.map_all_hosts(insert).result() + reload_results = cluster.map_all_hosts(reload_dict).result() + + if not all(insert_results.values()): + raise Exception("Failed to insert some exchange rates") + + if not all(reload_results.values()): + raise Exception("Failed to reload some exchange_rate_dict dictionary") + + else: + context.log.warning(f"No exchange rates to store for {date_str}") + + return (rows, values) + + +@dagster.asset( + partitions_def=DAILY_PARTITION_DEFINITION, + ins={"exchange_rates": dagster.AssetIn(key=daily_exchange_rates.key)}, +) +def daily_exchange_rates_in_clickhouse( + context: dagster.AssetExecutionContext, + exchange_rates: dict[str, Any], + cluster: dagster.ResourceParam[ClickhouseCluster], +) -> dagster.MaterializeResult: + """ + Stores exchange rates data in ClickHouse. + The base currency is always USD as per the table design. + """ + # Extract data from the input + date_str = context.partition_key + + # Store the rates in ClickHouse + rows, values = store_exchange_rates_in_clickhouse( + context=dagster.build_op_context(), date_str=date_str, exchange_rates=exchange_rates, cluster=cluster + ) + + # Calculate some statistics for metadata + currencies_count = len(rows) + min_rate = min(row["rate"] for row in rows) if rows else 0.0 + max_rate = max(row["rate"] for row in rows) if rows else 0.0 + avg_rate = sum(row["rate"] for row in rows) / len(rows) if rows else 0.0 + + # Return the rows with metadata + return dagster.MaterializeResult( + metadata={ + "date": dagster.MetadataValue.text(date_str), + "base_currency": dagster.MetadataValue.text("USD"), # Always USD as per table design + "currencies_count": dagster.MetadataValue.int(currencies_count), + "min_rate": dagster.MetadataValue.float(min_rate), + "max_rate": dagster.MetadataValue.float(max_rate), + "avg_rate": dagster.MetadataValue.float(avg_rate), + "values": dagster.MetadataValue.json(values), + } + ) + + +@dagster.asset( + partitions_def=HOURLY_PARTITION_DEFINITION, + ins={"exchange_rates": dagster.AssetIn(key=hourly_exchange_rates.key)}, +) +def hourly_exchange_rates_in_clickhouse( + context: dagster.AssetExecutionContext, + exchange_rates: dict[str, Any], + cluster: dagster.ResourceParam[ClickhouseCluster], +) -> dagster.MaterializeResult: + """ + Stores exchange rates data in ClickHouse. + The base currency is always USD as per the table design. + """ + # Extract data from the input + date_str = get_date_partition_from_hourly_partition(context.partition_key) + + # Store the rates in ClickHouse + rows, values = store_exchange_rates_in_clickhouse( + context=dagster.build_op_context(), date_str=date_str, exchange_rates=exchange_rates, cluster=cluster + ) + + # Calculate some statistics for metadata + currencies_count = len(rows) + min_rate = min(row["rate"] for row in rows) if rows else 0.0 + max_rate = max(row["rate"] for row in rows) if rows else 0.0 + avg_rate = sum(row["rate"] for row in rows) / len(rows) if rows else 0.0 + + # Return the rows with metadata + return dagster.MaterializeResult( + metadata={ + "date": dagster.MetadataValue.text(date_str), + "base_currency": dagster.MetadataValue.text("USD"), # Always USD as per table design + "currencies_count": dagster.MetadataValue.int(currencies_count), + "min_rate": dagster.MetadataValue.float(min_rate), + "max_rate": dagster.MetadataValue.float(max_rate), + "avg_rate": dagster.MetadataValue.float(avg_rate), + "values": dagster.MetadataValue.json(values), + } + ) + + +# Create jobs from the assets +daily_exchange_rates_job = dagster.define_asset_job( + name="daily_exchange_rates_job", + selection=[daily_exchange_rates.key, daily_exchange_rates_in_clickhouse.key], + tags={"owner": JobOwners.TEAM_WEB_ANALYTICS.value}, +) + +hourly_exchange_rates_job = dagster.define_asset_job( + name="hourly_exchange_rates_job", + selection=[hourly_exchange_rates.key, hourly_exchange_rates_in_clickhouse.key], + tags={"owner": JobOwners.TEAM_WEB_ANALYTICS.value}, +) + + +# Create daily/hourly schedules with different cron schedules +@dagster.schedule( + job=daily_exchange_rates_job, + cron_schedule="28 0 * * *", # Run at 00:28 AM every day, random minute to avoid peak load +) +def daily_exchange_rates_schedule(context): + """Process previous day's exchange rates data.""" + # Calculate the previous day's date + previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1) + timestamp = previous_day.strftime("%Y-%m-%d") + return dagster.RunRequest(run_key=timestamp, partition_key=timestamp) + + +@dagster.schedule( + job=hourly_exchange_rates_job, + cron_schedule="45 * * * *", # Run every hour at XX:45, random minute to avoid peak load at the top of the hour +) +def hourly_exchange_rates_schedule(context): + """Process current day's exchange rates data for this hour.""" + current_day = context.scheduled_execution_time + timestamp = current_day.strftime("%Y-%m-%d-%H:%M") + return dagster.RunRequest(run_key=timestamp, partition_key=timestamp) diff --git a/dags/export_query_logs_to_s3.py b/dags/export_query_logs_to_s3.py index c717978a1e..a4af9f0714 100644 --- a/dags/export_query_logs_to_s3.py +++ b/dags/export_query_logs_to_s3.py @@ -218,3 +218,12 @@ def export_query_logs( @dagster.job(resource_defs={"cluster": ClickhouseClusterResource()}) def export_query_logs_to_s3(): export_query_logs() + + +# Schedule to run query logs export at 1 AM daily +query_logs_export_schedule = dagster.ScheduleDefinition( + job=export_query_logs_to_s3, + cron_schedule="0 1 * * *", # At 01:00 (1 AM) every day + execution_timezone="UTC", + name="query_logs_export_schedule", +) diff --git a/dags/orm_examples.py b/dags/orm_examples.py index c4615bee24..432c7ff3f5 100644 --- a/dags/orm_examples.py +++ b/dags/orm_examples.py @@ -1,10 +1,10 @@ -from dagster import asset +import dagster from django.db.models import Q from posthog.models.async_deletion import AsyncDeletion, DeletionType -@asset +@dagster.asset def pending_deletions() -> list[AsyncDeletion]: """ Asset that fetches pending async deletions from Django ORM. @@ -16,7 +16,7 @@ def pending_deletions() -> list[AsyncDeletion]: return list(pending_deletions) -@asset(deps=[pending_deletions]) +@dagster.asset(deps=[pending_deletions]) def process_pending_deletions(pending_deletions: list[AsyncDeletion]) -> None: """ Asset that prints out pending deletions. diff --git a/dags/person_overrides.py b/dags/person_overrides.py index 0224be4ba3..11c7b92420 100644 --- a/dags/person_overrides.py +++ b/dags/person_overrides.py @@ -396,3 +396,12 @@ def cleanup_orphaned_person_overrides_snapshot(): """ dictionary = get_existing_dictionary_for_run_id() cleanup_snapshot_resources(dictionary) + + +# Schedule to run squash at 10 PM on Saturdays +squash_schedule = dagster.ScheduleDefinition( + job=squash_person_overrides, + cron_schedule="0 22 * * 6", # At 22:00 (10 PM) on Saturday + execution_timezone="UTC", + name="squash_person_overrides_schedule", +) diff --git a/dags/slack_alerts.py b/dags/slack_alerts.py index 8751484071..46cfd69f2a 100644 --- a/dags/slack_alerts.py +++ b/dags/slack_alerts.py @@ -1,20 +1,18 @@ -from dagster import ( - DefaultSensorStatus, - RunFailureSensorContext, - run_failure_sensor, -) -from dagster_slack import SlackResource +import dagster +import dagster_slack + from django.conf import settings from dags.common import JobOwners notification_channel_per_team = { JobOwners.TEAM_CLICKHOUSE.value: "#alerts-clickhouse", + JobOwners.TEAM_WEB_ANALYTICS.value: "#alerts-web-analytics", } -@run_failure_sensor(default_status=DefaultSensorStatus.RUNNING) -def notify_slack_on_failure(context: RunFailureSensorContext, slack: SlackResource): +@dagster.run_failure_sensor(default_status=dagster.DefaultSensorStatus.RUNNING) +def notify_slack_on_failure(context: dagster.RunFailureSensorContext, slack: dagster_slack.SlackResource): """Send a notification to Slack when any job fails.""" # Get the failed run failed_run = context.dagster_run diff --git a/dags/tests/test_exchange_rate.py b/dags/tests/test_exchange_rate.py new file mode 100644 index 0000000000..d3aeab7e8a --- /dev/null +++ b/dags/tests/test_exchange_rate.py @@ -0,0 +1,273 @@ +import datetime +from unittest import mock +import pytest +import responses +from freezegun import freeze_time + +import dagster +from dagster import build_op_context + +from dags.exchange_rate import ( + get_date_partition_from_hourly_partition, + fetch_exchange_rates, + daily_exchange_rates, + hourly_exchange_rates, + store_exchange_rates_in_clickhouse, + daily_exchange_rates_in_clickhouse, + hourly_exchange_rates_in_clickhouse, + daily_exchange_rates_schedule, + hourly_exchange_rates_schedule, + ExchangeRateConfig, + OPEN_EXCHANGE_RATES_API_BASE_URL, +) + +# Sample exchange rate data for testing +SAMPLE_EXCHANGE_RATES = { + "EUR": 0.85, + "GBP": 0.75, + "JPY": 110.0, + "CAD": 1.25, + "AUD": 1.35, +} + + +class TestExchangeRateUtils: + def test_get_date_partition_from_hourly_partition(self): + # Test converting hourly partition to daily partition + assert get_date_partition_from_hourly_partition("2023-01-15-08:00") == "2023-01-15" + assert get_date_partition_from_hourly_partition("2023-01-16-23:59") == "2023-01-16" + assert get_date_partition_from_hourly_partition("2023-01-17-00:00Z") == "2023-01-17" + + +class TestExchangeRateAPI: + @responses.activate + def test_fetch_exchange_rates_success(self): + # Mock the API response + date_str = "2023-01-15" + app_id = "test_app_id" + api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json" + + responses.add( + responses.GET, + api_url, + json={"base": "USD", "rates": SAMPLE_EXCHANGE_RATES}, + status=200, + ) + + # Create a Dagster context + context = build_op_context() + + # Call the function + result = fetch_exchange_rates(context, date_str, app_id, OPEN_EXCHANGE_RATES_API_BASE_URL) + + # Verify the result + assert result == SAMPLE_EXCHANGE_RATES + assert len(responses.calls) == 1 + assert f"app_id={app_id}" in responses.calls[0].request.url + + @responses.activate + def test_fetch_exchange_rates_api_error(self): + # Mock API error response + date_str = "2023-01-15" + app_id = "test_app_id" + api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json" + + responses.add( + responses.GET, + api_url, + json={"error": "Invalid API key"}, + status=401, + ) + + # Create a Dagster context + context = build_op_context() + + # Verify the function raises an exception + with pytest.raises(Exception, match="Failed to fetch exchange rates"): + fetch_exchange_rates(context, date_str, app_id, OPEN_EXCHANGE_RATES_API_BASE_URL) + + @responses.activate + def test_fetch_exchange_rates_empty_rates(self): + # Mock API response with empty rates + date_str = "2023-01-15" + app_id = "test_app_id" + api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json" + + responses.add( + responses.GET, + api_url, + json={"base": "USD", "rates": {}}, + status=200, + ) + + # Create a Dagster context + context = build_op_context() + + # Verify the function raises an exception + with pytest.raises(Exception, match="No rates found"): + fetch_exchange_rates(context, date_str, app_id, OPEN_EXCHANGE_RATES_API_BASE_URL) + + +class TestExchangeRateAssets: + @responses.activate + def test_daily_exchange_rates(self): + # Mock the API response + date_str = "2023-01-15" + app_id = "test_app_id" + api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json" + + responses.add( + responses.GET, + api_url, + json={"base": "USD", "rates": SAMPLE_EXCHANGE_RATES}, + status=200, + ) + + # Create config and context + config = ExchangeRateConfig(app_id=app_id) + context = dagster.build_asset_context(partition_key=date_str) + + # Call the asset + result = daily_exchange_rates(context=context, config=config) + + # Verify the result + assert result.value == SAMPLE_EXCHANGE_RATES + + @responses.activate + def test_hourly_exchange_rates(self): + # Mock the API response + date_str = "2023-01-15" + hourly_partition = f"{date_str}-10:00" + app_id = "test_app_id" + api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json" + + responses.add( + responses.GET, + api_url, + json={"base": "USD", "rates": SAMPLE_EXCHANGE_RATES}, + status=200, + ) + + # Create config and context + config = ExchangeRateConfig(app_id=app_id) + context = dagster.build_asset_context(partition_key=hourly_partition) + + # Call the asset + result = hourly_exchange_rates(context=context, config=config) + + # Verify the result + assert result.value == SAMPLE_EXCHANGE_RATES + + def test_daily_exchange_rates_missing_app_id(self): + # Create context with empty app_id + config = ExchangeRateConfig(app_id="") + context = dagster.build_asset_context(partition_key="2023-01-15") + + # Verify the asset raises an exception + with pytest.raises(ValueError, match="Open Exchange Rates API key"): + daily_exchange_rates(context=context, config=config) + + def test_hourly_exchange_rates_missing_app_id(self): + # Create context with empty app_id + config = ExchangeRateConfig(app_id="") + context = dagster.build_asset_context(partition_key="2023-01-15-10:00") + + # Verify the asset raises an exception + with pytest.raises(ValueError, match="Open Exchange Rates API key"): + hourly_exchange_rates(context=context, config=config) + + +class TestExchangeRateClickhouse: + @pytest.fixture + def mock_clickhouse_cluster(self): + mock_cluster = mock.MagicMock() + mock_cluster.map_all_hosts.return_value.result.return_value = {"host1": True} + return mock_cluster + + def test_store_exchange_rates_in_clickhouse(self, mock_clickhouse_cluster): + # Create context + context = build_op_context() + date_str = "2023-01-15" + + # Call the op + rows, values = store_exchange_rates_in_clickhouse( + context=context, date_str=date_str, exchange_rates=SAMPLE_EXCHANGE_RATES, cluster=mock_clickhouse_cluster + ) + + # Verify results + assert len(rows) == len(SAMPLE_EXCHANGE_RATES) + assert all(row["date"] == date_str for row in rows) + assert all(row["currency"] in SAMPLE_EXCHANGE_RATES for row in rows) + assert all(row["rate"] in SAMPLE_EXCHANGE_RATES.values() for row in rows) + + # Assert values generated by the op + assert [(date_str, currency, rate) for currency, rate in SAMPLE_EXCHANGE_RATES.items()] == values + + # Verify cluster calls + assert mock_clickhouse_cluster.map_all_hosts.call_count == 2 + + def test_daily_exchange_rates_in_clickhouse(self, mock_clickhouse_cluster): + # Create context + context = dagster.build_asset_context(partition_key="2023-01-15") + + # Call the asset + result = daily_exchange_rates_in_clickhouse( + context=context, exchange_rates=SAMPLE_EXCHANGE_RATES, cluster=mock_clickhouse_cluster + ) + + # Verify result is a MaterializeResult with correct metadata + assert isinstance(result, dagster.MaterializeResult) + assert result.metadata["date"].value == "2023-01-15" + assert result.metadata["base_currency"].value == "USD" + assert result.metadata["currencies_count"].value == len(SAMPLE_EXCHANGE_RATES) + assert result.metadata["min_rate"].value == min(SAMPLE_EXCHANGE_RATES.values()) + assert result.metadata["max_rate"].value == max(SAMPLE_EXCHANGE_RATES.values()) + + def test_hourly_exchange_rates_in_clickhouse(self, mock_clickhouse_cluster): + # Create context + context = dagster.build_asset_context(partition_key="2023-01-15-10:00") + + # Call the asset + result = hourly_exchange_rates_in_clickhouse( + context=context, exchange_rates=SAMPLE_EXCHANGE_RATES, cluster=mock_clickhouse_cluster + ) + + # Verify result is a MaterializeResult with correct metadata + assert isinstance(result, dagster.MaterializeResult) + assert result.metadata["date"].value == "2023-01-15" + assert result.metadata["base_currency"].value == "USD" + assert result.metadata["currencies_count"].value == len(SAMPLE_EXCHANGE_RATES) + assert result.metadata["min_rate"].value == min(SAMPLE_EXCHANGE_RATES.values()) + assert result.metadata["max_rate"].value == max(SAMPLE_EXCHANGE_RATES.values()) + + +class TestExchangeRateSchedules: + @freeze_time("2023-01-15 01:30:00") + def test_daily_exchange_rates_schedule(self): + # Mock the scheduled execution context + context = dagster.build_schedule_context(scheduled_execution_time=datetime.datetime(2023, 1, 15, 1, 30)) + + # Call the schedule + result = daily_exchange_rates_schedule(context=context) + + # Verify result is a RunRequest with correct partition key + assert isinstance(result, dagster.RunRequest) + + # Scheduled on the 15th, should use the previous day: 2023-01-14 + assert result.partition_key == "2023-01-14" + assert result.run_key == "2023-01-14" + + @freeze_time("2023-01-15 10:00:00") + def test_hourly_exchange_rates_schedule(self): + # Mock the scheduled execution context + context = dagster.build_schedule_context(scheduled_execution_time=datetime.datetime(2023, 1, 15, 10, 0)) + + # Call the schedule + result = hourly_exchange_rates_schedule(context=context) + + # Verify result is a RunRequest with correct partition key + assert isinstance(result, dagster.RunRequest) + + # Should be the current day and hour: 2023-01-15-10:00 + assert result.partition_key == "2023-01-15-10:00" + assert result.run_key == "2023-01-15-10:00" diff --git a/posthog/models/exchange_rate/sql.py b/posthog/models/exchange_rate/sql.py index 13a2e158c2..0945208395 100644 --- a/posthog/models/exchange_rate/sql.py +++ b/posthog/models/exchange_rate/sql.py @@ -131,10 +131,10 @@ def EXCHANGE_RATE_DATA_BACKFILL_SQL(exchange_rates=None): if exchange_rates is None: exchange_rates = HISTORICAL_EXCHANGE_RATE_TUPLES() - values = ",\n".join(f"('{currency}', {rate}, toDate('{date}'))" for date, currency, rate in exchange_rates) + values = ",\n".join(f"(toDate('{date}'), '{currency}', {rate})" for date, currency, rate in exchange_rates) return f""" -INSERT INTO exchange_rate (currency, rate, date) VALUES +INSERT INTO {EXCHANGE_RATE_TABLE_NAME} (date, currency, rate) VALUES {values};"""