chore: skip backup runs if one is already executing (#40943)

This commit is contained in:
Daniel Escribano
2025-11-05 16:40:38 +01:00
committed by GitHub
parent e8c2da408f
commit 9a6eda96eb
2 changed files with 56 additions and 10 deletions

View File

@@ -15,7 +15,7 @@ from dagster_aws.s3 import S3Resource
from posthog.clickhouse.client.connection import NodeRole, Workload
from posthog.clickhouse.cluster import ClickhouseCluster
from dags.common import JobOwners
from dags.common import JobOwners, check_for_concurrent_runs
NO_SHARD_PATH = "noshard"
@@ -474,7 +474,18 @@ def prepare_run_config(config: BackupConfig) -> dagster.RunConfig:
)
def run_backup_request(table: str, incremental: bool) -> dagster.RunRequest:
def run_backup_request(
table: str, incremental: bool, context: dagster.ScheduleEvaluationContext
) -> dagster.RunRequest | dagster.SkipReason:
skip_reason = check_for_concurrent_runs(
context,
tags={
"table": table,
},
)
if skip_reason:
return skip_reason
timestamp = datetime.now(UTC)
config = BackupConfig(
database=settings.CLICKHOUSE_DATABASE,
@@ -482,6 +493,7 @@ def run_backup_request(table: str, incremental: bool) -> dagster.RunRequest:
table=table,
incremental=incremental,
)
return dagster.RunRequest(
run_key=f"{timestamp.strftime('%Y%m%d')}-{table}",
run_config=prepare_run_config(config),
@@ -498,10 +510,10 @@ def run_backup_request(table: str, incremental: bool) -> dagster.RunRequest:
cron_schedule=settings.CLICKHOUSE_FULL_BACKUP_SCHEDULE,
default_status=dagster.DefaultScheduleStatus.RUNNING,
)
def full_sharded_backup_schedule():
def full_sharded_backup_schedule(context: dagster.ScheduleEvaluationContext):
"""Launch a full backup for sharded tables"""
for table in SHARDED_TABLES:
yield run_backup_request(table, incremental=False)
yield run_backup_request(table, incremental=False, context=context)
@dagster.schedule(
@@ -509,10 +521,10 @@ def full_sharded_backup_schedule():
cron_schedule=settings.CLICKHOUSE_FULL_BACKUP_SCHEDULE,
default_status=dagster.DefaultScheduleStatus.RUNNING,
)
def full_non_sharded_backup_schedule():
def full_non_sharded_backup_schedule(context: dagster.ScheduleEvaluationContext):
"""Launch a full backup for non-sharded tables"""
for table in NON_SHARDED_TABLES:
yield run_backup_request(table, incremental=False)
yield run_backup_request(table, incremental=False, context=context)
@dagster.schedule(
@@ -520,10 +532,10 @@ def full_non_sharded_backup_schedule():
cron_schedule=settings.CLICKHOUSE_INCREMENTAL_BACKUP_SCHEDULE,
default_status=dagster.DefaultScheduleStatus.RUNNING,
)
def incremental_sharded_backup_schedule():
def incremental_sharded_backup_schedule(context: dagster.ScheduleEvaluationContext):
"""Launch an incremental backup for sharded tables"""
for table in SHARDED_TABLES:
yield run_backup_request(table, incremental=True)
yield run_backup_request(table, incremental=True, context=context)
@dagster.schedule(
@@ -531,7 +543,7 @@ def incremental_sharded_backup_schedule():
cron_schedule=settings.CLICKHOUSE_INCREMENTAL_BACKUP_SCHEDULE,
default_status=dagster.DefaultScheduleStatus.RUNNING,
)
def incremental_non_sharded_backup_schedule():
def incremental_non_sharded_backup_schedule(context: dagster.ScheduleEvaluationContext):
"""Launch an incremental backup for non-sharded tables"""
for table in NON_SHARDED_TABLES:
yield run_backup_request(table, incremental=True)
yield run_backup_request(table, incremental=True, context=context)

View File

@@ -1,5 +1,6 @@
from contextlib import suppress
from enum import Enum
from typing import Optional
import dagster
from clickhouse_driver.errors import Error, ErrorCodes
@@ -137,3 +138,36 @@ def settings_with_log_comment(
qt = query_tagging.get_query_tags()
qt.with_dagster(dagster_tags(context))
return {"log_comment": qt.to_json()}
def check_for_concurrent_runs(
context: dagster.ScheduleEvaluationContext, tags: dict[str, str]
) -> Optional[dagster.SkipReason]:
# Get the schedule name from the context
schedule_name = context._schedule_name
if schedule_name is None:
context.log.info("Skipping concurrent runs check because schedule name is not available")
return None
# Get the schedule definition from the repository to find the associated job
schedule_def = context.repository_def.get_schedule_def(schedule_name)
job_name = schedule_def.job_name
run_records = context.instance.get_run_records(
dagster.RunsFilter(
job_name=job_name,
tags=tags,
statuses=[
dagster.DagsterRunStatus.QUEUED,
dagster.DagsterRunStatus.NOT_STARTED,
dagster.DagsterRunStatus.STARTING,
dagster.DagsterRunStatus.STARTED,
],
)
)
if len(run_records) > 0:
context.log.info(f"Skipping {job_name} due to {len(run_records)} active run(s)")
return dagster.SkipReason(f"Skipping {job_name} run because another run of the same job is already active")
return None