diff --git a/dags/backups.py b/dags/backups.py index 5f172c3969..debdcb7834 100644 --- a/dags/backups.py +++ b/dags/backups.py @@ -15,7 +15,7 @@ from dagster_aws.s3 import S3Resource from posthog.clickhouse.client.connection import NodeRole, Workload from posthog.clickhouse.cluster import ClickhouseCluster -from dags.common import JobOwners +from dags.common import JobOwners, check_for_concurrent_runs NO_SHARD_PATH = "noshard" @@ -474,7 +474,18 @@ def prepare_run_config(config: BackupConfig) -> dagster.RunConfig: ) -def run_backup_request(table: str, incremental: bool) -> dagster.RunRequest: +def run_backup_request( + table: str, incremental: bool, context: dagster.ScheduleEvaluationContext +) -> dagster.RunRequest | dagster.SkipReason: + skip_reason = check_for_concurrent_runs( + context, + tags={ + "table": table, + }, + ) + if skip_reason: + return skip_reason + timestamp = datetime.now(UTC) config = BackupConfig( database=settings.CLICKHOUSE_DATABASE, @@ -482,6 +493,7 @@ def run_backup_request(table: str, incremental: bool) -> dagster.RunRequest: table=table, incremental=incremental, ) + return dagster.RunRequest( run_key=f"{timestamp.strftime('%Y%m%d')}-{table}", run_config=prepare_run_config(config), @@ -498,10 +510,10 @@ def run_backup_request(table: str, incremental: bool) -> dagster.RunRequest: cron_schedule=settings.CLICKHOUSE_FULL_BACKUP_SCHEDULE, default_status=dagster.DefaultScheduleStatus.RUNNING, ) -def full_sharded_backup_schedule(): +def full_sharded_backup_schedule(context: dagster.ScheduleEvaluationContext): """Launch a full backup for sharded tables""" for table in SHARDED_TABLES: - yield run_backup_request(table, incremental=False) + yield run_backup_request(table, incremental=False, context=context) @dagster.schedule( @@ -509,10 +521,10 @@ def full_sharded_backup_schedule(): cron_schedule=settings.CLICKHOUSE_FULL_BACKUP_SCHEDULE, default_status=dagster.DefaultScheduleStatus.RUNNING, ) -def full_non_sharded_backup_schedule(): +def full_non_sharded_backup_schedule(context: dagster.ScheduleEvaluationContext): """Launch a full backup for non-sharded tables""" for table in NON_SHARDED_TABLES: - yield run_backup_request(table, incremental=False) + yield run_backup_request(table, incremental=False, context=context) @dagster.schedule( @@ -520,10 +532,10 @@ def full_non_sharded_backup_schedule(): cron_schedule=settings.CLICKHOUSE_INCREMENTAL_BACKUP_SCHEDULE, default_status=dagster.DefaultScheduleStatus.RUNNING, ) -def incremental_sharded_backup_schedule(): +def incremental_sharded_backup_schedule(context: dagster.ScheduleEvaluationContext): """Launch an incremental backup for sharded tables""" for table in SHARDED_TABLES: - yield run_backup_request(table, incremental=True) + yield run_backup_request(table, incremental=True, context=context) @dagster.schedule( @@ -531,7 +543,7 @@ def incremental_sharded_backup_schedule(): cron_schedule=settings.CLICKHOUSE_INCREMENTAL_BACKUP_SCHEDULE, default_status=dagster.DefaultScheduleStatus.RUNNING, ) -def incremental_non_sharded_backup_schedule(): +def incremental_non_sharded_backup_schedule(context: dagster.ScheduleEvaluationContext): """Launch an incremental backup for non-sharded tables""" for table in NON_SHARDED_TABLES: - yield run_backup_request(table, incremental=True) + yield run_backup_request(table, incremental=True, context=context) diff --git a/dags/common/common.py b/dags/common/common.py index 425e786f54..c4bb6668e4 100644 --- a/dags/common/common.py +++ b/dags/common/common.py @@ -1,5 +1,6 @@ from contextlib import suppress from enum import Enum +from typing import Optional import dagster from clickhouse_driver.errors import Error, ErrorCodes @@ -137,3 +138,36 @@ def settings_with_log_comment( qt = query_tagging.get_query_tags() qt.with_dagster(dagster_tags(context)) return {"log_comment": qt.to_json()} + + +def check_for_concurrent_runs( + context: dagster.ScheduleEvaluationContext, tags: dict[str, str] +) -> Optional[dagster.SkipReason]: + # Get the schedule name from the context + schedule_name = context._schedule_name + if schedule_name is None: + context.log.info("Skipping concurrent runs check because schedule name is not available") + return None + + # Get the schedule definition from the repository to find the associated job + schedule_def = context.repository_def.get_schedule_def(schedule_name) + job_name = schedule_def.job_name + + run_records = context.instance.get_run_records( + dagster.RunsFilter( + job_name=job_name, + tags=tags, + statuses=[ + dagster.DagsterRunStatus.QUEUED, + dagster.DagsterRunStatus.NOT_STARTED, + dagster.DagsterRunStatus.STARTING, + dagster.DagsterRunStatus.STARTED, + ], + ) + ) + + if len(run_records) > 0: + context.log.info(f"Skipping {job_name} due to {len(run_records)} active run(s)") + return dagster.SkipReason(f"Skipping {job_name} run because another run of the same job is already active") + + return None