mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
138 lines
5.4 KiB
Python
138 lines
5.4 KiB
Python
import re
|
|
|
|
from django.conf import settings
|
|
|
|
import dagster
|
|
import dagster_slack
|
|
from dagster import DagsterRunStatus, RunsFilter
|
|
|
|
from dags.common import JobOwners
|
|
|
|
notification_channel_per_team = {
|
|
JobOwners.TEAM_ANALYTICS_PLATFORM.value: "#alerts-analytics-platform",
|
|
JobOwners.TEAM_CLICKHOUSE.value: "#alerts-clickhouse",
|
|
JobOwners.TEAM_DATA_WAREHOUSE.value: "#alerts-data-warehouse",
|
|
JobOwners.TEAM_ERROR_TRACKING.value: "#alerts-error-tracking",
|
|
JobOwners.TEAM_EXPERIMENTS.value: "#alerts-experiments-dagster",
|
|
JobOwners.TEAM_GROWTH.value: "#alerts-growth",
|
|
JobOwners.TEAM_INGESTION.value: "#alerts-ingestion",
|
|
JobOwners.TEAM_MAX_AI.value: "#alerts-max-ai",
|
|
JobOwners.TEAM_REVENUE_ANALYTICS.value: "#alerts-revenue-analytics",
|
|
JobOwners.TEAM_WEB_ANALYTICS.value: "#alerts-web-analytics",
|
|
}
|
|
|
|
CONSECUTIVE_FAILURE_THRESHOLDS = {
|
|
"web_pre_aggregate_current_day_hourly_job": 3,
|
|
"web_pre_aggregate_job": 3,
|
|
"web_pre_aggregate_daily_job": 3,
|
|
}
|
|
|
|
|
|
def get_job_owner_for_alert(failed_run: dagster.DagsterRun, error_message: str) -> str:
|
|
"""Determine the correct job owner for alert routing, with special handling for asset jobs."""
|
|
job_name = failed_run.job_name
|
|
job_owner = failed_run.tags.get("owner", "unknown")
|
|
|
|
# Special handling for manually launched asset jobs
|
|
if job_name == "__ASSET_JOB":
|
|
# Check if the error message contains web_ prefixed failed steps
|
|
# Pattern: "Steps failed: ['web_analytics_bounces_hourly', 'web_analytics_stats_table_hourly']"
|
|
web_step_pattern = r"Steps failed:.*?\[([^\]]+)\]"
|
|
match = re.search(web_step_pattern, error_message)
|
|
|
|
if match:
|
|
steps_text = match.group(1)
|
|
# Check if any step starts with 'web_'
|
|
if re.search(r"'web_[^']*'", steps_text):
|
|
return JobOwners.TEAM_WEB_ANALYTICS.value
|
|
|
|
return job_owner
|
|
|
|
|
|
def should_suppress_alert(context: dagster.RunFailureSensorContext, job_name: str, threshold: int) -> bool:
|
|
try:
|
|
run_records = context.instance.get_run_records(
|
|
RunsFilter(
|
|
job_name=job_name,
|
|
),
|
|
limit=threshold,
|
|
)
|
|
|
|
if len(run_records) < threshold:
|
|
context.log.info(
|
|
f"Job {job_name} has {len(run_records)} run(s), suppressing alert until {threshold} consecutive failures"
|
|
)
|
|
return True
|
|
|
|
all_failed = all(record.dagster_run.status == DagsterRunStatus.FAILURE for record in run_records)
|
|
|
|
if all_failed:
|
|
context.log.warning(f"Job {job_name} has {threshold} consecutive failures, sending alert")
|
|
return False
|
|
else:
|
|
context.log.info(f"Job {job_name} does not have {threshold} consecutive failures, suppressing alert")
|
|
return True
|
|
|
|
except Exception as e:
|
|
# If we fail to check run history, err on the side of sending the alert
|
|
context.log.exception(f"Failed to check consecutive failures for {job_name}: {str(e)}")
|
|
return False
|
|
|
|
|
|
@dagster.run_failure_sensor(default_status=dagster.DefaultSensorStatus.RUNNING, monitor_all_code_locations=True)
|
|
def notify_slack_on_failure(context: dagster.RunFailureSensorContext, slack: dagster_slack.SlackResource):
|
|
"""Send a notification to Slack when any job fails."""
|
|
# Get the failed run
|
|
failed_run = context.dagster_run
|
|
job_name = failed_run.job_name
|
|
run_id = failed_run.run_id
|
|
error = context.failure_event.message if context.failure_event.message else "Unknown error"
|
|
job_owner = get_job_owner_for_alert(failed_run, error)
|
|
tags = failed_run.tags
|
|
|
|
# Only send notifications in prod environment
|
|
if not settings.CLOUD_DEPLOYMENT:
|
|
context.log.info("Skipping Slack notification in non-prod environment")
|
|
return
|
|
|
|
if tags.get("disable_slack_notifications"):
|
|
context.log.debug("Skipping Slack notification for %s, notifications are disabled", job_name)
|
|
return
|
|
|
|
# Check if this job has a consecutive failure threshold configured
|
|
threshold = CONSECUTIVE_FAILURE_THRESHOLDS.get(job_name, 1)
|
|
if threshold > 1:
|
|
if should_suppress_alert(context, job_name, threshold):
|
|
return
|
|
|
|
# Construct Dagster URL based on environment
|
|
dagster_domain = settings.DAGSTER_DOMAIN if settings.DAGSTER_DOMAIN else "dagster.localhost"
|
|
run_url = f"https://{dagster_domain}/runs/{run_id}"
|
|
|
|
environment = (
|
|
f"{settings.CLOUD_DEPLOYMENT} :flag-{settings.CLOUD_DEPLOYMENT}:" if settings.CLOUD_DEPLOYMENT else "unknown"
|
|
)
|
|
blocks = [
|
|
{
|
|
"type": "section",
|
|
"text": {
|
|
"type": "mrkdwn",
|
|
"text": f"❌ *Dagster job `{job_name}` failed*\n\n*Run ID*: `{run_id}`\n*Run URL*: <{run_url}|View in Dagster>\n*Tags*: {tags}",
|
|
},
|
|
},
|
|
{"type": "section", "text": {"type": "mrkdwn", "text": f"*Error*:\n```{error}```"}},
|
|
{
|
|
"type": "context",
|
|
"elements": [{"type": "mrkdwn", "text": f"Environment: {environment}"}],
|
|
},
|
|
]
|
|
|
|
try:
|
|
slack.get_client().chat_postMessage(
|
|
channel=notification_channel_per_team.get(job_owner, settings.DAGSTER_DEFAULT_SLACK_ALERTS_CHANNEL),
|
|
blocks=blocks,
|
|
)
|
|
context.log.info(f"Sent Slack notification for failed job {job_name} to {job_owner} team")
|
|
except Exception as e:
|
|
context.log.exception(f"Failed to send Slack notification: {str(e)}")
|