diff --git a/posthog/settings/batch_exports.py b/posthog/settings/batch_exports.py index 30d9b80aa0..064878547e 100644 --- a/posthog/settings/batch_exports.py +++ b/posthog/settings/batch_exports.py @@ -15,6 +15,9 @@ BATCH_EXPORT_SNOWFLAKE_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env( BATCH_EXPORT_SNOWFLAKE_USE_STAGE_TEAM_IDS: list[str] = get_list( os.getenv("BATCH_EXPORT_SNOWFLAKE_USE_STAGE_TEAM_IDS", "") ) +BATCH_EXPORT_SNOWFLAKE_USE_INTERNAL_STAGE_ROLLOUT_PERCENTAGE: int = get_from_env( + "BATCH_EXPORT_SNOWFLAKE_USE_INTERNAL_STAGE_ROLLOUT_PERCENTAGE", 0, type_cast=int +) BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 50 # 50MB BATCH_EXPORT_POSTGRES_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env( diff --git a/products/batch_exports/backend/temporal/destinations/snowflake_batch_export.py b/products/batch_exports/backend/temporal/destinations/snowflake_batch_export.py index 49b66a59a1..225a74f4b1 100644 --- a/products/batch_exports/backend/temporal/destinations/snowflake_batch_export.py +++ b/products/batch_exports/backend/temporal/destinations/snowflake_batch_export.py @@ -1375,8 +1375,10 @@ class SnowflakeBatchExportWorkflow(PostHogWorkflow): destination_default_fields=snowflake_default_fields(), ) - # Use stage consumer for specific team IDs, otherwise use the original activity - if str(inputs.team_id) in settings.BATCH_EXPORT_SNOWFLAKE_USE_STAGE_TEAM_IDS: + if ( + str(inputs.team_id) in settings.BATCH_EXPORT_SNOWFLAKE_USE_STAGE_TEAM_IDS + or inputs.team_id % 100 < settings.BATCH_EXPORT_SNOWFLAKE_USE_INTERNAL_STAGE_ROLLOUT_PERCENTAGE + ): await execute_batch_export_using_internal_stage( insert_into_snowflake_activity_from_stage, insert_inputs,