diff --git a/ee/billing/billing_manager.py b/ee/billing/billing_manager.py index c4bd824a0e..f38b01acd6 100644 --- a/ee/billing/billing_manager.py +++ b/ee/billing/billing_manager.py @@ -330,6 +330,7 @@ class BillingManager: surveys=usage_summary.get("surveys", {}), rows_synced=usage_summary.get("rows_synced", {}), cdp_invocations=usage_summary.get("cdp_invocations", {}), + rows_exported=usage_summary.get("rows_exported", {}), feature_flag_requests=usage_summary.get("feature_flag_requests", {}), api_queries_read_bytes=usage_summary.get("api_queries_read_bytes", {}), llm_events=usage_summary.get("llm_events", {}), diff --git a/ee/billing/quota_limiting.py b/ee/billing/quota_limiting.py index e7459e3398..d2a916776a 100644 --- a/ee/billing/quota_limiting.py +++ b/ee/billing/quota_limiting.py @@ -26,6 +26,7 @@ from posthog.tasks.usage_report import ( get_teams_with_exceptions_captured_in_period, get_teams_with_feature_flag_requests_count_in_period, get_teams_with_recording_count_in_period, + get_teams_with_rows_exported_in_period, get_teams_with_rows_synced_in_period, get_teams_with_survey_responses_count_in_period, ) @@ -64,6 +65,7 @@ class QuotaResource(Enum): SURVEYS = "surveys" LLM_EVENTS = "llm_events" CDP_INVOCATIONS = "cdp_invocations" + ROWS_EXPORTED = "rows_exported" class QuotaLimitingCaches(Enum): @@ -81,6 +83,7 @@ OVERAGE_BUFFER = { QuotaResource.SURVEYS: 0, QuotaResource.LLM_EVENTS: 0, QuotaResource.CDP_INVOCATIONS: 0, + QuotaResource.ROWS_EXPORTED: 0, } TRUST_SCORE_KEYS = { @@ -93,6 +96,7 @@ TRUST_SCORE_KEYS = { QuotaResource.SURVEYS: "surveys", QuotaResource.LLM_EVENTS: "llm_events", QuotaResource.CDP_INVOCATIONS: "cdp_invocations", + QuotaResource.ROWS_EXPORTED: "rows_exported", } @@ -106,6 +110,7 @@ class UsageCounters(TypedDict): surveys: int llm_events: int cdp_invocations: int + rows_exported: int # ------------------------------------------------------------------------------------------------- @@ -608,6 +613,9 @@ def update_all_orgs_billing_quotas( "teams_with_cdp_invocations_metrics": convert_team_usage_rows_to_dict( get_teams_with_cdp_billable_invocations_in_period(period_start, period_end) ), + "teams_with_rows_exported_in_period": convert_team_usage_rows_to_dict( + get_teams_with_rows_exported_in_period(period_start, period_end) + ), "teams_with_survey_responses_count_in_period": convert_team_usage_rows_to_dict( get_teams_with_survey_responses_count_in_period(period_start, period_end) ), @@ -647,6 +655,7 @@ def update_all_orgs_billing_quotas( surveys=all_data["teams_with_survey_responses_count_in_period"].get(team.id, 0), llm_events=all_data["teams_with_ai_event_count_in_period"].get(team.id, 0), cdp_invocations=all_data["teams_with_cdp_invocations_metrics"].get(team.id, 0), + rows_exported=all_data["teams_with_rows_exported_in_period"].get(team.id, 0), ) org_id = str(team.organization.id) @@ -742,6 +751,7 @@ def update_all_orgs_billing_quotas( "quota_limited_surveys": quota_limited_orgs["surveys"].get(org_id, None), "quota_limited_llm_events": quota_limited_orgs["llm_events"].get(org_id, None), "quota_limited_cdp_invocations": quota_limited_orgs["cdp_invocations"].get(org_id, None), + "quota_limited_rows_exported": quota_limited_orgs["rows_exported"].get(org_id, None), } report_organization_action( diff --git a/ee/billing/test/test_billing_manager.py b/ee/billing/test/test_billing_manager.py index c1d7b30cc5..0f9f83e1cd 100644 --- a/ee/billing/test/test_billing_manager.py +++ b/ee/billing/test/test_billing_manager.py @@ -151,6 +151,7 @@ class TestBillingManager(BaseTest): "quota_limiting_suspended_until": 1611705600, }, "rows_synced": {"usage": 45, "limit": 500, "todays_usage": 5}, + "rows_exported": {"usage": 10, "limit": 1000, "todays_usage": 5}, "feature_flag_requests": {"usage": 25, "limit": 300, "todays_usage": 5}, "api_queries_read_bytes": {"usage": 1000, "limit": 1000000, "todays_usage": 500}, "llm_events": {"usage": 50, "limit": 1000, "todays_usage": 2}, @@ -178,6 +179,7 @@ class TestBillingManager(BaseTest): "exceptions": {"usage": 10, "limit": 100}, "recordings": {"usage": 15, "limit": 100}, "rows_synced": {"usage": 45, "limit": 500}, + "rows_exported": {"usage": 10, "limit": 1000}, "feature_flag_requests": {"usage": 25, "limit": 300}, "api_queries_read_bytes": {"usage": 1000, "limit": 1000000}, "llm_events": {"usage": 50, "limit": 1000}, @@ -214,6 +216,7 @@ class TestBillingManager(BaseTest): "quota_limiting_suspended_until": 1611705600, }, "rows_synced": {"usage": 45, "limit": 500, "todays_usage": 5}, + "rows_exported": {"usage": 10, "limit": 1000, "todays_usage": 5}, "feature_flag_requests": {"usage": 25, "limit": 300, "todays_usage": 5}, "llm_events": {"usage": 50, "limit": 1000, "todays_usage": 2}, "period": ["2024-01-01T00:00:00Z", "2024-01-31T23:59:59Z"], diff --git a/ee/billing/test/test_quota_limiting.py b/ee/billing/test/test_quota_limiting.py index 8c2ed989e5..dfbc39f939 100644 --- a/ee/billing/test/test_quota_limiting.py +++ b/ee/billing/test/test_quota_limiting.py @@ -47,6 +47,7 @@ class TestQuotaLimiting(BaseTest): self.redis_client.delete(f"@posthog/quota-limits/rows_synced") self.redis_client.delete(f"@posthog/quota-limits/api_queries_read_bytes") self.redis_client.delete(f"@posthog/quota-limits/surveys") + self.redis_client.delete(f"@posthog/quota-limits/rows_exported") self.redis_client.delete(f"@posthog/quota-limits/llm_events") self.redis_client.delete(f"@posthog/quota-limits/cdp_invocations") self.redis_client.delete(f"@posthog/quota-limiting-suspended/events") @@ -55,6 +56,7 @@ class TestQuotaLimiting(BaseTest): self.redis_client.delete(f"@posthog/quota-limiting-suspended/rows_synced") self.redis_client.delete(f"@posthog/quota-limiting-suspended/api_queries_read_bytes") self.redis_client.delete(f"@posthog/quota-limiting-suspended/surveys") + self.redis_client.delete(f"@posthog/quota-limiting-suspended/rows_exported") self.redis_client.delete(f"@posthog/quota-limiting-suspended/llm_events") self.redis_client.delete(f"@posthog/quota-limiting-suspended/cdp_invocations") @@ -121,6 +123,7 @@ class TestQuotaLimiting(BaseTest): assert quota_limited_orgs["feature_flag_requests"] == {} assert quota_limited_orgs["api_queries_read_bytes"] == {} assert quota_limited_orgs["surveys"] == {} + assert quota_limited_orgs["rows_exported"] == {} assert quota_limiting_suspended_orgs["events"] == {} assert quota_limiting_suspended_orgs["exceptions"] == {} assert quota_limiting_suspended_orgs["recordings"] == {} @@ -128,6 +131,7 @@ class TestQuotaLimiting(BaseTest): assert quota_limiting_suspended_orgs["feature_flag_requests"] == {} assert quota_limiting_suspended_orgs["api_queries_read_bytes"] == {} assert quota_limiting_suspended_orgs["surveys"] == {} + assert quota_limiting_suspended_orgs["rows_exported"] == {} assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/exceptions", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == [] @@ -135,6 +139,7 @@ class TestQuotaLimiting(BaseTest): assert self.redis_client.zrange(f"@posthog/quota-limits/feature_flag_requests", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/api_queries_read_bytes", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/surveys", 0, -1) == [] + assert self.redis_client.zrange(f"@posthog/quota-limits/rows_exported", 0, -1) == [] patch_capture.reset_mock() # Add this org to the redis cache. @@ -179,6 +184,7 @@ class TestQuotaLimiting(BaseTest): assert quota_limited_orgs["feature_flag_requests"] == {} assert quota_limited_orgs["api_queries_read_bytes"] == {} assert quota_limited_orgs["surveys"] == {} + assert quota_limited_orgs["rows_exported"] == {} assert quota_limiting_suspended_orgs["events"] == {} assert quota_limiting_suspended_orgs["exceptions"] == {} assert quota_limiting_suspended_orgs["recordings"] == {} @@ -186,6 +192,7 @@ class TestQuotaLimiting(BaseTest): assert quota_limiting_suspended_orgs["feature_flag_requests"] == {} assert quota_limiting_suspended_orgs["api_queries_read_bytes"] == {} assert quota_limiting_suspended_orgs["surveys"] == {} + assert quota_limiting_suspended_orgs["rows_exported"] == {} assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == [self.team.api_token.encode("UTF-8")] assert self.redis_client.zrange(f"@posthog/quota-limits/exceptions", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == [] @@ -193,6 +200,7 @@ class TestQuotaLimiting(BaseTest): assert self.redis_client.zrange(f"@posthog/quota-limits/feature_flag_requests", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/api_queries_read_bytes", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/surveys", 0, -1) == [] + assert self.redis_client.zrange(f"@posthog/quota-limits/rows_exported", 0, -1) == [] @patch("posthoganalytics.capture") @patch("posthoganalytics.feature_enabled", return_value=True) @@ -212,7 +220,7 @@ class TestQuotaLimiting(BaseTest): self.organization.save() time.sleep(1) - with self.assertNumQueries(3): + with self.assertNumQueries(4): quota_limited_orgs, quota_limiting_suspended_orgs = update_all_orgs_billing_quotas() # Shouldn't be called due to lazy evaluation of the conditional patch_feature_enabled.assert_not_called() @@ -224,6 +232,7 @@ class TestQuotaLimiting(BaseTest): assert quota_limited_orgs["feature_flag_requests"] == {} assert quota_limited_orgs["api_queries_read_bytes"] == {} assert quota_limited_orgs["surveys"] == {} + assert quota_limited_orgs["rows_exported"] == {} assert quota_limiting_suspended_orgs["events"] == {} assert quota_limiting_suspended_orgs["exceptions"] == {} assert quota_limiting_suspended_orgs["recordings"] == {} @@ -231,6 +240,7 @@ class TestQuotaLimiting(BaseTest): assert quota_limiting_suspended_orgs["feature_flag_requests"] == {} assert quota_limiting_suspended_orgs["api_queries_read_bytes"] == {} assert quota_limiting_suspended_orgs["surveys"] == {} + assert quota_limiting_suspended_orgs["rows_exported"] == {} assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/exceptions", 0, -1) == [] @@ -239,6 +249,7 @@ class TestQuotaLimiting(BaseTest): assert self.redis_client.zrange(f"@posthog/quota-limits/feature_flag_requests", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/api_queries_read_bytes", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/surveys", 0, -1) == [] + assert self.redis_client.zrange(f"@posthog/quota-limits/rows_exported", 0, -1) == [] def test_billing_rate_limit_not_set_if_missing_org_usage(self) -> None: with self.settings(USE_TZ=False): @@ -279,6 +290,7 @@ class TestQuotaLimiting(BaseTest): assert self.redis_client.zrange(f"@posthog/quota-limits/feature_flag_requests", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/api_queries_read_bytes", 0, -1) == [] assert self.redis_client.zrange(f"@posthog/quota-limits/surveys", 0, -1) == [] + assert self.redis_client.zrange(f"@posthog/quota-limits/rows_exported", 0, -1) == [] @patch("posthoganalytics.capture") def test_billing_rate_limit(self, patch_capture) -> None: @@ -345,6 +357,7 @@ class TestQuotaLimiting(BaseTest): "quota_limited_surveys": None, "quota_limited_llm_events": None, "quota_limited_cdp_invocations": None, + "quota_limited_rows_exported": None, } assert org_action_call.kwargs.get("groups") == { "instance": "http://localhost:8010", @@ -373,6 +386,7 @@ class TestQuotaLimiting(BaseTest): "feature_flags": 0, TRUST_SCORE_KEYS[QuotaResource.API_QUERIES]: 0, "surveys": 0, + "rows_exported": 0, } self.organization.save() quota_limited_orgs, quota_limiting_suspended_orgs = update_all_orgs_billing_quotas() diff --git a/posthog/models/organization.py b/posthog/models/organization.py index 4e54bd903a..6dedf7aef9 100644 --- a/posthog/models/organization.py +++ b/posthog/models/organization.py @@ -43,6 +43,7 @@ class OrganizationUsageInfo(TypedDict): surveys: Optional[OrganizationUsageResource] rows_synced: Optional[OrganizationUsageResource] cdp_invocations: Optional[OrganizationUsageResource] + rows_exported: Optional[OrganizationUsageResource] feature_flag_requests: Optional[OrganizationUsageResource] api_queries_read_bytes: Optional[OrganizationUsageResource] llm_events: Optional[OrganizationUsageResource] diff --git a/posthog/tasks/test/test_usage_report.py b/posthog/tasks/test/test_usage_report.py index 5d55c651d6..c2de652e49 100644 --- a/posthog/tasks/test/test_usage_report.py +++ b/posthog/tasks/test/test_usage_report.py @@ -613,6 +613,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM "hog_function_calls_in_period": 0, "hog_function_fetch_calls_in_period": 0, "cdp_billable_invocations_in_period": 0, + "rows_exported_in_period": 0, "date": "2022-01-09", "organization_id": str(self.organization.id), "organization_name": "Test", @@ -678,6 +679,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM "hog_function_calls_in_period": 0, "hog_function_fetch_calls_in_period": 0, "cdp_billable_invocations_in_period": 0, + "rows_exported_in_period": 0, "ai_event_count_in_period": 1, }, str(self.org_1_team_2.id): { @@ -738,6 +740,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM "hog_function_calls_in_period": 0, "hog_function_fetch_calls_in_period": 0, "cdp_billable_invocations_in_period": 0, + "rows_exported_in_period": 0, "ai_event_count_in_period": 0, }, }, @@ -821,6 +824,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM "hog_function_calls_in_period": 0, "hog_function_fetch_calls_in_period": 0, "cdp_billable_invocations_in_period": 0, + "rows_exported_in_period": 0, "ai_event_count_in_period": 0, "date": "2022-01-09", "organization_id": str(self.org_2.id), @@ -889,6 +893,7 @@ class TestUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesM "hog_function_calls_in_period": 0, "hog_function_fetch_calls_in_period": 0, "cdp_billable_invocations_in_period": 0, + "rows_exported_in_period": 0, "ai_event_count_in_period": 0, } }, diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index 9ab0779302..263d2c14cf 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -25,6 +25,7 @@ from retry import retry from posthog.schema import AIEventType from posthog import version_requirement +from posthog.batch_exports.models import BatchExportRun from posthog.clickhouse.client import sync_execute from posthog.clickhouse.client.connection import Workload from posthog.clickhouse.query_tagging import tags_context @@ -141,6 +142,7 @@ class UsageReportCounters: active_external_data_schemas_in_period: int # Batch Exports metadata + rows_exported_in_period: int active_batch_exports_in_period: int dwh_total_storage_in_s3_in_mib: float @@ -930,6 +932,22 @@ def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list ) +@timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) +def get_teams_with_rows_exported_in_period(begin: datetime, end: datetime) -> list: + return list( + BatchExportRun.objects.filter( + finished_at__gte=begin, + finished_at__lte=end, + status=BatchExportRun.Status.COMPLETED, + batch_export__model=BatchExport.Model.EVENTS, + batch_export__deleted=False, + ) + .values("batch_export__team_id") + .annotate(total=Sum("records_completed")) + ) + + @timed_log() @retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_active_external_data_schemas_in_period() -> list: @@ -1178,6 +1196,7 @@ def has_non_zero_usage(report: FullUsageReport) -> bool: or report.local_evaluation_requests_count_in_period > 0 or report.survey_responses_count_in_period > 0 or report.rows_synced_in_period > 0 + or report.rows_exported_in_period > 0 or report.exceptions_captured_in_period > 0 ) @@ -1381,6 +1400,7 @@ def _get_all_usage_data(period_start: datetime, period_end: datetime) -> dict[st period_start, period_end ), "teams_with_rows_synced_in_period": get_teams_with_rows_synced_in_period(period_start, period_end), + "teams_with_rows_exported_in_period": get_teams_with_rows_exported_in_period(period_start, period_end), "teams_with_active_external_data_schemas_in_period": get_teams_with_active_external_data_schemas_in_period(), "teams_with_active_batch_exports_in_period": get_teams_with_active_batch_exports_in_period(), "teams_with_dwh_tables_storage_in_s3_in_mib": get_teams_with_dwh_tables_storage_in_s3(), @@ -1480,6 +1500,7 @@ def _get_team_report(all_data: dict[str, Any], team: Team) -> UsageReportCounter event_explorer_api_duration_ms=all_data["teams_with_event_explorer_api_duration_ms"].get(team.id, 0), survey_responses_count_in_period=all_data["teams_with_survey_responses_count_in_period"].get(team.id, 0), rows_synced_in_period=all_data["teams_with_rows_synced_in_period"].get(team.id, 0), + rows_exported_in_period=all_data["teams_with_rows_exported_in_period"].get(team.id, 0), active_external_data_schemas_in_period=all_data["teams_with_active_external_data_schemas_in_period"].get( team.id, 0 ),