feat(batch-exports): Added usage report for rows exported (#37241)

Co-authored-by: pawel-cebula <pawel.c@posthog.com>
This commit is contained in:
Ben White
2025-09-01 20:54:08 +02:00
committed by GitHub
parent f8f99147e7
commit 66a6aadd54
7 changed files with 56 additions and 1 deletions

View File

@@ -330,6 +330,7 @@ class BillingManager:
surveys=usage_summary.get("surveys", {}),
rows_synced=usage_summary.get("rows_synced", {}),
cdp_invocations=usage_summary.get("cdp_invocations", {}),
rows_exported=usage_summary.get("rows_exported", {}),
feature_flag_requests=usage_summary.get("feature_flag_requests", {}),
api_queries_read_bytes=usage_summary.get("api_queries_read_bytes", {}),
llm_events=usage_summary.get("llm_events", {}),

View File

@@ -26,6 +26,7 @@ from posthog.tasks.usage_report import (
get_teams_with_exceptions_captured_in_period,
get_teams_with_feature_flag_requests_count_in_period,
get_teams_with_recording_count_in_period,
get_teams_with_rows_exported_in_period,
get_teams_with_rows_synced_in_period,
get_teams_with_survey_responses_count_in_period,
)
@@ -64,6 +65,7 @@ class QuotaResource(Enum):
SURVEYS = "surveys"
LLM_EVENTS = "llm_events"
CDP_INVOCATIONS = "cdp_invocations"
ROWS_EXPORTED = "rows_exported"
class QuotaLimitingCaches(Enum):
@@ -81,6 +83,7 @@ OVERAGE_BUFFER = {
QuotaResource.SURVEYS: 0,
QuotaResource.LLM_EVENTS: 0,
QuotaResource.CDP_INVOCATIONS: 0,
QuotaResource.ROWS_EXPORTED: 0,
}
TRUST_SCORE_KEYS = {
@@ -93,6 +96,7 @@ TRUST_SCORE_KEYS = {
QuotaResource.SURVEYS: "surveys",
QuotaResource.LLM_EVENTS: "llm_events",
QuotaResource.CDP_INVOCATIONS: "cdp_invocations",
QuotaResource.ROWS_EXPORTED: "rows_exported",
}
@@ -106,6 +110,7 @@ class UsageCounters(TypedDict):
surveys: int
llm_events: int
cdp_invocations: int
rows_exported: int
# -------------------------------------------------------------------------------------------------
@@ -608,6 +613,9 @@ def update_all_orgs_billing_quotas(
"teams_with_cdp_invocations_metrics": convert_team_usage_rows_to_dict(
get_teams_with_cdp_billable_invocations_in_period(period_start, period_end)
),
"teams_with_rows_exported_in_period": convert_team_usage_rows_to_dict(
get_teams_with_rows_exported_in_period(period_start, period_end)
),
"teams_with_survey_responses_count_in_period": convert_team_usage_rows_to_dict(
get_teams_with_survey_responses_count_in_period(period_start, period_end)
),
@@ -647,6 +655,7 @@ def update_all_orgs_billing_quotas(
surveys=all_data["teams_with_survey_responses_count_in_period"].get(team.id, 0),
llm_events=all_data["teams_with_ai_event_count_in_period"].get(team.id, 0),
cdp_invocations=all_data["teams_with_cdp_invocations_metrics"].get(team.id, 0),
rows_exported=all_data["teams_with_rows_exported_in_period"].get(team.id, 0),
)
org_id = str(team.organization.id)
@@ -742,6 +751,7 @@ def update_all_orgs_billing_quotas(
"quota_limited_surveys": quota_limited_orgs["surveys"].get(org_id, None),
"quota_limited_llm_events": quota_limited_orgs["llm_events"].get(org_id, None),
"quota_limited_cdp_invocations": quota_limited_orgs["cdp_invocations"].get(org_id, None),
"quota_limited_rows_exported": quota_limited_orgs["rows_exported"].get(org_id, None),
}
report_organization_action(

View File

@@ -151,6 +151,7 @@ class TestBillingManager(BaseTest):
"quota_limiting_suspended_until": 1611705600,
},
"rows_synced": {"usage": 45, "limit": 500, "todays_usage": 5},
"rows_exported": {"usage": 10, "limit": 1000, "todays_usage": 5},
"feature_flag_requests": {"usage": 25, "limit": 300, "todays_usage": 5},
"api_queries_read_bytes": {"usage": 1000, "limit": 1000000, "todays_usage": 500},
"llm_events": {"usage": 50, "limit": 1000, "todays_usage": 2},
@@ -178,6 +179,7 @@ class TestBillingManager(BaseTest):
"exceptions": {"usage": 10, "limit": 100},
"recordings": {"usage": 15, "limit": 100},
"rows_synced": {"usage": 45, "limit": 500},
"rows_exported": {"usage": 10, "limit": 1000},
"feature_flag_requests": {"usage": 25, "limit": 300},
"api_queries_read_bytes": {"usage": 1000, "limit": 1000000},
"llm_events": {"usage": 50, "limit": 1000},
@@ -214,6 +216,7 @@ class TestBillingManager(BaseTest):
"quota_limiting_suspended_until": 1611705600,
},
"rows_synced": {"usage": 45, "limit": 500, "todays_usage": 5},
"rows_exported": {"usage": 10, "limit": 1000, "todays_usage": 5},
"feature_flag_requests": {"usage": 25, "limit": 300, "todays_usage": 5},
"llm_events": {"usage": 50, "limit": 1000, "todays_usage": 2},
"period": ["2024-01-01T00:00:00Z", "2024-01-31T23:59:59Z"],

View File

@@ -47,6 +47,7 @@ class TestQuotaLimiting(BaseTest):
self.redis_client.delete(f"@posthog/quota-limits/rows_synced")
self.redis_client.delete(f"@posthog/quota-limits/api_queries_read_bytes")
self.redis_client.delete(f"@posthog/quota-limits/surveys")
self.redis_client.delete(f"@posthog/quota-limits/rows_exported")
self.redis_client.delete(f"@posthog/quota-limits/llm_events")
self.redis_client.delete(f"@posthog/quota-limits/cdp_invocations")
self.redis_client.delete(f"@posthog/quota-limiting-suspended/events")
@@ -55,6 +56,7 @@ class TestQuotaLimiting(BaseTest):
self.redis_client.delete(f"@posthog/quota-limiting-suspended/rows_synced")
self.redis_client.delete(f"@posthog/quota-limiting-suspended/api_queries_read_bytes")
self.redis_client.delete(f"@posthog/quota-limiting-suspended/surveys")
self.redis_client.delete(f"@posthog/quota-limiting-suspended/rows_exported")
self.redis_client.delete(f"@posthog/quota-limiting-suspended/llm_events")
self.redis_client.delete(f"@posthog/quota-limiting-suspended/cdp_invocations")
@@ -121,6 +123,7 @@ class TestQuotaLimiting(BaseTest):
assert quota_limited_orgs["feature_flag_requests"] == {}
assert quota_limited_orgs["api_queries_read_bytes"] == {}
assert quota_limited_orgs["surveys"] == {}
assert quota_limited_orgs["rows_exported"] == {}
assert quota_limiting_suspended_orgs["events"] == {}
assert quota_limiting_suspended_orgs["exceptions"] == {}
assert quota_limiting_suspended_orgs["recordings"] == {}
@@ -128,6 +131,7 @@ class TestQuotaLimiting(BaseTest):
assert quota_limiting_suspended_orgs["feature_flag_requests"] == {}
assert quota_limiting_suspended_orgs["api_queries_read_bytes"] == {}
assert quota_limiting_suspended_orgs["surveys"] == {}
assert quota_limiting_suspended_orgs["rows_exported"] == {}
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/exceptions", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
@@ -135,6 +139,7 @@ class TestQuotaLimiting(BaseTest):
assert self.redis_client.zrange(f"@posthog/quota-limits/feature_flag_requests", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/api_queries_read_bytes", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/surveys", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_exported", 0, -1) == []
patch_capture.reset_mock()
# Add this org to the redis cache.
@@ -179,6 +184,7 @@ class TestQuotaLimiting(BaseTest):
assert quota_limited_orgs["feature_flag_requests"] == {}
assert quota_limited_orgs["api_queries_read_bytes"] == {}
assert quota_limited_orgs["surveys"] == {}
assert quota_limited_orgs["rows_exported"] == {}
assert quota_limiting_suspended_orgs["events"] == {}
assert quota_limiting_suspended_orgs["exceptions"] == {}
assert quota_limiting_suspended_orgs["recordings"] == {}
@@ -186,6 +192,7 @@ class TestQuotaLimiting(BaseTest):
assert quota_limiting_suspended_orgs["feature_flag_requests"] == {}
assert quota_limiting_suspended_orgs["api_queries_read_bytes"] == {}
assert quota_limiting_suspended_orgs["surveys"] == {}
assert quota_limiting_suspended_orgs["rows_exported"] == {}
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == [self.team.api_token.encode("UTF-8")]
assert self.redis_client.zrange(f"@posthog/quota-limits/exceptions", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
@@ -193,6 +200,7 @@ class TestQuotaLimiting(BaseTest):
assert self.redis_client.zrange(f"@posthog/quota-limits/feature_flag_requests", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/api_queries_read_bytes", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/surveys", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_exported", 0, -1) == []
@patch("posthoganalytics.capture")
@patch("posthoganalytics.feature_enabled", return_value=True)
@@ -212,7 +220,7 @@ class TestQuotaLimiting(BaseTest):
self.organization.save()
time.sleep(1)
with self.assertNumQueries(3):
with self.assertNumQueries(4):
quota_limited_orgs, quota_limiting_suspended_orgs = update_all_orgs_billing_quotas()
# Shouldn't be called due to lazy evaluation of the conditional
patch_feature_enabled.assert_not_called()
@@ -224,6 +232,7 @@ class TestQuotaLimiting(BaseTest):
assert quota_limited_orgs["feature_flag_requests"] == {}
assert quota_limited_orgs["api_queries_read_bytes"] == {}
assert quota_limited_orgs["surveys"] == {}
assert quota_limited_orgs["rows_exported"] == {}
assert quota_limiting_suspended_orgs["events"] == {}
assert quota_limiting_suspended_orgs["exceptions"] == {}
assert quota_limiting_suspended_orgs["recordings"] == {}
@@ -231,6 +240,7 @@ class TestQuotaLimiting(BaseTest):
assert quota_limiting_suspended_orgs["feature_flag_requests"] == {}
assert quota_limiting_suspended_orgs["api_queries_read_bytes"] == {}
assert quota_limiting_suspended_orgs["surveys"] == {}
assert quota_limiting_suspended_orgs["rows_exported"] == {}
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/exceptions", 0, -1) == []
@@ -239,6 +249,7 @@ class TestQuotaLimiting(BaseTest):
assert self.redis_client.zrange(f"@posthog/quota-limits/feature_flag_requests", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/api_queries_read_bytes", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/surveys", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_exported", 0, -1) == []
def test_billing_rate_limit_not_set_if_missing_org_usage(self) -> None:
with self.settings(USE_TZ=False):
@@ -279,6 +290,7 @@ class TestQuotaLimiting(BaseTest):
assert self.redis_client.zrange(f"@posthog/quota-limits/feature_flag_requests", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/api_queries_read_bytes", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/surveys", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_exported", 0, -1) == []
@patch("posthoganalytics.capture")
def test_billing_rate_limit(self, patch_capture) -> None:
@@ -345,6 +357,7 @@ class TestQuotaLimiting(BaseTest):
"quota_limited_surveys": None,
"quota_limited_llm_events": None,
"quota_limited_cdp_invocations": None,
"quota_limited_rows_exported": None,
}
assert org_action_call.kwargs.get("groups") == {
"instance": "http://localhost:8010",
@@ -373,6 +386,7 @@ class TestQuotaLimiting(BaseTest):
"feature_flags": 0,
TRUST_SCORE_KEYS[QuotaResource.API_QUERIES]: 0,
"surveys": 0,
"rows_exported": 0,
}
self.organization.save()
quota_limited_orgs, quota_limiting_suspended_orgs = update_all_orgs_billing_quotas()

View File

@@ -43,6 +43,7 @@ class OrganizationUsageInfo(TypedDict):
surveys: Optional[OrganizationUsageResource]
rows_synced: Optional[OrganizationUsageResource]
cdp_invocations: Optional[OrganizationUsageResource]
rows_exported: Optional[OrganizationUsageResource]
feature_flag_requests: Optional[OrganizationUsageResource]
api_queries_read_bytes: Optional[OrganizationUsageResource]
llm_events: Optional[OrganizationUsageResource]

View File

@@ -613,6 +613,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM
"hog_function_calls_in_period": 0,
"hog_function_fetch_calls_in_period": 0,
"cdp_billable_invocations_in_period": 0,
"rows_exported_in_period": 0,
"date": "2022-01-09",
"organization_id": str(self.organization.id),
"organization_name": "Test",
@@ -678,6 +679,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM
"hog_function_calls_in_period": 0,
"hog_function_fetch_calls_in_period": 0,
"cdp_billable_invocations_in_period": 0,
"rows_exported_in_period": 0,
"ai_event_count_in_period": 1,
},
str(self.org_1_team_2.id): {
@@ -738,6 +740,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM
"hog_function_calls_in_period": 0,
"hog_function_fetch_calls_in_period": 0,
"cdp_billable_invocations_in_period": 0,
"rows_exported_in_period": 0,
"ai_event_count_in_period": 0,
},
},
@@ -821,6 +824,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM
"hog_function_calls_in_period": 0,
"hog_function_fetch_calls_in_period": 0,
"cdp_billable_invocations_in_period": 0,
"rows_exported_in_period": 0,
"ai_event_count_in_period": 0,
"date": "2022-01-09",
"organization_id": str(self.org_2.id),
@@ -889,6 +893,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM
"hog_function_calls_in_period": 0,
"hog_function_fetch_calls_in_period": 0,
"cdp_billable_invocations_in_period": 0,
"rows_exported_in_period": 0,
"ai_event_count_in_period": 0,
}
},

View File

@@ -25,6 +25,7 @@ from retry import retry
from posthog.schema import AIEventType
from posthog import version_requirement
from posthog.batch_exports.models import BatchExportRun
from posthog.clickhouse.client import sync_execute
from posthog.clickhouse.client.connection import Workload
from posthog.clickhouse.query_tagging import tags_context
@@ -141,6 +142,7 @@ class UsageReportCounters:
active_external_data_schemas_in_period: int
# Batch Exports metadata
rows_exported_in_period: int
active_batch_exports_in_period: int
dwh_total_storage_in_s3_in_mib: float
@@ -930,6 +932,22 @@ def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list
)
@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_rows_exported_in_period(begin: datetime, end: datetime) -> list:
return list(
BatchExportRun.objects.filter(
finished_at__gte=begin,
finished_at__lte=end,
status=BatchExportRun.Status.COMPLETED,
batch_export__model=BatchExport.Model.EVENTS,
batch_export__deleted=False,
)
.values("batch_export__team_id")
.annotate(total=Sum("records_completed"))
)
@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_active_external_data_schemas_in_period() -> list:
@@ -1178,6 +1196,7 @@ def has_non_zero_usage(report: FullUsageReport) -> bool:
or report.local_evaluation_requests_count_in_period > 0
or report.survey_responses_count_in_period > 0
or report.rows_synced_in_period > 0
or report.rows_exported_in_period > 0
or report.exceptions_captured_in_period > 0
)
@@ -1381,6 +1400,7 @@ def _get_all_usage_data(period_start: datetime, period_end: datetime) -> dict[st
period_start, period_end
),
"teams_with_rows_synced_in_period": get_teams_with_rows_synced_in_period(period_start, period_end),
"teams_with_rows_exported_in_period": get_teams_with_rows_exported_in_period(period_start, period_end),
"teams_with_active_external_data_schemas_in_period": get_teams_with_active_external_data_schemas_in_period(),
"teams_with_active_batch_exports_in_period": get_teams_with_active_batch_exports_in_period(),
"teams_with_dwh_tables_storage_in_s3_in_mib": get_teams_with_dwh_tables_storage_in_s3(),
@@ -1480,6 +1500,7 @@ def _get_team_report(all_data: dict[str, Any], team: Team) -> UsageReportCounter
event_explorer_api_duration_ms=all_data["teams_with_event_explorer_api_duration_ms"].get(team.id, 0),
survey_responses_count_in_period=all_data["teams_with_survey_responses_count_in_period"].get(team.id, 0),
rows_synced_in_period=all_data["teams_with_rows_synced_in_period"].get(team.id, 0),
rows_exported_in_period=all_data["teams_with_rows_exported_in_period"].get(team.id, 0),
active_external_data_schemas_in_period=all_data["teams_with_active_external_data_schemas_in_period"].get(
team.id, 0
),