From 95f9d563556a24e010ce3e4ce658123fcb966eeb Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Thu, 13 Nov 2025 19:34:42 +0200 Subject: [PATCH] fix(flags): use shared person struct to read partitioned table (#41444) Co-authored-by: Dylan Martin --- rust/common/types/src/person.rs | 42 +++++-------- rust/cymbal/src/pipeline/person.rs | 17 ++--- .../src/flags/flag_matching_utils.rs | 63 ++++++------------- rust/feature-flags/src/utils/test_utils.rs | 22 ++----- 4 files changed, 48 insertions(+), 96 deletions(-) diff --git a/rust/common/types/src/person.rs b/rust/common/types/src/person.rs index 8171171f46..c1b5b0dc97 100644 --- a/rust/common/types/src/person.rs +++ b/rust/common/types/src/person.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use sqlx::Postgres; +use sqlx::PgConnection; use uuid::Uuid; pub type PersonId = i64; @@ -19,14 +19,11 @@ pub struct Person { } impl Person { - pub async fn from_distinct_id<'c, E>( - e: E, + pub async fn from_distinct_id( + e: &mut PgConnection, team_id: i32, distinct_id: &str, - ) -> Result, sqlx::Error> - where - E: sqlx::Executor<'c, Database = Postgres> + Clone, - { + ) -> Result, sqlx::Error> { if let Some(res) = sqlx::query_as!( Person, r#" @@ -43,7 +40,7 @@ impl Person { distinct_id, team_id ) - .fetch_optional(e.clone()) + .fetch_optional(&mut *e) .await? { return Ok(Some(res)); } @@ -51,14 +48,11 @@ impl Person { Self::from_distinct_id_legacy(e, team_id, distinct_id).await } - pub async fn from_distinct_id_no_props<'c, E>( - e: E, + pub async fn from_distinct_id_no_props( + e: &mut PgConnection, team_id: i32, distinct_id: &str, - ) -> Result, sqlx::Error> - where - E: sqlx::Executor<'c, Database = Postgres> + Clone, - { + ) -> Result, sqlx::Error> { if let Some(res) = sqlx::query_as!( Person, r#" @@ -75,7 +69,7 @@ impl Person { distinct_id, team_id ) - .fetch_optional(e.clone()) + .fetch_optional(&mut *e) .await? { return Ok(Some(res)); } @@ -83,14 +77,11 @@ impl Person { Self::from_distinct_id_no_props_legacy(e, team_id, distinct_id).await } - async fn from_distinct_id_legacy<'c, E>( - e: E, + async fn from_distinct_id_legacy( + e: &mut PgConnection, team_id: i32, distinct_id: &str, - ) -> Result, sqlx::Error> - where - E: sqlx::Executor<'c, Database = Postgres>, - { + ) -> Result, sqlx::Error> { sqlx::query_as!( Person, r#" @@ -111,14 +102,11 @@ impl Person { .await } - async fn from_distinct_id_no_props_legacy<'c, E>( - e: E, + async fn from_distinct_id_no_props_legacy( + e: &mut PgConnection, team_id: i32, distinct_id: &str, - ) -> Result, sqlx::Error> - where - E: sqlx::Executor<'c, Database = Postgres>, - { + ) -> Result, sqlx::Error> { sqlx::query_as!( Person, r#" diff --git a/rust/cymbal/src/pipeline/person.rs b/rust/cymbal/src/pipeline/person.rs index 9719bed6fc..42c330c504 100644 --- a/rust/cymbal/src/pipeline/person.rs +++ b/rust/cymbal/src/pipeline/person.rs @@ -38,7 +38,15 @@ pub async fn add_person_properties( let m_distinct_id = distinct_id.clone(); let team_id = event.team_id; let fut = async move { - let res = Person::from_distinct_id(&m_context.persons_pool, team_id, &m_distinct_id) + let mut conn = match m_context.persons_pool.acquire().await { + Ok(conn) => conn, + Err(e) => { + tracing::error!("Failed to acquire connection to persons pool: {:?}", e); + return Err(e); + } + }; + + let res = Person::from_distinct_id(&mut conn, team_id, &m_distinct_id) .await .map_err(|e| { tracing::error!("Failed to fetch person {}, {:?}", m_distinct_id, e); @@ -50,12 +58,7 @@ pub async fn add_person_properties( Err(sqlx::Error::ColumnDecode { .. }) => { // If we failed to decode the person properties, we just put an empty property set on // the event, so e.g. counting exceptions by person still works - Person::from_distinct_id_no_props( - &m_context.persons_pool, - team_id, - &m_distinct_id, - ) - .await + Person::from_distinct_id_no_props(&mut conn, team_id, &m_distinct_id).await } Err(e) => Err(e), } diff --git a/rust/feature-flags/src/flags/flag_matching_utils.rs b/rust/feature-flags/src/flags/flag_matching_utils.rs index cdf096506e..c23ae2135e 100644 --- a/rust/feature-flags/src/flags/flag_matching_utils.rs +++ b/rust/feature-flags/src/flags/flag_matching_utils.rs @@ -11,7 +11,7 @@ use crate::database::{ get_connection_with_metrics, get_writer_connection_with_metrics, PostgresRouter, }; use common_database::PostgresReader; -use common_types::{PersonId, ProjectId, TeamId}; +use common_types::{Person, PersonId, ProjectId, TeamId}; use serde_json::Value; use sha1::{Digest, Sha1}; use sqlx::{Acquire, Row}; @@ -150,25 +150,11 @@ pub async fn fetch_and_locally_cache_all_relevant_properties( // 2. the distinct_id is associated with an anonymous or cookieless user. In that case, it's fine to not return a person ID and to never return person properties. This is handled by just // returning an empty HashMap for person properties whenever I actually need them, and then obviously any condition that depends on person properties will return false. // That's fine though, we shouldn't error out just because we can't find a person ID. - let person_query = r#" - SELECT DISTINCT ON (ppd.distinct_id) - p.id as person_id, - p.properties as person_properties - FROM posthog_persondistinctid ppd - INNER JOIN posthog_person p - ON p.id = ppd.person_id - AND p.team_id = ppd.team_id - WHERE ppd.distinct_id = $1 - AND ppd.team_id = $2 - "#; - let person_query_start = Instant::now(); let person_query_timer = common_metrics::timing_guard(FLAG_PERSON_QUERY_TIME, &query_labels); - let (person_id, person_props): (Option, Option) = sqlx::query_as(person_query) - .bind(&distinct_id) - .bind(team_id) - .fetch_optional(&mut *conn) - .await? + let person = Person::from_distinct_id(&mut conn, team_id, &distinct_id).await?; + let (person_id, person_props) = person + .map(|p| (Some(p.id), Some(p.properties))) .unwrap_or((None, None)); person_query_timer.fin(); @@ -390,16 +376,6 @@ fn are_overrides_useful_for_flag( .any(|filter| overrides.contains_key(&filter.key)) } -/// Check if a FlagError contains a foreign key constraint violation -fn flag_error_is_foreign_key_constraint(error: &FlagError) -> bool { - match error { - FlagError::DatabaseError(sqlx_error, _) => { - common_database::is_foreign_key_constraint_error(sqlx_error) - } - _ => false, - } -} - /// Determines if a FlagError should trigger a retry fn should_retry_on_error(error: &FlagError) -> bool { match error { @@ -686,9 +662,8 @@ pub async fn set_feature_flag_hash_key_overrides( ) .await; - // Only retry on foreign key constraint errors (person deletion race condition) match &result { - Err(e) if flag_error_is_foreign_key_constraint(e) => { + Err(e) => { // Track error classification classify_and_track_error(e, "set_hash_key_overrides", true); @@ -709,19 +684,12 @@ pub async fn set_feature_flag_hash_key_overrides( team_id = %team_id, distinct_ids = ?distinct_ids, error = ?e, - "Hash key override setting failed due to person deletion, will retry" + "Hash key override setting failed, will retry" ); // Return error to trigger retry result } - // For other errors, don't retry - return immediately to stop retrying - Err(e) => { - // Track error classification for non-retried errors - classify_and_track_error(e, "set_hash_key_overrides", false); - - result - } // Success case - return the result Ok(_) => result, } @@ -758,7 +726,10 @@ async fn try_set_feature_flag_hash_key_overrides( ON existing.person_id = p.person_id AND existing.team_id = p.team_id WHERE p.team_id = $1 AND p.distinct_id = ANY($2) - AND EXISTS (SELECT 1 FROM posthog_person WHERE id = p.person_id AND team_id = p.team_id) + AND ( + EXISTS (SELECT 1 FROM posthog_person_new WHERE id = p.person_id AND team_id = p.team_id) + OR EXISTS (SELECT 1 FROM posthog_person WHERE id = p.person_id AND team_id = p.team_id) + ) "#; // Query 2: Get all active feature flags with experience continuity (non-person pool) @@ -1007,9 +978,8 @@ pub async fn should_write_hash_key_override( let result = try_should_write_hash_key_override(router, team_id, &distinct_ids, project_id).await; - // Only retry on foreign key constraint errors (person deletion race condition) match &result { - Err(e) if flag_error_is_foreign_key_constraint(e) => { + Err(e) => { // Increment retry counter for monitoring common_metrics::inc( FLAG_HASH_KEY_RETRIES_COUNTER, @@ -1027,14 +997,12 @@ pub async fn should_write_hash_key_override( team_id = %team_id, distinct_id = %distinct_id, error = ?e, - "Hash key override check failed due to person deletion, will retry" + "Hash key override check failed, will retry" ); // Return error to trigger retry result } - // For other errors, don't retry - return immediately to stop retrying - Err(_) => result, // Success case - return the result Ok(_) => result, } @@ -1058,7 +1026,12 @@ async fn try_should_write_hash_key_override( FROM posthog_persondistinctid p LEFT JOIN posthog_featureflaghashkeyoverride existing ON existing.person_id = p.person_id AND existing.team_id = p.team_id - WHERE p.team_id = $1 AND p.distinct_id = ANY($2) + WHERE p.team_id = $1 + AND p.distinct_id = ANY($2) + AND ( + EXISTS (SELECT 1 FROM posthog_person_new WHERE id = p.person_id AND team_id = p.team_id) + OR EXISTS (SELECT 1 FROM posthog_person WHERE id = p.person_id AND team_id = p.team_id) + ) "#; // Query 2: Get feature flags from non-person pool diff --git a/rust/feature-flags/src/utils/test_utils.rs b/rust/feature-flags/src/utils/test_utils.rs index a5e7690bfa..48792bba5c 100644 --- a/rust/feature-flags/src/utils/test_utils.rs +++ b/rust/feature-flags/src/utils/test_utils.rs @@ -12,7 +12,7 @@ use anyhow::Error; use axum::async_trait; use common_database::{get_pool, Client, CustomDatabaseError}; use common_redis::{Client as RedisClientTrait, RedisClient}; -use common_types::{PersonId, ProjectId, TeamId}; +use common_types::{Person, PersonId, ProjectId, TeamId}; use rand::{distributions::Alphanumeric, Rng}; use serde_json::{json, Value}; use sqlx::{pool::PoolConnection, Error as SqlxError, Postgres, Row}; @@ -557,22 +557,10 @@ pub async fn get_person_id_by_distinct_id( distinct_id: &str, ) -> Result { let mut conn = client.get_connection().await?; - let row: (PersonId,) = sqlx::query_as( - r#"SELECT id FROM posthog_person - WHERE team_id = $1 AND id = ( - SELECT person_id FROM posthog_persondistinctid - WHERE team_id = $1 AND distinct_id = $2 - LIMIT 1 - ) - LIMIT 1"#, - ) - .bind(team_id) - .bind(distinct_id) - .fetch_one(&mut *conn) - .await - .map_err(|_| anyhow::anyhow!("Person not found"))?; - - Ok(row.0) + Person::from_distinct_id(&mut conn, team_id, distinct_id) + .await? + .map(|p| p.id) + .ok_or_else(|| anyhow::anyhow!("Person not found")) } pub async fn add_person_to_cohort(