fix(cohort): static cohort count based on postgres (#38138)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Gustavo H. Strassburger
2025-09-22 15:54:15 -03:00
committed by GitHub
parent 6746f9f4d5
commit 52dce944f4
17 changed files with 1398 additions and 651 deletions

View File

@@ -2008,15 +2008,6 @@
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter
'''
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 99999
AND cohort_id = 99999
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter.1
'''
(SELECT person_static_cohort.person_id AS id
@@ -2034,6 +2025,28 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter.1
'''
SELECT person.person_id AS id
FROM
(SELECT *,
id AS person_id
FROM
(SELECT id
FROM person
WHERE team_id = 99999
GROUP BY id
HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1)) person
WHERE 1 = 1
AND (((id IN
(SELECT person_id as id
FROM person_static_cohort
WHERE cohort_id = 99999
AND team_id = 99999)))) SETTINGS optimize_aggregation_in_order = 1,
join_algorithm = 'auto'
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter.2
'''
@@ -2057,15 +2070,6 @@
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter_with_extra
'''
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 99999
AND cohort_id = 99999
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter_with_extra.1
'''
(SELECT person_static_cohort.person_id AS id
@@ -2107,7 +2111,7 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter_with_extra.2
# name: TestCohortQuery.test_static_cohort_filter_with_extra.1
'''
SELECT if(behavior_query.person_id = '00000000-0000-0000-0000-000000000000', person.person_id, behavior_query.person_id) AS id
@@ -2149,7 +2153,7 @@
join_algorithm = 'auto'
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter_with_extra.3
# name: TestCohortQuery.test_static_cohort_filter_with_extra.2
'''
(
(SELECT person_static_cohort.person_id AS id
@@ -2192,6 +2196,48 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter_with_extra.3
'''
SELECT if(behavior_query.person_id = '00000000-0000-0000-0000-000000000000', person.person_id, behavior_query.person_id) AS id
FROM
(SELECT if(not(empty(pdi.distinct_id)), pdi.person_id, e.person_id) AS person_id,
countIf(timestamp > now() - INTERVAL 1 week
AND timestamp < now()
AND event = '$pageview'
AND 1=1) > 0 AS performed_event_condition_None_level_level_0_level_1_level_0_0
FROM events e
LEFT OUTER JOIN
(SELECT distinct_id,
argMax(person_id, version) as person_id
FROM person_distinct_id2
WHERE team_id = 99999
GROUP BY distinct_id
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
WHERE team_id = 99999
AND event IN ['$pageview']
AND timestamp <= now()
AND timestamp >= now() - INTERVAL 1 week
GROUP BY person_id) behavior_query
FULL OUTER JOIN
(SELECT *,
id AS person_id
FROM
(SELECT id
FROM person
WHERE team_id = 99999
GROUP BY id
HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND ((((id IN
(SELECT person_id as id
FROM person_static_cohort
WHERE cohort_id = 99999
AND team_id = 99999)))
OR ((coalesce(performed_event_condition_None_level_level_0_level_1_level_0_0, false))))) SETTINGS optimize_aggregation_in_order = 1,
join_algorithm = 'auto'
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter_with_extra.4
'''
@@ -2236,15 +2282,6 @@
# ---
# name: TestCohortQuery.test_unwrapping_static_cohort_filter_hidden_in_layers_of_cohorts
'''
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 99999
AND cohort_id = 99999
'''
# ---
# name: TestCohortQuery.test_unwrapping_static_cohort_filter_hidden_in_layers_of_cohorts.1
'''
/* cohort_calculation: */
SELECT count(DISTINCT person_id)
FROM cohortpeople
@@ -2253,7 +2290,7 @@
AND version = 0
'''
# ---
# name: TestCohortQuery.test_unwrapping_static_cohort_filter_hidden_in_layers_of_cohorts.2
# name: TestCohortQuery.test_unwrapping_static_cohort_filter_hidden_in_layers_of_cohorts.1
'''
/* cohort_calculation: */
(SELECT cohort_people.person_id AS id
@@ -2301,6 +2338,48 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestCohortQuery.test_unwrapping_static_cohort_filter_hidden_in_layers_of_cohorts.2
'''
/* cohort_calculation: */
SELECT if(behavior_query.person_id = '00000000-0000-0000-0000-000000000000', person.person_id, behavior_query.person_id) AS id
FROM
(SELECT if(not(empty(pdi.distinct_id)), pdi.person_id, e.person_id) AS person_id,
countIf(timestamp > now() - INTERVAL 1 week
AND timestamp < now()
AND event = '$pageview'
AND 1=1) > 0 AS performed_event_condition_None_level_level_0_level_1_0
FROM events e
LEFT OUTER JOIN
(SELECT distinct_id,
argMax(person_id, version) as person_id
FROM person_distinct_id2
WHERE team_id = 99999
GROUP BY distinct_id
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
WHERE team_id = 99999
AND event IN ['$pageview']
AND timestamp <= now()
AND timestamp >= now() - INTERVAL 1 week
GROUP BY person_id) behavior_query
FULL OUTER JOIN
(SELECT *,
id AS person_id
FROM
(SELECT id
FROM person
WHERE team_id = 99999
GROUP BY id
HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND (((id IN
(SELECT person_id
FROM cohortpeople
WHERE cohort_id = 99999
AND team_id = 99999 ))
OR (coalesce(performed_event_condition_None_level_level_0_level_1_0, false)))) SETTINGS optimize_aggregation_in_order = 1,
join_algorithm = 'auto'
'''
# ---
# name: TestCohortQuery.test_unwrapping_static_cohort_filter_hidden_in_layers_of_cohorts.3
'''
/* cohort_calculation: */

View File

@@ -2,7 +2,7 @@ import { useActions, useValues } from 'kea'
import { Form } from 'kea-forms'
import { router } from 'kea-router'
import { IconCopy, IconMinusSmall, IconPlusSmall, IconTrash } from '@posthog/icons'
import { IconCopy, IconMinusSmall, IconPlusSmall, IconTrash, IconWarning } from '@posthog/icons'
import { LemonBanner, LemonDivider, LemonFileInput, Link, Tooltip } from '@posthog/lemon-ui'
import { NotFound } from 'lib/components/NotFound'
@@ -35,6 +35,7 @@ import { QueryContext } from '~/queries/types'
import { AddPersonToCohortModal } from './AddPersonToCohortModal'
import { addPersonToCohortModalLogic } from './addPersonToCohortModalLogic'
import { cohortCountWarningLogic } from './cohortCountWarningLogic'
import { createCohortDataNodeLogicKey } from './cohortUtils'
const RESOURCE_TYPE = 'cohort'
@@ -59,6 +60,8 @@ export function CohortEdit({ id }: CohortLogicProps): JSX.Element {
useValues(logic)
const isNewCohort = cohort.id === 'new' || cohort.id === undefined
const dataNodeLogicKey = createCohortDataNodeLogicKey(cohort.id)
const warningLogic = cohortCountWarningLogic({ cohort, query, dataNodeLogicKey })
const { shouldShowCountWarning } = useValues(warningLogic)
const createStaticCohortContext: QueryContext = {
columns: {
@@ -414,10 +417,13 @@ export function CohortEdit({ id }: CohortLogicProps): JSX.Element {
<>
Persons in this cohort
<span className="text-secondary ml-2">
{!cohort.is_calculating &&
cohort.count !== undefined &&
`(${cohort.count})`}
{!cohort.is_calculating && cohort.count != undefined && `(${cohort.count})`}
</span>
{shouldShowCountWarning && (
<Tooltip title="The displayed number of persons is less than the cohort count due to deleted persons. This is expected behavior for dynamic cohorts where persons may be deleted after being counted.">
<IconWarning className="text-warning ml-2" />
</Tooltip>
)}
</>
}
description="Persons who match the following criteria will be part of the cohort."

View File

@@ -0,0 +1,257 @@
import { BuiltLogic } from 'kea'
import { useMocks } from '~/mocks/jest'
import { dataNodeLogic } from '~/queries/nodes/DataNode/dataNodeLogic'
import { DataTableNode, NodeKind } from '~/queries/schema/schema-general'
import { initKeaTests } from '~/test/init'
import { mockCohort } from '~/test/mocks'
import { CohortType, PropertyFilterType, PropertyOperator } from '~/types'
import { CohortCountWarningLogicProps, cohortCountWarningLogic } from './cohortCountWarningLogic'
import type { cohortCountWarningLogicType } from './cohortCountWarningLogicType'
const createMockQuery = (cohortId: number): DataTableNode => ({
kind: NodeKind.DataTableNode,
source: {
kind: NodeKind.ActorsQuery,
fixedProperties: [
{
type: PropertyFilterType.Cohort,
key: 'id',
value: cohortId,
operator: PropertyOperator.Exact,
},
],
},
full: true,
showPropertyFilter: false,
showEventFilter: false,
})
const createMockCohort = (overrides: Partial<CohortType> = {}): CohortType => ({
...mockCohort,
id: 1,
count: 100,
is_calculating: false,
is_static: false,
...overrides,
})
describe('cohortCountWarningLogic', () => {
beforeEach(() => {
useMocks({
get: {
'/api/projects/:team/cohorts': [mockCohort],
'/api/projects/:team/cohorts/:id': mockCohort,
},
})
initKeaTests()
})
afterEach(() => {
jest.restoreAllMocks()
})
const createLogicWithProps = (
props: Partial<CohortCountWarningLogicProps> = {}
): BuiltLogic<cohortCountWarningLogicType> => {
const defaultProps: CohortCountWarningLogicProps = {
cohort: createMockCohort(),
query: createMockQuery(1),
dataNodeLogicKey: 'test-key',
...props,
}
return cohortCountWarningLogic(defaultProps)
}
describe('shouldShowCountWarning selector', () => {
it('returns false when cohort has no count', () => {
const cohort = createMockCohort({ count: undefined })
const logic = createLogicWithProps({ cohort })
logic.mount()
expect(logic.values.shouldShowCountWarning).toBe(false)
})
it('returns false when cohort is calculating', () => {
const cohort = createMockCohort({ is_calculating: true })
const logic = createLogicWithProps({ cohort })
logic.mount()
expect(logic.values.shouldShowCountWarning).toBe(false)
})
it('returns false when cohort is new', () => {
const cohort = createMockCohort({ id: 'new' })
const logic = createLogicWithProps({ cohort })
logic.mount()
expect(logic.values.shouldShowCountWarning).toBe(false)
})
it('returns false when cohort is static', () => {
const cohort = createMockCohort({ is_static: true })
const logic = createLogicWithProps({ cohort })
logic.mount()
expect(logic.values.shouldShowCountWarning).toBe(false)
})
it('returns false when there is no response data', () => {
const logic = createLogicWithProps()
logic.mount()
expect(logic.values.shouldShowCountWarning).toBe(false)
})
it('returns false when response has more data available', () => {
const logic = createLogicWithProps()
logic.mount()
const mockDataNodeLogic = dataNodeLogic({
key: 'test-key',
query: createMockQuery(1),
})
mockDataNodeLogic.mount()
mockDataNodeLogic.actions.loadDataSuccess({
results: new Array(50),
hasMore: true,
})
expect(logic.values.shouldShowCountWarning).toBe(false)
})
it('returns false when displayed count matches cohort count', () => {
const cohort = createMockCohort({ count: 100 })
const logic = createLogicWithProps({ cohort })
logic.mount()
const mockDataNodeLogic = dataNodeLogic({
key: 'test-key',
query: createMockQuery(1),
})
mockDataNodeLogic.mount()
mockDataNodeLogic.actions.loadDataSuccess({
results: new Array(100),
hasMore: false,
})
expect(logic.values.shouldShowCountWarning).toBe(false)
})
it('returns true when displayed count does not match cohort count', () => {
const cohort = createMockCohort({ count: 100 })
const logic = createLogicWithProps({ cohort })
logic.mount()
const mockDataNodeLogic = dataNodeLogic({
key: 'test-key',
query: createMockQuery(1),
})
mockDataNodeLogic.mount()
mockDataNodeLogic.actions.loadDataSuccess({
results: new Array(85),
hasMore: false,
})
expect(logic.values.shouldShowCountWarning).toBe(true)
})
it('handles empty results array', () => {
const cohort = createMockCohort({ count: 10 })
const logic = createLogicWithProps({ cohort })
logic.mount()
const mockDataNodeLogic = dataNodeLogic({
key: 'test-key',
query: createMockQuery(1),
})
mockDataNodeLogic.mount()
mockDataNodeLogic.actions.loadDataSuccess({
results: [],
hasMore: false,
})
expect(logic.values.shouldShowCountWarning).toBe(true)
})
it('handles response without results property', () => {
const cohort = createMockCohort({ count: 10 })
const logic = createLogicWithProps({ cohort })
logic.mount()
const mockDataNodeLogic = dataNodeLogic({
key: 'test-key',
query: createMockQuery(1),
})
mockDataNodeLogic.mount()
mockDataNodeLogic.actions.loadDataSuccess({
hasMore: false,
})
expect(logic.values.shouldShowCountWarning).toBe(true)
})
it('correctly identifies discrepancy when persons are deleted', () => {
const cohort = createMockCohort({ count: 150 })
const logic = createLogicWithProps({ cohort })
logic.mount()
const mockDataNodeLogic = dataNodeLogic({
key: 'test-key',
query: createMockQuery(1),
})
mockDataNodeLogic.mount()
mockDataNodeLogic.actions.loadDataSuccess({
results: new Array(120),
hasMore: false,
})
expect(logic.values.shouldShowCountWarning).toBe(true)
})
})
describe('key generation', () => {
it('generates unique keys based on cohort id and dataNodeLogicKey', () => {
const props1 = {
cohort: createMockCohort({ id: 1 }),
dataNodeLogicKey: 'key1',
query: createMockQuery(1),
}
const props2 = {
cohort: createMockCohort({ id: 2 }),
dataNodeLogicKey: 'key1',
query: createMockQuery(2),
}
const props3 = {
cohort: createMockCohort({ id: 1 }),
dataNodeLogicKey: 'key2',
query: createMockQuery(1),
}
const logic1 = cohortCountWarningLogic(props1)
const logic2 = cohortCountWarningLogic(props2)
const logic3 = cohortCountWarningLogic(props3)
expect(logic1).not.toBe(logic2)
expect(logic1).not.toBe(logic3)
expect(logic2).not.toBe(logic3)
})
})
describe('dataNodeLogic connection', () => {
it('connects to the correct dataNodeLogic instance', () => {
const query = createMockQuery(1)
const dataNodeLogicKey = 'test-cohort-key'
const logic = createLogicWithProps({
query,
dataNodeLogicKey,
})
logic.mount()
expect(logic.props.dataNodeLogicKey).toBe(dataNodeLogicKey)
expect(logic.props.query).toBe(query)
})
})
})

View File

@@ -0,0 +1,47 @@
import { connect, kea, key, path, props, selectors } from 'kea'
import { dataNodeLogic } from '~/queries/nodes/DataNode/dataNodeLogic'
import { DataTableNode } from '~/queries/schema/schema-general'
import { CohortType } from '~/types'
import type { cohortCountWarningLogicType } from './cohortCountWarningLogicType'
export type CohortCountWarningLogicProps = {
cohort: CohortType
query: DataTableNode
dataNodeLogicKey: string
}
export const cohortCountWarningLogic = kea<cohortCountWarningLogicType>([
props({} as CohortCountWarningLogicProps),
key((props) => `cohort-count-warning-${props.cohort.id}-${props.dataNodeLogicKey}`),
path(['scenes', 'cohorts', 'cohortCountWarningLogic']),
connect((props: CohortCountWarningLogicProps) => ({
values: [dataNodeLogic({ key: props.dataNodeLogicKey, query: props.query }), ['response']],
})),
selectors(({ props }) => ({
shouldShowCountWarning: [
(s) => [s.response],
(response): boolean => {
const { cohort } = props
if (!cohort.count || cohort.is_calculating || cohort.id === 'new' || cohort.is_static) {
return false
}
if (!response) {
return false
}
if ((response as any)?.hasMore) {
return false
}
const displayedCount = (response as any)?.results?.length || 0
return displayedCount !== cohort.count
},
],
})),
])

File diff suppressed because it is too large Load Diff

View File

@@ -6509,7 +6509,7 @@ class TestCohortGenerationForFeatureFlag(APIBaseTest, ClickhouseTestMixin):
)
# TODO: Ensure server-side cursors are disabled, since in production we use this with pgbouncer
with snapshot_postgres_queries_context(self), self.assertNumQueries(23):
with snapshot_postgres_queries_context(self), self.assertNumQueries(24):
get_cohort_actors_for_feature_flag(cohort.pk, "some-feature2", self.team.pk)
cohort.refresh_from_db()
@@ -6563,7 +6563,7 @@ class TestCohortGenerationForFeatureFlag(APIBaseTest, ClickhouseTestMixin):
)
# Extra queries because each batch adds its own queries
with snapshot_postgres_queries_context(self), self.assertNumQueries(35):
with snapshot_postgres_queries_context(self), self.assertNumQueries(37):
get_cohort_actors_for_feature_flag(cohort.pk, "some-feature2", self.team.pk, batchsize=2)
cohort.refresh_from_db()
@@ -6574,7 +6574,7 @@ class TestCohortGenerationForFeatureFlag(APIBaseTest, ClickhouseTestMixin):
self.assertEqual(len(response.json()["results"]), 3, response)
# if the batch is big enough, it's fewer queries
with self.assertNumQueries(20):
with self.assertNumQueries(21):
get_cohort_actors_for_feature_flag(cohort.pk, "some-feature2", self.team.pk, batchsize=10)
cohort.refresh_from_db()
@@ -6638,7 +6638,7 @@ class TestCohortGenerationForFeatureFlag(APIBaseTest, ClickhouseTestMixin):
name="some cohort",
)
with snapshot_postgres_queries_context(self), self.assertNumQueries(20):
with snapshot_postgres_queries_context(self), self.assertNumQueries(21):
# no queries to evaluate flags, because all evaluated using override properties
get_cohort_actors_for_feature_flag(cohort.pk, "some-feature2", self.team.pk)
@@ -6655,7 +6655,7 @@ class TestCohortGenerationForFeatureFlag(APIBaseTest, ClickhouseTestMixin):
name="some cohort2",
)
with snapshot_postgres_queries_context(self), self.assertNumQueries(20):
with snapshot_postgres_queries_context(self), self.assertNumQueries(21):
# person3 doesn't match filter conditions so is pre-filtered out
get_cohort_actors_for_feature_flag(cohort2.pk, "some-feature-new", self.team.pk)
@@ -6749,7 +6749,7 @@ class TestCohortGenerationForFeatureFlag(APIBaseTest, ClickhouseTestMixin):
name="some cohort",
)
with snapshot_postgres_queries_context(self), self.assertNumQueries(37):
with snapshot_postgres_queries_context(self), self.assertNumQueries(38):
# forced to evaluate flags by going to db, because cohorts need db query to evaluate
get_cohort_actors_for_feature_flag(cohort.pk, "some-feature-new", self.team.pk)

View File

@@ -1289,15 +1289,6 @@
# ---
# name: TestExperimentQueryRunner.test_query_runner_with_internal_filters_3_cohort_static
'''
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 99999
AND cohort_id = 99999
'''
# ---
# name: TestExperimentQueryRunner.test_query_runner_with_internal_filters_3_cohort_static.1
'''
SELECT metric_events.variant AS variant,
count(metric_events.entity_id) AS num_users,
sum(metric_events.value) AS total_sum,
@@ -1356,6 +1347,60 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestExperimentQueryRunner.test_query_runner_with_internal_filters_3_cohort_static.1
'''
SELECT metric_events.variant AS variant,
count(metric_events.entity_id) AS num_users,
sum(metric_events.value) AS total_sum,
sum(power(metric_events.value, 2)) AS total_sum_of_squares
FROM
(SELECT exposures.variant AS variant,
exposures.entity_id AS entity_id,
sum(coalesce(accurateCastOrNull(metric_events.value, 'Float64'), 0)) AS value
FROM
(SELECT if(not(empty(events__override.distinct_id)), events__override.person_id, events.person_id) AS entity_id,
if(ifNull(greater(count(DISTINCT replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$feature_flag_response'), ''), 'null'), '^"|"$', '')), 1), 0), '$multiple', any(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$feature_flag_response'), ''), 'null'), '^"|"$', ''))) AS variant,
min(toTimeZone(events.timestamp, 'UTC')) AS first_exposure_time
FROM events
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
person_distinct_id_overrides.distinct_id AS distinct_id
FROM person_distinct_id_overrides
WHERE equals(person_distinct_id_overrides.team_id, 99999)
GROUP BY person_distinct_id_overrides.distinct_id
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS events__override ON equals(events.distinct_id, events__override.distinct_id)
WHERE and(equals(events.team_id, 99999), greaterOrEquals(toTimeZone(events.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(toTimeZone(events.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), equals(events.event, '$feature_flag_called'), in(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$feature_flag_response'), ''), 'null'), '^"|"$', ''), ['control', 'test']), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$feature_flag'), ''), 'null'), '^"|"$', ''), 'test-experiment'), 0))
GROUP BY entity_id) AS exposures
LEFT JOIN
(SELECT toTimeZone(events.timestamp, 'UTC') AS timestamp,
if(not(empty(events__override.distinct_id)), events__override.person_id, events.person_id) AS entity_id,
events.event AS event,
1 AS value
FROM events
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
person_distinct_id_overrides.distinct_id AS distinct_id
FROM person_distinct_id_overrides
WHERE equals(person_distinct_id_overrides.team_id, 99999)
GROUP BY person_distinct_id_overrides.distinct_id
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS events__override ON equals(events.distinct_id, events__override.distinct_id)
WHERE and(equals(events.team_id, 99999), greaterOrEquals(toTimeZone(events.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), less(toTimeZone(events.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), equals(events.event, '$pageview'))) AS metric_events ON and(equals(toString(exposures.entity_id), toString(metric_events.entity_id)), greaterOrEquals(metric_events.timestamp, exposures.first_exposure_time))
GROUP BY exposures.variant,
exposures.entity_id) AS metric_events
GROUP BY metric_events.variant
LIMIT 100 SETTINGS readonly=2,
max_execution_time=600,
allow_experimental_object_type=1,
format_csv_allow_double_quotes=0,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=0,
allow_experimental_analyzer=1,
transform_null_in=1,
optimize_min_equality_disjunction_chain_length=4294967295,
allow_experimental_join_condition=1
'''
# ---
# name: TestExperimentQueryRunner.test_query_runner_with_internal_filters_3_cohort_static.2
'''
SELECT metric_events.variant AS variant,

View File

@@ -772,15 +772,6 @@
# ---
# name: TestExperimentQueryRunner.test_query_runner_with_data_warehouse_internal_filters_3_cohort_static
'''
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 99999
AND cohort_id = 99999
'''
# ---
# name: TestExperimentQueryRunner.test_query_runner_with_data_warehouse_internal_filters_3_cohort_static.1
'''
SELECT metric_events.variant AS variant,
count(metric_events.entity_id) AS num_users,
sum(metric_events.value) AS total_sum,
@@ -830,6 +821,54 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestExperimentQueryRunner.test_query_runner_with_data_warehouse_internal_filters_3_cohort_static.1
'''
SELECT metric_events.variant AS variant,
count(metric_events.entity_id) AS num_users,
sum(metric_events.value) AS total_sum,
sum(power(metric_events.value, 2)) AS total_sum_of_squares
FROM
(SELECT exposures.variant AS variant,
exposures.entity_id AS entity_id,
sum(coalesce(accurateCastOrNull(metric_events.value, 'Float64'), 0)) AS value
FROM
(SELECT if(not(empty(events__override.distinct_id)), events__override.person_id, events.person_id) AS entity_id,
if(ifNull(greater(count(DISTINCT replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$feature_flag_response'), ''), 'null'), '^"|"$', '')), 1), 0), '$multiple', any(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$feature_flag_response'), ''), 'null'), '^"|"$', ''))) AS variant,
min(toTimeZone(events.timestamp, 'UTC')) AS first_exposure_time,
replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$user_id'), ''), 'null'), '^"|"$', '') AS exposure_identifier
FROM events
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
person_distinct_id_overrides.distinct_id AS distinct_id
FROM person_distinct_id_overrides
WHERE equals(person_distinct_id_overrides.team_id, 99999)
GROUP BY person_distinct_id_overrides.distinct_id
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS events__override ON equals(events.distinct_id, events__override.distinct_id)
WHERE and(equals(events.team_id, 99999), greaterOrEquals(toTimeZone(events.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(toTimeZone(events.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), equals(events.event, '$feature_flag_called'), in(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$feature_flag_response'), ''), 'null'), '^"|"$', ''), ['control', 'test']), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$feature_flag'), ''), 'null'), '^"|"$', ''), 'test-experiment'), 0))
GROUP BY entity_id,
replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, '$user_id'), ''), 'null'), '^"|"$', '')) AS exposures
LEFT JOIN
(SELECT posthog_test_usage.ds AS timestamp,
posthog_test_usage.userid AS entity_identifier,
posthog_test_usage.usage AS value
FROM s3('http://host.docker.internal:19000/posthog/test_storage_bucket-posthog.hogql.experiments.queryrunner/posthog_test_usage/*.csv', 'object_storage_root_user', 'object_storage_root_password', 'CSVWithNames', '`ds` Date, `id` String, `plan` String, `usage` Float64, `region` String, `userid` String') AS posthog_test_usage
WHERE and(greaterOrEquals(posthog_test_usage.ds, toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), less(posthog_test_usage.ds, toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), 1)) AS metric_events ON and(equals(toString(exposures.exposure_identifier), toString(metric_events.entity_identifier)), greaterOrEquals(metric_events.timestamp, exposures.first_exposure_time))
GROUP BY exposures.variant,
exposures.entity_id) AS metric_events
GROUP BY metric_events.variant
LIMIT 100 SETTINGS readonly=2,
max_execution_time=600,
allow_experimental_object_type=1,
format_csv_allow_double_quotes=0,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=0,
allow_experimental_analyzer=1,
transform_null_in=1,
optimize_min_equality_disjunction_chain_length=4294967295,
allow_experimental_join_condition=1
'''
# ---
# name: TestExperimentQueryRunner.test_query_runner_with_data_warehouse_internal_filters_3_cohort_static.2
'''
SELECT metric_events.variant AS variant,

View File

@@ -1055,11 +1055,72 @@
# ---
# name: TestFOSSFunnel.test_funnel_with_static_cohort_step_filter
'''
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 99999
AND cohort_id = 99999
SELECT countIf(ifNull(equals(steps, 1), 0)) AS step_1,
countIf(ifNull(equals(steps, 2), 0)) AS step_2,
avg(step_1_average_conversion_time_inner) AS step_1_average_conversion_time,
median(step_1_median_conversion_time_inner) AS step_1_median_conversion_time
FROM
(SELECT aggregation_target AS aggregation_target,
steps AS steps,
avg(step_1_conversion_time) AS step_1_average_conversion_time_inner,
median(step_1_conversion_time) AS step_1_median_conversion_time_inner
FROM
(SELECT aggregation_target AS aggregation_target,
steps AS steps,
max(steps) OVER (PARTITION BY aggregation_target) AS max_steps,
step_1_conversion_time AS step_1_conversion_time
FROM
(SELECT aggregation_target AS aggregation_target,
timestamp AS timestamp,
step_0 AS step_0,
latest_0 AS latest_0,
step_1 AS step_1,
latest_1 AS latest_1,
if(and(ifNull(lessOrEquals(latest_0, latest_1), 0), ifNull(lessOrEquals(latest_1, plus(toTimeZone(latest_0, 'UTC'), toIntervalDay(14))), 0)), 2, 1) AS steps,
if(and(isNotNull(latest_1), ifNull(lessOrEquals(latest_1, plus(toTimeZone(latest_0, 'UTC'), toIntervalDay(14))), 0)), dateDiff('second', latest_0, latest_1), NULL) AS step_1_conversion_time
FROM
(SELECT aggregation_target AS aggregation_target,
timestamp AS timestamp,
step_0 AS step_0,
latest_0 AS latest_0,
step_1 AS step_1,
min(latest_1) OVER (PARTITION BY aggregation_target
ORDER BY timestamp DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) AS latest_1
FROM
(SELECT toTimeZone(e.timestamp, 'UTC') AS timestamp,
if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS aggregation_target,
if(and(equals(e.event, 'user signed up'), in(if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id),
(SELECT person_static_cohort.person_id AS person_id
FROM person_static_cohort
WHERE and(equals(person_static_cohort.team_id, 99999), equals(person_static_cohort.cohort_id, 99999))))), 1, 0) AS step_0,
if(ifNull(equals(step_0, 1), 0), timestamp, NULL) AS latest_0,
if(equals(e.event, 'paid'), 1, 0) AS step_1,
if(ifNull(equals(step_1, 1), 0), timestamp, NULL) AS latest_1
FROM events AS e
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
person_distinct_id_overrides.distinct_id AS distinct_id
FROM person_distinct_id_overrides
WHERE equals(person_distinct_id_overrides.team_id, 99999)
GROUP BY person_distinct_id_overrides.distinct_id
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id)
WHERE and(equals(e.team_id, 99999), and(and(greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC'))), in(e.event, tuple('paid', 'user signed up'))), or(ifNull(equals(step_0, 1), 0), ifNull(equals(step_1, 1), 0)))))
WHERE ifNull(equals(step_0, 1), 0)))
GROUP BY aggregation_target,
steps
HAVING ifNull(equals(steps, max(max_steps)), isNull(steps)
and isNull(max(max_steps))))
LIMIT 100 SETTINGS readonly=2,
max_execution_time=60,
allow_experimental_object_type=1,
format_csv_allow_double_quotes=0,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=23622320128,
allow_experimental_analyzer=1,
transform_null_in=1,
optimize_min_equality_disjunction_chain_length=4294967295,
allow_experimental_join_condition=1
'''
# ---
# name: TestFOSSFunnel.test_funnel_with_static_cohort_step_filter.1

View File

@@ -772,11 +772,69 @@
# ---
# name: TestFOSSFunnelUDF.test_funnel_with_static_cohort_step_filter
'''
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 99999
AND cohort_id = 99999
SELECT sum(step_1) AS step_1,
sum(step_2) AS step_2,
arrayMap(x -> if(isNaN(x), NULL, x), [avgArrayOrNull(step_1_conversion_times)])[1] AS step_1_average_conversion_time,
arrayMap(x -> if(isNaN(x), NULL, x), [medianArrayOrNull(step_1_conversion_times)])[1] AS step_1_median_conversion_time,
groupArray(row_number) AS row_number,
final_prop AS final_prop
FROM
(SELECT countIf(ifNull(notEquals(bitAnd(steps_bitfield, 1), 0), 1)) AS step_1,
countIf(ifNull(notEquals(bitAnd(steps_bitfield, 2), 0), 1)) AS step_2,
groupArrayIf(timings[1], ifNull(greater(timings[1], 0), 0)) AS step_1_conversion_times,
rowNumberInAllBlocks() AS row_number,
breakdown AS final_prop
FROM
(SELECT arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), uuid, '', arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))) AS events_array,
[''] AS prop,
arrayJoin(aggregate_funnel_v8(2, 1209600, 'first_touch', 'ordered', prop, [], arrayFilter((x, x_before, x_after) -> not(and(ifNull(lessOrEquals(length(x.4), 1), 0), ifNull(equals(x.4, x_before.4), isNull(x.4)
and isNull(x_before.4)), ifNull(equals(x.4, x_after.4), isNull(x.4)
and isNull(x_after.4)), ifNull(equals(x.3, x_before.3), isNull(x.3)
and isNull(x_before.3)), ifNull(equals(x.3, x_after.3), isNull(x.3)
and isNull(x_after.3)), ifNull(greater(x.1, x_before.1), 0), ifNull(less(x.1, x_after.1), 0))), events_array, arrayRotateRight(events_array, 1), arrayRotateLeft(events_array, 1)))) AS af_tuple,
af_tuple.1 AS step_reached,
plus(af_tuple.1, 1) AS steps,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings,
af_tuple.5 AS steps_bitfield,
aggregation_target AS aggregation_target
FROM
(SELECT toTimeZone(e.timestamp, 'UTC') AS timestamp,
if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS aggregation_target,
e.uuid AS uuid,
e.`$session_id` AS `$session_id`,
e.`$window_id` AS `$window_id`,
if(and(equals(e.event, 'user signed up'), in(if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id),
(SELECT person_static_cohort.person_id AS person_id
FROM person_static_cohort
WHERE and(equals(person_static_cohort.team_id, 99999), equals(person_static_cohort.cohort_id, 99999))))), 1, 0) AS step_0,
if(equals(e.event, 'paid'), 1, 0) AS step_1
FROM events AS e
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
person_distinct_id_overrides.distinct_id AS distinct_id
FROM person_distinct_id_overrides
WHERE equals(person_distinct_id_overrides.team_id, 99999)
GROUP BY person_distinct_id_overrides.distinct_id
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id)
WHERE and(equals(e.team_id, 99999), and(and(greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), toDateTime64('explicit_redacted_timestamp', 6, 'UTC'))), in(e.event, tuple('paid', 'user signed up'))), or(ifNull(equals(step_0, 1), 0), ifNull(equals(step_1, 1), 0))))
GROUP BY aggregation_target
HAVING ifNull(greaterOrEquals(step_reached, 0), 0))
GROUP BY breakdown
ORDER BY step_2 DESC, step_1 DESC)
GROUP BY final_prop
LIMIT 100 SETTINGS join_algorithm='auto',
readonly=2,
max_execution_time=60,
allow_experimental_object_type=1,
format_csv_allow_double_quotes=0,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=23622320128,
allow_experimental_analyzer=1,
transform_null_in=1,
optimize_min_equality_disjunction_chain_length=4294967295,
allow_experimental_join_condition=1
'''
# ---
# name: TestFOSSFunnelUDF.test_funnel_with_static_cohort_step_filter.1

View File

@@ -7,6 +7,8 @@ from django.conf import settings
from django.db import connection, models
from django.db.models import Q, QuerySet
from django.db.models.expressions import F
from django.db.models.signals import post_delete
from django.dispatch import receiver
from django.utils import timezone
import structlog
@@ -583,3 +585,28 @@ class CohortPeople(models.Model):
class Meta:
indexes = [models.Index(fields=["cohort_id", "person_id"])]
@receiver(post_delete, sender=CohortPeople)
def cohort_people_changed(sender, instance: "CohortPeople", **kwargs):
from posthog.models.cohort.util import get_static_cohort_size
try:
cohort_id = instance.cohort_id
person_uuid = instance.person_id
cohort = Cohort.objects.get(id=cohort_id)
cohort.count = get_static_cohort_size(cohort_id=cohort.id, team_id=cohort.team_id)
cohort.save(update_fields=["count"])
logger.info(
"Updated cohort count after CohortPeople change",
cohort_id=cohort_id,
person_uuid=person_uuid,
new_count=cohort.count,
)
except Cohort.DoesNotExist:
logger.warning("Attempted to update count for non-existent cohort", cohort_id=cohort_id)
except Exception as e:
logger.exception("Error updating cohort count", cohort_id=cohort_id, person_uuid=person_uuid)
capture_exception(e)

View File

@@ -85,11 +85,6 @@ WHERE team_id = %(team_id)s AND cohort_id = %(cohort_id)s
GROUP BY person_id, cohort_id, team_id
"""
GET_STATIC_COHORT_SIZE_SQL = f"""
SELECT count(DISTINCT person_id)
FROM {PERSON_STATIC_COHORT_TABLE}
WHERE team_id = %(team_id)s AND cohort_id = %(cohort_id)s
"""
STALE_COHORTPEOPLE = f"""
SELECT team_id, count() AS stale_people_count FROM cohortpeople

View File

@@ -22,13 +22,12 @@ from posthog.clickhouse.query_tagging import Feature, tag_queries, tags_context
from posthog.constants import PropertyOperatorType
from posthog.models import Action, Filter, Team
from posthog.models.action.util import format_action_filter
from posthog.models.cohort.cohort import Cohort, CohortOrEmpty
from posthog.models.cohort.cohort import Cohort, CohortOrEmpty, CohortPeople
from posthog.models.cohort.sql import (
CALCULATE_COHORT_PEOPLE_SQL,
GET_COHORT_SIZE_SQL,
GET_COHORTS_BY_PERSON_UUID,
GET_PERSON_ID_BY_PRECALCULATED_COHORT_ID,
GET_STATIC_COHORT_SIZE_SQL,
GET_STATIC_COHORTPEOPLE_BY_PERSON_UUID,
RECALCULATE_COHORT_BY_ID,
)
@@ -286,20 +285,10 @@ def insert_static_cohort(person_uuids: list[Optional[uuid.UUID]], cohort_id: int
sync_execute(INSERT_PERSON_STATIC_COHORT, persons)
def get_static_cohort_size(*, cohort_id: int, team_id: int) -> Optional[int]:
tag_queries(cohort_id=cohort_id, team_id=team_id, name="get_static_cohort_size", feature=Feature.COHORT)
count_result = sync_execute(
GET_STATIC_COHORT_SIZE_SQL,
{
"cohort_id": cohort_id,
"team_id": team_id,
},
)
def get_static_cohort_size(*, cohort_id: int, team_id: int) -> int:
count = CohortPeople.objects.filter(cohort_id=cohort_id, person__team_id=team_id).count()
if count_result and len(count_result) and len(count_result[0]):
return count_result[0][0]
else:
return None
return count
def recalculate_cohortpeople(

View File

@@ -1001,10 +1001,76 @@
# name: TestFOSSFunnel.test_funnel_with_static_cohort_step_filter
'''
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 99999
AND cohort_id = 99999
SELECT countIf(steps = 1) step_1,
countIf(steps = 2) step_2,
avg(step_1_average_conversion_time_inner) step_1_average_conversion_time,
median(step_1_median_conversion_time_inner) step_1_median_conversion_time
FROM
(SELECT aggregation_target,
steps,
avg(step_1_conversion_time) step_1_average_conversion_time_inner,
median(step_1_conversion_time) step_1_median_conversion_time_inner
FROM
(SELECT aggregation_target,
steps,
max(steps) over (PARTITION BY aggregation_target) as max_steps,
step_1_conversion_time
FROM
(SELECT *,
if(latest_0 <= latest_1
AND latest_1 <= latest_0 + INTERVAL 14 DAY, 2, 1) AS steps ,
if(isNotNull(latest_1)
AND latest_1 <= latest_0 + INTERVAL 14 DAY, dateDiff('second', toDateTime(latest_0), toDateTime(latest_1)), NULL) step_1_conversion_time
FROM
(SELECT aggregation_target, timestamp, step_0,
latest_0,
step_1,
min(latest_1) over (PARTITION by aggregation_target
ORDER BY timestamp DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) latest_1
FROM
(SELECT e.timestamp as timestamp,
if(notEmpty(pdi.distinct_id), pdi.person_id, e.person_id) as aggregation_target,
if(notEmpty(pdi.distinct_id), pdi.person_id, e.person_id) as person_id,
if(event = 'user signed up'
AND (person_id IN
(SELECT person_id as id
FROM person_static_cohort
WHERE cohort_id = 99999
AND team_id = 99999)), 1, 0) as step_0,
if(step_0 = 1, timestamp, null) as latest_0,
if(event = 'paid', 1, 0) as step_1,
if(step_1 = 1, timestamp, null) as latest_1
FROM events e
LEFT OUTER JOIN
(SELECT distinct_id,
argMax(person_id, version) as person_id
FROM person_distinct_id2
WHERE team_id = 99999
AND distinct_id IN
(SELECT distinct_id
FROM events
WHERE team_id = 99999
AND event IN ['paid', 'user signed up']
AND toTimeZone(timestamp, 'UTC') >= toDateTime('2020-01-01 00:00:00', 'UTC')
AND toTimeZone(timestamp, 'UTC') <= toDateTime('2020-01-14 23:59:59', 'UTC') )
GROUP BY distinct_id
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
INNER JOIN
(SELECT id
FROM person
WHERE team_id = 99999
GROUP BY id
HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1) person ON person.id = pdi.person_id
WHERE team_id = 99999
AND event IN ['paid', 'user signed up']
AND toTimeZone(timestamp, 'UTC') >= toDateTime('2020-01-01 00:00:00', 'UTC')
AND toTimeZone(timestamp, 'UTC') <= toDateTime('2020-01-14 23:59:59', 'UTC')
AND (step_0 = 1
OR step_1 = 1) ))
WHERE step_0 = 1 ))
GROUP BY aggregation_target,
steps
HAVING steps = max(max_steps))
'''
# ---
# name: TestFOSSFunnel.test_funnel_with_static_cohort_step_filter.1

View File

@@ -229,11 +229,12 @@
# ---
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties
'''
/* cohort_calculation: */
SELECT count(DISTINCT person_id)
FROM person_static_cohort
FROM cohortpeople
WHERE team_id = 99999
AND cohort_id = 99999
AND version = 0
'''
# ---
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.1
@@ -247,16 +248,6 @@
'''
# ---
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.2
'''
/* cohort_calculation: */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 99999
AND cohort_id = 99999
AND version = 0
'''
# ---
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.3
'''
/* cohort_calculation: */
SELECT s.session_id AS session_id,
@@ -305,7 +296,7 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.4
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.3
'''
/* cohort_calculation: */
SELECT s.session_id AS session_id,
@@ -354,7 +345,7 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.5
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.4
'''
/* cohort_calculation: */
SELECT s.session_id AS session_id,
@@ -403,7 +394,7 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.6
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.5
'''
/* cohort_calculation: */
SELECT s.session_id AS session_id,
@@ -452,6 +443,58 @@
allow_experimental_join_condition=1
'''
# ---
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.6
'''
/* cohort_calculation: */
SELECT s.session_id AS session_id,
any(s.team_id) AS `any(s.team_id)`,
any(s.distinct_id) AS `any(s.distinct_id)`,
min(toTimeZone(s.min_first_timestamp, 'UTC')) AS start_time,
max(toTimeZone(s.max_last_timestamp, 'UTC')) AS end_time,
dateDiff('SECOND', start_time, end_time) AS duration,
argMinMerge(s.first_url) AS first_url,
sum(s.click_count) AS click_count,
sum(s.keypress_count) AS keypress_count,
sum(s.mouse_activity_count) AS mouse_activity_count,
divide(sum(s.active_milliseconds), 1000) AS active_seconds,
minus(duration, active_seconds) AS inactive_seconds,
sum(s.console_log_count) AS console_log_count,
sum(s.console_warn_count) AS console_warn_count,
sum(s.console_error_count) AS console_error_count,
greaterOrEquals(max(toTimeZone(s._timestamp, 'UTC')), toDateTime64('2021-08-21 19:55:00.000000', 6, 'UTC')) AS ongoing,
round(multiply(divide(plus(plus(plus(divide(sum(s.active_milliseconds), 1000), sum(s.click_count)), sum(s.keypress_count)), sum(s.console_error_count)), plus(plus(plus(plus(sum(s.mouse_activity_count), dateDiff('SECOND', start_time, end_time)), sum(s.console_error_count)), sum(s.console_log_count)), sum(s.console_warn_count))), 100), 2) AS activity_score
FROM session_replay_events AS s
WHERE and(equals(s.team_id, 99999), greaterOrEquals(toTimeZone(s.min_first_timestamp, 'UTC'), toDateTime64('2021-07-31 20:00:00.000000', 6, 'UTC')), greaterOrEquals(addDays(dateTrunc('DAY', toTimeZone(s.min_first_timestamp, 'UTC')), 1), minus(toDateTime64('2021-08-21 20:00:00.000000', 6, 'UTC'), toIntervalDay(coalesce(s.retention_period_days, 365)))), greaterOrEquals(toTimeZone(s.min_first_timestamp, 'UTC'), toDateTime64('2021-08-18 00:00:00.000000', 6, 'UTC')), lessOrEquals(toTimeZone(s.min_first_timestamp, 'UTC'), toDateTime64('2021-08-21 20:00:00.000000', 6, 'UTC')), in(s.distinct_id,
(SELECT person_distinct_id2.distinct_id AS distinct_id
FROM person_distinct_id2
WHERE and(equals(person_distinct_id2.team_id, 99999), in(person_distinct_id2.distinct_id,
(SELECT person_distinct_id2.distinct_id AS distinct_id
FROM person_distinct_id2
WHERE and(equals(person_distinct_id2.team_id, 99999), 1, and(notIn(person_distinct_id2.person_id,
(SELECT cohortpeople.person_id AS person_id
FROM cohortpeople
WHERE and(equals(cohortpeople.team_id, 99999), equals(cohortpeople.cohort_id, 99999), equals(cohortpeople.version, 0)))), notIn(person_distinct_id2.person_id,
(SELECT person_static_cohort.person_id AS person_id
FROM person_static_cohort
WHERE and(equals(person_static_cohort.team_id, 99999), equals(person_static_cohort.cohort_id, 99999)))))))))
GROUP BY person_distinct_id2.distinct_id
HAVING ifNull(equals(argMax(person_distinct_id2.is_deleted, person_distinct_id2.version), 0), 0))))
GROUP BY s.session_id
HAVING 1
ORDER BY start_time DESC
LIMIT 51
OFFSET 0 SETTINGS readonly=2,
max_execution_time=60,
allow_experimental_object_type=1,
format_csv_allow_double_quotes=0,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=0,
transform_null_in=1,
optimize_min_equality_disjunction_chain_length=4294967295,
allow_experimental_join_condition=1
'''
# ---
# name: TestSessionRecordingsListByCohort.test_filter_with_static_and_dynamic_cohort_properties.7
'''
/* cohort_calculation: */

View File

@@ -18,7 +18,7 @@ from posthog.clickhouse.query_tagging import QueryTags, update_tags
from posthog.exceptions_capture import capture_exception
from posthog.models import Cohort
from posthog.models.cohort import CohortOrEmpty
from posthog.models.cohort.util import get_dependent_cohorts, get_static_cohort_size, sort_cohorts_topologically
from posthog.models.cohort.util import get_dependent_cohorts, sort_cohorts_topologically
from posthog.models.team.team import Team
from posthog.models.user import User
from posthog.tasks.utils import CeleryQueue
@@ -342,14 +342,6 @@ def insert_cohort_from_query(cohort_id: int, team_id: Optional[int] = None) -> N
if settings.DEBUG:
raise
finally:
# Always update the count and cohort state, even if processing failed
try:
cohort.count = get_static_cohort_size(cohort_id=cohort.id, team_id=cohort.team_id)
except Exception as count_err:
# If count calculation fails, log the error but don't override the processing error
logger.exception("Failed to calculate static cohort size", cohort_id=cohort.id, team_id=team_id)
capture_exception(count_err, additional_properties={"cohort_id": cohort.id, "team_id": team_id})
cohort._safe_save_cohort_state(team_id=team_id, processing_error=processing_error)

View File

@@ -836,25 +836,23 @@ def calculate_cohort_test_factory(event_factory: Callable, person_factory: Calla
query={"kind": "HogQLQuery", "query": "SELECT person_id FROM persons LIMIT 10"},
)
# Mock the query processing to fail but let count calculation succeed
# Mock the query processing to fail
with (
patch("posthog.api.cohort.insert_cohort_query_actors_into_ch") as mock_insert_ch,
patch("posthog.api.cohort.insert_cohort_people_into_pg") as mock_insert_pg,
patch("posthog.tasks.calculate_cohort.get_static_cohort_size") as mock_get_size,
):
# Make the processing functions throw an exception
mock_insert_ch.side_effect = Exception("Simulated query processing error")
mock_insert_pg.side_effect = Exception("Simulated pg insert error")
# But make the count calculation return a value
mock_get_size.return_value = 5
# This should not raise an exception and should update the count
# This should not raise an exception and should update the count using PostgreSQL
insert_cohort_from_query(cohort.id, self.team.pk)
# Verify count was updated despite processing errors
# Verify count was updated despite processing errors (should be 0 since no people were inserted due to mocked failures)
cohort.refresh_from_db()
self.assertEqual(cohort.count, 5, "Count should be updated even when query processing fails")
self.assertEqual(
cohort.count, 0, "Count should be updated using PostgreSQL even when query processing fails"
)
self.assertFalse(cohort.is_calculating, "Cohort should not be in calculating state")
self.assertGreater(cohort.errors_calculating, 0, "Should have recorded the processing error")