mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat: implement blob v2 replay (#29417)
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Binary file not shown.
|
Before Width: | Height: | Size: 106 KiB After Width: | Height: | Size: 106 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 108 KiB After Width: | Height: | Size: 108 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 105 KiB After Width: | Height: | Size: 105 KiB |
@@ -249,6 +249,7 @@ export const FEATURE_FLAGS = {
|
||||
SUPPORT_FORM_IN_ONBOARDING: 'support-form-in-onboarding', // owner: @joshsny #team-growth
|
||||
AI_SETUP_WIZARD: 'ai-setup-wizard', // owner: @joshsny #team-growth
|
||||
CRM_BLOCKING_QUERIES: 'crm-blocking-queries', // owner: @danielbachhuber #team-crm
|
||||
RECORDINGS_BLOBBY_V2_REPLAY: 'recordings-blobby-v2-replay', // owner: @pl #team-cdp
|
||||
} as const
|
||||
export type FeatureFlagKey = (typeof FEATURE_FLAGS)[keyof typeof FEATURE_FLAGS]
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ import { createSegments, mapSnapshotsToWindowId } from './utils/segmenter'
|
||||
const IS_TEST_MODE = process.env.NODE_ENV === 'test'
|
||||
const BUFFER_MS = 60000 // +- before and after start and end of a recording to query for.
|
||||
const DEFAULT_REALTIME_POLLING_MILLIS = 3000
|
||||
const DEFAULT_V2_POLLING_INTERVAL_MS = 10000
|
||||
export const MUTATION_CHUNK_SIZE = 5000 // Maximum number of mutations per chunk
|
||||
|
||||
let postHogEEModule: PostHogEE
|
||||
@@ -282,6 +283,12 @@ export const parseEncodedSnapshots = async (
|
||||
if (typeof l === 'string') {
|
||||
// is loaded from blob or realtime storage
|
||||
snapshotLine = JSON.parse(l) as EncodedRecordingSnapshot
|
||||
if (Array.isArray(snapshotLine)) {
|
||||
snapshotLine = {
|
||||
windowId: snapshotLine[0],
|
||||
data: [snapshotLine[1]],
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// is loaded from file export
|
||||
snapshotLine = l
|
||||
@@ -455,7 +462,7 @@ export const sessionRecordingDataLogic = kea<sessionRecordingDataLogicType>([
|
||||
loadRecordingComments: true,
|
||||
maybeLoadRecordingMeta: true,
|
||||
loadSnapshots: true,
|
||||
loadSnapshotSources: true,
|
||||
loadSnapshotSources: (breakpointLength?: number) => ({ breakpointLength }),
|
||||
loadNextSnapshotSource: true,
|
||||
loadSnapshotsForSource: (source: Pick<SessionRecordingSnapshotSource, 'source' | 'blob_key'>) => ({ source }),
|
||||
loadEvents: true,
|
||||
@@ -561,9 +568,18 @@ export const sessionRecordingDataLogic = kea<sessionRecordingDataLogicType>([
|
||||
snapshotSources: [
|
||||
null as SessionRecordingSnapshotSource[] | null,
|
||||
{
|
||||
loadSnapshotSources: async () => {
|
||||
loadSnapshotSources: async ({ breakpointLength }, breakpoint) => {
|
||||
if (breakpointLength) {
|
||||
await breakpoint(breakpointLength)
|
||||
}
|
||||
const response = await api.recordings.listSnapshotSources(props.sessionRecordingId)
|
||||
return response.sources ?? []
|
||||
if (!response.sources) {
|
||||
return []
|
||||
}
|
||||
if (values.featureFlags[FEATURE_FLAGS.RECORDINGS_BLOBBY_V2_REPLAY]) {
|
||||
return response.sources.filter((s) => s.source === SnapshotSourceType.blob_v2)
|
||||
}
|
||||
return response.sources.filter((s) => s.source !== SnapshotSourceType.blob_v2)
|
||||
},
|
||||
},
|
||||
],
|
||||
@@ -580,6 +596,8 @@ export const sessionRecordingDataLogic = kea<sessionRecordingDataLogicType>([
|
||||
params = { blob_key: source.blob_key, source: 'blob' }
|
||||
} else if (source.source === SnapshotSourceType.realtime) {
|
||||
params = { source: 'realtime', version: '2024-04-30' }
|
||||
} else if (source.source === SnapshotSourceType.blob_v2) {
|
||||
params = { source: 'blob_v2', blob_key: source.blob_key }
|
||||
} else {
|
||||
throw new Error(`Unsupported source: ${source.source}`)
|
||||
}
|
||||
@@ -729,7 +747,7 @@ export const sessionRecordingDataLogic = kea<sessionRecordingDataLogicType>([
|
||||
query: hogql`SELECT properties, uuid
|
||||
FROM events
|
||||
-- the timestamp range here is only to avoid querying too much of the events table
|
||||
-- we don't really care about the absolute value,
|
||||
-- we don't really care about the absolute value,
|
||||
-- but we do care about whether timezones have an odd impact
|
||||
-- so, we extend the range by a day on each side so that timezones don't cause issues
|
||||
WHERE timestamp > ${dayjs(earliestTimestamp).subtract(1, 'day')}
|
||||
@@ -852,6 +870,10 @@ export const sessionRecordingDataLogic = kea<sessionRecordingDataLogicType>([
|
||||
return actions.loadSnapshotsForSource(nextSourceToLoad)
|
||||
}
|
||||
|
||||
if (values.snapshotSources?.find((s) => s.source === SnapshotSourceType.blob_v2)) {
|
||||
actions.loadSnapshotSources(DEFAULT_V2_POLLING_INTERVAL_MS)
|
||||
}
|
||||
|
||||
// TODO: Move this to a one time check - only report once per recording
|
||||
cache.snapshotsLoadDuration = Math.round(performance.now() - cache.snapshotsStartTime)
|
||||
actions.reportUsageIfFullyLoaded()
|
||||
|
||||
@@ -975,6 +975,7 @@ export const SnapshotSourceType = {
|
||||
blob: 'blob',
|
||||
realtime: 'realtime',
|
||||
file: 'file',
|
||||
blob_v2: 'blob_v2',
|
||||
} as const
|
||||
|
||||
export type SnapshotSourceType = (typeof SnapshotSourceType)[keyof typeof SnapshotSourceType]
|
||||
@@ -991,6 +992,10 @@ export type SessionRecordingSnapshotParams =
|
||||
source: 'blob'
|
||||
blob_key?: string
|
||||
}
|
||||
| {
|
||||
source: 'blob_v2'
|
||||
blob_key?: string
|
||||
}
|
||||
| {
|
||||
source: 'realtime'
|
||||
// originally realtime snapshots were returned in a different format than blob snapshots
|
||||
|
||||
@@ -56,3 +56,23 @@ class RecordingMetadata(TypedDict):
|
||||
|
||||
class RecordingMatchingEvents(TypedDict):
|
||||
events: list[MatchingSessionRecordingEvent]
|
||||
|
||||
|
||||
class RecordingMetadataV2Test(TypedDict):
|
||||
"""Metadata for session recordings from the v2 test table.
|
||||
|
||||
Maps to the columns in session_replay_events_v2_test table:
|
||||
- distinct_id
|
||||
- min_first_timestamp (as start_time)
|
||||
- max_last_timestamp (as end_time)
|
||||
- block_first_timestamps
|
||||
- block_last_timestamps
|
||||
- block_urls
|
||||
"""
|
||||
|
||||
distinct_id: str
|
||||
start_time: datetime # min_first_timestamp
|
||||
end_time: datetime # max_last_timestamp
|
||||
block_first_timestamps: list[datetime]
|
||||
block_last_timestamps: list[datetime]
|
||||
block_urls: list[str]
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from posthog.clickhouse.client import sync_execute
|
||||
from posthog.models.team import Team
|
||||
from posthog.session_recordings.models.metadata import RecordingMetadataV2Test
|
||||
|
||||
|
||||
class SessionReplayEventsV2Test:
|
||||
def get_metadata(
|
||||
self,
|
||||
session_id: str,
|
||||
team: Team,
|
||||
recording_start_time: Optional[datetime] = None,
|
||||
) -> Optional[RecordingMetadataV2Test]:
|
||||
query = """
|
||||
SELECT
|
||||
any(distinct_id),
|
||||
min(min_first_timestamp) as start_time,
|
||||
max(max_last_timestamp) as end_time,
|
||||
groupArrayArray(block_first_timestamps) as block_first_timestamps,
|
||||
groupArrayArray(block_last_timestamps) as block_last_timestamps,
|
||||
groupArrayArray(block_urls) as block_urls
|
||||
FROM
|
||||
session_replay_events_v2_test
|
||||
PREWHERE
|
||||
team_id = %(team_id)s
|
||||
AND session_id = %(session_id)s
|
||||
{optional_timestamp_clause}
|
||||
GROUP BY
|
||||
session_id
|
||||
"""
|
||||
query = query.format(
|
||||
optional_timestamp_clause=(
|
||||
"AND min_first_timestamp >= %(recording_start_time)s" if recording_start_time else ""
|
||||
)
|
||||
)
|
||||
|
||||
replay_response: list[tuple] = sync_execute(
|
||||
query,
|
||||
{
|
||||
"team_id": team.pk,
|
||||
"session_id": session_id,
|
||||
"recording_start_time": recording_start_time,
|
||||
},
|
||||
)
|
||||
|
||||
if len(replay_response) == 0:
|
||||
return None
|
||||
if len(replay_response) > 1:
|
||||
raise ValueError(f"Multiple sessions found for session_id: {session_id}")
|
||||
|
||||
replay = replay_response[0]
|
||||
return RecordingMetadataV2Test(
|
||||
distinct_id=replay[0],
|
||||
start_time=replay[1],
|
||||
end_time=replay[2],
|
||||
block_first_timestamps=replay[3],
|
||||
block_last_timestamps=replay[4],
|
||||
block_urls=replay[5],
|
||||
)
|
||||
@@ -9,7 +9,7 @@ from json import JSONDecodeError
|
||||
from typing import Any, Optional, cast, Literal
|
||||
|
||||
from posthoganalytics.ai.openai import OpenAI
|
||||
from urllib.parse import urlparse
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
import posthoganalytics
|
||||
import requests
|
||||
@@ -49,6 +49,7 @@ from posthog.session_recordings.models.session_recording_event import (
|
||||
)
|
||||
from posthog.session_recordings.queries.session_recording_list_from_query import SessionRecordingListFromQuery
|
||||
from posthog.session_recordings.queries.session_replay_events import SessionReplayEvents
|
||||
from posthog.session_recordings.queries.session_replay_events_v2_test import SessionReplayEventsV2Test
|
||||
from posthog.session_recordings.realtime_snapshots import (
|
||||
get_realtime_snapshots,
|
||||
publish_subscription,
|
||||
@@ -66,6 +67,7 @@ from openai.types.chat import (
|
||||
ChatCompletionAssistantMessageParam,
|
||||
)
|
||||
from posthog.session_recordings.utils import clean_prompt_whitespace
|
||||
import snappy
|
||||
|
||||
SNAPSHOTS_BY_PERSONAL_API_KEY_COUNTER = Counter(
|
||||
"snapshots_personal_api_key_counter",
|
||||
@@ -642,8 +644,14 @@ class SessionRecordingViewSet(TeamAndOrgViewSetMixin, viewsets.GenericViewSet, U
|
||||
return self._send_realtime_snapshots_to_client(recording, request, event_properties)
|
||||
elif source == "blob":
|
||||
return self._stream_blob_to_client(recording, request, event_properties)
|
||||
elif source == "blob_v2":
|
||||
blob_key = request.GET.get("blob_key")
|
||||
if blob_key:
|
||||
return self._stream_blob_v2_to_client(recording, request, event_properties)
|
||||
else:
|
||||
return self._gather_session_recording_sources(recording)
|
||||
else:
|
||||
raise exceptions.ValidationError("Invalid source must be one of [realtime, blob]")
|
||||
raise exceptions.ValidationError("Invalid source must be one of [realtime, blob, blob_v2]")
|
||||
|
||||
def _maybe_report_recording_list_filters_changed(self, request: request.Request, team: Team):
|
||||
"""
|
||||
@@ -675,6 +683,26 @@ class SessionRecordingViewSet(TeamAndOrgViewSetMixin, viewsets.GenericViewSet, U
|
||||
blob_keys: list[str] | None = None
|
||||
blob_prefix = ""
|
||||
|
||||
v2_metadata = SessionReplayEventsV2Test().get_metadata(str(recording.session_id), self.team)
|
||||
if v2_metadata:
|
||||
blocks = sorted(
|
||||
zip(
|
||||
v2_metadata["block_first_timestamps"],
|
||||
v2_metadata["block_last_timestamps"],
|
||||
v2_metadata["block_urls"],
|
||||
),
|
||||
key=lambda x: x[0],
|
||||
)
|
||||
for i, (start_timestamp, end_timestamp, _) in enumerate(blocks):
|
||||
sources.append(
|
||||
{
|
||||
"source": "blob_v2",
|
||||
"start_timestamp": start_timestamp,
|
||||
"end_timestamp": end_timestamp,
|
||||
"blob_key": str(i),
|
||||
}
|
||||
)
|
||||
might_have_realtime = False
|
||||
if recording.object_storage_path:
|
||||
blob_prefix = recording.object_storage_path
|
||||
blob_keys = object_storage.list_objects(cast(str, blob_prefix))
|
||||
@@ -852,6 +880,86 @@ class SessionRecordingViewSet(TeamAndOrgViewSetMixin, viewsets.GenericViewSet, U
|
||||
|
||||
return response
|
||||
|
||||
def _stream_blob_v2_to_client(
|
||||
self, recording: SessionRecording, request: request.Request, event_properties: dict
|
||||
) -> HttpResponse:
|
||||
"""Stream a v2 session recording blob to the client.
|
||||
|
||||
The blob_key is the block index in the metadata arrays.
|
||||
"""
|
||||
blob_key = request.GET.get("blob_key", "")
|
||||
if not blob_key:
|
||||
raise exceptions.ValidationError("Must provide a blob key")
|
||||
|
||||
try:
|
||||
block_index = int(blob_key)
|
||||
except ValueError:
|
||||
raise exceptions.ValidationError("Blob key must be an integer")
|
||||
|
||||
event_properties["source"] = "blob_v2"
|
||||
event_properties["blob_key"] = blob_key
|
||||
posthoganalytics.capture(
|
||||
self._distinct_id_from_request(request),
|
||||
"session recording snapshots v2 loaded",
|
||||
event_properties,
|
||||
)
|
||||
|
||||
with STREAM_RESPONSE_TO_CLIENT_HISTOGRAM.time():
|
||||
# Get metadata for the session
|
||||
metadata = SessionReplayEventsV2Test().get_metadata(recording.session_id, self.team)
|
||||
if not metadata:
|
||||
raise exceptions.NotFound("Session recording not found")
|
||||
|
||||
# Sort blocks by first timestamp
|
||||
blocks = sorted(
|
||||
zip(metadata["block_first_timestamps"], metadata["block_last_timestamps"], metadata["block_urls"]),
|
||||
key=lambda x: x[0],
|
||||
)
|
||||
|
||||
# Validate block index
|
||||
if block_index >= len(blocks):
|
||||
raise exceptions.NotFound("Block index out of range")
|
||||
|
||||
# Get the block URL
|
||||
block_url = blocks[block_index][2]
|
||||
if not block_url:
|
||||
raise exceptions.NotFound("Block URL not found")
|
||||
|
||||
# Parse URL and extract key and byte range
|
||||
parsed_url = urlparse(block_url)
|
||||
key = parsed_url.path.lstrip("/")
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
byte_range = query_params.get("range", [""])[0].replace("bytes=", "")
|
||||
start_byte, end_byte = map(int, byte_range.split("-")) if "-" in byte_range else (None, None)
|
||||
|
||||
# Read and return the specific byte range from the object
|
||||
if start_byte is None or end_byte is None:
|
||||
raise exceptions.NotFound("Invalid byte range")
|
||||
|
||||
expected_length = end_byte - start_byte + 1
|
||||
compressed_block = object_storage.read_bytes(key, first_byte=start_byte, last_byte=end_byte)
|
||||
|
||||
if not compressed_block:
|
||||
raise exceptions.NotFound("Block content not found")
|
||||
|
||||
if len(compressed_block) != expected_length:
|
||||
raise exceptions.APIException(
|
||||
f"Unexpected data length. Expected {expected_length} bytes, got {len(compressed_block)} bytes."
|
||||
)
|
||||
|
||||
decompressed_block = snappy.decompress(compressed_block).decode("utf-8")
|
||||
|
||||
response = HttpResponse(
|
||||
content=decompressed_block,
|
||||
content_type="application/jsonl",
|
||||
)
|
||||
|
||||
# Set caching headers - blocks are immutable so we can cache for a while
|
||||
response["Cache-Control"] = "max-age=3600"
|
||||
response["Content-Disposition"] = "inline"
|
||||
|
||||
return response
|
||||
|
||||
def _send_realtime_snapshots_to_client(
|
||||
self, recording: SessionRecording, request: request.Request, event_properties: dict
|
||||
) -> HttpResponse | Response:
|
||||
|
||||
@@ -34,7 +34,9 @@ class ObjectStorageClient(metaclass=abc.ABCMeta):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def read_bytes(self, bucket: str, key: str) -> Optional[bytes]:
|
||||
def read_bytes(
|
||||
self, bucket: str, key: str, first_byte: Optional[int] = None, last_byte: Optional[int] = None
|
||||
) -> Optional[bytes]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
@@ -64,10 +66,12 @@ class UnavailableStorage(ObjectStorageClient):
|
||||
pass
|
||||
|
||||
def read(self, bucket: str, key: str) -> Optional[str]:
|
||||
pass
|
||||
return None
|
||||
|
||||
def read_bytes(self, bucket: str, key: str) -> Optional[bytes]:
|
||||
pass
|
||||
def read_bytes(
|
||||
self, bucket: str, key: str, first_byte: Optional[int] = None, last_byte: Optional[int] = None
|
||||
) -> Optional[bytes]:
|
||||
return None
|
||||
|
||||
def tag(self, bucket: str, key: str, tags: dict[str, str]) -> None:
|
||||
pass
|
||||
@@ -127,10 +131,18 @@ class ObjectStorage(ObjectStorageClient):
|
||||
else:
|
||||
return None
|
||||
|
||||
def read_bytes(self, bucket: str, key: str) -> Optional[bytes]:
|
||||
def read_bytes(
|
||||
self, bucket: str, key: str, first_byte: Optional[int] = None, last_byte: Optional[int] = None
|
||||
) -> Optional[bytes]:
|
||||
s3_response = {}
|
||||
try:
|
||||
s3_response = self.aws_client.get_object(Bucket=bucket, Key=key)
|
||||
kwargs = {"Bucket": bucket, "Key": key}
|
||||
if first_byte is not None and last_byte is not None:
|
||||
kwargs["Range"] = f"bytes={first_byte}-{last_byte}"
|
||||
elif first_byte is not None:
|
||||
kwargs["Range"] = f"bytes={first_byte}-"
|
||||
|
||||
s3_response = self.aws_client.get_object(**kwargs)
|
||||
return s3_response["Body"].read()
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
@@ -235,8 +247,11 @@ def read(file_name: str, bucket: str | None = None) -> Optional[str]:
|
||||
return object_storage_client().read(bucket=bucket or settings.OBJECT_STORAGE_BUCKET, key=file_name)
|
||||
|
||||
|
||||
def read_bytes(file_name: str) -> Optional[bytes]:
|
||||
return object_storage_client().read_bytes(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name)
|
||||
def read_bytes(
|
||||
file_name: str, bucket: str | None = None, first_byte: Optional[int] = None, last_byte: Optional[int] = None
|
||||
) -> Optional[bytes]:
|
||||
bucket = bucket or settings.OBJECT_STORAGE_BUCKET
|
||||
return object_storage_client().read_bytes(bucket, file_name, first_byte, last_byte)
|
||||
|
||||
|
||||
def list_objects(prefix: str) -> Optional[list[str]]:
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import re
|
||||
import uuid
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from boto3 import resource
|
||||
from botocore.client import Config
|
||||
@@ -13,10 +15,12 @@ from posthog.settings import (
|
||||
from posthog.storage.object_storage import (
|
||||
health_check,
|
||||
read,
|
||||
read_bytes,
|
||||
write,
|
||||
get_presigned_url,
|
||||
list_objects,
|
||||
copy_objects,
|
||||
ObjectStorage,
|
||||
)
|
||||
from posthog.test.base import APIBaseTest
|
||||
|
||||
@@ -39,7 +43,7 @@ class TestStorage(APIBaseTest):
|
||||
@patch("posthog.storage.object_storage.client")
|
||||
def test_does_not_create_client_if_storage_is_disabled(self, patched_s3_client) -> None:
|
||||
with self.settings(OBJECT_STORAGE_ENABLED=False):
|
||||
self.assertFalse(health_check())
|
||||
assert not health_check()
|
||||
patched_s3_client.assert_not_called()
|
||||
|
||||
def test_write_and_read_works_with_known_content(self) -> None:
|
||||
@@ -49,7 +53,7 @@ class TestStorage(APIBaseTest):
|
||||
name = f"{session_id}/{0}-{chunk_id}"
|
||||
file_name = f"{TEST_BUCKET}/test_write_and_read_works_with_known_content/{name}"
|
||||
write(file_name, "my content")
|
||||
self.assertEqual(read(file_name), "my content")
|
||||
assert read(file_name) == "my content"
|
||||
|
||||
def test_write_and_read_works_with_known_byte_content(self) -> None:
|
||||
with self.settings(OBJECT_STORAGE_ENABLED=True):
|
||||
@@ -58,7 +62,7 @@ class TestStorage(APIBaseTest):
|
||||
name = f"{session_id}/{0}-{chunk_id}"
|
||||
file_name = f"{TEST_BUCKET}/test_write_and_read_works_with_known_content/{name}"
|
||||
write(file_name, b"my content")
|
||||
self.assertEqual(read(file_name), "my content")
|
||||
assert read(file_name) == "my content"
|
||||
|
||||
def test_can_generate_presigned_url_for_existing_file(self) -> None:
|
||||
with self.settings(OBJECT_STORAGE_ENABLED=True):
|
||||
@@ -70,9 +74,9 @@ class TestStorage(APIBaseTest):
|
||||
|
||||
presigned_url = get_presigned_url(file_name)
|
||||
assert presigned_url is not None
|
||||
self.assertRegex(
|
||||
presigned_url,
|
||||
assert re.match(
|
||||
r"^http://localhost:\d+/posthog/test_storage_bucket/test_can_generate_presigned_url_for_existing_file/.*\?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=.*$",
|
||||
presigned_url,
|
||||
)
|
||||
|
||||
def test_can_generate_presigned_url_for_non_existent_file(self) -> None:
|
||||
@@ -82,9 +86,9 @@ class TestStorage(APIBaseTest):
|
||||
|
||||
presigned_url = get_presigned_url(file_name)
|
||||
assert presigned_url is not None
|
||||
self.assertRegex(
|
||||
presigned_url,
|
||||
assert re.match(
|
||||
r"^http://localhost:\d+/posthog/test_storage_bucket/test_can_ignore_presigned_url_for_non_existent_file/.*?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=.*$",
|
||||
presigned_url,
|
||||
)
|
||||
|
||||
def test_can_list_objects_with_prefix(self) -> None:
|
||||
@@ -157,3 +161,40 @@ class TestStorage(APIBaseTest):
|
||||
"test_storage_bucket/a_shared_prefix/b",
|
||||
"test_storage_bucket/a_shared_prefix/c",
|
||||
]
|
||||
|
||||
def test_read_bytes_with_byte_range(self):
|
||||
# Setup
|
||||
mock_client = MagicMock()
|
||||
mock_body = MagicMock()
|
||||
|
||||
# For the first test, return a specific content
|
||||
mock_body.read.return_value = b"test content"
|
||||
mock_client.get_object.return_value = {"Body": mock_body}
|
||||
storage = ObjectStorage(mock_client)
|
||||
|
||||
# Test with both first_byte and last_byte
|
||||
storage.read_bytes("test-bucket", "test-key", first_byte=5, last_byte=10)
|
||||
mock_client.get_object.assert_called_with(Bucket="test-bucket", Key="test-key", Range="bytes=5-10")
|
||||
|
||||
# Test with only first_byte
|
||||
storage.read_bytes("test-bucket", "test-key", first_byte=5)
|
||||
mock_client.get_object.assert_called_with(Bucket="test-bucket", Key="test-key", Range="bytes=5-")
|
||||
|
||||
# Test without byte range
|
||||
storage.read_bytes("test-bucket", "test-key")
|
||||
mock_client.get_object.assert_called_with(Bucket="test-bucket", Key="test-key")
|
||||
|
||||
def test_read_specific_byte_range(self):
|
||||
with self.settings(OBJECT_STORAGE_ENABLED=True):
|
||||
# Setup
|
||||
session_id = str(uuid.uuid4())
|
||||
chunk_id = uuid.uuid4()
|
||||
name = f"{session_id}/{0}-{chunk_id}"
|
||||
file_name = f"{TEST_BUCKET}/test_read_specific_byte_range/{name}"
|
||||
content = b"abcdefghij" * 11 # 110 bytes total
|
||||
write(file_name, content)
|
||||
|
||||
result = read_bytes(file_name, first_byte=91, last_byte=101)
|
||||
|
||||
assert result == b"bcdefghijab"
|
||||
assert len(result) == 11
|
||||
|
||||
@@ -125,5 +125,6 @@ grpcio~=1.63.2 # Version constrained so that `deepeval` can be installed in in d
|
||||
tenacity~=8.4.2 # Version constrained so that `deepeval` can be installed in in dev
|
||||
markdown-it-py~=3.0.0
|
||||
tzlocal~=5.1
|
||||
python-snappy~=0.7.3
|
||||
cohere==5.14.0
|
||||
types-cachetools==5.5.0.20240820
|
||||
|
||||
@@ -144,6 +144,8 @@ coloredlogs==14.0
|
||||
# via dagster
|
||||
conditional-cache==1.2
|
||||
# via -r requirements.in
|
||||
cramjam==2.9.1
|
||||
# via python-snappy
|
||||
cryptography==39.0.2
|
||||
# via
|
||||
# -r requirements.in
|
||||
@@ -341,8 +343,6 @@ graphql-core==3.2.5
|
||||
# graphql-relay
|
||||
graphql-relay==3.2.0
|
||||
# via graphene
|
||||
greenlet==3.1.1
|
||||
# via sqlalchemy
|
||||
grpcio==1.63.2
|
||||
# via
|
||||
# -r requirements.in
|
||||
@@ -683,6 +683,8 @@ python-dotenv==0.21.0
|
||||
# pydantic-settings
|
||||
# uvicorn
|
||||
# webdriver-manager
|
||||
python-snappy==0.7.3
|
||||
# via -r requirements.in
|
||||
python-statsd==2.1.0
|
||||
# via django-statsd
|
||||
python3-openid==3.1.0
|
||||
|
||||
Reference in New Issue
Block a user