diff --git a/rust/.sqlx/query-02e68ffe7fc5077208c044a563e8291075aafe18c3cbd6459e46578a54b86696.json b/rust/.sqlx/query-02e68ffe7fc5077208c044a563e8291075aafe18c3cbd6459e46578a54b86696.json deleted file mode 100644 index 276348f1fb..0000000000 --- a/rust/.sqlx/query-02e68ffe7fc5077208c044a563e8291075aafe18c3cbd6459e46578a54b86696.json +++ /dev/null @@ -1,76 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH next_job AS (\n SELECT *, lease_id as previous_lease_id\n FROM posthog_batchimport\n WHERE status = 'running' AND coalesce(leased_until, now()) <= now()\n ORDER BY created_at\n LIMIT 1\n FOR UPDATE SKIP LOCKED\n )\n UPDATE posthog_batchimport\n SET\n status = 'running',\n leased_until = now() + interval '30 minutes', -- We lease for a long time because job init can be quite slow\n lease_id = $1\n FROM next_job\n WHERE posthog_batchimport.id = next_job.id\n RETURNING\n posthog_batchimport.id,\n posthog_batchimport.team_id,\n posthog_batchimport.created_at,\n posthog_batchimport.updated_at,\n posthog_batchimport.status_message,\n posthog_batchimport.display_status_message,\n posthog_batchimport.state,\n posthog_batchimport.import_config,\n posthog_batchimport.secrets,\n next_job.previous_lease_id\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "team_id", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "created_at", - "type_info": "Timestamptz" - }, - { - "ordinal": 3, - "name": "updated_at", - "type_info": "Timestamptz" - }, - { - "ordinal": 4, - "name": "status_message", - "type_info": "Text" - }, - { - "ordinal": 5, - "name": "display_status_message", - "type_info": "Text" - }, - { - "ordinal": 6, - "name": "state", - "type_info": "Jsonb" - }, - { - "ordinal": 7, - "name": "import_config", - "type_info": "Jsonb" - }, - { - "ordinal": 8, - "name": "secrets", - "type_info": "Text" - }, - { - "ordinal": 9, - "name": "previous_lease_id", - "type_info": "Text" - } - ], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [ - false, - false, - false, - false, - true, - true, - true, - false, - false, - true - ] - }, - "hash": "02e68ffe7fc5077208c044a563e8291075aafe18c3cbd6459e46578a54b86696" -} diff --git a/rust/.sqlx/query-bdf5288f88ee3c0d914c26d8e7c01af95c5802d8b0f703eb74df2c5084c76f00.json b/rust/.sqlx/query-bdf5288f88ee3c0d914c26d8e7c01af95c5802d8b0f703eb74df2c5084c76f00.json new file mode 100644 index 0000000000..19588465e6 --- /dev/null +++ b/rust/.sqlx/query-bdf5288f88ee3c0d914c26d8e7c01af95c5802d8b0f703eb74df2c5084c76f00.json @@ -0,0 +1,88 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH next_job AS (\n SELECT *, lease_id as previous_lease_id\n FROM posthog_batchimport\n WHERE status = 'running' AND (leased_until IS NULL OR leased_until <= now())\n ORDER BY created_at\n LIMIT 1\n FOR UPDATE SKIP LOCKED\n )\n UPDATE posthog_batchimport\n SET\n status = 'running',\n leased_until = now() + interval '30 minutes', -- We lease for a long time because job init can be quite slow\n lease_id = $1\n FROM next_job\n WHERE posthog_batchimport.id = next_job.id\n RETURNING\n posthog_batchimport.id,\n posthog_batchimport.team_id,\n posthog_batchimport.created_at,\n posthog_batchimport.updated_at,\n posthog_batchimport.status_message,\n posthog_batchimport.display_status_message,\n posthog_batchimport.state,\n posthog_batchimport.import_config,\n posthog_batchimport.secrets,\n posthog_batchimport.backoff_attempt,\n posthog_batchimport.backoff_until,\n next_job.previous_lease_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "team_id", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "updated_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "status_message", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "display_status_message", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "state", + "type_info": "Jsonb" + }, + { + "ordinal": 7, + "name": "import_config", + "type_info": "Jsonb" + }, + { + "ordinal": 8, + "name": "secrets", + "type_info": "Text" + }, + { + "ordinal": 9, + "name": "backoff_attempt", + "type_info": "Int4" + }, + { + "ordinal": 10, + "name": "backoff_until", + "type_info": "Timestamptz" + }, + { + "ordinal": 11, + "name": "previous_lease_id", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + true, + true, + true, + false, + false, + false, + true, + true + ] + }, + "hash": "bdf5288f88ee3c0d914c26d8e7c01af95c5802d8b0f703eb74df2c5084c76f00" +} diff --git a/rust/.sqlx/query-c6bafa6b2143db5827d8eae0dc805265345689ce3104081e1c430998dc2e7196.json b/rust/.sqlx/query-c6bafa6b2143db5827d8eae0dc805265345689ce3104081e1c430998dc2e7196.json new file mode 100644 index 0000000000..209a388d44 --- /dev/null +++ b/rust/.sqlx/query-c6bafa6b2143db5827d8eae0dc805265345689ce3104081e1c430998dc2e7196.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM posthog_errortrackingsymbolset WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "c6bafa6b2143db5827d8eae0dc805265345689ce3104081e1c430998dc2e7196" +} diff --git a/rust/.sqlx/query-ca831f6430f5371ad11d3741b99d5451c96af96d9321c3cd56db4a8cb4993f9e.json b/rust/.sqlx/query-ca831f6430f5371ad11d3741b99d5451c96af96d9321c3cd56db4a8cb4993f9e.json deleted file mode 100644 index e57383fada..0000000000 --- a/rust/.sqlx/query-ca831f6430f5371ad11d3741b99d5451c96af96d9321c3cd56db4a8cb4993f9e.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT redirect_url\n FROM posthog_link\n WHERE short_code = $1 AND short_link_domain = $2\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "redirect_url", - "type_info": "Varchar" - } - ], - "parameters": { - "Left": [ - "Text", - "Text" - ] - }, - "nullable": [ - false - ] - }, - "hash": "ca831f6430f5371ad11d3741b99d5451c96af96d9321c3cd56db4a8cb4993f9e" -} diff --git a/rust/cymbal/src/posthog_utils.rs b/rust/cymbal/src/posthog_utils.rs index 06aee78379..ea883ee9b5 100644 --- a/rust/cymbal/src/posthog_utils.rs +++ b/rust/cymbal/src/posthog_utils.rs @@ -5,6 +5,7 @@ use uuid::Uuid; const ISSUE_CREATED: &str = "error_tracking_issue_created"; const ISSUE_REOPENED: &str = "error_tracking_issue_reopened"; const SYMBOL_SET_SAVED: &str = "error_tracking_symbol_set_saved"; +const SYMBOL_SET_DELETED: &str = "error_tracking_symbol_set_deleted"; pub fn capture_issue_created(team_id: i32, issue_id: Uuid) { let mut event = Event::new_anon(ISSUE_CREATED); @@ -29,6 +30,16 @@ pub fn capture_symbol_set_saved(team_id: i32, set_ref: &str, storage_ptr: &str, spawning_capture(event); } +pub fn capture_symbol_set_deleted(team_id: i32, set_ref: &str, storage_ptr: Option<&str>) { + let mut event = Event::new_anon(SYMBOL_SET_DELETED); + event.insert_prop("team_id", team_id).unwrap(); + event.insert_prop("set_ref", set_ref).unwrap(); + if let Some(ptr) = storage_ptr { + event.insert_prop("storage_ptr", ptr).unwrap(); + } + spawning_capture(event); +} + pub fn spawning_capture(event: Event) { tokio::spawn(async move { if let Err(e) = posthog_rs::capture(event).await { diff --git a/rust/cymbal/src/symbol_store/chunk_id.rs b/rust/cymbal/src/symbol_store/chunk_id.rs index c3a65e4041..8c5a9254b3 100644 --- a/rust/cymbal/src/symbol_store/chunk_id.rs +++ b/rust/cymbal/src/symbol_store/chunk_id.rs @@ -86,23 +86,27 @@ where // If we failed to parse this chunk's data in the past, we should not try again. // Note that in situations where we're running beneath a `Saving` layer, we'll // never reach this point, but we still handle the case for correctness sake - if let Some(failure_reason) = record.failure_reason { + if let Some(failure_reason) = &record.failure_reason { counter!(CHUNK_ID_FAILURE_FETCHED).increment(1); let error: FrameError = - serde_json::from_str(&failure_reason).map_err(UnhandledError::from)?; + serde_json::from_str(failure_reason).map_err(UnhandledError::from)?; return Err(error.into()); } - let Some(storage_ptr) = record.storage_ptr else { + let Some(storage_ptr) = &record.storage_ptr else { // It's never valid to have no failure reason and no storage pointer - if we hit this case, just panic error!("No storage pointer found for chunk id {}", id); panic!("No storage pointer found for chunk id {id}"); }; - self.client - .get(&self.bucket, &storage_ptr) - .await - .map_err(|e| e.into()) + let Ok(data) = self.client.get(&self.bucket, storage_ptr).await else { + let mut record = record; + record.delete(&self.pool).await?; + // This is kind-of false - the actual problem is missing data in s3, with a record that exists, rather than no record being found for + // a given chunk id - but it's close enough that it's fine for a temporary fix. + return Err(FrameError::MissingChunkIdData(record.set_ref).into()); + }; + Ok(data) } } diff --git a/rust/cymbal/src/symbol_store/saving.rs b/rust/cymbal/src/symbol_store/saving.rs index 5b55a55157..fffeafe238 100644 --- a/rust/cymbal/src/symbol_store/saving.rs +++ b/rust/cymbal/src/symbol_store/saving.rs @@ -15,7 +15,7 @@ use crate::{ SAVE_SYMBOL_SET, SYMBOL_SET_DB_FETCHES, SYMBOL_SET_DB_HITS, SYMBOL_SET_DB_MISSES, SYMBOL_SET_FETCH_RETRY, SYMBOL_SET_SAVED, }, - posthog_utils::capture_symbol_set_saved, + posthog_utils::{capture_symbol_set_deleted, capture_symbol_set_saved}, }; use super::{Fetcher, Parser, S3Client}; @@ -167,13 +167,19 @@ where if let Some(record) = SymbolSetRecord::load(&self.pool, team_id, &set_ref).await? { metrics::counter!(SYMBOL_SET_DB_HITS).increment(1); - if let Some(storage_ptr) = record.storage_ptr { + if let Some(storage_ptr) = &record.storage_ptr { info!("Found s3 saved symbol set data for {}", set_ref); - let data = self.s3_client.get(&self.bucket, &storage_ptr).await?; + let Ok(data) = self.s3_client.get(&self.bucket, storage_ptr).await else { + let mut record = record; + record.delete(&self.pool).await?; + // This is kind-of false - the actual problem is missing data in s3, with a record that exists, rather than no record being found for + // a given chunk id - but it's close enough that it's fine for a temporary fix. + return Err(FrameError::MissingChunkIdData(record.set_ref).into()); + }; metrics::counter!(SAVED_SYMBOL_SET_LOADED).increment(1); return Ok(Saveable { data, - storage_ptr: Some(storage_ptr), + storage_ptr: Some(storage_ptr.clone()), team_id, set_ref, }); @@ -343,6 +349,24 @@ impl SymbolSetRecord { Ok(()) } + + pub async fn delete<'c, E>(&mut self, e: E) -> Result<(), UnhandledError> + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + let _ignored = sqlx::query!( + r#" + DELETE FROM posthog_errortrackingsymbolset WHERE id = $1 + "#, + self.id + ) + .execute(e) + .await; // We don't really care if this fails, since it's a robustness thing anyway + + capture_symbol_set_deleted(self.team_id, &self.set_ref, self.storage_ptr.as_deref()); + + Ok(()) + } } #[cfg(test)]