feat: Add Dagster jobs to compute exchange rates (#29495)

This commit is contained in:
Rafael Audibert
2025-03-06 11:22:29 -03:00
committed by GitHub
parent bd37883896
commit 487488d858
14 changed files with 811 additions and 125 deletions

0
.dagster_home/.gitkeep Normal file
View File

3
.gitignore vendored
View File

@@ -96,3 +96,6 @@ playwright-report/
test-results/
playwright/playwright-report/
playwright/test-results/
.dagster_home/*
!.dagster_home/.gitkeep

View File

@@ -1,17 +1,99 @@
# Dagster
# PostHog Dagster DAGs
## Running locally
This directory contains [Dagster](https://dagster.io/) data pipelines (DAGs) for PostHog. Dagster is a data orchestration framework that allows us to define, schedule, and monitor data workflows.
You'll need to set DAGSTER_HOME
## What is Dagster?
Dagster is an open-source data orchestration tool designed to help you define and execute data pipelines. Key concepts include:
- **Assets**: Data artifacts that your pipelines produce and consume (e.g., tables, files)
- **Ops**: Individual units of computation (functions)
- **Jobs**: Collections of ops that are executed together
- **Resources**: Shared infrastructure and connections (e.g. database connections)
- **Schedules**: Time-based triggers for jobs
- **Sensors**: Event-based triggers for jobs
## Project Structure
- `definitions.py`: Main Dagster definition file that defines assets, jobs, schedules, sensors, and resources
- `common.py`: Shared utilities and resources
- Individual DAG files (e.g., `exchange_rate.py`, `deletes.py`, `person_overrides.py`)
- `tests/`: Tests for the DAGs
## Local Development
### Environment Setup
Dagster uses the `DAGSTER_HOME` environment variable to determine where to store instance configuration, logs, and other local artifacts. If not set, Dagster will use a temporary folder that's erased after you bring `dagster dev` down
Easiest is to just start jobs from your cli
```bash
dagster job execute -m dags.export_query_logs_to_s3 --config dags/query_log_example.yaml
# Set DAGSTER_HOME to a directory of your choice
export DAGSTER_HOME=/path/to/your/dagster/home
```
You can also run the interface
For consistency with the PostHog development environment, you might want to set this to a subdirectory within your project:
```bash
dagster dev
export DAGSTER_HOME=$(pwd)/.dagster_home
```
By default this will run on http://127.0.0.1:3000/
You can add this to your shell profile if you want to always store your assets, or to your local `.env` file which will be automatically detected by `dagster dev`.
### Running the Development Server
To run the Dagster development server locally:
```bash
# Important: Set DEBUG=1 when running locally to use local resources
DEBUG=1 dagster dev
```
Setting `DEBUG=1` is critical to get it to run properly
The Dagster UI will be available at http://localhost:3000 by default, where you can:
- Browse assets, jobs, and schedules
- Manually trigger job runs
- View execution logs and status
- Debug pipeline issues
## Adding New DAGs
When adding a new DAG:
1. Create a new Python file for your DAG
2. Define your assets, ops, and jobs
3. Import and register them in `definitions.py`
4. Add appropriate tests in the `tests/` directory
## Running Tests
Tests are implemented using pytest. The following command will run all DAG tests:
```bash
# From the project root
pytest dags/
```
To run a specific test file:
```bash
pytest dags/tests/test_exchange_rate.py
```
To run a specific test:
```bash
pytest dags/tests/test_exchange_rate.py::test_name
```
Add `-v` for verbose output:
```bash
pytest -v dags/tests/test_exchange_rate.py
```
## Additional Resources
- [Dagster Documentation](https://docs.dagster.io/)
- [PostHog Documentation](https://posthog.com/docs)

View File

@@ -1,26 +1,24 @@
from dagster import (
Config,
MaterializeResult,
asset,
)
import dagster
from posthog.clickhouse.client import sync_execute # noqa
class ClickHouseConfig(Config):
class ClickHouseConfig(dagster.Config):
result_path: str = "/tmp/clickhouse_version.txt"
@asset
def get_clickhouse_version(config: ClickHouseConfig) -> MaterializeResult:
@dagster.asset
def get_clickhouse_version(config: ClickHouseConfig) -> dagster.MaterializeResult:
version = sync_execute("SELECT version()")[0][0]
with open(config.result_path, "w") as f:
f.write(version)
return MaterializeResult(metadata={"version": version})
return dagster.MaterializeResult(metadata={"version": version})
@asset(deps=[get_clickhouse_version])
@dagster.asset(deps=[get_clickhouse_version])
def print_clickhouse_version(config: ClickHouseConfig):
with open(config.result_path) as f:
print(f.read()) # noqa
return MaterializeResult(metadata={"version": config.result_path})
return dagster.MaterializeResult(metadata={"version": config.result_path})

View File

@@ -12,6 +12,7 @@ from posthog.clickhouse.cluster import (
class JobOwners(str, Enum):
TEAM_CLICKHOUSE = "team-clickhouse"
TEAM_WEB_ANALYTICS = "team-web-analytics"
class ClickhouseClusterResource(dagster.ConfigurableResource):

View File

@@ -1,29 +1,21 @@
from dagster import (
DagsterRunStatus,
Definitions,
EnvVar,
ResourceDefinition,
RunRequest,
ScheduleDefinition,
fs_io_manager,
load_assets_from_modules,
run_status_sensor,
)
import dagster
import dagster_slack
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_aws.s3.resources import s3_resource
from dagster_slack import SlackResource
from django.conf import settings
from dags.slack_alerts import notify_slack_on_failure
from . import ch_examples, deletes, materialized_columns, orm_examples, person_overrides, export_query_logs_to_s3
from .common import ClickhouseClusterResource
all_assets = load_assets_from_modules([ch_examples, orm_examples])
env = "local" if settings.DEBUG else "prod"
from dags.common import ClickhouseClusterResource
from dags import (
ch_examples,
deletes,
exchange_rate,
export_query_logs_to_s3,
materialized_columns,
orm_examples,
person_overrides,
slack_alerts,
)
# Define resources for different environments
resources_by_env = {
@@ -34,57 +26,50 @@ resources_by_env = {
),
"s3": s3_resource,
# Using EnvVar instead of the Django setting to ensure that the token is not leaked anywhere in the Dagster UI
"slack": SlackResource(token=EnvVar("SLACK_TOKEN")),
"slack": dagster_slack.SlackResource(token=dagster.EnvVar("SLACK_TOKEN")),
},
"local": {
"cluster": ClickhouseClusterResource.configure_at_launch(),
"io_manager": fs_io_manager,
"slack": ResourceDefinition.none_resource(description="Dummy Slack resource for local development"),
"io_manager": dagster.fs_io_manager,
"slack": dagster.ResourceDefinition.none_resource(description="Dummy Slack resource for local development"),
},
}
# Get resources for current environment, fallback to local if env not found
env = "local" if settings.DEBUG else "prod"
resources = resources_by_env.get(env, resources_by_env["local"])
# Schedule to run squash at 10 PM on Saturdays
squash_schedule = ScheduleDefinition(
job=person_overrides.squash_person_overrides,
cron_schedule="0 22 * * 6", # At 22:00 (10 PM) on Saturday
execution_timezone="UTC",
name="squash_person_overrides_schedule",
)
# Schedule to run query logs export at 1 AM daily
query_logs_export_schedule = ScheduleDefinition(
job=export_query_logs_to_s3.export_query_logs_to_s3,
cron_schedule="0 1 * * *", # At 01:00 (1 AM) every day
execution_timezone="UTC",
name="query_logs_export_schedule",
)
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
monitored_jobs=[person_overrides.squash_person_overrides],
request_job=deletes.deletes_job,
)
def run_deletes_after_squash(context):
return RunRequest(run_key=None)
defs = Definitions(
assets=all_assets,
defs = dagster.Definitions(
assets=[
ch_examples.get_clickhouse_version,
ch_examples.print_clickhouse_version,
exchange_rate.daily_exchange_rates,
exchange_rate.hourly_exchange_rates,
exchange_rate.daily_exchange_rates_in_clickhouse,
exchange_rate.hourly_exchange_rates_in_clickhouse,
orm_examples.pending_deletions,
orm_examples.process_pending_deletions,
],
jobs=[
deletes.deletes_job,
exchange_rate.daily_exchange_rates_job,
exchange_rate.hourly_exchange_rates_job,
export_query_logs_to_s3.export_query_logs_to_s3,
materialized_columns.materialize_column,
person_overrides.cleanup_orphaned_person_overrides_snapshot,
person_overrides.squash_person_overrides,
export_query_logs_to_s3.export_query_logs_to_s3,
],
schedules=[squash_schedule, query_logs_export_schedule],
sensors=[run_deletes_after_squash, notify_slack_on_failure],
schedules=[
exchange_rate.daily_exchange_rates_schedule,
exchange_rate.hourly_exchange_rates_schedule,
export_query_logs_to_s3.query_logs_export_schedule,
person_overrides.squash_schedule,
],
sensors=[
deletes.run_deletes_after_squash,
slack_alerts.notify_slack_on_failure,
],
resources=resources,
)

View File

@@ -3,20 +3,12 @@ import time
from clickhouse_driver.client import Client
from datetime import datetime
from dataclasses import dataclass
from dagster import (
op,
job,
OpExecutionContext,
Config,
MetadataValue,
ResourceParam,
)
import dagster
from django.conf import settings
from functools import partial
import uuid
from django.utils import timezone
from dags.common import JobOwners
from posthog.clickhouse.cluster import (
ClickhouseCluster,
Mutation,
@@ -28,8 +20,11 @@ from posthog.models.async_deletion import AsyncDeletion, DeletionType
from posthog.models.event.sql import EVENTS_DATA_TABLE
from posthog.models.person.sql import PERSON_DISTINCT_ID_OVERRIDES_TABLE
from dags.common import JobOwners
from dags.person_overrides import squash_person_overrides
class DeleteConfig(Config):
class DeleteConfig(dagster.Config):
team_id: int | None = pydantic.Field(
default=None, description="The team ID to delete events for. If not provided, all teams will be deleted :fire:"
)
@@ -264,9 +259,9 @@ class PendingDeletesDictionary:
)
@op
@dagster.op
def get_oldest_person_override_timestamp(
cluster: ResourceParam[ClickhouseCluster],
cluster: dagster.ResourceParam[ClickhouseCluster],
) -> datetime:
"""Get the oldest person override timestamp from the person_distinct_id_overrides table."""
@@ -277,10 +272,10 @@ def get_oldest_person_override_timestamp(
return result
@op
@dagster.op
def create_pending_person_deletions_table(
config: DeleteConfig,
cluster: ResourceParam[ClickhouseCluster],
cluster: dagster.ResourceParam[ClickhouseCluster],
oldest_person_override_timestamp: datetime,
) -> PendingPersonEventDeletesTable:
"""
@@ -298,10 +293,10 @@ def create_pending_person_deletions_table(
return table
@op
@dagster.op
def create_reporting_pending_person_deletions_table(
config: DeleteConfig,
cluster: ResourceParam[ClickhouseCluster],
cluster: dagster.ResourceParam[ClickhouseCluster],
) -> PendingPersonEventDeletesTable:
"""Create a merge tree table in ClickHouse to store pending deletes."""
table = PendingPersonEventDeletesTable(
@@ -314,11 +309,11 @@ def create_reporting_pending_person_deletions_table(
return table
@op
@dagster.op
def load_pending_person_deletions(
context: OpExecutionContext,
context: dagster.OpExecutionContext,
create_pending_person_deletions_table: PendingPersonEventDeletesTable,
cluster: ResourceParam[ClickhouseCluster],
cluster: dagster.ResourceParam[ClickhouseCluster],
cleanup_delete_assets: bool | None = None,
) -> PendingPersonEventDeletesTable:
"""Query postgres using django ORM to get pending person deletions and insert directly into ClickHouse."""
@@ -373,18 +368,18 @@ def load_pending_person_deletions(
context.add_output_metadata(
{
"total_rows": MetadataValue.int(total_rows),
"table_name": MetadataValue.text(create_pending_person_deletions_table.table_name),
"total_rows": dagster.MetadataValue.int(total_rows),
"table_name": dagster.MetadataValue.text(create_pending_person_deletions_table.table_name),
}
)
return create_pending_person_deletions_table
@op
@dagster.op
def create_deletes_dict(
load_pending_person_deletions: PendingPersonEventDeletesTable,
config: DeleteConfig,
cluster: ResourceParam[ClickhouseCluster],
cluster: dagster.ResourceParam[ClickhouseCluster],
) -> PendingDeletesDictionary:
"""Create a dictionary in ClickHouse to store pending event deletions."""
@@ -410,9 +405,9 @@ def create_deletes_dict(
return del_dict
@op
@dagster.op
def load_and_verify_deletes_dictionary(
cluster: ResourceParam[ClickhouseCluster],
cluster: dagster.ResourceParam[ClickhouseCluster],
dictionary: PendingDeletesDictionary,
) -> PendingDeletesDictionary:
"""Load the dictionary data on all hosts in the cluster, and ensure all hosts have identical data."""
@@ -421,10 +416,10 @@ def load_and_verify_deletes_dictionary(
return dictionary
@op
@dagster.op
def delete_person_events(
context: OpExecutionContext,
cluster: ResourceParam[ClickhouseCluster],
context: dagster.OpExecutionContext,
cluster: dagster.ResourceParam[ClickhouseCluster],
load_and_verify_deletes_dictionary: PendingDeletesDictionary,
) -> tuple[PendingDeletesDictionary, ShardMutations]:
"""Delete events from sharded_events table for persons pending deletion."""
@@ -442,12 +437,14 @@ def delete_person_events(
all_zero = all(count == 0 for count in count_result.values())
if all_zero:
context.add_output_metadata({"events_deleted": MetadataValue.int(0), "message": "No pending deletions found"})
context.add_output_metadata(
{"events_deleted": dagster.MetadataValue.int(0), "message": "No pending deletions found"}
)
return (load_and_verify_deletes_dictionary, {})
context.add_output_metadata(
{
"events_deleted": MetadataValue.int(sum(count_result.values())),
"events_deleted": dagster.MetadataValue.int(sum(count_result.values())),
}
)
@@ -463,10 +460,10 @@ def delete_person_events(
return (load_and_verify_deletes_dictionary, shard_mutations)
@op
@dagster.op
def wait_for_delete_mutations(
context: OpExecutionContext,
cluster: ResourceParam[ClickhouseCluster],
context: dagster.OpExecutionContext,
cluster: dagster.ResourceParam[ClickhouseCluster],
delete_person_events: tuple[PendingDeletesDictionary, ShardMutations],
) -> PendingDeletesDictionary:
pending_deletes_dict, shard_mutations = delete_person_events
@@ -476,9 +473,9 @@ def wait_for_delete_mutations(
return pending_deletes_dict
@op
@dagster.op
def cleanup_delete_assets(
cluster: ResourceParam[ClickhouseCluster],
cluster: dagster.ResourceParam[ClickhouseCluster],
config: DeleteConfig,
create_pending_person_deletions_table: PendingPersonEventDeletesTable,
create_deletes_dict: PendingDeletesDictionary,
@@ -513,7 +510,7 @@ def cleanup_delete_assets(
return True
@job(tags={"owner": JobOwners.TEAM_CLICKHOUSE.value})
@dagster.job(tags={"owner": JobOwners.TEAM_CLICKHOUSE.value})
def deletes_job():
"""Job that handles deletion of person events."""
oldest_override_timestamp = get_oldest_person_override_timestamp()
@@ -526,3 +523,12 @@ def deletes_job():
waited_mutation = wait_for_delete_mutations(delete_events)
cleaned = cleanup_delete_assets(person_table, create_deletes_dict_op, waited_mutation)
load_pending_person_deletions(report_person_table, cleaned)
@dagster.run_status_sensor(
run_status=dagster.DagsterRunStatus.SUCCESS,
monitored_jobs=[squash_person_overrides],
request_job=deletes_job,
)
def run_deletes_after_squash(context):
return dagster.RunRequest(run_key=None)

322
dags/exchange_rate.py Normal file
View File

@@ -0,0 +1,322 @@
import os
import requests
import datetime
from typing import Any
import dagster
from clickhouse_driver import Client
from posthog.clickhouse.cluster import ClickhouseCluster
from posthog.models.exchange_rate.sql import (
EXCHANGE_RATE_DICTIONARY_NAME,
EXCHANGE_RATE_DATA_BACKFILL_SQL,
)
from posthog.models.exchange_rate.currencies import SUPPORTED_CURRENCY_CODES
from dags.common import JobOwners
OPEN_EXCHANGE_RATES_API_BASE_URL = "https://openexchangerates.org/api"
class ExchangeRateConfig(dagster.Config):
"""Configuration for the exchange rate API."""
# NOTE: For local development, you can add this key to a `.env` file in the root of the project
app_id: str = os.environ.get("OPEN_EXCHANGE_RATES_APP_ID", "")
api_base_url: str = OPEN_EXCHANGE_RATES_API_BASE_URL
# We'll have one partition for each day, starting from 2025-01-01 for the daily job
# And one partition for hourly updates for the hourly job
DAILY_PARTITION_DEFINITION = dagster.DailyPartitionsDefinition(start_date="2025-01-01")
HOURLY_PARTITION_DEFINITION = dagster.HourlyPartitionsDefinition(start_date="2025-01-01-00:00Z")
def get_date_partition_from_hourly_partition(hourly_partition: str) -> str:
"""
Convert a hourly partition key to a daily partition key.
"""
return "-".join(hourly_partition.split("-", 3)[0:3])
@dagster.op(
retry_policy=dagster.RetryPolicy(
max_retries=5,
delay=0.2, # 200ms
backoff=dagster.Backoff.EXPONENTIAL,
jitter=dagster.Jitter.PLUS_MINUS,
)
)
def fetch_exchange_rates(
context: dagster.OpExecutionContext, date_str: str, app_id: str, api_base_url: str
) -> dict[str, Any]:
"""
Fetches exchange rates from the Open Exchange Rates API for a specific date.
"""
# Construct the API URL
url = f"{api_base_url}/historical/{date_str}.json"
# Prepare query parameters
params = {"app_id": app_id}
# Make the API request
context.log.info(f"Fetching exchange rates for {date_str} with params {params}")
response = requests.get(url, params=params)
if response.status_code != 200:
error_msg = f"Failed to fetch exchange rates: {response.status_code} - {response.text}"
context.log.error(error_msg)
raise Exception(error_msg)
# Parse the response
data = response.json()
# Log some information about the fetched data
context.log.info(f"Successfully fetched exchange rates for {date_str}")
context.log.info(f"Base currency: {data.get('base')}")
context.log.info(f"Number of rates: {len(data.get('rates', {}))}")
if not data.get("rates"):
raise Exception(f"No rates found for {date_str}")
return data.get("rates")
@dagster.asset(partitions_def=DAILY_PARTITION_DEFINITION)
def daily_exchange_rates(
context: dagster.AssetExecutionContext, config: ExchangeRateConfig
) -> dagster.Output[dict[str, Any]]:
"""
Fetches exchange rates from the Open Exchange Rates API for a specific date.
The date is determined by the partition key, which is in the format %Y-%m-%d.
"""
date_str = context.partition_key
app_id = config.app_id
api_base_url = config.api_base_url
if not app_id:
raise ValueError("Open Exchange Rates API key (app_id) is required")
rates = fetch_exchange_rates(
context=dagster.build_op_context(), date_str=date_str, app_id=app_id, api_base_url=api_base_url
)
return dagster.Output(
value=rates,
metadata={
"date_str": date_str,
"rates_count": len(rates),
"rates": rates,
},
)
@dagster.asset(partitions_def=HOURLY_PARTITION_DEFINITION)
def hourly_exchange_rates(
context: dagster.AssetExecutionContext, config: ExchangeRateConfig
) -> dagster.Output[dict[str, Any]]:
"""
Fetches exchange rates from the Open Exchange Rates API for a specific hour.
The date is determined by the partition key, which is in the format %Y-%m-%d-%H:%M.
"""
# Convert hourly partition key to daily format because we always fetch information for the day
date_str = get_date_partition_from_hourly_partition(context.partition_key)
app_id = config.app_id
api_base_url = config.api_base_url
if not app_id:
raise ValueError("Open Exchange Rates API key (app_id) is required")
rates = fetch_exchange_rates(
context=dagster.build_op_context(), date_str=date_str, app_id=app_id, api_base_url=api_base_url
)
return dagster.Output(
value=rates,
metadata={
"date_str": date_str,
"rates_count": len(rates),
"rates": rates,
},
)
@dagster.op
def store_exchange_rates_in_clickhouse(
context: dagster.OpExecutionContext,
date_str: str,
exchange_rates: dict[str, Any],
cluster: dagster.ResourceParam[ClickhouseCluster],
) -> tuple[list[dict[str, Any]], list[tuple[str, str, Any]]]:
"""
Stores exchange rates data in ClickHouse.
"""
# Transform data into rows for ClickHouse
rows = [
{"date": date_str, "currency": currency, "rate": rate}
for currency, rate in exchange_rates.items()
if currency in SUPPORTED_CURRENCY_CODES
]
# Log information about the data being stored
context.log.info(f"Storing {len(rows)} exchange rates for {date_str} in ClickHouse")
# Prepare values for batch insert
# Use toDate() to cast the string date to a ClickHouse Date type
values = [(row["date"], row["currency"], row["rate"]) for row in rows]
# Execute the insert if there are values to insert
if values:
# Batch insert all values
def insert(client: Client) -> bool:
try:
client.execute(EXCHANGE_RATE_DATA_BACKFILL_SQL(exchange_rates=values))
context.log.info("Successfully inserted exchange rates")
return True
except Exception as e:
context.log.warning(f"Failed to insert exchange rates: {e}")
return False
# Simply ask the dictionary to be reloaded with the new data
def reload_dict(client: Client) -> bool:
try:
client.execute(f"SYSTEM RELOAD DICTIONARY {EXCHANGE_RATE_DICTIONARY_NAME}")
context.log.info("Successfully reloaded exchange_rate_dict dictionary")
return True
except Exception as e:
context.log.warning(f"Failed to reload exchange_rate_dict dictionary: {e}")
return False
insert_results = cluster.map_all_hosts(insert).result()
reload_results = cluster.map_all_hosts(reload_dict).result()
if not all(insert_results.values()):
raise Exception("Failed to insert some exchange rates")
if not all(reload_results.values()):
raise Exception("Failed to reload some exchange_rate_dict dictionary")
else:
context.log.warning(f"No exchange rates to store for {date_str}")
return (rows, values)
@dagster.asset(
partitions_def=DAILY_PARTITION_DEFINITION,
ins={"exchange_rates": dagster.AssetIn(key=daily_exchange_rates.key)},
)
def daily_exchange_rates_in_clickhouse(
context: dagster.AssetExecutionContext,
exchange_rates: dict[str, Any],
cluster: dagster.ResourceParam[ClickhouseCluster],
) -> dagster.MaterializeResult:
"""
Stores exchange rates data in ClickHouse.
The base currency is always USD as per the table design.
"""
# Extract data from the input
date_str = context.partition_key
# Store the rates in ClickHouse
rows, values = store_exchange_rates_in_clickhouse(
context=dagster.build_op_context(), date_str=date_str, exchange_rates=exchange_rates, cluster=cluster
)
# Calculate some statistics for metadata
currencies_count = len(rows)
min_rate = min(row["rate"] for row in rows) if rows else 0.0
max_rate = max(row["rate"] for row in rows) if rows else 0.0
avg_rate = sum(row["rate"] for row in rows) / len(rows) if rows else 0.0
# Return the rows with metadata
return dagster.MaterializeResult(
metadata={
"date": dagster.MetadataValue.text(date_str),
"base_currency": dagster.MetadataValue.text("USD"), # Always USD as per table design
"currencies_count": dagster.MetadataValue.int(currencies_count),
"min_rate": dagster.MetadataValue.float(min_rate),
"max_rate": dagster.MetadataValue.float(max_rate),
"avg_rate": dagster.MetadataValue.float(avg_rate),
"values": dagster.MetadataValue.json(values),
}
)
@dagster.asset(
partitions_def=HOURLY_PARTITION_DEFINITION,
ins={"exchange_rates": dagster.AssetIn(key=hourly_exchange_rates.key)},
)
def hourly_exchange_rates_in_clickhouse(
context: dagster.AssetExecutionContext,
exchange_rates: dict[str, Any],
cluster: dagster.ResourceParam[ClickhouseCluster],
) -> dagster.MaterializeResult:
"""
Stores exchange rates data in ClickHouse.
The base currency is always USD as per the table design.
"""
# Extract data from the input
date_str = get_date_partition_from_hourly_partition(context.partition_key)
# Store the rates in ClickHouse
rows, values = store_exchange_rates_in_clickhouse(
context=dagster.build_op_context(), date_str=date_str, exchange_rates=exchange_rates, cluster=cluster
)
# Calculate some statistics for metadata
currencies_count = len(rows)
min_rate = min(row["rate"] for row in rows) if rows else 0.0
max_rate = max(row["rate"] for row in rows) if rows else 0.0
avg_rate = sum(row["rate"] for row in rows) / len(rows) if rows else 0.0
# Return the rows with metadata
return dagster.MaterializeResult(
metadata={
"date": dagster.MetadataValue.text(date_str),
"base_currency": dagster.MetadataValue.text("USD"), # Always USD as per table design
"currencies_count": dagster.MetadataValue.int(currencies_count),
"min_rate": dagster.MetadataValue.float(min_rate),
"max_rate": dagster.MetadataValue.float(max_rate),
"avg_rate": dagster.MetadataValue.float(avg_rate),
"values": dagster.MetadataValue.json(values),
}
)
# Create jobs from the assets
daily_exchange_rates_job = dagster.define_asset_job(
name="daily_exchange_rates_job",
selection=[daily_exchange_rates.key, daily_exchange_rates_in_clickhouse.key],
tags={"owner": JobOwners.TEAM_WEB_ANALYTICS.value},
)
hourly_exchange_rates_job = dagster.define_asset_job(
name="hourly_exchange_rates_job",
selection=[hourly_exchange_rates.key, hourly_exchange_rates_in_clickhouse.key],
tags={"owner": JobOwners.TEAM_WEB_ANALYTICS.value},
)
# Create daily/hourly schedules with different cron schedules
@dagster.schedule(
job=daily_exchange_rates_job,
cron_schedule="28 0 * * *", # Run at 00:28 AM every day, random minute to avoid peak load
)
def daily_exchange_rates_schedule(context):
"""Process previous day's exchange rates data."""
# Calculate the previous day's date
previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1)
timestamp = previous_day.strftime("%Y-%m-%d")
return dagster.RunRequest(run_key=timestamp, partition_key=timestamp)
@dagster.schedule(
job=hourly_exchange_rates_job,
cron_schedule="45 * * * *", # Run every hour at XX:45, random minute to avoid peak load at the top of the hour
)
def hourly_exchange_rates_schedule(context):
"""Process current day's exchange rates data for this hour."""
current_day = context.scheduled_execution_time
timestamp = current_day.strftime("%Y-%m-%d-%H:%M")
return dagster.RunRequest(run_key=timestamp, partition_key=timestamp)

View File

@@ -218,3 +218,12 @@ def export_query_logs(
@dagster.job(resource_defs={"cluster": ClickhouseClusterResource()})
def export_query_logs_to_s3():
export_query_logs()
# Schedule to run query logs export at 1 AM daily
query_logs_export_schedule = dagster.ScheduleDefinition(
job=export_query_logs_to_s3,
cron_schedule="0 1 * * *", # At 01:00 (1 AM) every day
execution_timezone="UTC",
name="query_logs_export_schedule",
)

View File

@@ -1,10 +1,10 @@
from dagster import asset
import dagster
from django.db.models import Q
from posthog.models.async_deletion import AsyncDeletion, DeletionType
@asset
@dagster.asset
def pending_deletions() -> list[AsyncDeletion]:
"""
Asset that fetches pending async deletions from Django ORM.
@@ -16,7 +16,7 @@ def pending_deletions() -> list[AsyncDeletion]:
return list(pending_deletions)
@asset(deps=[pending_deletions])
@dagster.asset(deps=[pending_deletions])
def process_pending_deletions(pending_deletions: list[AsyncDeletion]) -> None:
"""
Asset that prints out pending deletions.

View File

@@ -396,3 +396,12 @@ def cleanup_orphaned_person_overrides_snapshot():
"""
dictionary = get_existing_dictionary_for_run_id()
cleanup_snapshot_resources(dictionary)
# Schedule to run squash at 10 PM on Saturdays
squash_schedule = dagster.ScheduleDefinition(
job=squash_person_overrides,
cron_schedule="0 22 * * 6", # At 22:00 (10 PM) on Saturday
execution_timezone="UTC",
name="squash_person_overrides_schedule",
)

View File

@@ -1,20 +1,18 @@
from dagster import (
DefaultSensorStatus,
RunFailureSensorContext,
run_failure_sensor,
)
from dagster_slack import SlackResource
import dagster
import dagster_slack
from django.conf import settings
from dags.common import JobOwners
notification_channel_per_team = {
JobOwners.TEAM_CLICKHOUSE.value: "#alerts-clickhouse",
JobOwners.TEAM_WEB_ANALYTICS.value: "#alerts-web-analytics",
}
@run_failure_sensor(default_status=DefaultSensorStatus.RUNNING)
def notify_slack_on_failure(context: RunFailureSensorContext, slack: SlackResource):
@dagster.run_failure_sensor(default_status=dagster.DefaultSensorStatus.RUNNING)
def notify_slack_on_failure(context: dagster.RunFailureSensorContext, slack: dagster_slack.SlackResource):
"""Send a notification to Slack when any job fails."""
# Get the failed run
failed_run = context.dagster_run

View File

@@ -0,0 +1,273 @@
import datetime
from unittest import mock
import pytest
import responses
from freezegun import freeze_time
import dagster
from dagster import build_op_context
from dags.exchange_rate import (
get_date_partition_from_hourly_partition,
fetch_exchange_rates,
daily_exchange_rates,
hourly_exchange_rates,
store_exchange_rates_in_clickhouse,
daily_exchange_rates_in_clickhouse,
hourly_exchange_rates_in_clickhouse,
daily_exchange_rates_schedule,
hourly_exchange_rates_schedule,
ExchangeRateConfig,
OPEN_EXCHANGE_RATES_API_BASE_URL,
)
# Sample exchange rate data for testing
SAMPLE_EXCHANGE_RATES = {
"EUR": 0.85,
"GBP": 0.75,
"JPY": 110.0,
"CAD": 1.25,
"AUD": 1.35,
}
class TestExchangeRateUtils:
def test_get_date_partition_from_hourly_partition(self):
# Test converting hourly partition to daily partition
assert get_date_partition_from_hourly_partition("2023-01-15-08:00") == "2023-01-15"
assert get_date_partition_from_hourly_partition("2023-01-16-23:59") == "2023-01-16"
assert get_date_partition_from_hourly_partition("2023-01-17-00:00Z") == "2023-01-17"
class TestExchangeRateAPI:
@responses.activate
def test_fetch_exchange_rates_success(self):
# Mock the API response
date_str = "2023-01-15"
app_id = "test_app_id"
api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json"
responses.add(
responses.GET,
api_url,
json={"base": "USD", "rates": SAMPLE_EXCHANGE_RATES},
status=200,
)
# Create a Dagster context
context = build_op_context()
# Call the function
result = fetch_exchange_rates(context, date_str, app_id, OPEN_EXCHANGE_RATES_API_BASE_URL)
# Verify the result
assert result == SAMPLE_EXCHANGE_RATES
assert len(responses.calls) == 1
assert f"app_id={app_id}" in responses.calls[0].request.url
@responses.activate
def test_fetch_exchange_rates_api_error(self):
# Mock API error response
date_str = "2023-01-15"
app_id = "test_app_id"
api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json"
responses.add(
responses.GET,
api_url,
json={"error": "Invalid API key"},
status=401,
)
# Create a Dagster context
context = build_op_context()
# Verify the function raises an exception
with pytest.raises(Exception, match="Failed to fetch exchange rates"):
fetch_exchange_rates(context, date_str, app_id, OPEN_EXCHANGE_RATES_API_BASE_URL)
@responses.activate
def test_fetch_exchange_rates_empty_rates(self):
# Mock API response with empty rates
date_str = "2023-01-15"
app_id = "test_app_id"
api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json"
responses.add(
responses.GET,
api_url,
json={"base": "USD", "rates": {}},
status=200,
)
# Create a Dagster context
context = build_op_context()
# Verify the function raises an exception
with pytest.raises(Exception, match="No rates found"):
fetch_exchange_rates(context, date_str, app_id, OPEN_EXCHANGE_RATES_API_BASE_URL)
class TestExchangeRateAssets:
@responses.activate
def test_daily_exchange_rates(self):
# Mock the API response
date_str = "2023-01-15"
app_id = "test_app_id"
api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json"
responses.add(
responses.GET,
api_url,
json={"base": "USD", "rates": SAMPLE_EXCHANGE_RATES},
status=200,
)
# Create config and context
config = ExchangeRateConfig(app_id=app_id)
context = dagster.build_asset_context(partition_key=date_str)
# Call the asset
result = daily_exchange_rates(context=context, config=config)
# Verify the result
assert result.value == SAMPLE_EXCHANGE_RATES
@responses.activate
def test_hourly_exchange_rates(self):
# Mock the API response
date_str = "2023-01-15"
hourly_partition = f"{date_str}-10:00"
app_id = "test_app_id"
api_url = f"{OPEN_EXCHANGE_RATES_API_BASE_URL}/historical/{date_str}.json"
responses.add(
responses.GET,
api_url,
json={"base": "USD", "rates": SAMPLE_EXCHANGE_RATES},
status=200,
)
# Create config and context
config = ExchangeRateConfig(app_id=app_id)
context = dagster.build_asset_context(partition_key=hourly_partition)
# Call the asset
result = hourly_exchange_rates(context=context, config=config)
# Verify the result
assert result.value == SAMPLE_EXCHANGE_RATES
def test_daily_exchange_rates_missing_app_id(self):
# Create context with empty app_id
config = ExchangeRateConfig(app_id="")
context = dagster.build_asset_context(partition_key="2023-01-15")
# Verify the asset raises an exception
with pytest.raises(ValueError, match="Open Exchange Rates API key"):
daily_exchange_rates(context=context, config=config)
def test_hourly_exchange_rates_missing_app_id(self):
# Create context with empty app_id
config = ExchangeRateConfig(app_id="")
context = dagster.build_asset_context(partition_key="2023-01-15-10:00")
# Verify the asset raises an exception
with pytest.raises(ValueError, match="Open Exchange Rates API key"):
hourly_exchange_rates(context=context, config=config)
class TestExchangeRateClickhouse:
@pytest.fixture
def mock_clickhouse_cluster(self):
mock_cluster = mock.MagicMock()
mock_cluster.map_all_hosts.return_value.result.return_value = {"host1": True}
return mock_cluster
def test_store_exchange_rates_in_clickhouse(self, mock_clickhouse_cluster):
# Create context
context = build_op_context()
date_str = "2023-01-15"
# Call the op
rows, values = store_exchange_rates_in_clickhouse(
context=context, date_str=date_str, exchange_rates=SAMPLE_EXCHANGE_RATES, cluster=mock_clickhouse_cluster
)
# Verify results
assert len(rows) == len(SAMPLE_EXCHANGE_RATES)
assert all(row["date"] == date_str for row in rows)
assert all(row["currency"] in SAMPLE_EXCHANGE_RATES for row in rows)
assert all(row["rate"] in SAMPLE_EXCHANGE_RATES.values() for row in rows)
# Assert values generated by the op
assert [(date_str, currency, rate) for currency, rate in SAMPLE_EXCHANGE_RATES.items()] == values
# Verify cluster calls
assert mock_clickhouse_cluster.map_all_hosts.call_count == 2
def test_daily_exchange_rates_in_clickhouse(self, mock_clickhouse_cluster):
# Create context
context = dagster.build_asset_context(partition_key="2023-01-15")
# Call the asset
result = daily_exchange_rates_in_clickhouse(
context=context, exchange_rates=SAMPLE_EXCHANGE_RATES, cluster=mock_clickhouse_cluster
)
# Verify result is a MaterializeResult with correct metadata
assert isinstance(result, dagster.MaterializeResult)
assert result.metadata["date"].value == "2023-01-15"
assert result.metadata["base_currency"].value == "USD"
assert result.metadata["currencies_count"].value == len(SAMPLE_EXCHANGE_RATES)
assert result.metadata["min_rate"].value == min(SAMPLE_EXCHANGE_RATES.values())
assert result.metadata["max_rate"].value == max(SAMPLE_EXCHANGE_RATES.values())
def test_hourly_exchange_rates_in_clickhouse(self, mock_clickhouse_cluster):
# Create context
context = dagster.build_asset_context(partition_key="2023-01-15-10:00")
# Call the asset
result = hourly_exchange_rates_in_clickhouse(
context=context, exchange_rates=SAMPLE_EXCHANGE_RATES, cluster=mock_clickhouse_cluster
)
# Verify result is a MaterializeResult with correct metadata
assert isinstance(result, dagster.MaterializeResult)
assert result.metadata["date"].value == "2023-01-15"
assert result.metadata["base_currency"].value == "USD"
assert result.metadata["currencies_count"].value == len(SAMPLE_EXCHANGE_RATES)
assert result.metadata["min_rate"].value == min(SAMPLE_EXCHANGE_RATES.values())
assert result.metadata["max_rate"].value == max(SAMPLE_EXCHANGE_RATES.values())
class TestExchangeRateSchedules:
@freeze_time("2023-01-15 01:30:00")
def test_daily_exchange_rates_schedule(self):
# Mock the scheduled execution context
context = dagster.build_schedule_context(scheduled_execution_time=datetime.datetime(2023, 1, 15, 1, 30))
# Call the schedule
result = daily_exchange_rates_schedule(context=context)
# Verify result is a RunRequest with correct partition key
assert isinstance(result, dagster.RunRequest)
# Scheduled on the 15th, should use the previous day: 2023-01-14
assert result.partition_key == "2023-01-14"
assert result.run_key == "2023-01-14"
@freeze_time("2023-01-15 10:00:00")
def test_hourly_exchange_rates_schedule(self):
# Mock the scheduled execution context
context = dagster.build_schedule_context(scheduled_execution_time=datetime.datetime(2023, 1, 15, 10, 0))
# Call the schedule
result = hourly_exchange_rates_schedule(context=context)
# Verify result is a RunRequest with correct partition key
assert isinstance(result, dagster.RunRequest)
# Should be the current day and hour: 2023-01-15-10:00
assert result.partition_key == "2023-01-15-10:00"
assert result.run_key == "2023-01-15-10:00"

View File

@@ -131,10 +131,10 @@ def EXCHANGE_RATE_DATA_BACKFILL_SQL(exchange_rates=None):
if exchange_rates is None:
exchange_rates = HISTORICAL_EXCHANGE_RATE_TUPLES()
values = ",\n".join(f"('{currency}', {rate}, toDate('{date}'))" for date, currency, rate in exchange_rates)
values = ",\n".join(f"(toDate('{date}'), '{currency}', {rate})" for date, currency, rate in exchange_rates)
return f"""
INSERT INTO exchange_rate (currency, rate, date) VALUES
INSERT INTO {EXCHANGE_RATE_TABLE_NAME} (date, currency, rate) VALUES
{values};"""