fix(MA): grouping by campaign and source for all CGs (#38663)

Co-authored-by: Javier Bahamondes <javierbahamondes@Javiers-MacBook-Pro.local>
This commit is contained in:
Javier Bahamondes
2025-09-26 10:53:45 -03:00
committed by GitHub
parent 7ff33d484a
commit 6ba87c2b0d
10 changed files with 1424 additions and 505 deletions

View File

@@ -0,0 +1,319 @@
# serializer version: 1
# name: TestConversionGoalsAggregator.test_multiple_goals_sql_snapshot
'''
SELECT
campaign,
source,
sum(conversion_0) AS conversion_0,
sum(conversion_1) AS conversion_1,
sum(conversion_2) AS conversion_2
FROM
(
SELECT
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
count() AS conversion_0,
0 AS conversion_1,
0 AS conversion_2
FROM
(
SELECT
person_id,
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
FROM
(
SELECT
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
FROM
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'purchase'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'purchase'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'purchase'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'purchase'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
FROM
events
WHERE
or(and(equals(events.event, 'purchase'), greaterOrEquals(events.timestamp, toDateTime('2023-01-01')), less(events.timestamp, toDateTime('2023-02-01'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2023-01-01'), toIntervalSecond(31536000)))))
GROUP BY
events.person_id
HAVING
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
GROUP BY
campaign,
source
UNION ALL
SELECT
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
0 AS conversion_0,
count() AS conversion_1,
0 AS conversion_2
FROM
(
SELECT
person_id,
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
FROM
(
SELECT
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
FROM
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'sign_up'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'sign_up'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'sign_up'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'sign_up'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
FROM
events
WHERE
or(and(equals(events.event, 'sign_up'), greaterOrEquals(events.timestamp, toDateTime('2023-01-01')), less(events.timestamp, toDateTime('2023-02-01'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2023-01-01'), toIntervalSecond(31536000)))))
GROUP BY
events.person_id
HAVING
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
GROUP BY
campaign,
source
UNION ALL
SELECT
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
0 AS conversion_0,
0 AS conversion_1,
count() AS conversion_2
FROM
(
SELECT
person_id,
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
FROM
(
SELECT
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
FROM
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'login'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'login'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'login'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'login'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
FROM
events
WHERE
or(and(equals(events.event, 'login'), greaterOrEquals(events.timestamp, toDateTime('2023-01-01')), less(events.timestamp, toDateTime('2023-02-01'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2023-01-01'), toIntervalSecond(31536000)))))
GROUP BY
events.person_id
HAVING
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
GROUP BY
campaign,
source)
GROUP BY
campaign,
source
LIMIT 100
'''
# ---
# name: TestConversionGoalsAggregator.test_unified_cte_sql_snapshot
'''
SELECT
campaign,
source,
sum(conversion_0) AS conversion_0,
sum(conversion_1) AS conversion_1
FROM
(
SELECT
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
count() AS conversion_0,
0 AS conversion_1
FROM
(
SELECT
person_id,
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
FROM
(
SELECT
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
FROM
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'sign_up'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'sign_up'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'sign_up'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'sign_up'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
FROM
events
WHERE
or(and(equals(events.event, 'sign_up'), greaterOrEquals(events.timestamp, toDateTime('2023-01-01')), less(events.timestamp, toDateTime('2023-02-01'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2023-01-01'), toIntervalSecond(31536000)))))
GROUP BY
events.person_id
HAVING
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
GROUP BY
campaign,
source
UNION ALL
SELECT
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
0 AS conversion_0,
count() AS conversion_1
FROM
(
SELECT
person_id,
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
FROM
(
SELECT
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
FROM
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'purchase'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(events.event, 'purchase'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'purchase'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(events.event, 'purchase'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
FROM
events
WHERE
or(and(equals(events.event, 'purchase'), greaterOrEquals(events.timestamp, toDateTime('2023-01-01')), less(events.timestamp, toDateTime('2023-02-01'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2023-01-01'), toIntervalSecond(31536000)))))
GROUP BY
events.person_id
HAVING
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
GROUP BY
campaign,
source)
GROUP BY
campaign,
source
LIMIT 100
'''
# ---

View File

@@ -58,8 +58,8 @@
round(divide(campaign_costs.total_cost, nullif(campaign_costs.total_clicks, 0)), 2) AS CPC,
round(multiply(divide(campaign_costs.total_clicks, nullif(campaign_costs.total_impressions, 0)), 100), 2) AS CTR,
round(campaign_costs.total_reported_conversions, 2) AS `Reported Conversion`,
cg_0.conversion_0 AS `Sign Up Conversions`,
round(divide(campaign_costs.total_cost, nullif(cg_0.conversion_0, 0)), 2) AS `Cost per Sign Up Conversions`
ucg.conversion_0 AS `Sign Up Conversions`,
round(divide(campaign_costs.total_cost, nullif(ucg.conversion_0, 0)), 2) AS `Cost per Sign Up Conversions`
FROM
(
@@ -92,58 +92,69 @@
source) AS campaign_costs
LEFT JOIN (
SELECT
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
count() AS conversion_0
campaign,
source,
sum(conversion_0) AS conversion_0
FROM
(
SELECT
person_id,
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
count() AS conversion_0
FROM
(
SELECT
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
FROM
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
FROM
events
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
FROM
events
WHERE
or(and(equals(event, 'test_event'), greaterOrEquals(events.timestamp, toDateTime('2024-01-01 00:00:00')), lessOrEquals(events.timestamp, toDateTime('2024-12-31 23:59:59'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2024-01-01 00:00:00'), toIntervalSecond(31536000))), lessOrEquals(events.timestamp, toDateTime('2024-12-31 23:59:59'))))
or(and(equals(event, 'test_event'), greaterOrEquals(events.timestamp, toDateTime('2024-01-01 00:00:00')), lessOrEquals(events.timestamp, toDateTime('2024-12-31 23:59:59'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2024-01-01 00:00:00'), toIntervalSecond(31536000))), lessOrEquals(events.timestamp, toDateTime('2024-12-31 23:59:59'))))
GROUP BY
events.person_id
events.person_id
HAVING
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
GROUP BY
campaign,
source)
GROUP BY
campaign,
source) AS cg_0 ON and(equals(campaign_costs.campaign, cg_0.campaign), equals(campaign_costs.source, cg_0.source))
source) AS ucg ON and(equals(campaign_costs.campaign, ucg.campaign), equals(campaign_costs.source, ucg.source))
ORDER BY
Cost DESC

View File

@@ -72,8 +72,8 @@
round(divide(campaign_costs.total_cost, nullif(campaign_costs.total_clicks, 0)), 2) AS CPC,
round(multiply(divide(campaign_costs.total_clicks, nullif(campaign_costs.total_impressions, 0)), 100), 2) AS CTR,
round(campaign_costs.total_reported_conversions, 2) AS `Reported Conversion`,
cg_0.conversion_0 AS `Sign Up Conversions`,
round(divide(campaign_costs.total_cost, nullif(cg_0.conversion_0, 0)), 2) AS `Cost per Sign Up Conversions`
ucg.conversion_0 AS `Sign Up Conversions`,
round(divide(campaign_costs.total_cost, nullif(ucg.conversion_0, 0)), 2) AS `Cost per Sign Up Conversions`
FROM
(
@@ -106,58 +106,69 @@
source) AS campaign_costs
LEFT JOIN (
SELECT
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
count() AS conversion_0
campaign,
source,
sum(conversion_0) AS conversion_0
FROM
(
SELECT
person_id,
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
count() AS conversion_0
FROM
(
SELECT
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
FROM
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
FROM
events
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
FROM
events
WHERE
or(and(equals(event, 'test_event'), greaterOrEquals(events.timestamp, toDateTime('2024-11-01 00:00:00')), lessOrEquals(events.timestamp, toDateTime('2024-12-31 23:59:59'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2024-11-01 00:00:00'), toIntervalSecond(31536000))), lessOrEquals(events.timestamp, toDateTime('2024-12-31 23:59:59'))))
or(and(equals(event, 'test_event'), greaterOrEquals(events.timestamp, toDateTime('2024-11-01 00:00:00')), lessOrEquals(events.timestamp, toDateTime('2024-12-31 23:59:59'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2024-11-01 00:00:00'), toIntervalSecond(31536000))), lessOrEquals(events.timestamp, toDateTime('2024-12-31 23:59:59'))))
GROUP BY
events.person_id
events.person_id
HAVING
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
GROUP BY
campaign,
source)
GROUP BY
campaign,
source) AS cg_0 ON and(equals(campaign_costs.campaign, cg_0.campaign), equals(campaign_costs.source, cg_0.source))) AS current_period
source) AS ucg ON and(equals(campaign_costs.campaign, ucg.campaign), equals(campaign_costs.source, ucg.source))) AS current_period
LEFT JOIN (
SELECT
campaign_costs.campaign AS Campaign,
@@ -168,8 +179,8 @@
round(divide(campaign_costs.total_cost, nullif(campaign_costs.total_clicks, 0)), 2) AS CPC,
round(multiply(divide(campaign_costs.total_clicks, nullif(campaign_costs.total_impressions, 0)), 100), 2) AS CTR,
round(campaign_costs.total_reported_conversions, 2) AS `Reported Conversion`,
cg_0.conversion_0 AS `Sign Up Conversions`,
round(divide(campaign_costs.total_cost, nullif(cg_0.conversion_0, 0)), 2) AS `Cost per Sign Up Conversions`
ucg.conversion_0 AS `Sign Up Conversions`,
round(divide(campaign_costs.total_cost, nullif(ucg.conversion_0, 0)), 2) AS `Cost per Sign Up Conversions`
FROM
(
@@ -202,58 +213,69 @@
source) AS campaign_costs
LEFT JOIN (
SELECT
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
count() AS conversion_0
campaign,
source,
sum(conversion_0) AS conversion_0
FROM
(
SELECT
person_id,
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
if(notEmpty(campaign_name), campaign_name, 'organic') AS campaign,
if(notEmpty(source_name), source_name, 'organic') AS source,
count() AS conversion_0
FROM
(
SELECT
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
if(notEmpty(conversion_campaign), conversion_campaign, if(notEmpty(fallback_campaign), fallback_campaign, '')) AS campaign_name,
if(notEmpty(conversion_source), conversion_source, if(notEmpty(fallback_source), fallback_source, '')) AS source_name,
1 AS conversion_value
FROM
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
person_id,
conversion_timestamps[i] AS conversion_time,
conversion_math_values[i] AS conversion_math_value,
conversion_campaigns[i] AS conversion_campaign,
conversion_sources[i] AS conversion_source,
arrayMax(arrayFilter(x -> and(lessOrEquals(x, conversion_timestamps[i]), greaterOrEquals(x, minus(conversion_timestamps[i], 31536000))), utm_timestamps)) AS last_utm_timestamp,
if(isNotNull(last_utm_timestamp), utm_campaigns[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_campaign,
if(isNotNull(last_utm_timestamp), utm_sources[indexOf(utm_timestamps, last_utm_timestamp)], '') AS fallback_source
FROM
events
(
SELECT
events.person_id,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toUnixTimestamp(events.timestamp), 0))) AS conversion_timestamps,
arrayFilter(x -> greater(x, 0), groupArray(if(equals(event, 'test_event'), toFloat(1), 0))) AS conversion_math_values,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS conversion_campaigns,
arrayFilter(x -> notEmpty(toString(x)), groupArray(if(equals(event, 'test_event'), toString(ifNull(events.properties.utm_source, '')), ''))) AS conversion_sources,
arrayFilter(x -> greater(x, 0), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toUnixTimestamp(events.timestamp), 0))) AS utm_timestamps,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_campaign, '')), ''))) AS utm_campaigns,
arrayFilter(x -> notEmpty(x), groupArray(if(and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, '')))), toString(ifNull(events.properties.utm_source, '')), ''))) AS utm_sources
FROM
events
WHERE
or(and(equals(event, 'test_event'), greaterOrEquals(events.timestamp, toDateTime('2024-09-01 00:00:00')), lessOrEquals(events.timestamp, toDateTime('2024-10-31 23:59:59'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2024-09-01 00:00:00'), toIntervalSecond(31536000))), lessOrEquals(events.timestamp, toDateTime('2024-10-31 23:59:59'))))
or(and(equals(event, 'test_event'), greaterOrEquals(events.timestamp, toDateTime('2024-09-01 00:00:00')), lessOrEquals(events.timestamp, toDateTime('2024-10-31 23:59:59'))), and(equals(events.event, '$pageview'), notEmpty(toString(ifNull(events.properties.utm_campaign, ''))), notEmpty(toString(ifNull(events.properties.utm_source, ''))), greaterOrEquals(events.timestamp, minus(toDateTime('2024-09-01 00:00:00'), toIntervalSecond(31536000))), lessOrEquals(events.timestamp, toDateTime('2024-10-31 23:59:59'))))
GROUP BY
events.person_id
events.person_id
HAVING
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
greater(length(conversion_timestamps), 0))
ARRAY JOIN arrayEnumerate(conversion_timestamps) AS i)) AS attributed_conversions
GROUP BY
campaign,
source)
GROUP BY
campaign,
source) AS cg_0 ON and(equals(campaign_costs.campaign, cg_0.campaign), equals(campaign_costs.source, cg_0.source))) AS previous_period ON and(equals(current_period.Campaign, previous_period.Campaign), equals(current_period.Source, previous_period.Source))
source) AS ucg ON and(equals(campaign_costs.campaign, ucg.campaign), equals(campaign_costs.source, ucg.source))) AS previous_period ON and(equals(current_period.Campaign, previous_period.Campaign), equals(current_period.Source, previous_period.Source))
ORDER BY
Cost DESC

View File

@@ -28,6 +28,7 @@ DEFAULT_DISTINCT_ID_FIELD = "distinct_id"
# CTE names
CAMPAIGN_COST_CTE_NAME = "campaign_costs"
UNIFIED_CONVERSION_GOALS_CTE_ALIAS = "ucg"
# Prefixes for table names
CONVERSION_GOAL_PREFIX_ABBREVIATION = "cg_"

View File

@@ -1,5 +1,4 @@
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union
from posthog.schema import (
@@ -7,7 +6,6 @@ from posthog.schema import (
ConversionGoalFilter1,
ConversionGoalFilter2,
ConversionGoalFilter3,
MarketingAnalyticsHelperForColumnNames,
PropertyMathType,
)
@@ -16,27 +14,11 @@ from posthog.hogql.property import action_to_expr, property_to_expr
from posthog.models import Action, Team
from .adapters.base import MarketingSourceAdapter
from .constants import (
CAMPAIGN_COST_CTE_NAME,
CONVERSION_GOAL_PREFIX,
CONVERSION_GOAL_PREFIX_ABBREVIATION,
DECIMAL_PRECISION,
DEFAULT_DISTINCT_ID_FIELD,
ORGANIC_CAMPAIGN,
ORGANIC_SOURCE,
TOTAL_COST_FIELD,
)
from .marketing_analytics_config import MarketingAnalyticsConfig
MAX_ATTRIBUTION_WINDOW_DAYS = 365 # let's start with a year window for the conversions
DAY_IN_SECONDS = 86400
class AttributionModeOperator(Enum):
LAST_TOUCH = "arrayMax"
FIRST_TOUCH = "arrayMin"
@dataclass
class ConversionGoalProcessor:
"""
@@ -50,8 +32,7 @@ class ConversionGoalProcessor:
goal: Union[ConversionGoalFilter1, ConversionGoalFilter2, ConversionGoalFilter3]
index: int
team: Team
attribution_window_days: int = MAX_ATTRIBUTION_WINDOW_DAYS
attribution_mode: str = AttributionModeOperator.LAST_TOUCH.value
config: MarketingAnalyticsConfig
def get_cte_name(self) -> str:
"""Get unique CTE name for this conversion goal"""
@@ -99,9 +80,9 @@ class ConversionGoalProcessor:
"""Build DAU (Daily Active Users) select expression"""
if self.goal.kind == "DataWarehouseNode":
schema_map = self.goal.schema_map
distinct_id_field = schema_map.get("distinct_id_field", DEFAULT_DISTINCT_ID_FIELD)
distinct_id_field = schema_map.get("distinct_id_field", self.config.default_distinct_id_field)
return ast.Call(name="uniq", args=[ast.Field(chain=[distinct_id_field])])
return ast.Call(name="uniq", args=[ast.Field(chain=["events", DEFAULT_DISTINCT_ID_FIELD])])
return ast.Call(name="uniq", args=[ast.Field(chain=["events", self.config.default_distinct_id_field])])
def _build_sum_select(self) -> ast.Expr:
"""Build SUM aggregation select expression"""
@@ -118,7 +99,7 @@ class ConversionGoalProcessor:
name="round",
args=[
ast.Call(name="sum", args=[ast.Call(name="toFloat", args=[property_field])]),
ast.Constant(value=DECIMAL_PRECISION),
ast.Constant(value=self.config.decimal_precision),
],
)
@@ -163,7 +144,7 @@ class ConversionGoalProcessor:
def _generate_array_based_query(self, additional_conditions: list[ast.Expr]) -> ast.SelectQuery:
"""Generate array-based query with attribution logic for Events/Actions"""
if self.attribution_window_days > 0:
if self.config.max_attribution_window_days > 0:
return self._generate_funnel_query(additional_conditions)
return self._generate_direct_query(additional_conditions)
@@ -177,7 +158,7 @@ class ConversionGoalProcessor:
where_conditions.extend(additional_conditions)
# Build nested query structure for attribution
attribution_window_seconds = self.attribution_window_days * DAY_IN_SECONDS
attribution_window_seconds = self.config.max_attribution_window_days * DAY_IN_SECONDS
array_collection = self._build_array_collection_subquery(conversion_event, where_conditions)
array_join = self._build_array_join_subquery(array_collection, attribution_window_seconds)
attribution = self._build_attribution_logic_subquery(array_join)
@@ -257,7 +238,7 @@ class ConversionGoalProcessor:
self._build_pageview_event_filter(date_conditions, utm_campaign_field, utm_source_field),
]
)
elif self.goal.kind == "ActionsNode" and self.attribution_window_days > 0:
elif self.goal.kind == "ActionsNode" and self.config.max_attribution_window_days > 0:
# For ActionsNode with attribution, we need both action events and pageview events
action_conditions = self.get_base_where_conditions()
action_filter = self._build_action_event_filter(action_conditions, date_conditions)
@@ -331,7 +312,7 @@ class ConversionGoalProcessor:
]
# Apply extended date conditions for pageviews (attribution window)
attribution_window_seconds = self.attribution_window_days * DAY_IN_SECONDS
attribution_window_seconds = self.config.max_attribution_window_days * DAY_IN_SECONDS
for date_condition in date_conditions:
if isinstance(date_condition, ast.CompareOperation):
if date_condition.op == ast.CompareOperationOp.GtEq:
@@ -651,7 +632,7 @@ class ConversionGoalProcessor:
return ast.Alias(
alias="last_utm_timestamp",
expr=ast.Call(
name=self.attribution_mode,
name=self.config.default_attribution_mode,
args=[
ast.Call(
name="arrayFilter",
@@ -772,15 +753,15 @@ class ConversionGoalProcessor:
"""Build final aggregation query with organic defaults"""
select_columns: list[ast.Expr] = [
ast.Alias(
alias=MarketingSourceAdapter.campaign_name_field,
expr=self._build_organic_default_expr("campaign_name", ORGANIC_CAMPAIGN),
alias=self.config.campaign_field,
expr=self._build_organic_default_expr("campaign_name", self.config.organic_campaign),
),
ast.Alias(
alias=MarketingSourceAdapter.source_name_field,
expr=self._build_organic_default_expr("source_name", ORGANIC_SOURCE),
alias=self.config.source_field,
expr=self._build_organic_default_expr("source_name", self.config.organic_source),
),
ast.Alias(
alias=CONVERSION_GOAL_PREFIX + str(self.index),
alias=self.config.get_conversion_goal_column_name(self.index),
expr=self._get_aggregation_expr(),
),
]
@@ -788,10 +769,7 @@ class ConversionGoalProcessor:
return ast.SelectQuery(
select=select_columns,
select_from=ast.JoinExpr(table=attribution_query, alias="attributed_conversions"),
group_by=[
ast.Field(chain=[MarketingSourceAdapter.campaign_name_field]),
ast.Field(chain=[MarketingSourceAdapter.source_name_field]),
],
group_by=[ast.Field(chain=[field]) for field in self.config.group_by_fields],
)
def _build_organic_default_expr(self, field_name: str, default_value: str) -> ast.Call:
@@ -834,15 +812,17 @@ class ConversionGoalProcessor:
# Build SELECT columns with organic defaults
select_columns: list[ast.Expr] = [
ast.Alias(
alias=MarketingSourceAdapter.campaign_name_field,
expr=ast.Call(name="coalesce", args=[utm_campaign_expr, ast.Constant(value=ORGANIC_CAMPAIGN)]),
alias=self.config.campaign_field,
expr=ast.Call(
name="coalesce", args=[utm_campaign_expr, ast.Constant(value=self.config.organic_campaign)]
),
),
ast.Alias(
alias=MarketingSourceAdapter.source_name_field,
expr=ast.Call(name="coalesce", args=[utm_source_expr, ast.Constant(value=ORGANIC_SOURCE)]),
alias=self.config.source_field,
expr=ast.Call(name="coalesce", args=[utm_source_expr, ast.Constant(value=self.config.organic_source)]),
),
ast.Alias(
alias=CONVERSION_GOAL_PREFIX + str(self.index),
alias=self.config.get_conversion_goal_column_name(self.index),
expr=select_field,
),
]
@@ -856,10 +836,7 @@ class ConversionGoalProcessor:
select=select_columns,
select_from=ast.JoinExpr(table=ast.Field(chain=[table])),
where=where_expr,
group_by=[
ast.Field(chain=[MarketingSourceAdapter.campaign_name_field]),
ast.Field(chain=[MarketingSourceAdapter.source_name_field]),
],
group_by=[ast.Field(chain=[field]) for field in self.config.group_by_fields],
)
def generate_cte_query_expr(self, additional_conditions: list[ast.Expr]) -> ast.Expr:
@@ -871,19 +848,19 @@ class ConversionGoalProcessor:
def generate_join_clause(self, use_full_outer_join: bool = False) -> ast.JoinExpr:
"""Generate JOIN clause for this conversion goal"""
cte_name = self.get_cte_name()
alias = CONVERSION_GOAL_PREFIX_ABBREVIATION + str(self.index)
alias = self.config.get_conversion_goal_alias(self.index)
join_condition = ast.And(
exprs=[
ast.CompareOperation(
left=ast.Field(chain=[CAMPAIGN_COST_CTE_NAME, MarketingSourceAdapter.campaign_name_field]),
left=ast.Field(chain=self.config.get_campaign_cost_field_chain(self.config.campaign_field)),
op=ast.CompareOperationOp.Eq,
right=ast.Field(chain=[alias, MarketingSourceAdapter.campaign_name_field]),
right=ast.Field(chain=[alias, self.config.campaign_field]),
),
ast.CompareOperation(
left=ast.Field(chain=[CAMPAIGN_COST_CTE_NAME, MarketingSourceAdapter.source_name_field]),
left=ast.Field(chain=self.config.get_campaign_cost_field_chain(self.config.source_field)),
op=ast.CompareOperationOp.Eq,
right=ast.Field(chain=[alias, MarketingSourceAdapter.source_name_field]),
right=ast.Field(chain=[alias, self.config.source_field]),
),
]
)
@@ -899,14 +876,14 @@ class ConversionGoalProcessor:
def generate_select_columns(self) -> list[ast.Alias]:
"""Generate SELECT columns for this conversion goal"""
goal_name = self.goal.conversion_goal_name
alias_prefix = CONVERSION_GOAL_PREFIX_ABBREVIATION + str(self.index)
alias_prefix = self.config.get_conversion_goal_alias(self.index)
conversion_goal_field = ast.Field(chain=[alias_prefix, CONVERSION_GOAL_PREFIX + str(self.index)])
conversion_goal_field = ast.Field(chain=[alias_prefix, self.config.get_conversion_goal_column_name(self.index)])
conversion_goal_alias = ast.Alias(alias=goal_name, expr=conversion_goal_field)
# Cost per conversion calculation
cost_field = ast.Field(chain=[CAMPAIGN_COST_CTE_NAME, TOTAL_COST_FIELD])
goal_field = ast.Field(chain=[alias_prefix, CONVERSION_GOAL_PREFIX + str(self.index)])
cost_field = ast.Field(chain=self.config.get_campaign_cost_field_chain(self.config.total_cost_field))
goal_field = ast.Field(chain=[alias_prefix, self.config.get_conversion_goal_column_name(self.index)])
cost_per_goal_expr = ast.Call(
name="round",
@@ -916,12 +893,12 @@ class ConversionGoalProcessor:
op=ast.ArithmeticOperationOp.Div,
right=ast.Call(name="nullif", args=[goal_field, ast.Constant(value=0)]),
),
ast.Constant(value=DECIMAL_PRECISION),
ast.Constant(value=self.config.decimal_precision),
],
)
cost_per_goal_alias = ast.Alias(
alias=f"{MarketingAnalyticsHelperForColumnNames.COST_PER} {goal_name}",
alias=f"{self.config.cost_per_prefix} {goal_name}",
expr=cost_per_goal_expr,
)

View File

@@ -0,0 +1,209 @@
from posthog.hogql import ast
from posthog.hogql_queries.utils.query_date_range import QueryDateRange
from .conversion_goal_processor import ConversionGoalProcessor
from .marketing_analytics_config import MarketingAnalyticsConfig
class ConversionGoalsAggregator:
"""
A dedicated query runner that creates a single unified table of all conversion goals
grouped by campaign and source
"""
def __init__(self, processors: list[ConversionGoalProcessor], config: MarketingAnalyticsConfig):
self.processors = processors
self.config = config
def generate_unified_cte(self, date_range: QueryDateRange, additional_conditions_getter) -> ast.CTE:
"""Generate a single CTE that contains all conversion goals aggregated by campaign/source"""
if not self.processors:
raise ValueError("Cannot create unified CTE without conversion goal processors")
# Step 1: Generate individual conversion goal queries
conversion_subqueries = []
for processor in self.processors:
# Build additional conditions for this processor
date_field = processor.get_date_field()
additional_conditions = additional_conditions_getter(
date_range=date_range,
include_date_range=True,
date_field=date_field,
use_date_not_datetime=True,
)
# Generate the base conversion goal query
base_query = processor.generate_cte_query(additional_conditions)
# Transform the query to include a column for this specific conversion goal
# and zero columns for all other conversion goals
enhanced_select = [
# Keep campaign and source
base_query.select[0], # campaign
base_query.select[1], # source
]
# Add columns for all conversion goals (this one gets the actual value, others get 0)
for p in self.processors:
if p.index == processor.index:
# This is the current processor - use the actual conversion value
# Extract the expression from the alias to avoid double aliasing
conversion_expr = base_query.select[2]
if isinstance(conversion_expr, ast.Alias):
conversion_expr = conversion_expr.expr
enhanced_select.append(
ast.Alias(
alias=self.config.get_conversion_goal_column_name(p.index),
expr=conversion_expr,
)
)
else:
# This is a different processor - add zero column
enhanced_select.append(
ast.Alias(
alias=self.config.get_conversion_goal_column_name(p.index), expr=ast.Constant(value=0)
)
)
enhanced_query = ast.SelectQuery(
select=enhanced_select,
select_from=base_query.select_from,
where=base_query.where,
group_by=base_query.group_by,
having=base_query.having,
array_join_op=base_query.array_join_op,
array_join_list=base_query.array_join_list,
)
conversion_subqueries.append(enhanced_query)
# Step 2: UNION ALL the individual queries
if len(conversion_subqueries) == 1:
union_query: ast.SelectQuery | ast.SelectSetQuery = conversion_subqueries[0]
else:
union_query = ast.SelectSetQuery.create_from_queries(conversion_subqueries, "UNION ALL")
# Step 3: Create final aggregation query that sums all conversion goals by campaign/source
final_select: list[ast.Expr] = [ast.Field(chain=[field]) for field in self.config.group_by_fields]
# Add each conversion goal as a summed column
for processor in self.processors:
final_select.append(
ast.Alias(
alias=self.config.get_conversion_goal_column_name(processor.index),
expr=ast.Call(
name="sum",
args=[ast.Field(chain=[self.config.get_conversion_goal_column_name(processor.index)])],
),
)
)
final_query = ast.SelectQuery(
select=final_select,
select_from=ast.JoinExpr(table=union_query),
group_by=[ast.Field(chain=[field]) for field in self.config.group_by_fields],
)
return ast.CTE(name="unified_conversion_goals", expr=final_query, cte_type="subquery")
def get_conversion_goal_columns(self) -> dict[str, ast.Alias]:
"""Get the column mappings for accessing conversion goals from the unified CTE"""
columns = {}
for processor in self.processors:
goal_name = processor.goal.conversion_goal_name
# Conversion goal column
conversion_goal_alias = ast.Alias(
alias=goal_name,
expr=ast.Field(
chain=self.config.get_unified_conversion_field_chain(
self.config.get_conversion_goal_column_name(processor.index)
)
),
)
# Cost per conversion column
cost_per_goal_alias = ast.Alias(
alias=f"{self.config.cost_per_prefix} {goal_name}",
expr=ast.Call(
name="round",
args=[
ast.ArithmeticOperation(
left=ast.Field(
chain=self.config.get_campaign_cost_field_chain(self.config.total_cost_field)
),
op=ast.ArithmeticOperationOp.Div,
right=ast.Call(
name="nullif",
args=[
ast.Field(
chain=self.config.get_unified_conversion_field_chain(
self.config.get_conversion_goal_column_name(processor.index)
)
),
ast.Constant(value=0),
],
),
),
ast.Constant(value=2),
],
),
)
columns[goal_name] = conversion_goal_alias
columns[f"{self.config.cost_per_prefix} {goal_name}"] = cost_per_goal_alias
return columns
def get_coalesce_fallback_columns(self) -> dict[str, ast.Expr]:
"""Get COALESCE columns that fall back to unified conversion goals for campaign/source"""
# Use the config group_by_fields to build COALESCE expressions
campaign_field, source_field = self.config.group_by_fields
campaign_args = [
ast.Call(
name="nullif",
args=[
ast.Field(chain=self.config.get_campaign_cost_field_chain(campaign_field)),
ast.Constant(value=""),
],
),
ast.Call(
name="nullif",
args=[
ast.Field(chain=self.config.get_unified_conversion_field_chain(campaign_field)),
ast.Constant(value=""),
],
),
ast.Constant(value=self.config.organic_campaign),
]
source_args = [
ast.Call(
name="nullif",
args=[
ast.Field(chain=self.config.get_campaign_cost_field_chain(source_field)),
ast.Constant(value=""),
],
),
ast.Call(
name="nullif",
args=[
ast.Field(chain=self.config.get_unified_conversion_field_chain(source_field)),
ast.Constant(value=""),
],
),
ast.Constant(value=self.config.organic_source),
]
return {
self.config.campaign_column_alias: ast.Alias(
alias=self.config.campaign_column_alias, expr=ast.Call(name="coalesce", args=campaign_args)
),
self.config.source_column_alias: ast.Alias(
alias=self.config.source_column_alias, expr=ast.Call(name="coalesce", args=source_args)
),
}

View File

@@ -0,0 +1,90 @@
from dataclasses import dataclass
from enum import Enum
from posthog.schema import MarketingAnalyticsBaseColumns, MarketingAnalyticsHelperForColumnNames
from .adapters.base import MarketingSourceAdapter
from .constants import (
CAMPAIGN_COST_CTE_NAME,
CONVERSION_GOAL_PREFIX,
CONVERSION_GOAL_PREFIX_ABBREVIATION,
DECIMAL_PRECISION,
DEFAULT_DISTINCT_ID_FIELD,
ORGANIC_CAMPAIGN,
ORGANIC_SOURCE,
TOTAL_CLICKS_FIELD,
TOTAL_COST_FIELD,
TOTAL_IMPRESSIONS_FIELD,
TOTAL_REPORTED_CONVERSION_FIELD,
UNIFIED_CONVERSION_GOALS_CTE_ALIAS,
)
class AttributionModeOperator(Enum):
LAST_TOUCH = "arrayMax"
FIRST_TOUCH = "arrayMin"
@dataclass
class MarketingAnalyticsConfig:
"""
Configuration object that centralizes all constants and naming conventions
for marketing analytics queries. This makes the system more configurable
and testable by injecting dependencies rather than hardcoding them.
"""
# CTE and table names
campaign_costs_cte_name: str = CAMPAIGN_COST_CTE_NAME
unified_conversion_goals_cte_alias: str = UNIFIED_CONVERSION_GOALS_CTE_ALIAS
# Field names for grouping
campaign_field: str = MarketingSourceAdapter.campaign_name_field
source_field: str = MarketingSourceAdapter.source_name_field
# Column aliases for output
campaign_column_alias: str = MarketingAnalyticsBaseColumns.CAMPAIGN
source_column_alias: str = MarketingAnalyticsBaseColumns.SOURCE
# Prefixes for naming
conversion_goal_prefix: str = CONVERSION_GOAL_PREFIX
conversion_goal_abbreviation: str = CONVERSION_GOAL_PREFIX_ABBREVIATION
cost_per_prefix: str = MarketingAnalyticsHelperForColumnNames.COST_PER
# Default values
organic_campaign: str = ORGANIC_CAMPAIGN
organic_source: str = ORGANIC_SOURCE
# Field references
total_cost_field: str = TOTAL_COST_FIELD
total_clicks_field: str = TOTAL_CLICKS_FIELD
total_impressions_field: str = TOTAL_IMPRESSIONS_FIELD
total_reported_conversions_field: str = TOTAL_REPORTED_CONVERSION_FIELD
default_distinct_id_field: str = DEFAULT_DISTINCT_ID_FIELD
# Precision settings
decimal_precision: int = DECIMAL_PRECISION
# Attribution settings
max_attribution_window_days: int = 365
default_attribution_mode: str = AttributionModeOperator.LAST_TOUCH.value
@property
def group_by_fields(self) -> list[str]:
"""Get the list of fields to group by"""
return [self.campaign_field, self.source_field]
def get_campaign_cost_field_chain(self, field_name: str) -> list[str | int]:
"""Get field chain for campaign cost CTE fields"""
return [self.campaign_costs_cte_name, field_name]
def get_unified_conversion_field_chain(self, field_name: str) -> list[str | int]:
"""Get field chain for unified conversion goals CTE fields"""
return [self.unified_conversion_goals_cte_alias, field_name]
def get_conversion_goal_column_name(self, index: int) -> str:
"""Get standardized conversion goal column name"""
return f"{self.conversion_goal_prefix}{index}"
def get_conversion_goal_alias(self, index: int) -> str:
"""Get conversion goal CTE alias"""
return f"{self.conversion_goal_abbreviation}{index}"

View File

@@ -1,6 +1,6 @@
from datetime import datetime
from functools import cached_property
from typing import Literal, cast
from typing import Literal, Optional, cast
import structlog
@@ -29,21 +29,10 @@ from posthog.models.team.team import DEFAULT_CURRENCY
from .adapters.base import MarketingSourceAdapter, QueryContext
from .adapters.factory import MarketingSourceFactory
from .constants import (
BASE_COLUMN_MAPPING,
CAMPAIGN_COST_CTE_NAME,
CONVERSION_GOAL_PREFIX_ABBREVIATION,
DEFAULT_LIMIT,
ORGANIC_CAMPAIGN,
ORGANIC_SOURCE,
PAGINATION_EXTRA,
TOTAL_CLICKS_FIELD,
TOTAL_COST_FIELD,
TOTAL_IMPRESSIONS_FIELD,
TOTAL_REPORTED_CONVERSION_FIELD,
to_marketing_analytics_data,
)
from .constants import BASE_COLUMN_MAPPING, DEFAULT_LIMIT, PAGINATION_EXTRA, to_marketing_analytics_data
from .conversion_goal_processor import ConversionGoalProcessor
from .conversion_goals_aggregator import ConversionGoalsAggregator
from .marketing_analytics_config import MarketingAnalyticsConfig
from .utils import convert_team_conversion_goals_to_objects
logger = structlog.get_logger(__name__)
@@ -58,6 +47,8 @@ class MarketingAnalyticsTableQueryRunner(AnalyticsQueryRunner[MarketingAnalytics
self.paginator = HogQLHasMorePaginator.from_limit_context(
limit_context=self.limit_context, limit=self.query.limit, offset=self.query.offset
)
# Initialize configuration
self.config = MarketingAnalyticsConfig()
@cached_property
def query_date_range(self):
@@ -115,34 +106,26 @@ class MarketingAnalyticsTableQueryRunner(AnalyticsQueryRunner[MarketingAnalytics
) -> ast.SelectQuery:
"""Build the complete query with CTEs using AST expressions"""
# Create conversion goals aggregator if needed
conversion_aggregator = ConversionGoalsAggregator(processors, self.config) if processors else None
# Build the main SELECT query
main_query = self._build_select_query(processors)
main_query = self._build_select_query(conversion_aggregator)
# Build CTEs as a dictionary
ctes: dict[str, ast.CTE] = {}
# Add campaign_costs CTE
campaign_cost_select = self._build_campaign_cost_select(union_query_string)
campaign_cost_cte = ast.CTE(name=CAMPAIGN_COST_CTE_NAME, expr=campaign_cost_select, cte_type="subquery")
ctes[CAMPAIGN_COST_CTE_NAME] = campaign_cost_cte
campaign_cost_cte = ast.CTE(
name=self.config.campaign_costs_cte_name, expr=campaign_cost_select, cte_type="subquery"
)
ctes[self.config.campaign_costs_cte_name] = campaign_cost_cte
# Add conversion goal CTEs if any
if processors:
for processor in processors:
# Build additional conditions (date range and global filters)
date_field = processor.get_date_field()
additional_conditions = self._get_where_conditions(
date_range=date_range,
include_date_range=True,
date_field=date_field,
use_date_not_datetime=True, # Conversion goals use toDate instead of toDateTime
)
# Generate CTE
cte_alias = processor.generate_cte_query_expr(additional_conditions)
cte_name = processor.get_cte_name()
cte = ast.CTE(name=cte_name, expr=cte_alias.expr, cte_type="subquery")
ctes[cte_name] = cte
# Add unified conversion goal CTE if any
if conversion_aggregator:
unified_cte = conversion_aggregator.generate_unified_cte(date_range, self._get_where_conditions)
ctes["unified_conversion_goals"] = unified_cte
# Add CTEs to the main query
main_query.ctes = ctes
@@ -347,22 +330,22 @@ class MarketingAnalyticsTableQueryRunner(AnalyticsQueryRunner[MarketingAnalytics
"""Build the campaign_costs CTE SELECT query"""
# Build SELECT columns for the CTE
select_columns: list[ast.Expr] = [
ast.Field(chain=[MarketingSourceAdapter.campaign_name_field]),
ast.Field(chain=[MarketingSourceAdapter.source_name_field]),
ast.Field(chain=[self.config.campaign_field]),
ast.Field(chain=[self.config.source_field]),
ast.Alias(
alias=TOTAL_COST_FIELD,
alias=self.config.total_cost_field,
expr=ast.Call(name="sum", args=[ast.Field(chain=[MarketingSourceAdapter.cost_field])]),
),
ast.Alias(
alias=TOTAL_CLICKS_FIELD,
alias=self.config.total_clicks_field,
expr=ast.Call(name="sum", args=[ast.Field(chain=[MarketingSourceAdapter.clicks_field])]),
),
ast.Alias(
alias=TOTAL_IMPRESSIONS_FIELD,
alias=self.config.total_impressions_field,
expr=ast.Call(name="sum", args=[ast.Field(chain=[MarketingSourceAdapter.impressions_field])]),
),
ast.Alias(
alias=TOTAL_REPORTED_CONVERSION_FIELD,
alias=self.config.total_reported_conversions_field,
expr=ast.Call(name="sum", args=[ast.Field(chain=[MarketingSourceAdapter.reported_conversion_field])]),
),
]
@@ -371,101 +354,72 @@ class MarketingAnalyticsTableQueryRunner(AnalyticsQueryRunner[MarketingAnalytics
union_subquery = parse_select(union_query_string)
union_join_expr = ast.JoinExpr(table=union_subquery)
# Build GROUP BY
group_by_exprs: list[ast.Expr] = [
ast.Field(chain=[MarketingSourceAdapter.campaign_name_field]),
ast.Field(chain=[MarketingSourceAdapter.source_name_field]),
]
# Build GROUP BY using configuration
group_by_exprs: list[ast.Expr] = [ast.Field(chain=[field]) for field in self.config.group_by_fields]
# Build the CTE SELECT query
return ast.SelectQuery(select=select_columns, select_from=union_join_expr, group_by=group_by_exprs)
def _build_select_columns_mapping(self, processors: list[ConversionGoalProcessor]) -> dict[str, ast.Expr]:
def _build_select_columns_mapping(
self, conversion_aggregator: Optional[ConversionGoalsAggregator] = None
) -> dict[str, ast.Expr]:
all_columns: dict[str, ast.Expr] = {str(k): v for k, v in BASE_COLUMN_MAPPING.items()}
# For FULL OUTER JOIN: use COALESCE to show conversion goal UTM values when campaign costs are empty
if processors and self.query.includeAllConversions:
all_columns.update(self._build_coalesce_campaign_source_columns(processors))
# Add conversion goal columns using the aggregator
if conversion_aggregator:
# For FULL OUTER JOIN: use COALESCE to show conversion goal UTM values when campaign costs are empty
if self.query.includeAllConversions:
coalesce_columns = conversion_aggregator.get_coalesce_fallback_columns()
all_columns.update(coalesce_columns)
for processor in processors:
conversion_goal_expr, cost_per_goal_expr = processor.generate_select_columns()
all_columns.update(
{conversion_goal_expr.alias: conversion_goal_expr, cost_per_goal_expr.alias: cost_per_goal_expr}
)
# Add conversion goal columns
conversion_columns = conversion_aggregator.get_conversion_goal_columns()
all_columns.update(conversion_columns)
return all_columns
def _build_coalesce_campaign_source_columns(self, processors: list[ConversionGoalProcessor]) -> dict[str, ast.Expr]:
"""Build COALESCE expressions for campaign and source that fall back to conversion goal values"""
campaign_args: list[ast.Expr] = [
ast.Call(
name="nullif",
args=[
ast.Field(chain=[CAMPAIGN_COST_CTE_NAME, MarketingSourceAdapter.campaign_name_field]),
ast.Constant(value=""),
],
)
]
source_args: list[ast.Expr] = [
ast.Call(
name="nullif",
args=[
ast.Field(chain=[CAMPAIGN_COST_CTE_NAME, MarketingSourceAdapter.source_name_field]),
ast.Constant(value=""),
],
)
]
# Add conversion goal fallbacks
for processor in processors:
alias_prefix = CONVERSION_GOAL_PREFIX_ABBREVIATION + str(processor.index)
campaign_args.append(
ast.Call(
name="nullif",
args=[
ast.Field(chain=[alias_prefix, MarketingSourceAdapter.campaign_name_field]),
ast.Constant(value=""),
],
)
)
source_args.append(
ast.Call(
name="nullif",
args=[
ast.Field(chain=[alias_prefix, MarketingSourceAdapter.source_name_field]),
ast.Constant(value=""),
],
)
)
# Add organic defaults
campaign_args.append(ast.Constant(value=ORGANIC_CAMPAIGN))
source_args.append(ast.Constant(value=ORGANIC_SOURCE))
return {
str(MarketingAnalyticsBaseColumns.CAMPAIGN): ast.Alias(
alias=MarketingAnalyticsBaseColumns.CAMPAIGN, expr=ast.Call(name="coalesce", args=campaign_args)
),
str(MarketingAnalyticsBaseColumns.SOURCE): ast.Alias(
alias=MarketingAnalyticsBaseColumns.SOURCE, expr=ast.Call(name="coalesce", args=source_args)
),
}
def _build_select_query(self, processors: list) -> ast.SelectQuery:
def _build_select_query(self, conversion_aggregator: Optional[ConversionGoalsAggregator] = None) -> ast.SelectQuery:
"""Build the complete SELECT query with base columns and conversion goal columns"""
# Get conversion goal components (processors already created and passed in)
conversion_columns_mapping = self._build_select_columns_mapping(processors)
if processors:
conversion_joins = self._generate_conversion_goal_joins_from_processors(processors)
else:
conversion_joins = []
# Get conversion goal components
conversion_columns_mapping = self._build_select_columns_mapping(conversion_aggregator)
# Create the FROM clause with base table
from_clause = ast.JoinExpr(table=ast.Field(chain=[CAMPAIGN_COST_CTE_NAME]))
from_clause = ast.JoinExpr(table=ast.Field(chain=[self.config.campaign_costs_cte_name]))
# Add conversion goal joins
if conversion_joins:
from_clause = self._append_joins(from_clause, conversion_joins)
# Add single unified conversion goals join if we have conversion goals
if conversion_aggregator:
join_type = "FULL OUTER JOIN" if self.query.includeAllConversions else "LEFT JOIN"
unified_join = ast.JoinExpr(
join_type=join_type,
table=ast.Field(chain=["unified_conversion_goals"]),
alias=self.config.unified_conversion_goals_cte_alias,
constraint=ast.JoinConstraint(
expr=ast.And(
exprs=[
ast.CompareOperation(
left=ast.Field(
chain=self.config.get_campaign_cost_field_chain(self.config.campaign_field)
),
op=ast.CompareOperationOp.Eq,
right=ast.Field(
chain=self.config.get_unified_conversion_field_chain(self.config.campaign_field)
),
),
ast.CompareOperation(
left=ast.Field(
chain=self.config.get_campaign_cost_field_chain(self.config.source_field)
),
op=ast.CompareOperationOp.Eq,
right=ast.Field(
chain=self.config.get_unified_conversion_field_chain(self.config.source_field)
),
),
]
),
constraint_type="ON",
),
)
from_clause.next_join = unified_join
return ast.SelectQuery(
select=list(conversion_columns_mapping.values()),
@@ -537,23 +491,12 @@ class MarketingAnalyticsTableQueryRunner(AnalyticsQueryRunner[MarketingAnalytics
in self.query.select
)
if should_create:
processor = ConversionGoalProcessor(goal=conversion_goal, index=index, team=self.team)
processor = ConversionGoalProcessor(
goal=conversion_goal, index=index, team=self.team, config=self.config
)
processors.append(processor)
return processors
def _generate_conversion_goal_joins_from_processors(self, processors: list) -> list[ast.JoinExpr]:
"""Generate JOIN clauses for conversion goals"""
if not processors:
return []
joins = []
for processor in processors:
# Let the processor generate its own JOIN clause
join_clause = processor.generate_join_clause(use_full_outer_join=bool(self.query.includeAllConversions))
joins.append(join_clause)
return joins
def _generate_conversion_goal_selects_from_processors(
self, processors: list[ConversionGoalProcessor]
) -> list[ast.Expr]:

View File

@@ -21,10 +21,13 @@ from posthog.hogql.test.utils import pretty_print_in_tests
from posthog.models import Action
from products.marketing_analytics.backend.hogql_queries.conversion_goal_processor import (
AttributionModeOperator,
ConversionGoalProcessor,
add_conversion_goal_property_filters,
)
from products.marketing_analytics.backend.hogql_queries.marketing_analytics_config import (
AttributionModeOperator,
MarketingAnalyticsConfig,
)
def _create_action(**kwargs):
@@ -75,6 +78,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
def setUp(self):
super().setUp()
self.date_range = DateRange(date_from="2023-01-01", date_to="2023-01-31")
self.config = MarketingAnalyticsConfig()
# No shared test data - each test creates its own isolated data
def _create_test_data(self):
@@ -135,7 +139,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Test basic getters
assert processor.get_cte_name() == "signup_goal"
@@ -157,7 +161,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
# Test various index values
for index in [0, 1, 5, 10]:
processor = ConversionGoalProcessor(goal=goal, index=index, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=index, team=self.team, config=self.config)
join_clause = processor.generate_join_clause()
assert join_clause.alias == f"cg_{index}"
@@ -176,7 +180,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
assert processor.get_table_name() == "events"
conditions = processor.get_base_where_conditions()
@@ -195,7 +199,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
assert processor.get_table_name() == "events"
conditions = processor.get_base_where_conditions()
@@ -221,7 +225,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
assert processor.get_table_name() == "warehouse_table"
assert processor.get_date_field() == "event_timestamp"
@@ -282,7 +286,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -370,7 +374,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -453,7 +457,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -531,7 +535,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -603,7 +607,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -691,7 +695,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -772,7 +776,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Apply property filters to additional conditions (same pattern as working test)
additional_conditions = [
@@ -862,7 +866,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Apply property filters to additional conditions (same pattern as working test)
additional_conditions = [
@@ -912,7 +916,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
utm_campaign, utm_source = processor.get_utm_expressions()
assert utm_campaign.chain == ["events", "properties", "utm_campaign"]
@@ -938,7 +942,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
utm_campaign, utm_source = processor.get_utm_expressions()
assert utm_campaign.chain == ["campaign_field"]
@@ -955,7 +959,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "custom_campaign_field", "utm_source_name": "custom_source_field"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
utm_campaign, utm_source = processor.get_utm_expressions()
assert utm_campaign.chain == ["events", "properties", "custom_campaign_field"]
@@ -972,7 +976,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={}, # Empty schema map
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Should handle missing schema gracefully and fallback to defaults
utm_campaign, utm_source = processor.get_utm_expressions()
@@ -1026,7 +1030,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "my_campaign_field", "utm_source_name": "my_source_field"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -1077,7 +1081,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
join_clause = processor.generate_join_clause()
assert join_clause.join_type == "LEFT JOIN"
@@ -1096,7 +1100,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
select_columns = processor.generate_select_columns()
assert len(select_columns) == 2
@@ -1124,7 +1128,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
with pytest.raises(Action.DoesNotExist):
processor.get_base_where_conditions()
@@ -1141,7 +1145,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Should handle gracefully by ignoring irrelevant math_property for DAU
select_field = processor.get_select_field()
@@ -1192,7 +1196,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -1240,7 +1244,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Test that query executes successfully with very long goal name
additional_conditions = [
@@ -1304,7 +1308,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Test that query correctly matches only the event with special characters
additional_conditions = [
@@ -1372,7 +1376,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "营销活动", "utm_source_name": "来源"}, # Chinese property names
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Test that query executes successfully with Unicode property names
additional_conditions = [
@@ -1437,7 +1441,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Test that query executes and finds the conversion despite complex timeline
additional_conditions = [
@@ -1482,7 +1486,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -1511,7 +1515,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -1539,7 +1543,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -1565,7 +1569,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Test all major components can be generated without errors
assert processor.get_select_field() is not None
@@ -1614,7 +1618,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
# Execute query and verify attribution
additional_conditions = [
@@ -1669,7 +1673,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -1740,11 +1744,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -1812,11 +1812,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -1903,7 +1899,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -1973,11 +1969,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2045,7 +2037,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2114,9 +2106,11 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
# Set attribution mode to first touch
processor.attribution_mode = AttributionModeOperator.FIRST_TOUCH.value
# Create config with first-touch attribution mode
first_touch_config = MarketingAnalyticsConfig()
first_touch_config.default_attribution_mode = AttributionModeOperator.FIRST_TOUCH.value
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=first_touch_config)
additional_conditions = [
ast.CompareOperation(
@@ -2210,7 +2204,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2274,11 +2268,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2367,11 +2357,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
)
# Test April conversion attribution (should use spring_sale)
processor_april = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor_april = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions_april = [
ast.CompareOperation(
@@ -2405,11 +2391,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
assert count == 1, f"Expected 1 conversion attributed to spring_sale, got {count}"
# Test May conversion attribution (should use mothers_day)
processor_may = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor_may = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions_may = [
ast.CompareOperation(
@@ -2442,11 +2424,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
assert may_count == 2, f"Expected 2 conversions attributed to mothers_day, got {may_count}"
# Test June conversion attribution (should still use mothers_day - no new ads)
processor_june = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor_june = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions_june = [
ast.CompareOperation(
@@ -2523,7 +2501,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2582,7 +2560,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2640,7 +2618,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2697,7 +2675,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2768,7 +2746,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2839,11 +2817,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -2949,11 +2923,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3024,11 +2994,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3098,11 +3064,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3191,11 +3153,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3295,11 +3253,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3370,11 +3324,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
)
# Note: Query range EXCLUDES the March UTM touchpoint
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [ # May only!
ast.CompareOperation(
@@ -3451,12 +3401,8 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
)
# Test conversion within 30-day attribution window (should attribute)
processor_within = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor_within.attribution_window_days = 30
processor_within = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
processor_within.config.max_attribution_window_days = 30
additional_conditions_within = [
ast.CompareOperation(
@@ -3484,11 +3430,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
assert within_count == 1, f"Expected 1 conversion within window, got {within_count}"
# Test conversion beyond 30-day attribution window (should not attribute)
processor_beyond = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor_beyond = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions_beyond = [
ast.CompareOperation(
@@ -3497,7 +3439,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
right=ast.Call(name="toDate", args=[ast.Constant(value="2023-02-01")]),
),
]
processor_beyond.attribution_window_days = 30
processor_beyond.config.max_attribution_window_days = 30
cte_query_beyond = processor_beyond.generate_cte_query(additional_conditions_beyond)
response_beyond = execute_hogql_query(query=cte_query_beyond, team=self.team)
@@ -3552,12 +3494,8 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor.attribution_window_days = 10
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
processor.config.max_attribution_window_days = 10
additional_conditions = [
ast.CompareOperation(
left=ast.Field(chain=["events", "timestamp"]),
@@ -3639,11 +3577,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3713,7 +3647,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3784,7 +3718,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3850,11 +3784,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3914,11 +3844,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -3997,11 +3923,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -4079,11 +4001,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -4171,11 +4089,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -4304,11 +4218,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
)
# Test first purchase attribution (February 17)
processor_first = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor_first = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions_first = [
ast.CompareOperation(
@@ -4340,11 +4250,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
assert conversion_count == 1, f"Expected 1 conversion for first purchase, got {conversion_count}"
# Test both purchases together (full timeline attribution)
processor_full = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor_full = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions_full = [
ast.CompareOperation(
@@ -4435,11 +4341,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -4542,11 +4444,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -4622,12 +4520,8 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor.attribution_window_days = 180 # 6 month attribution window
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
processor.config.max_attribution_window_days = 180 # 6 month attribution window
additional_conditions = [
ast.CompareOperation(
@@ -4705,12 +4599,8 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor.attribution_window_days = 180 # 6 month attribution window
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
processor.config.max_attribution_window_days = 180 # 6 month attribution window
additional_conditions = [
ast.CompareOperation(
@@ -4805,12 +4695,8 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(
goal=goal,
index=0,
team=self.team,
)
processor.attribution_window_days = 30 # 30-day attribution window
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
processor.config.max_attribution_window_days = 30 # 30-day attribution window
# Query range: June 2 to July 2 (includes both conversions)
additional_conditions = [
@@ -4905,7 +4791,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -5010,7 +4896,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor_total = ConversionGoalProcessor(goal=goal_total, index=0, team=self.team)
processor_total = ConversionGoalProcessor(goal=goal_total, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -5039,7 +4925,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor_dau = ConversionGoalProcessor(goal=goal_dau, index=0, team=self.team)
processor_dau = ConversionGoalProcessor(goal=goal_dau, index=0, team=self.team, config=self.config)
cte_query_dau = processor_dau.generate_cte_query(additional_conditions)
response_dau = execute_hogql_query(query=cte_query_dau, team=self.team)
@@ -5136,7 +5022,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -5200,7 +5086,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
@@ -5261,7 +5147,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
additional_conditions = [
ast.CompareOperation(
left=ast.Field(chain=["events", "timestamp"]),
@@ -5352,8 +5238,8 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
]
# Execute queries
events_processor = ConversionGoalProcessor(goal=events_goal, index=0, team=self.team)
actions_processor = ConversionGoalProcessor(goal=actions_goal, index=1, team=self.team)
events_processor = ConversionGoalProcessor(goal=events_goal, index=0, team=self.team, config=self.config)
actions_processor = ConversionGoalProcessor(goal=actions_goal, index=1, team=self.team, config=self.config)
events_query = events_processor.generate_cte_query(date_conditions)
actions_query = actions_processor.generate_cte_query(date_conditions)
@@ -5418,7 +5304,7 @@ class TestConversionGoalProcessor(ClickhouseTestMixin, BaseTest):
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team)
processor = ConversionGoalProcessor(goal=goal, index=0, team=self.team, config=self.config)
date_conditions = [
ast.CompareOperation(

View File

@@ -0,0 +1,461 @@
from datetime import datetime
import pytest
from posthog.test.base import BaseTest, ClickhouseTestMixin
from posthog.schema import BaseMathType, ConversionGoalFilter1, ConversionGoalFilter2, DateRange, NodeKind
from posthog.hogql import ast
from posthog.hogql.query import execute_hogql_query
from posthog.hogql.test.utils import pretty_print_in_tests
from posthog.hogql_queries.utils.query_date_range import QueryDateRange
from posthog.models import Action
from .conversion_goal_processor import ConversionGoalProcessor
from .conversion_goals_aggregator import ConversionGoalsAggregator
from .marketing_analytics_config import MarketingAnalyticsConfig
class TestConversionGoalsAggregator(ClickhouseTestMixin, BaseTest):
def setUp(self):
super().setUp()
self.config = MarketingAnalyticsConfig()
self.date_range = QueryDateRange(
date_range=DateRange(date_from="2023-01-01", date_to="2023-01-31"),
team=self.team,
interval=None,
now=datetime(2023, 1, 31, 23, 59, 59),
)
def _create_test_conversion_goal(
self,
goal_id: str,
goal_name: str,
event_name: str | None = None,
math: BaseMathType = BaseMathType.TOTAL,
math_property: str | None = None,
) -> ConversionGoalFilter1:
return ConversionGoalFilter1(
kind=NodeKind.EVENTS_NODE,
event=event_name or goal_name.lower().replace(" ", "_"),
conversion_goal_id=goal_id,
conversion_goal_name=goal_name,
math=math,
math_property=math_property,
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
def _create_test_action_goal(self, action: Action, goal_id: str, goal_name: str) -> ConversionGoalFilter2:
return ConversionGoalFilter2(
kind=NodeKind.ACTIONS_NODE,
id=str(action.id),
conversion_goal_id=goal_id,
conversion_goal_name=goal_name,
math=BaseMathType.TOTAL,
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
def _create_test_processor(self, goal, index: int) -> ConversionGoalProcessor:
return ConversionGoalProcessor(goal=goal, index=index, team=self.team, config=self.config)
def _create_mock_additional_conditions_getter(self):
def mock_getter(**kwargs):
return [
ast.CompareOperation(
left=ast.Field(chain=["events", "timestamp"]),
op=ast.CompareOperationOp.GtEq,
right=ast.Call(name="toDate", args=[ast.Constant(value="2023-01-01")]),
),
ast.CompareOperation(
left=ast.Field(chain=["events", "timestamp"]),
op=ast.CompareOperationOp.Lt,
right=ast.Call(name="toDate", args=[ast.Constant(value="2023-02-01")]),
),
]
return mock_getter
def test_initialization_basic(self):
goal1 = self._create_test_conversion_goal("goal1", "Sign Ups")
goal2 = self._create_test_conversion_goal("goal2", "Purchases")
processor1 = self._create_test_processor(goal1, 0)
processor2 = self._create_test_processor(goal2, 1)
aggregator = ConversionGoalsAggregator(processors=[processor1, processor2], config=self.config)
assert len(aggregator.processors) == 2
assert aggregator.config == self.config
assert aggregator.processors[0].index == 0
assert aggregator.processors[1].index == 1
def test_initialization_empty_processors(self):
aggregator = ConversionGoalsAggregator(processors=[], config=self.config)
assert len(aggregator.processors) == 0
assert aggregator.config == self.config
def test_initialization_single_processor(self):
goal = self._create_test_conversion_goal("single_goal", "Single Goal")
processor = self._create_test_processor(goal, 0)
aggregator = ConversionGoalsAggregator(processors=[processor], config=self.config)
assert len(aggregator.processors) == 1
assert aggregator.processors[0].goal.conversion_goal_name == "Single Goal"
assert aggregator.processors[0].index == 0
def test_unified_cte_empty_processors_error(self):
aggregator = ConversionGoalsAggregator(processors=[], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
with pytest.raises(ValueError, match="Cannot create unified CTE without conversion goal processors"):
aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
def test_unified_cte_single_processor(self):
goal = self._create_test_conversion_goal("single_cte", "Single CTE Test")
processor = self._create_test_processor(goal, 0)
aggregator = ConversionGoalsAggregator(processors=[processor], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
assert isinstance(cte, ast.CTE)
assert cte.name == "unified_conversion_goals"
assert cte.cte_type == "subquery"
assert isinstance(cte.expr, ast.SelectQuery)
final_query = cte.expr
assert isinstance(final_query, ast.SelectQuery)
assert len(final_query.select) == 3
assert final_query.group_by is not None
assert len(final_query.group_by) == 2
conversion_column = final_query.select[2]
assert isinstance(conversion_column, ast.Alias)
assert conversion_column.alias == self.config.get_conversion_goal_column_name(0)
def test_unified_cte_multiple_processors(self):
goal1 = self._create_test_conversion_goal("multi_goal1", "Goal 1")
goal2 = self._create_test_conversion_goal("multi_goal2", "Goal 2")
goal3 = self._create_test_conversion_goal("multi_goal3", "Goal 3")
processor1 = self._create_test_processor(goal1, 0)
processor2 = self._create_test_processor(goal2, 1)
processor3 = self._create_test_processor(goal3, 2)
aggregator = ConversionGoalsAggregator(processors=[processor1, processor2, processor3], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
assert isinstance(cte, ast.CTE)
assert cte.name == "unified_conversion_goals"
final_query = cte.expr
assert isinstance(final_query, ast.SelectQuery)
assert len(final_query.select) == 5
assert final_query.group_by is not None
assert len(final_query.group_by) == 2
conversion_columns = final_query.select[2:]
assert len(conversion_columns) == 3
for i, column in enumerate(conversion_columns):
assert isinstance(column, ast.Alias)
assert column.alias == self.config.get_conversion_goal_column_name(i)
assert isinstance(column.expr, ast.Call)
assert column.expr.name == "sum"
@pytest.mark.usefixtures("unittest_snapshot")
def test_unified_cte_sql_snapshot(self):
goal1 = self._create_test_conversion_goal("snapshot_goal1", "Snapshot Goal 1", "sign_up")
goal2 = self._create_test_conversion_goal("snapshot_goal2", "Snapshot Goal 2", "purchase")
processor1 = self._create_test_processor(goal1, 0)
processor2 = self._create_test_processor(goal2, 1)
aggregator = ConversionGoalsAggregator(processors=[processor1, processor2], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
response = execute_hogql_query(query=cte.expr, team=self.team)
assert pretty_print_in_tests(response.hogql, self.team.pk) == self.snapshot
def test_unified_cte_ast_structure(self):
goal1 = self._create_test_conversion_goal("ast_goal1", "AST Goal 1", "sign_up")
goal2 = self._create_test_conversion_goal("ast_goal2", "AST Goal 2", "purchase")
processor1 = self._create_test_processor(goal1, 0)
processor2 = self._create_test_processor(goal2, 1)
aggregator = ConversionGoalsAggregator(processors=[processor1, processor2], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
assert isinstance(cte, ast.CTE)
assert cte.name == "unified_conversion_goals"
assert isinstance(cte.expr, ast.SelectQuery)
final_query = cte.expr
assert isinstance(final_query, ast.SelectQuery)
assert len(final_query.select) == 4
assert final_query.group_by is not None
assert len(final_query.group_by) == 2
assert isinstance(final_query.select_from, ast.JoinExpr)
assert isinstance(final_query.select_from.table, ast.SelectSetQuery)
def test_conversion_goal_columns_single(self):
goal = self._create_test_conversion_goal("columns_test", "Columns Test")
processor = self._create_test_processor(goal, 0)
aggregator = ConversionGoalsAggregator(processors=[processor], config=self.config)
columns = aggregator.get_conversion_goal_columns()
assert len(columns) == 2
goal_column = columns["Columns Test"]
assert isinstance(goal_column, ast.Alias)
assert goal_column.alias == "Columns Test"
assert isinstance(goal_column.expr, ast.Field)
expected_chain = self.config.get_unified_conversion_field_chain(self.config.get_conversion_goal_column_name(0))
assert goal_column.expr.chain == expected_chain
cost_column_name = f"{self.config.cost_per_prefix} Columns Test"
cost_column = columns[cost_column_name]
assert isinstance(cost_column, ast.Alias)
assert cost_column.alias == cost_column_name
assert isinstance(cost_column.expr, ast.Call)
assert cost_column.expr.name == "round"
def test_conversion_goal_columns_multiple(self):
goal1 = self._create_test_conversion_goal("col_goal1", "Goal Alpha")
goal2 = self._create_test_conversion_goal("col_goal2", "Goal Beta")
goal3 = self._create_test_conversion_goal("col_goal3", "Goal Gamma")
processor1 = self._create_test_processor(goal1, 0)
processor2 = self._create_test_processor(goal2, 1)
processor3 = self._create_test_processor(goal3, 2)
aggregator = ConversionGoalsAggregator(processors=[processor1, processor2, processor3], config=self.config)
columns = aggregator.get_conversion_goal_columns()
assert len(columns) == 6
expected_goals = ["Goal Alpha", "Goal Beta", "Goal Gamma"]
for goal_name in expected_goals:
assert goal_name in columns
assert f"{self.config.cost_per_prefix} {goal_name}" in columns
for goal_name in expected_goals:
cost_column_name = f"{self.config.cost_per_prefix} {goal_name}"
cost_column = columns[cost_column_name]
assert isinstance(cost_column.expr, ast.Call)
assert cost_column.expr.name == "round"
args = cost_column.expr.args
assert len(args) == 2
division_expr = args[0]
assert isinstance(division_expr, ast.ArithmeticOperation)
assert division_expr.op == ast.ArithmeticOperationOp.Div
def test_coalesce_fallback_columns(self):
goal = self._create_test_conversion_goal("fallback_test", "Fallback Test")
processor = self._create_test_processor(goal, 0)
aggregator = ConversionGoalsAggregator(processors=[processor], config=self.config)
fallback_columns = aggregator.get_coalesce_fallback_columns()
assert len(fallback_columns) == 2
assert self.config.campaign_column_alias in fallback_columns
assert self.config.source_column_alias in fallback_columns
campaign_column = fallback_columns[self.config.campaign_column_alias]
assert isinstance(campaign_column, ast.Alias)
assert campaign_column.alias == self.config.campaign_column_alias
assert isinstance(campaign_column.expr, ast.Call)
assert campaign_column.expr.name == "coalesce"
assert len(campaign_column.expr.args) == 3
source_column = fallback_columns[self.config.source_column_alias]
assert isinstance(source_column, ast.Alias)
assert source_column.alias == self.config.source_column_alias
assert isinstance(source_column.expr, ast.Call)
assert source_column.expr.name == "coalesce"
assert len(source_column.expr.args) == 3
def test_coalesce_nullif_logic(self):
goal = self._create_test_conversion_goal("nullif_test", "NULLIF Test")
processor = self._create_test_processor(goal, 0)
aggregator = ConversionGoalsAggregator(processors=[processor], config=self.config)
fallback_columns = aggregator.get_coalesce_fallback_columns()
campaign_column = fallback_columns[self.config.campaign_column_alias]
assert isinstance(campaign_column, ast.Alias)
assert isinstance(campaign_column.expr, ast.Call)
first_arg = campaign_column.expr.args[0]
assert isinstance(first_arg, ast.Call)
assert first_arg.name == "nullif"
assert len(first_arg.args) == 2
assert isinstance(first_arg.args[1], ast.Constant)
assert first_arg.args[1].value == ""
second_arg = campaign_column.expr.args[1]
assert isinstance(second_arg, ast.Call)
assert second_arg.name == "nullif"
assert len(second_arg.args) == 2
assert isinstance(second_arg.args[1], ast.Constant)
assert second_arg.args[1].value == ""
third_arg = campaign_column.expr.args[2]
assert isinstance(third_arg, ast.Constant)
assert third_arg.value == self.config.organic_campaign
def test_integration_events_mix(self):
events_goal1 = self._create_test_conversion_goal("integration_events1", "Events Goal 1", "sign_up")
events_goal2 = self._create_test_conversion_goal("integration_events2", "Events Goal 2", "purchase")
events_processor1 = self._create_test_processor(events_goal1, 0)
events_processor2 = self._create_test_processor(events_goal2, 1)
aggregator = ConversionGoalsAggregator(processors=[events_processor1, events_processor2], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
assert isinstance(cte, ast.CTE)
assert cte.name == "unified_conversion_goals"
columns = aggregator.get_conversion_goal_columns()
assert "Events Goal 1" in columns
assert "Events Goal 2" in columns
assert f"{self.config.cost_per_prefix} Events Goal 1" in columns
assert f"{self.config.cost_per_prefix} Events Goal 2" in columns
def test_different_math_types(self):
goal1 = self._create_test_conversion_goal("math_total", "Total Goal", "sign_up", BaseMathType.TOTAL)
goal2 = self._create_test_conversion_goal("math_dau", "DAU Goal", "login", BaseMathType.DAU)
goal3 = self._create_test_conversion_goal("math_total2", "Total Goal 2", "purchase", BaseMathType.TOTAL)
processor1 = self._create_test_processor(goal1, 0)
processor2 = self._create_test_processor(goal2, 1)
processor3 = self._create_test_processor(goal3, 2)
aggregator = ConversionGoalsAggregator(processors=[processor1, processor2, processor3], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
assert isinstance(cte, ast.CTE)
final_query = cte.expr
assert isinstance(final_query, ast.SelectQuery)
assert len(final_query.select) == 5
columns = aggregator.get_conversion_goal_columns()
assert len(columns) == 6
@pytest.mark.usefixtures("unittest_snapshot")
def test_multiple_goals_sql_snapshot(self):
events_goal1 = self._create_test_conversion_goal("multi_goal1", "Multi Goal 1", "purchase")
events_goal2 = self._create_test_conversion_goal("multi_goal2", "Multi Goal 2", "sign_up")
events_goal3 = self._create_test_conversion_goal("multi_goal3", "Multi Goal 3", "login")
processor1 = self._create_test_processor(events_goal1, 0)
processor2 = self._create_test_processor(events_goal2, 1)
processor3 = self._create_test_processor(events_goal3, 2)
aggregator = ConversionGoalsAggregator(processors=[processor1, processor2, processor3], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
response = execute_hogql_query(query=cte.expr, team=self.team)
assert pretty_print_in_tests(response.hogql, self.team.pk) == self.snapshot
def test_aggregation_ast_validation(self):
events_goal1 = self._create_test_conversion_goal("simple_events1", "Simple Events 1", "purchase")
events_goal2 = self._create_test_conversion_goal("simple_events2", "Simple Events 2", "sign_up")
events_goal3 = self._create_test_conversion_goal("simple_events3", "Simple Events 3", "login")
processor1 = self._create_test_processor(events_goal1, 0)
processor2 = self._create_test_processor(events_goal2, 1)
processor3 = self._create_test_processor(events_goal3, 2)
aggregator = ConversionGoalsAggregator(processors=[processor1, processor2, processor3], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
assert isinstance(cte, ast.CTE)
assert cte.name == "unified_conversion_goals"
final_query = cte.expr
assert isinstance(final_query, ast.SelectQuery)
assert len(final_query.select) == 5
assert final_query.group_by is not None
assert len(final_query.group_by) == 2
conversion_columns = final_query.select[2:]
expected_names = [
self.config.get_conversion_goal_column_name(0),
self.config.get_conversion_goal_column_name(1),
self.config.get_conversion_goal_column_name(2),
]
actual_names = [col.alias for col in conversion_columns if isinstance(col, ast.Alias)]
assert actual_names == expected_names
def test_empty_goal_name(self):
goal = ConversionGoalFilter1(
kind=NodeKind.EVENTS_NODE,
event="test_event",
conversion_goal_id="empty_name_test",
conversion_goal_name="",
math=BaseMathType.TOTAL,
schema_map={"utm_campaign_name": "utm_campaign", "utm_source_name": "utm_source"},
)
processor = self._create_test_processor(goal, 0)
aggregator = ConversionGoalsAggregator(processors=[processor], config=self.config)
columns = aggregator.get_conversion_goal_columns()
assert "" in columns
assert f"{self.config.cost_per_prefix} " in columns
def test_high_processor_indices(self):
goal = self._create_test_conversion_goal("high_index_test", "High Index Test")
processor = self._create_test_processor(goal, 999)
aggregator = ConversionGoalsAggregator(processors=[processor], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
assert isinstance(cte, ast.CTE)
final_query = cte.expr
assert isinstance(final_query, ast.SelectQuery)
conversion_column = final_query.select[2]
assert isinstance(conversion_column, ast.Alias)
assert conversion_column.alias == self.config.get_conversion_goal_column_name(999)
def test_duplicate_goal_names(self):
goal1 = self._create_test_conversion_goal("dup1", "Duplicate Goal")
goal2 = self._create_test_conversion_goal("dup2", "Duplicate Goal")
processor1 = self._create_test_processor(goal1, 0)
processor2 = self._create_test_processor(goal2, 1)
aggregator = ConversionGoalsAggregator(processors=[processor1, processor2], config=self.config)
columns = aggregator.get_conversion_goal_columns()
assert "Duplicate Goal" in columns
assert f"{self.config.cost_per_prefix} Duplicate Goal" in columns
def test_many_processors(self):
processors = []
for i in range(10):
goal = self._create_test_conversion_goal(f"perf_goal_{i}", f"Performance Goal {i}")
processor = self._create_test_processor(goal, i)
processors.append(processor)
aggregator = ConversionGoalsAggregator(processors=processors, config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
assert isinstance(cte, ast.CTE)
final_query = cte.expr
assert isinstance(final_query, ast.SelectQuery)
assert len(final_query.select) == 12
columns = aggregator.get_conversion_goal_columns()
assert len(columns) == 20
fallback_columns = aggregator.get_coalesce_fallback_columns()
assert len(fallback_columns) == 2
def test_attribution_compatibility(self):
goal = self._create_test_conversion_goal("attribution_test", "Attribution Test", "purchase")
processor = self._create_test_processor(goal, 0)
aggregator = ConversionGoalsAggregator(processors=[processor], config=self.config)
additional_conditions_getter = self._create_mock_additional_conditions_getter()
cte = aggregator.generate_unified_cte(self.date_range, additional_conditions_getter)
assert isinstance(cte, ast.CTE)
assert cte.name == "unified_conversion_goals"