From 3cad99de44e1f05d91fca7e95c0045b5546b71f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 2 Jul 2025 10:31:12 +0200 Subject: [PATCH] feat: Add execution time tracker for batch exports (#34084) --- posthog/temporal/batch_exports/metrics.py | 209 ++++++++++++++++++ .../temporal/batch_exports/s3_batch_export.py | 100 +++++---- posthog/temporal/batch_exports/transformer.py | 31 ++- .../test_s3_batch_export_workflow.py | 2 +- 4 files changed, 283 insertions(+), 59 deletions(-) diff --git a/posthog/temporal/batch_exports/metrics.py b/posthog/temporal/batch_exports/metrics.py index 8de87df38a..b80e267320 100644 --- a/posthog/temporal/batch_exports/metrics.py +++ b/posthog/temporal/batch_exports/metrics.py @@ -1,6 +1,12 @@ +import datetime as dt +import time +import typing + from temporalio import activity, workflow from temporalio.common import MetricCounter +from posthog.temporal.common.logger import get_internal_logger + def get_rows_exported_metric() -> MetricCounter: return activity.metric_meter().create_counter("batch_export_rows_exported", "Number of rows exported.") @@ -22,3 +28,206 @@ def get_export_finished_metric(status: str) -> MetricCounter: "batch_export_finished", "Number of batch exports finished, for any reason (including failure)." ) ) + + +Attributes = dict[str, str | int | float | bool] + + +class ExecutionTimeRecorder: + def __init__( + self, + histogram_name: str, + /, + description: str | None = None, + histogram_attributes: Attributes | None = None, + log: bool = True, + log_message: str = "Finished %(name)s with status '%(status)s' in %(duration_seconds)ds", + log_name: str | None = None, + log_attributes: Attributes | None = None, + ) -> None: + """Context manager to record execution time to a histogram metric. + + This can be used from within a workflow or an activity. + + Attributes: + histogram_name: A name for the histogram metric. + description: Description to use for the metric. + histogram_attributes: Mapping of any attributes to add to meter. + This tracker already adds common attributes like 'workflow_id' + or 'activity_type'. Moreover, a 'status' will be added to + indicate if an exception was raised within the block ('FAILED') + or not ('COMPLETED'). + log: Whether to additionally log the execution time. + log_message: Use a custom log message. + log_name: Provide an alternative name for the log line instead of + using the histogram name. + log_attributes: Mapping of additional attributes available to pass + to the logger. + """ + + self.histogram_name = histogram_name + self.description = description + self.histogram_attributes = histogram_attributes + self.log = log + self.log_message = log_message + self.log_name = log_name + self.log_attributes = log_attributes + self.bytes_processed: None | int = None + + self._start_counter: float | None = None + + def __enter__(self) -> typing.Self: + """Start the counter and return.""" + self._start_counter = time.perf_counter() + return self + + def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback) -> None: + """Record execution time on exiting. + + No exceptions from within the context are handled in this method. + Exception information is used to set status. + """ + if not self._start_counter: + raise RuntimeError("Start counter not initialized, did you call `__enter__`?") + + start_counter = self._start_counter + end_counter = time.perf_counter() + delta_milli_seconds = int((end_counter - start_counter) * 1000) + delta = dt.timedelta(milliseconds=delta_milli_seconds) + + attributes = get_attributes(self.histogram_attributes) + if exc_value is not None: + attributes["status"] = "FAILED" + attributes["exception"] = str(exc_value) + else: + attributes["status"] = "COMPLETED" + attributes["exception"] = "" + + meter = get_metric_meter(attributes) + hist = meter.create_histogram_timedelta(name=self.histogram_name, description=self.description, unit="ms") + try: + hist.record(value=delta) + except Exception: + logger = get_internal_logger() + logger.exception("Failed to record execution time to histogram '%s'", self.histogram_name) + + if self.log: + log_execution_time( + self.log_message, + name=self.log_name or self.histogram_name, + delta=delta, + status="FAILED" if exc_value else "COMPLETED", + bytes_processed=self.bytes_processed, + extra_arguments=self.log_attributes, + ) + + self.reset() + + def add_bytes_processed(self, bytes_processed: int) -> int: + """Add to bytes processed, returning the total so far.""" + if self.bytes_processed is None: + self.bytes_processed = bytes_processed + else: + self.bytes_processed += bytes_processed + + return self.bytes_processed + + def reset(self): + """Reset counter and bytes processed.""" + self._start_counter = None + self.bytes_processed = None + + +def get_metric_meter(additional_attributes: Attributes | None = None): + """Return a meter depending on in which context we are.""" + if activity.in_activity(): + meter = activity.metric_meter() + else: + try: + meter = workflow.metric_meter() + except Exception: + raise RuntimeError("Not within workflow or activity context") + + if additional_attributes: + meter = meter.with_additional_attributes(additional_attributes) + + return meter + + +def get_attributes(additional_attributes: Attributes | None = None) -> Attributes: + """Return attributes depending on in which context we are.""" + if activity.in_activity(): + attributes = get_activity_attributes() + else: + try: + attributes = get_workflow_attributes() + except Exception: + attributes = {} + + if additional_attributes: + attributes = {**attributes, **additional_attributes} + + return attributes + + +def get_activity_attributes() -> Attributes: + """Return basic Temporal.io activity attributes.""" + info = activity.info() + + return { + "workflow_namespace": info.workflow_namespace, + "workflow_type": info.workflow_type, + "activity_type": info.activity_type, + } + + +def get_workflow_attributes() -> Attributes: + """Return basic Temporal.io workflow attributes.""" + info = workflow.info() + + return { + "workflow_namespace": info.namespace, + "workflow_type": info.workflow_type, + } + + +def log_execution_time( + log_message: str, + name: str, + delta: dt.timedelta, + status: str, + bytes_processed: None | int = None, + extra_arguments: Attributes | None = None, +): + """Log execution time.""" + logger = get_internal_logger() + + duration_seconds = delta.total_seconds() + + if bytes_processed is not None: + mb_processed = bytes_processed / 1024 / 1024 + + if duration_seconds > 0: + bytes_per_second = bytes_processed / duration_seconds + mb_per_second = mb_processed / duration_seconds + else: + bytes_per_second = float("inf") + mb_per_second = float("inf") + else: + mb_processed = None + bytes_per_second = None + mb_per_second = None + + arguments = { + "name": name, + "status": status, + "duration_seconds": duration_seconds, + "bytes_processed": bytes_processed, + "mb_processed": mb_processed, + "bytes_per_second": bytes_per_second, + "mb_per_second": mb_per_second, + } + if extra_arguments: + arguments = {**arguments, **extra_arguments} + + logger.info(log_message, arguments) diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 33a96f7b98..d8258b9aea 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -7,7 +7,6 @@ import io import json import operator import posixpath -import time import typing import aioboto3 @@ -48,6 +47,7 @@ from posthog.temporal.batch_exports.heartbeat import ( HeartbeatParseError, should_resume_from_activity_heartbeat, ) +from posthog.temporal.batch_exports.metrics import ExecutionTimeRecorder from posthog.temporal.batch_exports.pre_export_stage import ( ProducerFromInternalS3Stage, execute_batch_export_insert_activity_using_s3_stage, @@ -1164,62 +1164,64 @@ class ConcurrentS3Consumer(ConsumerFromStage): response: UploadPartOutputTypeDef | None = None attempt = 0 - while response is None: - upload_start = time.time() - try: - response = await client.upload_part( - Bucket=self.s3_inputs.bucket_name, - Key=current_key, - PartNumber=part_number, - UploadId=self.upload_id, - Body=data, - ) + with ExecutionTimeRecorder( + "s3_batch_export_upload_part_duration", + description="Total duration of the upload of a part of a multi-part upload", + log_message=( + "Finished uploading file number %(file_number)d part %(part_number)d" + " with upload id '%(upload_id)s' with status '%(status)s'." + " File size: %(mb_processed).2f MB, upload time: %(duration_seconds)d" + " seconds, speed: %(mb_per_second).2f MB/s" + ), + log_attributes={ + "file_number": self.current_file_index, + "upload_id": self.upload_id, + "part_number": part_number, + }, + ) as recorder: + recorder.add_bytes_processed(len(data)) - except botocore.exceptions.ClientError as err: - error_code = err.response.get("Error", {}).get("Code", None) - attempt += 1 - - await self.logger.ainfo( - "Caught ClientError while uploading file %s part %s: %s (attempt %s/%s)", - self.current_file_index, - part_number, - error_code, - attempt, - max_attempts, - ) - - if error_code is not None and error_code == "RequestTimeout": - if attempt >= max_attempts: - raise IntermittentUploadPartTimeoutError(part_number=part_number) from err - - retry_delay = min( - max_retry_delay, initial_retry_delay * (attempt**exponential_backoff_coefficient) + while response is None: + try: + response = await client.upload_part( + Bucket=self.s3_inputs.bucket_name, + Key=current_key, + PartNumber=part_number, + UploadId=self.upload_id, + Body=data, ) - await self.logger.ainfo("Retrying part %s upload in %s seconds", part_number, retry_delay) - await asyncio.sleep(retry_delay) - continue - else: - raise - upload_time = time.time() - upload_start + except botocore.exceptions.ClientError as err: + error_code = err.response.get("Error", {}).get("Code", None) + attempt += 1 + + await self.logger.ainfo( + "Caught ClientError while uploading file %s part %s: %s (attempt %s/%s)", + self.current_file_index, + part_number, + error_code, + attempt, + max_attempts, + ) + + if error_code is not None and error_code == "RequestTimeout": + if attempt >= max_attempts: + raise IntermittentUploadPartTimeoutError(part_number=part_number) from err + + retry_delay = min( + max_retry_delay, initial_retry_delay * (attempt**exponential_backoff_coefficient) + ) + await self.logger.ainfo("Retrying part %s upload in %s seconds", part_number, retry_delay) + await asyncio.sleep(retry_delay) + continue + else: + raise + part_info: CompletedPartTypeDef = {"ETag": response["ETag"], "PartNumber": part_number} # Store completed part info self.completed_parts[part_number] = part_info - # Calculate transfer speed - part_size_mb = len(data) / (1024 * 1024) - upload_speed_mbps = part_size_mb / upload_time if upload_time > 0 else 0 - - await self.logger.ainfo( - "Finished uploading file number %s part %s with upload id %s. File size: %.2f MB, upload time: %.2fs, speed: %.2f MB/s", - self.current_file_index, - part_number, - self.upload_id, - part_size_mb, - upload_time, - upload_speed_mbps, - ) return part_info except Exception: diff --git a/posthog/temporal/batch_exports/transformer.py b/posthog/temporal/batch_exports/transformer.py index fe0105d535..93c508e7cb 100644 --- a/posthog/temporal/batch_exports/transformer.py +++ b/posthog/temporal/batch_exports/transformer.py @@ -15,6 +15,8 @@ import pyarrow.parquet as pq import structlog from django.conf import settings +from posthog.temporal.batch_exports.metrics import ExecutionTimeRecorder + logger = structlog.get_logger() @@ -373,19 +375,30 @@ class ParquetStreamTransformer: current_file_size = 0 async for record_batch in record_batches: - # Running write in a thread to yield control back to event loop. - chunk = await asyncio.to_thread(self.write_record_batch, record_batch) + with ExecutionTimeRecorder( + "parquet_stream_transformer_record_batch_transform_duration", + description="Duration to transform a record batch into Parquet bytes.", + log_message=( + "Processed record batch with %(num_records)d records to parquet." + " Record batch size: %(mb_processed).2f MB, process time:" + " %(duration_seconds)d seconds, speed: %(mb_per_second).2f MB/s" + ), + log_attributes={"num_records": record_batch.num_rows}, + ) as recorder: + recorder.add_bytes_processed(record_batch.nbytes) + # Running write in a thread to yield control back to event loop. + chunk = await asyncio.to_thread(self.write_record_batch, record_batch) - yield Chunk(chunk, False) + yield Chunk(chunk, False) - if max_file_size_bytes and current_file_size + len(chunk) > max_file_size_bytes: - footer = await asyncio.to_thread(self.finish_parquet_file) + if max_file_size_bytes and current_file_size + len(chunk) > max_file_size_bytes: + footer = await asyncio.to_thread(self.finish_parquet_file) - yield Chunk(footer, True) - current_file_size = 0 + yield Chunk(footer, True) + current_file_size = 0 - else: - current_file_size += len(chunk) + else: + current_file_size += len(chunk) footer = await asyncio.to_thread(self.finish_parquet_file) yield Chunk(footer, True) diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index de48979d40..d287daf8e8 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -930,7 +930,7 @@ async def _run_s3_batch_export_workflow( @pytest.mark.parametrize("model", TEST_S3_MODELS) @pytest.mark.parametrize("compression", [None], indirect=True) @pytest.mark.parametrize("exclude_events", [None], indirect=True) -@pytest.mark.parametrize("file_format", ["JSONLines"], indirect=True) +@pytest.mark.parametrize("file_format", ["Parquet"], indirect=True) @pytest.mark.parametrize("use_internal_s3_stage", [True, False]) async def test_s3_export_workflow_with_minio_bucket_with_various_intervals_and_models( clickhouse_client,