From 8c2ef1c587c8b0949c43489bc8e0f94bdf84b10c Mon Sep 17 00:00:00 2001 From: David Newell Date: Tue, 17 Jun 2025 16:52:32 +0100 Subject: [PATCH] feat: presigned upload flow (#33746) --- cli/Cargo.lock | 64 ++++--- cli/Cargo.toml | 2 +- cli/src/commands/sourcemap/upload.rs | 158 +++++++++++++++--- posthog/api/error_tracking.py | 158 +++++++++++++----- posthog/api/test/test_error_tracking.py | 59 ++++++- posthog/storage/object_storage.py | 49 +++++- posthog/storage/test/test_object_storage.py | 13 ++ .../frontend/ErrorTrackingScene.tsx | 23 ++- ...19e7516ff5074840d1a0dad1842075d877388.json | 35 ++++ ...52ea79485e403b9e17b6c37259ea0612065ee.json | 23 --- ...e6591135f7d0fab4e2d2d004bf7e2e9d2461.json} | 4 +- rust/cymbal/src/symbol_store/chunk_id.rs | 2 +- rust/cymbal/src/symbol_store/saving.rs | 5 +- 13 files changed, 459 insertions(+), 136 deletions(-) create mode 100644 rust/.sqlx/query-4343f5351a5751f5c36aca99d8d19e7516ff5074840d1a0dad1842075d877388.json delete mode 100644 rust/.sqlx/query-c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee.json rename rust/.sqlx/{query-f389911c2ee51d01e5dae92f6ba120cf2186a728ba52924d36eeb22377351dcf.json => query-da4bb58032057ce9d01119f6ae68e6591135f7d0fab4e2d2d004bf7e2e9d2461.json} (84%) diff --git a/cli/Cargo.lock b/cli/Cargo.lock index ce8e158692..613748be08 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -229,9 +229,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.26" +version = "1.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "956a5e21988b87f372569b66183b78babf23ebc2e744b733e4350a752c4dafac" +checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc" dependencies = [ "shlex", ] @@ -509,7 +509,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.60.1", + "windows-sys 0.60.2", ] [[package]] @@ -889,7 +889,7 @@ dependencies = [ "http 1.3.1", "hyper 1.6.0", "hyper-util", - "rustls 0.23.27", + "rustls 0.23.28", "rustls-pki-types", "tokio", "tokio-rustls 0.26.2", @@ -1171,9 +1171,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.172" +version = "0.2.173" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +checksum = "d8cfeafaffdbc32176b64fb251369d52ea9f0a8fbc6f8759edffef7b525d64bb" [[package]] name = "libredox" @@ -1463,7 +1463,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "posthog-cli" -version = "0.2.0" +version = "0.3.0" dependencies = [ "anyhow", "clap", @@ -1553,7 +1553,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.27", + "rustls 0.23.28", "socket2", "thiserror 2.0.12", "tokio", @@ -1573,7 +1573,7 @@ dependencies = [ "rand", "ring", "rustc-hash", - "rustls 0.23.27", + "rustls 0.23.28", "rustls-pki-types", "slab", "thiserror 2.0.12", @@ -1607,9 +1607,9 @@ dependencies = [ [[package]] name = "r-efi" -version = "5.2.0" +version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" [[package]] name = "radium" @@ -1669,9 +1669,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.12" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" +checksum = "0d04b7d0ee6b4a0207a0a7adb104d23ecb0b47d6beae7152d0fa34b692b29fd6" dependencies = [ "bitflags 2.9.1", ] @@ -1795,7 +1795,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.27", + "rustls 0.23.28", "rustls-pki-types", "serde", "serde_json", @@ -1879,9 +1879,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.27" +version = "0.23.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" +checksum = "7160e3e10bf4535308537f3c4e1641468cd0e485175d6163087c0393c7d46643" dependencies = [ "once_cell", "ring", @@ -2077,12 +2077,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] +checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" [[package]] name = "smallvec" @@ -2187,9 +2184,9 @@ checksum = "b7401a30af6cb5818bb64852270bb722533397edcfc7344954a38f420819ece2" [[package]] name = "syn" -version = "2.0.102" +version = "2.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6397daf94fa90f058bd0fd88429dd9e5738999cca8d701813c80723add80462" +checksum = "e4307e30089d6fd6aff212f2da3a1f9e32f3223b1f010fb09b7c95f90f3ca1e8" dependencies = [ "proc-macro2", "quote", @@ -2333,12 +2330,11 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.8" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" dependencies = [ "cfg-if", - "once_cell", ] [[package]] @@ -2397,7 +2393,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls 0.23.27", + "rustls 0.23.28", "tokio", ] @@ -2869,9 +2865,9 @@ dependencies = [ [[package]] name = "windows-link" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3bfe459f85da17560875b8bf1423d6f113b7a87a5d942e7da0ac71be7c61f8b" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" [[package]] name = "windows-result" @@ -2920,11 +2916,11 @@ dependencies = [ [[package]] name = "windows-sys" -version = "0.60.1" +version = "0.60.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b36e9ed89376c545e20cbf5a13c306b49106b21b9d1d4f9cb9a1cb6b1e9ee06a" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" dependencies = [ - "windows-targets 0.53.1", + "windows-targets 0.53.2", ] [[package]] @@ -2960,9 +2956,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.53.1" +version = "0.53.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30357ec391cde730f8fbfcdc29adc47518b06504528df977ab5af02ef23fdee9" +checksum = "c66f69fcc9ce11da9966ddb31a40968cad001c5bedeb5c2b82ede4253ab48aef" dependencies = [ "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 1f86b844ff..3e69281e28 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "posthog-cli" -version = "0.2.0" +version = "0.3.0" authors = [ "David ", "Olly ", diff --git a/cli/src/commands/sourcemap/upload.rs b/cli/src/commands/sourcemap/upload.rs index c4ccd5c42f..a8868ff7a9 100644 --- a/cli/src/commands/sourcemap/upload.rs +++ b/cli/src/commands/sourcemap/upload.rs @@ -1,15 +1,38 @@ use core::str; +use std::collections::HashMap; use std::path::PathBuf; -use anyhow::{anyhow, Context, Ok, Result}; +use anyhow::{bail, Context, Ok, Result}; +use reqwest::blocking::multipart::{Form, Part}; +use reqwest::blocking::Client; +use serde::{Deserialize, Serialize}; use sha2::Digest; -use tracing::info; +use tracing::{info, warn}; use crate::utils::auth::load_token; use crate::utils::posthog::capture_command_invoked; use crate::utils::release::{create_release, CreateReleaseResponse}; use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair}; +const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; // 100 MB + +#[derive(Debug, Deserialize)] +struct StartUploadResponseData { + presigned_url: PresignedUrl, + symbol_set_id: String, +} + +#[derive(Deserialize, Debug)] +pub struct PresignedUrl { + pub url: String, + pub fields: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct FinishUploadRequest { + pub content_hash: String, +} + pub fn upload( host: Option, directory: &PathBuf, @@ -22,7 +45,7 @@ pub fn upload( let capture_handle = capture_command_invoked("sourcemap_upload", Some(&token.env_id)); - let url = format!( + let base_url = format!( "{}/api/environments/{}/error_tracking/symbol_sets", host, token.env_id ); @@ -45,13 +68,13 @@ pub fn upload( &host, &token, Some(directory.clone()), - Some(content_hash(&uploads)), + Some(content_hash(uploads.iter().map(|upload| &upload.data))), project, version, ) .context("While creating release")?; - upload_chunks(&url, &token.token, uploads, release.as_ref())?; + upload_chunks(&base_url, &token.token, uploads, release.as_ref())?; if delete_after { delete_files(sourcemap_paths).context("While deleting sourcemaps")?; @@ -71,7 +94,7 @@ fn collect_uploads(pairs: Vec) -> Result> { } fn upload_chunks( - url: &str, + base_url: &str, token: &str, uploads: Vec, release: Option<&CreateReleaseResponse>, @@ -81,36 +104,119 @@ fn upload_chunks( for upload in uploads { info!("Uploading chunk {}", upload.chunk_id); - let mut params: Vec<(&'static str, &str)> = - vec![("chunk_id", &upload.chunk_id), ("multipart", "true")]; - if let Some(id) = &release_id { - params.push(("release_id", id)); + let upload_size = upload.data.len(); + if upload_size > MAX_FILE_SIZE { + warn!( + "Skipping chunk {} because the file size is too large ({})", + upload.chunk_id, upload_size + ); + continue; } - let part = reqwest::blocking::multipart::Part::bytes(upload.data).file_name("file"); - let form = reqwest::blocking::multipart::Form::new().part("file", part); + let upload_response = + start_upload(&client, base_url, token, &upload.chunk_id, &release_id)?; - let res = client - .post(url) - .multipart(form) - .header("Authorization", format!("Bearer {}", token)) - .query(¶ms) - .send() - .context(format!("While uploading chunk to {}", url))?; + let content_hash = content_hash([&upload.data]); - if !res.status().is_success() { - return Err(anyhow!("Failed to upload chunk: {:?}", res) - .context(format!("Chunk id: {}", upload.chunk_id))); - } + upload_to_s3(&client, upload_response.presigned_url, upload.data)?; + + finish_upload( + &client, + base_url, + token, + upload_response.symbol_set_id, + content_hash, + )?; } Ok(()) } -fn content_hash(uploads: &[ChunkUpload]) -> String { +fn start_upload( + client: &Client, + base_url: &str, + auth_token: &str, + chunk_id: &str, + release_id: &Option, +) -> Result { + let start_upload_url: String = format!("{}{}", base_url, "/start_upload"); + + let mut params = vec![("chunk_id", chunk_id)]; + if let Some(id) = release_id { + params.push(("release_id", id)); + } + + let res = client + .post(&start_upload_url) + .header("Authorization", format!("Bearer {}", auth_token)) + .query(¶ms) + .send() + .context(format!("While starting upload to {}", start_upload_url))?; + + if !res.status().is_success() { + bail!("Failed to start upload: {:?}", res); + } + + let data: StartUploadResponseData = res.json()?; + Ok(data) +} + +fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec) -> Result<()> { + let mut form = Form::new(); + + for (key, value) in presigned_url.fields { + form = form.text(key.clone(), value.clone()); + } + + let part = Part::bytes(data); + form = form.part("file", part); + + let res = client + .post(&presigned_url.url) + .multipart(form) + .send() + .context(format!("While uploading chunk to {}", presigned_url.url))?; + + if !res.status().is_success() { + bail!("Failed to upload chunk: {:?}", res); + } + + Ok(()) +} + +fn finish_upload( + client: &Client, + base_url: &str, + auth_token: &str, + symbol_set_id: String, + content_hash: String, +) -> Result<()> { + let finish_upload_url: String = format!("{}/{}/{}", base_url, symbol_set_id, "finish_upload"); + let request = FinishUploadRequest { content_hash }; + + let res = client + .put(finish_upload_url) + .header("Authorization", format!("Bearer {}", auth_token)) + .header("Content-Type", "application/json") + .json(&request) + .send() + .context(format!("While finishing upload to {}", base_url))?; + + if !res.status().is_success() { + bail!("Failed to finish upload: {:?}", res); + } + + Ok(()) +} + +fn content_hash(upload_data: Iter) -> String +where + Iter: IntoIterator, + Item: AsRef<[u8]>, +{ let mut hasher = sha2::Sha512::new(); - for upload in uploads { - hasher.update(&upload.data); + for data in upload_data { + hasher.update(data.as_ref()); } format!("{:x}", hasher.finalize()) } diff --git a/posthog/api/error_tracking.py b/posthog/api/error_tracking.py index b9938c048a..c7a09a3d2d 100644 --- a/posthog/api/error_tracking.py +++ b/posthog/api/error_tracking.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Optional from django.core.files.uploadedfile import UploadedFile from posthog.models.team.team import Team @@ -8,7 +8,7 @@ import hashlib from rest_framework import serializers, viewsets, status, request from rest_framework.response import Response from rest_framework.exceptions import ValidationError -from rest_framework.parsers import MultiPartParser, FileUploadParser +from rest_framework.parsers import MultiPartParser, FileUploadParser, JSONParser from django.http import JsonResponse from django.conf import settings @@ -44,7 +44,7 @@ from posthog.tasks.email import send_error_tracking_issue_assigned from posthog.hogql.compiler.bytecode import create_bytecode from posthog.schema import PropertyGroupFilterValue -ONE_GIGABYTE = 1024 * 1024 * 1024 +ONE_HUNDRED_MEGABYTES = 1024 * 1024 * 100 JS_DATA_MAGIC = b"posthog_error_tracking" JS_DATA_VERSION = 1 JS_DATA_TYPE_SOURCE_AND_MAP = 2 @@ -433,6 +433,7 @@ class ErrorTrackingSymbolSetViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSe queryset = ErrorTrackingSymbolSet.objects.all() serializer_class = ErrorTrackingSymbolSetSerializer parser_classes = [MultiPartParser, FileUploadParser] + scope_object_write_actions = ["start_upload", "finish_upload"] def safely_get_queryset(self, queryset): queryset = queryset.filter(team_id=self.team.id) @@ -474,6 +475,8 @@ class ErrorTrackingSymbolSetViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSe # pull the symbol set reference from the query params chunk_id = request.query_params.get("chunk_id", None) multipart = request.query_params.get("multipart", False) + release_id = request.query_params.get("release_id", None) + if not chunk_id: return Response({"detail": "chunk_id query parameter is required"}, status=status.HTTP_400_BAD_REQUEST) @@ -487,34 +490,78 @@ class ErrorTrackingSymbolSetViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSe data = request.data["file"].read() (storage_ptr, content_hash) = upload_content(bytearray(data)) - - release_id = request.query_params.get("release_id", None) - if release_id: - objects = ErrorTrackingRelease.objects.all().filter(team=self.team, id=release_id) - if len(objects) < 1: - raise ValueError(f"Unknown release: {release_id}") - release = objects[0] - else: - release = None - - with transaction.atomic(): - # Use update_or_create for proper upsert behavior - symbol_set, created = ErrorTrackingSymbolSet.objects.update_or_create( - team=self.team, - ref=chunk_id, - release=release, - defaults={ - "storage_ptr": storage_ptr, - "content_hash": content_hash, - "failure_reason": None, - }, - ) - - # Delete any existing frames associated with this symbol set - ErrorTrackingStackFrame.objects.filter(team=self.team, symbol_set=symbol_set).delete() + create_symbol_set(chunk_id, self.team, release_id, storage_ptr, content_hash) return Response({"ok": True}, status=status.HTTP_201_CREATED) + @action(methods=["POST"], detail=False) + def start_upload(self, request, **kwargs): + chunk_id = request.query_params.get("chunk_id", None) + release_id = request.query_params.get("release_id", None) + + if not settings.OBJECT_STORAGE_ENABLED: + raise ValidationError( + code="object_storage_required", + detail="Object storage must be available to allow source map uploads.", + ) + + if not chunk_id: + return Response({"detail": "chunk_id query parameter is required"}, status=status.HTTP_400_BAD_REQUEST) + + file_key = generate_symbol_set_file_key() + presigned_url = object_storage.get_presigned_post( + file_key=file_key, + conditions=[["content-length-range", 0, ONE_HUNDRED_MEGABYTES]], + expiration=60, + ) + + symbol_set = create_symbol_set(chunk_id, self.team, release_id, file_key) + + return Response( + {"presigned_url": presigned_url, "symbol_set_id": str(symbol_set.pk)}, status=status.HTTP_201_CREATED + ) + + @action(methods=["PUT"], detail=True, parser_classes=[JSONParser]) + def finish_upload(self, request, **kwargs): + content_hash = request.data.get("content_hash") + + if not content_hash: + raise ValidationError( + code="content_hash_required", + detail="A content hash must be provided to complete symbol set upload.", + ) + + if not settings.OBJECT_STORAGE_ENABLED: + raise ValidationError( + code="object_storage_required", + detail="Object storage must be available to allow source map uploads.", + ) + + symbol_set = self.get_object() + s3_upload = object_storage.head_object(file_key=symbol_set.storage_ptr) + + if s3_upload: + content_length = s3_upload.get("ContentLength") + + if not content_length or content_length > ONE_HUNDRED_MEGABYTES: + symbol_set.delete() + + raise ValidationError( + code="file_too_large", + detail="The uploaded symbol set file was too large.", + ) + else: + raise ValidationError( + code="file_not_found", + detail="No file has been uploaded for the symbol set.", + ) + + if not symbol_set.content_hash: + symbol_set.content_hash = content_hash + symbol_set.save() + + return Response({"success": True}, status=status.HTTP_200_OK) + class ErrorTrackingAssignmentRuleSerializer(serializers.ModelSerializer): assignee = serializers.SerializerMethodField() @@ -705,6 +752,36 @@ class ErrorTrackingSuppressionRuleViewSet(TeamAndOrgViewSetMixin, viewsets.Model return Response(serializer.data, status=status.HTTP_201_CREATED) +def create_symbol_set( + chunk_id: str, team: Team, release_id: str | None, storage_ptr: str, content_hash: Optional[str] = None +): + if release_id: + objects = ErrorTrackingRelease.objects.all().filter(team=team, id=release_id) + if len(objects) < 1: + raise ValueError(f"Unknown release: {release_id}") + release = objects[0] + else: + release = None + + with transaction.atomic(): + # Use update_or_create for proper upsert behavior + symbol_set, created = ErrorTrackingSymbolSet.objects.update_or_create( + team=team, + ref=chunk_id, + release=release, + defaults={ + "storage_ptr": storage_ptr, + "content_hash": content_hash, + "failure_reason": None, + }, + ) + + # Delete any existing frames associated with this symbol set + ErrorTrackingStackFrame.objects.filter(team=team, symbol_set=symbol_set).delete() + + return symbol_set + + def upload_symbol_set(minified: UploadedFile, source_map: UploadedFile) -> tuple[str, str]: js_data = construct_js_data_object(minified.read(), source_map.read()) return upload_content(js_data) @@ -713,22 +790,21 @@ def upload_symbol_set(minified: UploadedFile, source_map: UploadedFile) -> tuple def upload_content(content: bytearray) -> tuple[str, str]: content_hash = hashlib.sha512(content).hexdigest() - if settings.OBJECT_STORAGE_ENABLED: - # TODO - maybe a gigabyte is too much? - if len(content) > ONE_GIGABYTE: - raise ValidationError( - code="file_too_large", detail="Combined source map and symbol set must be less than 1 gigabyte" - ) - - upload_path = f"{settings.OBJECT_STORAGE_ERROR_TRACKING_SOURCE_MAPS_FOLDER}/{str(uuid7())}" - object_storage.write(upload_path, bytes(content)) - return (upload_path, content_hash) - else: + if not settings.OBJECT_STORAGE_ENABLED: raise ValidationError( code="object_storage_required", detail="Object storage must be available to allow source map uploads.", ) + if len(content) > ONE_HUNDRED_MEGABYTES: + raise ValidationError( + code="file_too_large", detail="Combined source map and symbol set must be less than 100MB" + ) + + upload_path = generate_symbol_set_file_key() + object_storage.write(upload_path, bytes(content)) + return (upload_path, content_hash) + def construct_js_data_object(minified: bytes, source_map: bytes) -> bytearray: # See rust/cymbal/hacks/js_data.rs @@ -769,3 +845,7 @@ def validate_bytecode(bytecode: list[Any]) -> None: def get_suppression_rules(team: Team): return list(ErrorTrackingSuppressionRule.objects.filter(team=team).values_list("filters", flat=True)) + + +def generate_symbol_set_file_key(): + return f"{settings.OBJECT_STORAGE_ERROR_TRACKING_SOURCE_MAPS_FOLDER}/{str(uuid7())}" diff --git a/posthog/api/test/test_error_tracking.py b/posthog/api/test/test_error_tracking.py index c07d707510..0f330d61be 100644 --- a/posthog/api/test/test_error_tracking.py +++ b/posthog/api/test/test_error_tracking.py @@ -1,6 +1,6 @@ import os from boto3 import resource - +from unittest.mock import patch from rest_framework import status from freezegun import freeze_time from django.test import override_settings @@ -165,6 +165,63 @@ class TestErrorTracking(APIBaseTest): assert ErrorTrackingIssueFingerprintV2.objects.filter(fingerprint="fingerprint_two", version=1).exists() assert ErrorTrackingIssue.objects.count() == 1 + def test_can_start_symbol_set_upload(self) -> None: + chunk_id = uuid7() + response = self.client.post( + f"/api/environments/{self.team.id}/error_tracking/symbol_sets/start_upload?chunk_id={chunk_id}" + ) + response_json = response.json() + + assert response_json["presigned_url"] is not None + + symbol_set = ErrorTrackingSymbolSet.objects.get(id=response_json["symbol_set_id"]) + assert symbol_set.content_hash is None + + def test_finish_upload_fails_if_file_not_found(self): + symbol_set = ErrorTrackingSymbolSet.objects.create( + team=self.team, ref=str(uuid7()), storage_ptr=f"symbolsets/{uuid7()}" + ) + + response = self.client.put( + f"/api/environments/{self.team.id}/error_tracking/symbol_sets/{symbol_set.pk}/finish_upload", + data={"content_hash": "this_is_a_content_hash"}, + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.json()["code"] == "file_not_found" + + @patch("posthog.storage.object_storage._client") + def test_finish_upload_fails_if_uploaded_file_is_too_large(self, patched_s3_client): + patched_s3_client.head_object.return_value = {"ContentLength": 1073741824} # 1GB + symbol_set = ErrorTrackingSymbolSet.objects.create( + team=self.team, ref=str(uuid7()), storage_ptr=f"symbolsets/{uuid7()}" + ) + + response = self.client.put( + f"/api/environments/{self.team.id}/error_tracking/symbol_sets/{symbol_set.pk}/finish_upload", + data={"content_hash": "this_is_a_content_hash"}, + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.json()["code"] == "file_too_large" + + @patch("posthog.storage.object_storage._client") + def test_finish_upload_updates_the_content_hash(self, patched_s3_client): + patched_s3_client.head_object.return_value = {"ContentLength": 1048576} # 1MB + symbol_set = ErrorTrackingSymbolSet.objects.create( + team=self.team, ref=str(uuid7()), storage_ptr=f"symbolsets/{uuid7()}" + ) + + response = self.client.put( + f"/api/environments/{self.team.id}/error_tracking/symbol_sets/{symbol_set.pk}/finish_upload", + data={"content_hash": "this_is_a_content_hash"}, + ) + + symbol_set.refresh_from_db() + + assert response.status_code == status.HTTP_200_OK + assert symbol_set.content_hash == "this_is_a_content_hash" + def test_can_upload_a_source_map(self) -> None: with self.settings(OBJECT_STORAGE_ENABLED=True, OBJECT_STORAGE_ERROR_TRACKING_SOURCE_MAPS_FOLDER=TEST_BUCKET): symbol_set = ErrorTrackingSymbolSet.objects.create( diff --git a/posthog/storage/object_storage.py b/posthog/storage/object_storage.py index 1033e07a57..1200c596bc 100644 --- a/posthog/storage/object_storage.py +++ b/posthog/storage/object_storage.py @@ -1,5 +1,5 @@ import abc -from typing import Optional, Union +from typing import Optional, Union, Any import structlog from boto3 import client @@ -21,10 +21,20 @@ class ObjectStorageClient(metaclass=abc.ABCMeta): def head_bucket(self, bucket: str) -> bool: pass + @abc.abstractmethod + def head_object(self, bucket: str, file_key: str) -> Optional[dict]: + pass + @abc.abstractmethod def get_presigned_url(self, bucket: str, file_key: str, expiration: int = 3600) -> Optional[str]: pass + @abc.abstractmethod + def get_presigned_post( + self, bucket: str, file_key: str, conditions: list[Any], expiration: int = 3600 + ) -> Optional[dict]: + pass + @abc.abstractmethod def list_objects(self, bucket: str, prefix: str) -> Optional[list[str]]: pass @@ -61,9 +71,17 @@ class UnavailableStorage(ObjectStorageClient): def head_bucket(self, bucket: str): return False + def head_object(self, bucket: str, file_key: str): + return None + def get_presigned_url(self, bucket: str, file_key: str, expiration: int = 3600) -> Optional[str]: pass + def get_presigned_post( + self, bucket: str, file_key: str, conditions: list[Any], expiration: int = 3600 + ) -> Optional[dict]: + pass + def list_objects(self, bucket: str, prefix: str) -> Optional[list[str]]: pass @@ -97,6 +115,13 @@ class ObjectStorage(ObjectStorageClient): logger.warn("object_storage.health_check_failed", bucket=bucket, error=e) return False + def head_object(self, bucket: str, file_key) -> Optional[dict]: + try: + return self.aws_client.head_object(Bucket=bucket, Key=file_key) + except Exception as e: + logger.warn("object_storage.head_object_failed", bucket=bucket, file_key=file_key, error=e) + return None + def get_presigned_url(self, bucket: str, file_key: str, expiration: int = 3600) -> Optional[str]: try: return self.aws_client.generate_presigned_url( @@ -110,6 +135,18 @@ class ObjectStorage(ObjectStorageClient): capture_exception(e) return None + def get_presigned_post( + self, bucket: str, file_key: str, conditions: list[Any], expiration: int = 3600 + ) -> Optional[dict]: + try: + return self.aws_client.generate_presigned_post( + bucket, file_key, Conditions=conditions, ExpiresIn=expiration + ) + except Exception as e: + logger.exception("object_storage.get_presigned_post_failed", file_name=file_key, error=e) + capture_exception(e) + return None + def list_objects(self, bucket: str, prefix: str) -> Optional[list[str]]: try: s3_response = self.aws_client.list_objects_v2(Bucket=bucket, Prefix=prefix) @@ -281,5 +318,15 @@ def get_presigned_url(file_key: str, expiration: int = 3600) -> Optional[str]: ) +def get_presigned_post(file_key: str, conditions: list[Any], expiration: int = 3600) -> Optional[dict]: + return object_storage_client().get_presigned_post( + bucket=settings.OBJECT_STORAGE_BUCKET, file_key=file_key, conditions=conditions, expiration=expiration + ) + + +def head_object(file_key: str, bucket: str = settings.OBJECT_STORAGE_BUCKET) -> Optional[dict]: + return object_storage_client().head_object(file_key=file_key, bucket=bucket) + + def health_check() -> bool: return object_storage_client().head_bucket(bucket=settings.OBJECT_STORAGE_BUCKET) diff --git a/posthog/storage/test/test_object_storage.py b/posthog/storage/test/test_object_storage.py index 7f8adb1bdd..48b62a41ab 100644 --- a/posthog/storage/test/test_object_storage.py +++ b/posthog/storage/test/test_object_storage.py @@ -17,6 +17,7 @@ from posthog.storage.object_storage import ( read, write, get_presigned_url, + get_presigned_post, list_objects, copy_objects, ObjectStorage, @@ -90,6 +91,18 @@ class TestStorage(APIBaseTest): presigned_url, ) + def test_can_generate_presigned_post_url(self) -> None: + with self.settings(OBJECT_STORAGE_ENABLED=True): + file_name = f"{TEST_BUCKET}/test_can_generate_presigned_upload_url/{uuid.uuid4()}" + + presigned_url = get_presigned_post(file_name, conditions=[]) + assert presigned_url is not None + assert "fields" in presigned_url + assert re.match( + r"^http://localhost:\d+/posthog", + presigned_url["url"], + ) + def test_can_list_objects_with_prefix(self) -> None: with self.settings(OBJECT_STORAGE_ENABLED=True): shared_prefix = "a_shared_prefix" diff --git a/products/error_tracking/frontend/ErrorTrackingScene.tsx b/products/error_tracking/frontend/ErrorTrackingScene.tsx index 7387926dc1..98319d0b0b 100644 --- a/products/error_tracking/frontend/ErrorTrackingScene.tsx +++ b/products/error_tracking/frontend/ErrorTrackingScene.tsx @@ -218,18 +218,27 @@ const CountColumn = ({ record, columnName }: { record: unknown; columnName: stri const Header = (): JSX.Element => { const { user } = useValues(userLogic) + const onClick = (): void => { + setInterval(() => { + throw new Error('Kaboom !') + }, 100) + } + return ( {user?.is_staff ? ( - { - posthog.captureException(new Error('Kaboom !')) - }} - > - Send an exception - + <> + { + posthog.captureException(new Error('Kaboom !')) + }} + > + Send an exception + + Start exception loop + ) : null} Documentation diff --git a/rust/.sqlx/query-4343f5351a5751f5c36aca99d8d19e7516ff5074840d1a0dad1842075d877388.json b/rust/.sqlx/query-4343f5351a5751f5c36aca99d8d19e7516ff5074840d1a0dad1842075d877388.json new file mode 100644 index 0000000000..d6f9b078a4 --- /dev/null +++ b/rust/.sqlx/query-4343f5351a5751f5c36aca99d8d19e7516ff5074840d1a0dad1842075d877388.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT group_type, team_id, group_type_index FROM posthog_grouptypemapping\n WHERE (group_type, team_id) = ANY(SELECT * FROM UNNEST($1::text[], $2::int[]))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "group_type", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "team_id", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "group_type_index", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "TextArray", + "Int4Array" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "4343f5351a5751f5c36aca99d8d19e7516ff5074840d1a0dad1842075d877388" +} diff --git a/rust/.sqlx/query-c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee.json b/rust/.sqlx/query-c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee.json deleted file mode 100644 index 608008e09a..0000000000 --- a/rust/.sqlx/query-c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT group_type_index FROM posthog_grouptypemapping WHERE group_type = $1 AND team_id = $2", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "group_type_index", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Text", - "Int4" - ] - }, - "nullable": [ - false - ] - }, - "hash": "c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee" -} diff --git a/rust/.sqlx/query-f389911c2ee51d01e5dae92f6ba120cf2186a728ba52924d36eeb22377351dcf.json b/rust/.sqlx/query-da4bb58032057ce9d01119f6ae68e6591135f7d0fab4e2d2d004bf7e2e9d2461.json similarity index 84% rename from rust/.sqlx/query-f389911c2ee51d01e5dae92f6ba120cf2186a728ba52924d36eeb22377351dcf.json rename to rust/.sqlx/query-da4bb58032057ce9d01119f6ae68e6591135f7d0fab4e2d2d004bf7e2e9d2461.json index e1b4f9e35b..4bcbc41fad 100644 --- a/rust/.sqlx/query-f389911c2ee51d01e5dae92f6ba120cf2186a728ba52924d36eeb22377351dcf.json +++ b/rust/.sqlx/query-da4bb58032057ce9d01119f6ae68e6591135f7d0fab4e2d2d004bf7e2e9d2461.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, team_id, ref as set_ref, storage_ptr, created_at, failure_reason, content_hash, last_used\n FROM posthog_errortrackingsymbolset\n WHERE team_id = $1 AND ref = $2", + "query": "SELECT id, team_id, ref as set_ref, storage_ptr, created_at, failure_reason, content_hash, last_used\n FROM posthog_errortrackingsymbolset\n WHERE (content_hash is not null OR storage_ptr is null) AND team_id = $1 AND ref = $2", "describe": { "columns": [ { @@ -61,5 +61,5 @@ true ] }, - "hash": "f389911c2ee51d01e5dae92f6ba120cf2186a728ba52924d36eeb22377351dcf" + "hash": "da4bb58032057ce9d01119f6ae68e6591135f7d0fab4e2d2d004bf7e2e9d2461" } diff --git a/rust/cymbal/src/symbol_store/chunk_id.rs b/rust/cymbal/src/symbol_store/chunk_id.rs index a9e0aa260e..d65e020ad0 100644 --- a/rust/cymbal/src/symbol_store/chunk_id.rs +++ b/rust/cymbal/src/symbol_store/chunk_id.rs @@ -267,7 +267,7 @@ mod test { storage_ptr: Some(chunk_id.clone()), failure_reason: None, created_at: Utc::now(), - content_hash: None, + content_hash: Some("fake-hash".to_string()), last_used: Some(Utc::now()), }; diff --git a/rust/cymbal/src/symbol_store/saving.rs b/rust/cymbal/src/symbol_store/saving.rs index 0a83c43e13..915cf042e7 100644 --- a/rust/cymbal/src/symbol_store/saving.rs +++ b/rust/cymbal/src/symbol_store/saving.rs @@ -262,11 +262,14 @@ impl SymbolSetRecord { team_id: i32, set_ref: &str, ) -> Result, UnhandledError> { + // Query looks a bit odd. Symbol sets are usable by cymbal if they have no storage ptr (indicating an + // unfound symbol set) or if they have a content hash (indicating a full saved symbol set). The in-between + // states (storage_ptr is not null AND content_hash is null) indicate an ongoing upload. let mut record = sqlx::query_as!( SymbolSetRecord, r#"SELECT id, team_id, ref as set_ref, storage_ptr, created_at, failure_reason, content_hash, last_used FROM posthog_errortrackingsymbolset - WHERE team_id = $1 AND ref = $2"#, + WHERE (content_hash is not null OR storage_ptr is null) AND team_id = $1 AND ref = $2"#, team_id, set_ref )