feat: Add execution time tracker for batch exports (#34084)

This commit is contained in:
Tomás Farías Santana
2025-07-02 10:31:12 +02:00
committed by GitHub
parent 70be666442
commit 3cad99de44
4 changed files with 283 additions and 59 deletions

View File

@@ -1,6 +1,12 @@
import datetime as dt
import time
import typing
from temporalio import activity, workflow
from temporalio.common import MetricCounter
from posthog.temporal.common.logger import get_internal_logger
def get_rows_exported_metric() -> MetricCounter:
return activity.metric_meter().create_counter("batch_export_rows_exported", "Number of rows exported.")
@@ -22,3 +28,206 @@ def get_export_finished_metric(status: str) -> MetricCounter:
"batch_export_finished", "Number of batch exports finished, for any reason (including failure)."
)
)
Attributes = dict[str, str | int | float | bool]
class ExecutionTimeRecorder:
def __init__(
self,
histogram_name: str,
/,
description: str | None = None,
histogram_attributes: Attributes | None = None,
log: bool = True,
log_message: str = "Finished %(name)s with status '%(status)s' in %(duration_seconds)ds",
log_name: str | None = None,
log_attributes: Attributes | None = None,
) -> None:
"""Context manager to record execution time to a histogram metric.
This can be used from within a workflow or an activity.
Attributes:
histogram_name: A name for the histogram metric.
description: Description to use for the metric.
histogram_attributes: Mapping of any attributes to add to meter.
This tracker already adds common attributes like 'workflow_id'
or 'activity_type'. Moreover, a 'status' will be added to
indicate if an exception was raised within the block ('FAILED')
or not ('COMPLETED').
log: Whether to additionally log the execution time.
log_message: Use a custom log message.
log_name: Provide an alternative name for the log line instead of
using the histogram name.
log_attributes: Mapping of additional attributes available to pass
to the logger.
"""
self.histogram_name = histogram_name
self.description = description
self.histogram_attributes = histogram_attributes
self.log = log
self.log_message = log_message
self.log_name = log_name
self.log_attributes = log_attributes
self.bytes_processed: None | int = None
self._start_counter: float | None = None
def __enter__(self) -> typing.Self:
"""Start the counter and return."""
self._start_counter = time.perf_counter()
return self
def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback) -> None:
"""Record execution time on exiting.
No exceptions from within the context are handled in this method.
Exception information is used to set status.
"""
if not self._start_counter:
raise RuntimeError("Start counter not initialized, did you call `__enter__`?")
start_counter = self._start_counter
end_counter = time.perf_counter()
delta_milli_seconds = int((end_counter - start_counter) * 1000)
delta = dt.timedelta(milliseconds=delta_milli_seconds)
attributes = get_attributes(self.histogram_attributes)
if exc_value is not None:
attributes["status"] = "FAILED"
attributes["exception"] = str(exc_value)
else:
attributes["status"] = "COMPLETED"
attributes["exception"] = ""
meter = get_metric_meter(attributes)
hist = meter.create_histogram_timedelta(name=self.histogram_name, description=self.description, unit="ms")
try:
hist.record(value=delta)
except Exception:
logger = get_internal_logger()
logger.exception("Failed to record execution time to histogram '%s'", self.histogram_name)
if self.log:
log_execution_time(
self.log_message,
name=self.log_name or self.histogram_name,
delta=delta,
status="FAILED" if exc_value else "COMPLETED",
bytes_processed=self.bytes_processed,
extra_arguments=self.log_attributes,
)
self.reset()
def add_bytes_processed(self, bytes_processed: int) -> int:
"""Add to bytes processed, returning the total so far."""
if self.bytes_processed is None:
self.bytes_processed = bytes_processed
else:
self.bytes_processed += bytes_processed
return self.bytes_processed
def reset(self):
"""Reset counter and bytes processed."""
self._start_counter = None
self.bytes_processed = None
def get_metric_meter(additional_attributes: Attributes | None = None):
"""Return a meter depending on in which context we are."""
if activity.in_activity():
meter = activity.metric_meter()
else:
try:
meter = workflow.metric_meter()
except Exception:
raise RuntimeError("Not within workflow or activity context")
if additional_attributes:
meter = meter.with_additional_attributes(additional_attributes)
return meter
def get_attributes(additional_attributes: Attributes | None = None) -> Attributes:
"""Return attributes depending on in which context we are."""
if activity.in_activity():
attributes = get_activity_attributes()
else:
try:
attributes = get_workflow_attributes()
except Exception:
attributes = {}
if additional_attributes:
attributes = {**attributes, **additional_attributes}
return attributes
def get_activity_attributes() -> Attributes:
"""Return basic Temporal.io activity attributes."""
info = activity.info()
return {
"workflow_namespace": info.workflow_namespace,
"workflow_type": info.workflow_type,
"activity_type": info.activity_type,
}
def get_workflow_attributes() -> Attributes:
"""Return basic Temporal.io workflow attributes."""
info = workflow.info()
return {
"workflow_namespace": info.namespace,
"workflow_type": info.workflow_type,
}
def log_execution_time(
log_message: str,
name: str,
delta: dt.timedelta,
status: str,
bytes_processed: None | int = None,
extra_arguments: Attributes | None = None,
):
"""Log execution time."""
logger = get_internal_logger()
duration_seconds = delta.total_seconds()
if bytes_processed is not None:
mb_processed = bytes_processed / 1024 / 1024
if duration_seconds > 0:
bytes_per_second = bytes_processed / duration_seconds
mb_per_second = mb_processed / duration_seconds
else:
bytes_per_second = float("inf")
mb_per_second = float("inf")
else:
mb_processed = None
bytes_per_second = None
mb_per_second = None
arguments = {
"name": name,
"status": status,
"duration_seconds": duration_seconds,
"bytes_processed": bytes_processed,
"mb_processed": mb_processed,
"bytes_per_second": bytes_per_second,
"mb_per_second": mb_per_second,
}
if extra_arguments:
arguments = {**arguments, **extra_arguments}
logger.info(log_message, arguments)

View File

@@ -7,7 +7,6 @@ import io
import json
import operator
import posixpath
import time
import typing
import aioboto3
@@ -48,6 +47,7 @@ from posthog.temporal.batch_exports.heartbeat import (
HeartbeatParseError,
should_resume_from_activity_heartbeat,
)
from posthog.temporal.batch_exports.metrics import ExecutionTimeRecorder
from posthog.temporal.batch_exports.pre_export_stage import (
ProducerFromInternalS3Stage,
execute_batch_export_insert_activity_using_s3_stage,
@@ -1164,62 +1164,64 @@ class ConcurrentS3Consumer(ConsumerFromStage):
response: UploadPartOutputTypeDef | None = None
attempt = 0
while response is None:
upload_start = time.time()
try:
response = await client.upload_part(
Bucket=self.s3_inputs.bucket_name,
Key=current_key,
PartNumber=part_number,
UploadId=self.upload_id,
Body=data,
)
with ExecutionTimeRecorder(
"s3_batch_export_upload_part_duration",
description="Total duration of the upload of a part of a multi-part upload",
log_message=(
"Finished uploading file number %(file_number)d part %(part_number)d"
" with upload id '%(upload_id)s' with status '%(status)s'."
" File size: %(mb_processed).2f MB, upload time: %(duration_seconds)d"
" seconds, speed: %(mb_per_second).2f MB/s"
),
log_attributes={
"file_number": self.current_file_index,
"upload_id": self.upload_id,
"part_number": part_number,
},
) as recorder:
recorder.add_bytes_processed(len(data))
except botocore.exceptions.ClientError as err:
error_code = err.response.get("Error", {}).get("Code", None)
attempt += 1
await self.logger.ainfo(
"Caught ClientError while uploading file %s part %s: %s (attempt %s/%s)",
self.current_file_index,
part_number,
error_code,
attempt,
max_attempts,
)
if error_code is not None and error_code == "RequestTimeout":
if attempt >= max_attempts:
raise IntermittentUploadPartTimeoutError(part_number=part_number) from err
retry_delay = min(
max_retry_delay, initial_retry_delay * (attempt**exponential_backoff_coefficient)
while response is None:
try:
response = await client.upload_part(
Bucket=self.s3_inputs.bucket_name,
Key=current_key,
PartNumber=part_number,
UploadId=self.upload_id,
Body=data,
)
await self.logger.ainfo("Retrying part %s upload in %s seconds", part_number, retry_delay)
await asyncio.sleep(retry_delay)
continue
else:
raise
upload_time = time.time() - upload_start
except botocore.exceptions.ClientError as err:
error_code = err.response.get("Error", {}).get("Code", None)
attempt += 1
await self.logger.ainfo(
"Caught ClientError while uploading file %s part %s: %s (attempt %s/%s)",
self.current_file_index,
part_number,
error_code,
attempt,
max_attempts,
)
if error_code is not None and error_code == "RequestTimeout":
if attempt >= max_attempts:
raise IntermittentUploadPartTimeoutError(part_number=part_number) from err
retry_delay = min(
max_retry_delay, initial_retry_delay * (attempt**exponential_backoff_coefficient)
)
await self.logger.ainfo("Retrying part %s upload in %s seconds", part_number, retry_delay)
await asyncio.sleep(retry_delay)
continue
else:
raise
part_info: CompletedPartTypeDef = {"ETag": response["ETag"], "PartNumber": part_number}
# Store completed part info
self.completed_parts[part_number] = part_info
# Calculate transfer speed
part_size_mb = len(data) / (1024 * 1024)
upload_speed_mbps = part_size_mb / upload_time if upload_time > 0 else 0
await self.logger.ainfo(
"Finished uploading file number %s part %s with upload id %s. File size: %.2f MB, upload time: %.2fs, speed: %.2f MB/s",
self.current_file_index,
part_number,
self.upload_id,
part_size_mb,
upload_time,
upload_speed_mbps,
)
return part_info
except Exception:

View File

@@ -15,6 +15,8 @@ import pyarrow.parquet as pq
import structlog
from django.conf import settings
from posthog.temporal.batch_exports.metrics import ExecutionTimeRecorder
logger = structlog.get_logger()
@@ -373,19 +375,30 @@ class ParquetStreamTransformer:
current_file_size = 0
async for record_batch in record_batches:
# Running write in a thread to yield control back to event loop.
chunk = await asyncio.to_thread(self.write_record_batch, record_batch)
with ExecutionTimeRecorder(
"parquet_stream_transformer_record_batch_transform_duration",
description="Duration to transform a record batch into Parquet bytes.",
log_message=(
"Processed record batch with %(num_records)d records to parquet."
" Record batch size: %(mb_processed).2f MB, process time:"
" %(duration_seconds)d seconds, speed: %(mb_per_second).2f MB/s"
),
log_attributes={"num_records": record_batch.num_rows},
) as recorder:
recorder.add_bytes_processed(record_batch.nbytes)
# Running write in a thread to yield control back to event loop.
chunk = await asyncio.to_thread(self.write_record_batch, record_batch)
yield Chunk(chunk, False)
yield Chunk(chunk, False)
if max_file_size_bytes and current_file_size + len(chunk) > max_file_size_bytes:
footer = await asyncio.to_thread(self.finish_parquet_file)
if max_file_size_bytes and current_file_size + len(chunk) > max_file_size_bytes:
footer = await asyncio.to_thread(self.finish_parquet_file)
yield Chunk(footer, True)
current_file_size = 0
yield Chunk(footer, True)
current_file_size = 0
else:
current_file_size += len(chunk)
else:
current_file_size += len(chunk)
footer = await asyncio.to_thread(self.finish_parquet_file)
yield Chunk(footer, True)

View File

@@ -930,7 +930,7 @@ async def _run_s3_batch_export_workflow(
@pytest.mark.parametrize("model", TEST_S3_MODELS)
@pytest.mark.parametrize("compression", [None], indirect=True)
@pytest.mark.parametrize("exclude_events", [None], indirect=True)
@pytest.mark.parametrize("file_format", ["JSONLines"], indirect=True)
@pytest.mark.parametrize("file_format", ["Parquet"], indirect=True)
@pytest.mark.parametrize("use_internal_s3_stage", [True, False])
async def test_s3_export_workflow_with_minio_bucket_with_various_intervals_and_models(
clickhouse_client,