mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
fix(flags): use shared person struct to read partitioned table (#41444)
Co-authored-by: Dylan Martin <dylan@posthog.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use sqlx::Postgres;
|
||||
use sqlx::PgConnection;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub type PersonId = i64;
|
||||
@@ -19,14 +19,11 @@ pub struct Person {
|
||||
}
|
||||
|
||||
impl Person {
|
||||
pub async fn from_distinct_id<'c, E>(
|
||||
e: E,
|
||||
pub async fn from_distinct_id(
|
||||
e: &mut PgConnection,
|
||||
team_id: i32,
|
||||
distinct_id: &str,
|
||||
) -> Result<Option<Person>, sqlx::Error>
|
||||
where
|
||||
E: sqlx::Executor<'c, Database = Postgres> + Clone,
|
||||
{
|
||||
) -> Result<Option<Person>, sqlx::Error> {
|
||||
if let Some(res) = sqlx::query_as!(
|
||||
Person,
|
||||
r#"
|
||||
@@ -43,7 +40,7 @@ impl Person {
|
||||
distinct_id,
|
||||
team_id
|
||||
)
|
||||
.fetch_optional(e.clone())
|
||||
.fetch_optional(&mut *e)
|
||||
.await? {
|
||||
return Ok(Some(res));
|
||||
}
|
||||
@@ -51,14 +48,11 @@ impl Person {
|
||||
Self::from_distinct_id_legacy(e, team_id, distinct_id).await
|
||||
}
|
||||
|
||||
pub async fn from_distinct_id_no_props<'c, E>(
|
||||
e: E,
|
||||
pub async fn from_distinct_id_no_props(
|
||||
e: &mut PgConnection,
|
||||
team_id: i32,
|
||||
distinct_id: &str,
|
||||
) -> Result<Option<Person>, sqlx::Error>
|
||||
where
|
||||
E: sqlx::Executor<'c, Database = Postgres> + Clone,
|
||||
{
|
||||
) -> Result<Option<Person>, sqlx::Error> {
|
||||
if let Some(res) = sqlx::query_as!(
|
||||
Person,
|
||||
r#"
|
||||
@@ -75,7 +69,7 @@ impl Person {
|
||||
distinct_id,
|
||||
team_id
|
||||
)
|
||||
.fetch_optional(e.clone())
|
||||
.fetch_optional(&mut *e)
|
||||
.await? {
|
||||
return Ok(Some(res));
|
||||
}
|
||||
@@ -83,14 +77,11 @@ impl Person {
|
||||
Self::from_distinct_id_no_props_legacy(e, team_id, distinct_id).await
|
||||
}
|
||||
|
||||
async fn from_distinct_id_legacy<'c, E>(
|
||||
e: E,
|
||||
async fn from_distinct_id_legacy(
|
||||
e: &mut PgConnection,
|
||||
team_id: i32,
|
||||
distinct_id: &str,
|
||||
) -> Result<Option<Person>, sqlx::Error>
|
||||
where
|
||||
E: sqlx::Executor<'c, Database = Postgres>,
|
||||
{
|
||||
) -> Result<Option<Person>, sqlx::Error> {
|
||||
sqlx::query_as!(
|
||||
Person,
|
||||
r#"
|
||||
@@ -111,14 +102,11 @@ impl Person {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn from_distinct_id_no_props_legacy<'c, E>(
|
||||
e: E,
|
||||
async fn from_distinct_id_no_props_legacy(
|
||||
e: &mut PgConnection,
|
||||
team_id: i32,
|
||||
distinct_id: &str,
|
||||
) -> Result<Option<Person>, sqlx::Error>
|
||||
where
|
||||
E: sqlx::Executor<'c, Database = Postgres>,
|
||||
{
|
||||
) -> Result<Option<Person>, sqlx::Error> {
|
||||
sqlx::query_as!(
|
||||
Person,
|
||||
r#"
|
||||
|
||||
@@ -38,7 +38,15 @@ pub async fn add_person_properties(
|
||||
let m_distinct_id = distinct_id.clone();
|
||||
let team_id = event.team_id;
|
||||
let fut = async move {
|
||||
let res = Person::from_distinct_id(&m_context.persons_pool, team_id, &m_distinct_id)
|
||||
let mut conn = match m_context.persons_pool.acquire().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to acquire connection to persons pool: {:?}", e);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let res = Person::from_distinct_id(&mut conn, team_id, &m_distinct_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("Failed to fetch person {}, {:?}", m_distinct_id, e);
|
||||
@@ -50,12 +58,7 @@ pub async fn add_person_properties(
|
||||
Err(sqlx::Error::ColumnDecode { .. }) => {
|
||||
// If we failed to decode the person properties, we just put an empty property set on
|
||||
// the event, so e.g. counting exceptions by person still works
|
||||
Person::from_distinct_id_no_props(
|
||||
&m_context.persons_pool,
|
||||
team_id,
|
||||
&m_distinct_id,
|
||||
)
|
||||
.await
|
||||
Person::from_distinct_id_no_props(&mut conn, team_id, &m_distinct_id).await
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::database::{
|
||||
get_connection_with_metrics, get_writer_connection_with_metrics, PostgresRouter,
|
||||
};
|
||||
use common_database::PostgresReader;
|
||||
use common_types::{PersonId, ProjectId, TeamId};
|
||||
use common_types::{Person, PersonId, ProjectId, TeamId};
|
||||
use serde_json::Value;
|
||||
use sha1::{Digest, Sha1};
|
||||
use sqlx::{Acquire, Row};
|
||||
@@ -150,25 +150,11 @@ pub async fn fetch_and_locally_cache_all_relevant_properties(
|
||||
// 2. the distinct_id is associated with an anonymous or cookieless user. In that case, it's fine to not return a person ID and to never return person properties. This is handled by just
|
||||
// returning an empty HashMap for person properties whenever I actually need them, and then obviously any condition that depends on person properties will return false.
|
||||
// That's fine though, we shouldn't error out just because we can't find a person ID.
|
||||
let person_query = r#"
|
||||
SELECT DISTINCT ON (ppd.distinct_id)
|
||||
p.id as person_id,
|
||||
p.properties as person_properties
|
||||
FROM posthog_persondistinctid ppd
|
||||
INNER JOIN posthog_person p
|
||||
ON p.id = ppd.person_id
|
||||
AND p.team_id = ppd.team_id
|
||||
WHERE ppd.distinct_id = $1
|
||||
AND ppd.team_id = $2
|
||||
"#;
|
||||
|
||||
let person_query_start = Instant::now();
|
||||
let person_query_timer = common_metrics::timing_guard(FLAG_PERSON_QUERY_TIME, &query_labels);
|
||||
let (person_id, person_props): (Option<PersonId>, Option<Value>) = sqlx::query_as(person_query)
|
||||
.bind(&distinct_id)
|
||||
.bind(team_id)
|
||||
.fetch_optional(&mut *conn)
|
||||
.await?
|
||||
let person = Person::from_distinct_id(&mut conn, team_id, &distinct_id).await?;
|
||||
let (person_id, person_props) = person
|
||||
.map(|p| (Some(p.id), Some(p.properties)))
|
||||
.unwrap_or((None, None));
|
||||
person_query_timer.fin();
|
||||
|
||||
@@ -390,16 +376,6 @@ fn are_overrides_useful_for_flag(
|
||||
.any(|filter| overrides.contains_key(&filter.key))
|
||||
}
|
||||
|
||||
/// Check if a FlagError contains a foreign key constraint violation
|
||||
fn flag_error_is_foreign_key_constraint(error: &FlagError) -> bool {
|
||||
match error {
|
||||
FlagError::DatabaseError(sqlx_error, _) => {
|
||||
common_database::is_foreign_key_constraint_error(sqlx_error)
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines if a FlagError should trigger a retry
|
||||
fn should_retry_on_error(error: &FlagError) -> bool {
|
||||
match error {
|
||||
@@ -686,9 +662,8 @@ pub async fn set_feature_flag_hash_key_overrides(
|
||||
)
|
||||
.await;
|
||||
|
||||
// Only retry on foreign key constraint errors (person deletion race condition)
|
||||
match &result {
|
||||
Err(e) if flag_error_is_foreign_key_constraint(e) => {
|
||||
Err(e) => {
|
||||
// Track error classification
|
||||
classify_and_track_error(e, "set_hash_key_overrides", true);
|
||||
|
||||
@@ -709,19 +684,12 @@ pub async fn set_feature_flag_hash_key_overrides(
|
||||
team_id = %team_id,
|
||||
distinct_ids = ?distinct_ids,
|
||||
error = ?e,
|
||||
"Hash key override setting failed due to person deletion, will retry"
|
||||
"Hash key override setting failed, will retry"
|
||||
);
|
||||
|
||||
// Return error to trigger retry
|
||||
result
|
||||
}
|
||||
// For other errors, don't retry - return immediately to stop retrying
|
||||
Err(e) => {
|
||||
// Track error classification for non-retried errors
|
||||
classify_and_track_error(e, "set_hash_key_overrides", false);
|
||||
|
||||
result
|
||||
}
|
||||
// Success case - return the result
|
||||
Ok(_) => result,
|
||||
}
|
||||
@@ -758,7 +726,10 @@ async fn try_set_feature_flag_hash_key_overrides(
|
||||
ON existing.person_id = p.person_id AND existing.team_id = p.team_id
|
||||
WHERE p.team_id = $1
|
||||
AND p.distinct_id = ANY($2)
|
||||
AND EXISTS (SELECT 1 FROM posthog_person WHERE id = p.person_id AND team_id = p.team_id)
|
||||
AND (
|
||||
EXISTS (SELECT 1 FROM posthog_person_new WHERE id = p.person_id AND team_id = p.team_id)
|
||||
OR EXISTS (SELECT 1 FROM posthog_person WHERE id = p.person_id AND team_id = p.team_id)
|
||||
)
|
||||
"#;
|
||||
|
||||
// Query 2: Get all active feature flags with experience continuity (non-person pool)
|
||||
@@ -1007,9 +978,8 @@ pub async fn should_write_hash_key_override(
|
||||
let result =
|
||||
try_should_write_hash_key_override(router, team_id, &distinct_ids, project_id).await;
|
||||
|
||||
// Only retry on foreign key constraint errors (person deletion race condition)
|
||||
match &result {
|
||||
Err(e) if flag_error_is_foreign_key_constraint(e) => {
|
||||
Err(e) => {
|
||||
// Increment retry counter for monitoring
|
||||
common_metrics::inc(
|
||||
FLAG_HASH_KEY_RETRIES_COUNTER,
|
||||
@@ -1027,14 +997,12 @@ pub async fn should_write_hash_key_override(
|
||||
team_id = %team_id,
|
||||
distinct_id = %distinct_id,
|
||||
error = ?e,
|
||||
"Hash key override check failed due to person deletion, will retry"
|
||||
"Hash key override check failed, will retry"
|
||||
);
|
||||
|
||||
// Return error to trigger retry
|
||||
result
|
||||
}
|
||||
// For other errors, don't retry - return immediately to stop retrying
|
||||
Err(_) => result,
|
||||
// Success case - return the result
|
||||
Ok(_) => result,
|
||||
}
|
||||
@@ -1058,7 +1026,12 @@ async fn try_should_write_hash_key_override(
|
||||
FROM posthog_persondistinctid p
|
||||
LEFT JOIN posthog_featureflaghashkeyoverride existing
|
||||
ON existing.person_id = p.person_id AND existing.team_id = p.team_id
|
||||
WHERE p.team_id = $1 AND p.distinct_id = ANY($2)
|
||||
WHERE p.team_id = $1
|
||||
AND p.distinct_id = ANY($2)
|
||||
AND (
|
||||
EXISTS (SELECT 1 FROM posthog_person_new WHERE id = p.person_id AND team_id = p.team_id)
|
||||
OR EXISTS (SELECT 1 FROM posthog_person WHERE id = p.person_id AND team_id = p.team_id)
|
||||
)
|
||||
"#;
|
||||
|
||||
// Query 2: Get feature flags from non-person pool
|
||||
|
||||
@@ -12,7 +12,7 @@ use anyhow::Error;
|
||||
use axum::async_trait;
|
||||
use common_database::{get_pool, Client, CustomDatabaseError};
|
||||
use common_redis::{Client as RedisClientTrait, RedisClient};
|
||||
use common_types::{PersonId, ProjectId, TeamId};
|
||||
use common_types::{Person, PersonId, ProjectId, TeamId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde_json::{json, Value};
|
||||
use sqlx::{pool::PoolConnection, Error as SqlxError, Postgres, Row};
|
||||
@@ -557,22 +557,10 @@ pub async fn get_person_id_by_distinct_id(
|
||||
distinct_id: &str,
|
||||
) -> Result<PersonId, Error> {
|
||||
let mut conn = client.get_connection().await?;
|
||||
let row: (PersonId,) = sqlx::query_as(
|
||||
r#"SELECT id FROM posthog_person
|
||||
WHERE team_id = $1 AND id = (
|
||||
SELECT person_id FROM posthog_persondistinctid
|
||||
WHERE team_id = $1 AND distinct_id = $2
|
||||
LIMIT 1
|
||||
)
|
||||
LIMIT 1"#,
|
||||
)
|
||||
.bind(team_id)
|
||||
.bind(distinct_id)
|
||||
.fetch_one(&mut *conn)
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("Person not found"))?;
|
||||
|
||||
Ok(row.0)
|
||||
Person::from_distinct_id(&mut conn, team_id, distinct_id)
|
||||
.await?
|
||||
.map(|p| p.id)
|
||||
.ok_or_else(|| anyhow::anyhow!("Person not found"))
|
||||
}
|
||||
|
||||
pub async fn add_person_to_cohort(
|
||||
|
||||
Reference in New Issue
Block a user