chore(capture_internal): add optional sent_at param; fix event_source tagging (#35523)

This commit is contained in:
Eli Reisman
2025-07-24 10:03:26 -07:00
committed by GitHub
parent 5d238c9d41
commit 7da7d27749
2 changed files with 66 additions and 10 deletions

View File

@@ -45,6 +45,7 @@ def capture_internal(
distinct_id: str,
timestamp: Optional[datetime | str],
properties: dict[str, Any],
sent_at: Optional[datetime | str] = None,
process_person_profile: bool = False,
) -> Response:
"""
@@ -59,6 +60,7 @@ def capture_internal(
distinct_id: the distict ID of the event (optional; required in properties if absent)
timestamp: the timestamp of the event to be published (optional; will be set to now UTC if absent)
properties: event properties to submit with the event (required; can be empty)
sent_at: time the client submitted this event (optional; typically, let capture_internal set this)
process_person_profile: if TRUE, process the person profile for the event according to the caller's settings.
if FALSE, disable person processing for this event.
@@ -78,7 +80,7 @@ def capture_internal(
)
event_payload = prepare_capture_internal_payload(
token, event_name, event_source, distinct_id, timestamp, properties, process_person_profile
token, event_name, event_source, distinct_id, timestamp, properties, sent_at, process_person_profile
)
# determine if this is a recordings or events type, route to correct capture endpoint
@@ -96,7 +98,7 @@ def capture_internal(
),
)
CAPTURE_INTERNAL_EVENT_SUBMITTED_COUNTER.labels(event_source="TODO").inc()
CAPTURE_INTERNAL_EVENT_SUBMITTED_COUNTER.labels(event_source=event_source).inc()
return s.post(
resolved_capture_url,
json=event_payload,
@@ -143,6 +145,7 @@ def capture_batch_internal(
# 1. token should be supplied by caller, and be consistent per batch submitted.
# new capture_internal will attempt to extract from each event if missing
# 2. distinct_id should be present on each event since these can differ within a batch
sent_at: datetime = datetime.now(UTC) # should be same for whole batch
for event in events:
properties: dict[str, Any] = event.get("properties", {})
distinct_id: str = event.get("distinct_id", "")
@@ -156,6 +159,7 @@ def capture_batch_internal(
distinct_id=distinct_id,
timestamp=timestamp,
properties=properties,
sent_at=sent_at,
process_person_profile=process_person_profile,
)
futures.append(future)
@@ -171,6 +175,7 @@ def prepare_capture_internal_payload(
distinct_id: Optional[str],
timestamp: Optional[datetime | str],
properties: dict[str, Any],
sent_at: Optional[datetime | str] = None,
process_person_profile: bool = False,
) -> dict[str, Any]:
# mark event as internal for observability
@@ -198,18 +203,21 @@ def prepare_capture_internal_payload(
if not event_name:
raise CaptureInternalError(f"capture_internal ({event_source}): event name is required")
event_timestamp = datetime.now(UTC).isoformat()
if timestamp:
if isinstance(timestamp, datetime):
event_timestamp = timestamp.replace(tzinfo=UTC).isoformat()
elif isinstance(timestamp, str):
# assume its an ISO8601 string and submit it as-is
event_timestamp = timestamp
# if the timestamp or sent_at fields are strings, assume they are well-formed ISO8601 stamps
if not sent_at:
sent_at = datetime.now(UTC).isoformat()
elif isinstance(sent_at, datetime):
sent_at = sent_at.replace(tzinfo=UTC).isoformat()
if not timestamp:
timestamp = datetime.now(UTC).isoformat()
elif isinstance(timestamp, datetime):
timestamp = timestamp.replace(tzinfo=UTC).isoformat()
return {
"api_key": token,
"timestamp": event_timestamp,
"timestamp": timestamp,
"distinct_id": distinct_id,
"sent_at": sent_at,
"event": event_name,
"properties": properties,
}

View File

@@ -89,6 +89,51 @@ class TestCaptureInternal(BaseTest):
assert spied_calls[0]["event_payload"]["distinct_id"] == distinct_id
assert spied_calls[0]["event_payload"]["api_key"] == token
assert spied_calls[0]["event_payload"]["timestamp"] == timestamp.isoformat()
assert spied_calls[0]["event_payload"].get("sent_at", None) is not None
# note: event payload passed to new capture_internal is mutated. here we inject a source marker
assert spied_calls[0]["event_payload"]["properties"]["capture_internal"] is True
assert len(spied_calls[0]["event_payload"]["properties"]) == len(test_props)
# when process_person_profile is False, new capture_internal explicitly sets this on the event
assert spied_calls[0]["event_payload"]["properties"].get("$process_person_profile", None) is not None
assert spied_calls[0]["event_payload"]["properties"]["$process_person_profile"] is False
@patch("posthog.api.capture.Session")
def test_capture_internal_with_sent_at(self, mock_session_class):
token = "abc123"
distinct_id = "xyz987"
event_name = "test_event"
timestamp = datetime.now(UTC)
test_props = {
"$current_url": "https://example.com",
"$ip": "127.0.0.1",
"$lib": "python",
"$lib_version": "1.0.0",
"$screen_width": 1920,
"$screen_height": 1080,
"some_custom_property": True,
}
spy = InstallCapturePostSpy(mock_session_class)
response = capture_internal(
token=token,
event_name=event_name,
event_source="test_capture_internal",
distinct_id=distinct_id,
timestamp=timestamp,
sent_at=timestamp,
properties=test_props,
)
assert response.status_code == 200
spied_calls = spy.get_calls()
assert len(spied_calls) == 1
assert f"{CAPTURE_INTERNAL_URL}{NEW_ANALYTICS_CAPTURE_ENDPOINT}" in spied_calls[0]["url"]
assert spied_calls[0]["event_payload"]["event"] == event_name
assert spied_calls[0]["event_payload"]["distinct_id"] == distinct_id
assert spied_calls[0]["event_payload"]["api_key"] == token
assert spied_calls[0]["event_payload"]["timestamp"] == timestamp.isoformat()
assert spied_calls[0]["event_payload"]["sent_at"] == timestamp.isoformat()
# note: event payload passed to new capture_internal is mutated. here we inject a source marker
assert spied_calls[0]["event_payload"]["properties"]["capture_internal"] is True
assert len(spied_calls[0]["event_payload"]["properties"]) == len(test_props)
@@ -138,6 +183,7 @@ class TestCaptureInternal(BaseTest):
assert spied_calls[0]["event_payload"]["distinct_id"] == distinct_id
assert spied_calls[0]["event_payload"]["api_key"] == token
assert spied_calls[0]["event_payload"]["timestamp"] == timestamp.isoformat()
assert spied_calls[0]["event_payload"].get("sent_at", None) is not None
assert spied_calls[0]["event_payload"]["properties"]["capture_internal"] is True
assert len(spied_calls[0]["event_payload"]["properties"]) == len(test_props)
# when new capture_internal is called with process_person_profile == True, we don't alter the event payload
@@ -273,6 +319,7 @@ class TestCaptureInternal(BaseTest):
assert spied_calls[0]["event_payload"]["distinct_id"] == distinct_id
assert spied_calls[0]["event_payload"]["api_key"] == token
assert spied_calls[0]["event_payload"]["timestamp"] == timestamp.isoformat()
assert spied_calls[0]["event_payload"].get("sent_at", None) is not None
assert spied_calls[0]["event_payload"]["properties"]["capture_internal"] is True
assert len(spied_calls[0]["event_payload"]["properties"]) == len(test_replay_props)
assert spied_calls[0]["event_payload"]["properties"].get("$process_person_profile", None) is not None
@@ -415,6 +462,7 @@ class TestCaptureInternal(BaseTest):
assert spied_calls[i]["event_payload"]["distinct_id"] == test_events[i]["distinct_id"]
assert spied_calls[i]["event_payload"]["api_key"] == token
assert spied_calls[i]["event_payload"]["timestamp"] == timestamp
assert spied_calls[i]["event_payload"].get("sent_at", None) is not None
assert len(spied_calls[i]["event_payload"]["properties"]) == len(test_events[i]["properties"])
# every event in the batch should be marked asinternally-sourced
assert spied_calls[i]["event_payload"]["properties"]["capture_internal"] is True