fix(err): delete symbol sets we fail to fetch (#37441)

This commit is contained in:
Oliver Browne
2025-09-01 17:42:39 +03:00
committed by GitHub
parent 0ec0250902
commit 3382260051
7 changed files with 152 additions and 110 deletions

View File

@@ -1,76 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH next_job AS (\n SELECT *, lease_id as previous_lease_id\n FROM posthog_batchimport\n WHERE status = 'running' AND coalesce(leased_until, now()) <= now()\n ORDER BY created_at\n LIMIT 1\n FOR UPDATE SKIP LOCKED\n )\n UPDATE posthog_batchimport\n SET\n status = 'running',\n leased_until = now() + interval '30 minutes', -- We lease for a long time because job init can be quite slow\n lease_id = $1\n FROM next_job\n WHERE posthog_batchimport.id = next_job.id\n RETURNING\n posthog_batchimport.id,\n posthog_batchimport.team_id,\n posthog_batchimport.created_at,\n posthog_batchimport.updated_at,\n posthog_batchimport.status_message,\n posthog_batchimport.display_status_message,\n posthog_batchimport.state,\n posthog_batchimport.import_config,\n posthog_batchimport.secrets,\n next_job.previous_lease_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "team_id",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "created_at",
"type_info": "Timestamptz"
},
{
"ordinal": 3,
"name": "updated_at",
"type_info": "Timestamptz"
},
{
"ordinal": 4,
"name": "status_message",
"type_info": "Text"
},
{
"ordinal": 5,
"name": "display_status_message",
"type_info": "Text"
},
{
"ordinal": 6,
"name": "state",
"type_info": "Jsonb"
},
{
"ordinal": 7,
"name": "import_config",
"type_info": "Jsonb"
},
{
"ordinal": 8,
"name": "secrets",
"type_info": "Text"
},
{
"ordinal": 9,
"name": "previous_lease_id",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false,
false,
true,
true,
true,
false,
false,
true
]
},
"hash": "02e68ffe7fc5077208c044a563e8291075aafe18c3cbd6459e46578a54b86696"
}

View File

@@ -0,0 +1,88 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH next_job AS (\n SELECT *, lease_id as previous_lease_id\n FROM posthog_batchimport\n WHERE status = 'running' AND (leased_until IS NULL OR leased_until <= now())\n ORDER BY created_at\n LIMIT 1\n FOR UPDATE SKIP LOCKED\n )\n UPDATE posthog_batchimport\n SET\n status = 'running',\n leased_until = now() + interval '30 minutes', -- We lease for a long time because job init can be quite slow\n lease_id = $1\n FROM next_job\n WHERE posthog_batchimport.id = next_job.id\n RETURNING\n posthog_batchimport.id,\n posthog_batchimport.team_id,\n posthog_batchimport.created_at,\n posthog_batchimport.updated_at,\n posthog_batchimport.status_message,\n posthog_batchimport.display_status_message,\n posthog_batchimport.state,\n posthog_batchimport.import_config,\n posthog_batchimport.secrets,\n posthog_batchimport.backoff_attempt,\n posthog_batchimport.backoff_until,\n next_job.previous_lease_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "team_id",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "created_at",
"type_info": "Timestamptz"
},
{
"ordinal": 3,
"name": "updated_at",
"type_info": "Timestamptz"
},
{
"ordinal": 4,
"name": "status_message",
"type_info": "Text"
},
{
"ordinal": 5,
"name": "display_status_message",
"type_info": "Text"
},
{
"ordinal": 6,
"name": "state",
"type_info": "Jsonb"
},
{
"ordinal": 7,
"name": "import_config",
"type_info": "Jsonb"
},
{
"ordinal": 8,
"name": "secrets",
"type_info": "Text"
},
{
"ordinal": 9,
"name": "backoff_attempt",
"type_info": "Int4"
},
{
"ordinal": 10,
"name": "backoff_until",
"type_info": "Timestamptz"
},
{
"ordinal": 11,
"name": "previous_lease_id",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false,
false,
true,
true,
true,
false,
false,
false,
true,
true
]
},
"hash": "bdf5288f88ee3c0d914c26d8e7c01af95c5802d8b0f703eb74df2c5084c76f00"
}

View File

@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM posthog_errortrackingsymbolset WHERE id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
},
"hash": "c6bafa6b2143db5827d8eae0dc805265345689ce3104081e1c430998dc2e7196"
}

View File

@@ -1,23 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT redirect_url\n FROM posthog_link\n WHERE short_code = $1 AND short_link_domain = $2\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "redirect_url",
"type_info": "Varchar"
}
],
"parameters": {
"Left": [
"Text",
"Text"
]
},
"nullable": [
false
]
},
"hash": "ca831f6430f5371ad11d3741b99d5451c96af96d9321c3cd56db4a8cb4993f9e"
}

View File

@@ -5,6 +5,7 @@ use uuid::Uuid;
const ISSUE_CREATED: &str = "error_tracking_issue_created";
const ISSUE_REOPENED: &str = "error_tracking_issue_reopened";
const SYMBOL_SET_SAVED: &str = "error_tracking_symbol_set_saved";
const SYMBOL_SET_DELETED: &str = "error_tracking_symbol_set_deleted";
pub fn capture_issue_created(team_id: i32, issue_id: Uuid) {
let mut event = Event::new_anon(ISSUE_CREATED);
@@ -29,6 +30,16 @@ pub fn capture_symbol_set_saved(team_id: i32, set_ref: &str, storage_ptr: &str,
spawning_capture(event);
}
pub fn capture_symbol_set_deleted(team_id: i32, set_ref: &str, storage_ptr: Option<&str>) {
let mut event = Event::new_anon(SYMBOL_SET_DELETED);
event.insert_prop("team_id", team_id).unwrap();
event.insert_prop("set_ref", set_ref).unwrap();
if let Some(ptr) = storage_ptr {
event.insert_prop("storage_ptr", ptr).unwrap();
}
spawning_capture(event);
}
pub fn spawning_capture(event: Event) {
tokio::spawn(async move {
if let Err(e) = posthog_rs::capture(event).await {

View File

@@ -86,23 +86,27 @@ where
// If we failed to parse this chunk's data in the past, we should not try again.
// Note that in situations where we're running beneath a `Saving` layer, we'll
// never reach this point, but we still handle the case for correctness sake
if let Some(failure_reason) = record.failure_reason {
if let Some(failure_reason) = &record.failure_reason {
counter!(CHUNK_ID_FAILURE_FETCHED).increment(1);
let error: FrameError =
serde_json::from_str(&failure_reason).map_err(UnhandledError::from)?;
serde_json::from_str(failure_reason).map_err(UnhandledError::from)?;
return Err(error.into());
}
let Some(storage_ptr) = record.storage_ptr else {
let Some(storage_ptr) = &record.storage_ptr else {
// It's never valid to have no failure reason and no storage pointer - if we hit this case, just panic
error!("No storage pointer found for chunk id {}", id);
panic!("No storage pointer found for chunk id {id}");
};
self.client
.get(&self.bucket, &storage_ptr)
.await
.map_err(|e| e.into())
let Ok(data) = self.client.get(&self.bucket, storage_ptr).await else {
let mut record = record;
record.delete(&self.pool).await?;
// This is kind-of false - the actual problem is missing data in s3, with a record that exists, rather than no record being found for
// a given chunk id - but it's close enough that it's fine for a temporary fix.
return Err(FrameError::MissingChunkIdData(record.set_ref).into());
};
Ok(data)
}
}

View File

@@ -15,7 +15,7 @@ use crate::{
SAVE_SYMBOL_SET, SYMBOL_SET_DB_FETCHES, SYMBOL_SET_DB_HITS, SYMBOL_SET_DB_MISSES,
SYMBOL_SET_FETCH_RETRY, SYMBOL_SET_SAVED,
},
posthog_utils::capture_symbol_set_saved,
posthog_utils::{capture_symbol_set_deleted, capture_symbol_set_saved},
};
use super::{Fetcher, Parser, S3Client};
@@ -167,13 +167,19 @@ where
if let Some(record) = SymbolSetRecord::load(&self.pool, team_id, &set_ref).await? {
metrics::counter!(SYMBOL_SET_DB_HITS).increment(1);
if let Some(storage_ptr) = record.storage_ptr {
if let Some(storage_ptr) = &record.storage_ptr {
info!("Found s3 saved symbol set data for {}", set_ref);
let data = self.s3_client.get(&self.bucket, &storage_ptr).await?;
let Ok(data) = self.s3_client.get(&self.bucket, storage_ptr).await else {
let mut record = record;
record.delete(&self.pool).await?;
// This is kind-of false - the actual problem is missing data in s3, with a record that exists, rather than no record being found for
// a given chunk id - but it's close enough that it's fine for a temporary fix.
return Err(FrameError::MissingChunkIdData(record.set_ref).into());
};
metrics::counter!(SAVED_SYMBOL_SET_LOADED).increment(1);
return Ok(Saveable {
data,
storage_ptr: Some(storage_ptr),
storage_ptr: Some(storage_ptr.clone()),
team_id,
set_ref,
});
@@ -343,6 +349,24 @@ impl SymbolSetRecord {
Ok(())
}
pub async fn delete<'c, E>(&mut self, e: E) -> Result<(), UnhandledError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let _ignored = sqlx::query!(
r#"
DELETE FROM posthog_errortrackingsymbolset WHERE id = $1
"#,
self.id
)
.execute(e)
.await; // We don't really care if this fails, since it's a robustness thing anyway
capture_symbol_set_deleted(self.team_id, &self.set_ref, self.storage_ptr.as_deref());
Ok(())
}
}
#[cfg(test)]