feat: presigned upload flow (#33746)

This commit is contained in:
David Newell
2025-06-17 16:52:32 +01:00
committed by GitHub
parent eb89e9075b
commit 8c2ef1c587
13 changed files with 459 additions and 136 deletions

64
cli/Cargo.lock generated
View File

@@ -229,9 +229,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.26"
version = "1.2.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956a5e21988b87f372569b66183b78babf23ebc2e744b733e4350a752c4dafac"
checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc"
dependencies = [
"shlex",
]
@@ -509,7 +509,7 @@ dependencies = [
"libc",
"option-ext",
"redox_users",
"windows-sys 0.60.1",
"windows-sys 0.60.2",
]
[[package]]
@@ -889,7 +889,7 @@ dependencies = [
"http 1.3.1",
"hyper 1.6.0",
"hyper-util",
"rustls 0.23.27",
"rustls 0.23.28",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.2",
@@ -1171,9 +1171,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.172"
version = "0.2.173"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
checksum = "d8cfeafaffdbc32176b64fb251369d52ea9f0a8fbc6f8759edffef7b525d64bb"
[[package]]
name = "libredox"
@@ -1463,7 +1463,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "posthog-cli"
version = "0.2.0"
version = "0.3.0"
dependencies = [
"anyhow",
"clap",
@@ -1553,7 +1553,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls 0.23.27",
"rustls 0.23.28",
"socket2",
"thiserror 2.0.12",
"tokio",
@@ -1573,7 +1573,7 @@ dependencies = [
"rand",
"ring",
"rustc-hash",
"rustls 0.23.27",
"rustls 0.23.28",
"rustls-pki-types",
"slab",
"thiserror 2.0.12",
@@ -1607,9 +1607,9 @@ dependencies = [
[[package]]
name = "r-efi"
version = "5.2.0"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "radium"
@@ -1669,9 +1669,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.5.12"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af"
checksum = "0d04b7d0ee6b4a0207a0a7adb104d23ecb0b47d6beae7152d0fa34b692b29fd6"
dependencies = [
"bitflags 2.9.1",
]
@@ -1795,7 +1795,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls 0.23.27",
"rustls 0.23.28",
"rustls-pki-types",
"serde",
"serde_json",
@@ -1879,9 +1879,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.27"
version = "0.23.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321"
checksum = "7160e3e10bf4535308537f3c4e1641468cd0e485175d6163087c0393c7d46643"
dependencies = [
"once_cell",
"ring",
@@ -2077,12 +2077,9 @@ dependencies = [
[[package]]
name = "slab"
version = "0.4.9"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d"
[[package]]
name = "smallvec"
@@ -2187,9 +2184,9 @@ checksum = "b7401a30af6cb5818bb64852270bb722533397edcfc7344954a38f420819ece2"
[[package]]
name = "syn"
version = "2.0.102"
version = "2.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6397daf94fa90f058bd0fd88429dd9e5738999cca8d701813c80723add80462"
checksum = "e4307e30089d6fd6aff212f2da3a1f9e32f3223b1f010fb09b7c95f90f3ca1e8"
dependencies = [
"proc-macro2",
"quote",
@@ -2333,12 +2330,11 @@ dependencies = [
[[package]]
name = "thread_local"
version = "1.1.8"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]]
@@ -2397,7 +2393,7 @@ version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
dependencies = [
"rustls 0.23.27",
"rustls 0.23.28",
"tokio",
]
@@ -2869,9 +2865,9 @@ dependencies = [
[[package]]
name = "windows-link"
version = "0.1.2"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3bfe459f85da17560875b8bf1423d6f113b7a87a5d942e7da0ac71be7c61f8b"
checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
[[package]]
name = "windows-result"
@@ -2920,11 +2916,11 @@ dependencies = [
[[package]]
name = "windows-sys"
version = "0.60.1"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b36e9ed89376c545e20cbf5a13c306b49106b21b9d1d4f9cb9a1cb6b1e9ee06a"
checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
dependencies = [
"windows-targets 0.53.1",
"windows-targets 0.53.2",
]
[[package]]
@@ -2960,9 +2956,9 @@ dependencies = [
[[package]]
name = "windows-targets"
version = "0.53.1"
version = "0.53.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30357ec391cde730f8fbfcdc29adc47518b06504528df977ab5af02ef23fdee9"
checksum = "c66f69fcc9ce11da9966ddb31a40968cad001c5bedeb5c2b82ede4253ab48aef"
dependencies = [
"windows_aarch64_gnullvm 0.53.0",
"windows_aarch64_msvc 0.53.0",

View File

@@ -1,6 +1,6 @@
[package]
name = "posthog-cli"
version = "0.2.0"
version = "0.3.0"
authors = [
"David <david@posthog.com>",
"Olly <oliver@posthog.com>",

View File

@@ -1,15 +1,38 @@
use core::str;
use std::collections::HashMap;
use std::path::PathBuf;
use anyhow::{anyhow, Context, Ok, Result};
use anyhow::{bail, Context, Ok, Result};
use reqwest::blocking::multipart::{Form, Part};
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
use sha2::Digest;
use tracing::info;
use tracing::{info, warn};
use crate::utils::auth::load_token;
use crate::utils::posthog::capture_command_invoked;
use crate::utils::release::{create_release, CreateReleaseResponse};
use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair};
const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; // 100 MB
#[derive(Debug, Deserialize)]
struct StartUploadResponseData {
presigned_url: PresignedUrl,
symbol_set_id: String,
}
#[derive(Deserialize, Debug)]
pub struct PresignedUrl {
pub url: String,
pub fields: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct FinishUploadRequest {
pub content_hash: String,
}
pub fn upload(
host: Option<String>,
directory: &PathBuf,
@@ -22,7 +45,7 @@ pub fn upload(
let capture_handle = capture_command_invoked("sourcemap_upload", Some(&token.env_id));
let url = format!(
let base_url = format!(
"{}/api/environments/{}/error_tracking/symbol_sets",
host, token.env_id
);
@@ -45,13 +68,13 @@ pub fn upload(
&host,
&token,
Some(directory.clone()),
Some(content_hash(&uploads)),
Some(content_hash(uploads.iter().map(|upload| &upload.data))),
project,
version,
)
.context("While creating release")?;
upload_chunks(&url, &token.token, uploads, release.as_ref())?;
upload_chunks(&base_url, &token.token, uploads, release.as_ref())?;
if delete_after {
delete_files(sourcemap_paths).context("While deleting sourcemaps")?;
@@ -71,7 +94,7 @@ fn collect_uploads(pairs: Vec<SourcePair>) -> Result<Vec<ChunkUpload>> {
}
fn upload_chunks(
url: &str,
base_url: &str,
token: &str,
uploads: Vec<ChunkUpload>,
release: Option<&CreateReleaseResponse>,
@@ -81,36 +104,119 @@ fn upload_chunks(
for upload in uploads {
info!("Uploading chunk {}", upload.chunk_id);
let mut params: Vec<(&'static str, &str)> =
vec![("chunk_id", &upload.chunk_id), ("multipart", "true")];
if let Some(id) = &release_id {
params.push(("release_id", id));
let upload_size = upload.data.len();
if upload_size > MAX_FILE_SIZE {
warn!(
"Skipping chunk {} because the file size is too large ({})",
upload.chunk_id, upload_size
);
continue;
}
let part = reqwest::blocking::multipart::Part::bytes(upload.data).file_name("file");
let form = reqwest::blocking::multipart::Form::new().part("file", part);
let upload_response =
start_upload(&client, base_url, token, &upload.chunk_id, &release_id)?;
let res = client
.post(url)
.multipart(form)
.header("Authorization", format!("Bearer {}", token))
.query(&params)
.send()
.context(format!("While uploading chunk to {}", url))?;
let content_hash = content_hash([&upload.data]);
if !res.status().is_success() {
return Err(anyhow!("Failed to upload chunk: {:?}", res)
.context(format!("Chunk id: {}", upload.chunk_id)));
}
upload_to_s3(&client, upload_response.presigned_url, upload.data)?;
finish_upload(
&client,
base_url,
token,
upload_response.symbol_set_id,
content_hash,
)?;
}
Ok(())
}
fn content_hash(uploads: &[ChunkUpload]) -> String {
fn start_upload(
client: &Client,
base_url: &str,
auth_token: &str,
chunk_id: &str,
release_id: &Option<String>,
) -> Result<StartUploadResponseData> {
let start_upload_url: String = format!("{}{}", base_url, "/start_upload");
let mut params = vec![("chunk_id", chunk_id)];
if let Some(id) = release_id {
params.push(("release_id", id));
}
let res = client
.post(&start_upload_url)
.header("Authorization", format!("Bearer {}", auth_token))
.query(&params)
.send()
.context(format!("While starting upload to {}", start_upload_url))?;
if !res.status().is_success() {
bail!("Failed to start upload: {:?}", res);
}
let data: StartUploadResponseData = res.json()?;
Ok(data)
}
fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec<u8>) -> Result<()> {
let mut form = Form::new();
for (key, value) in presigned_url.fields {
form = form.text(key.clone(), value.clone());
}
let part = Part::bytes(data);
form = form.part("file", part);
let res = client
.post(&presigned_url.url)
.multipart(form)
.send()
.context(format!("While uploading chunk to {}", presigned_url.url))?;
if !res.status().is_success() {
bail!("Failed to upload chunk: {:?}", res);
}
Ok(())
}
fn finish_upload(
client: &Client,
base_url: &str,
auth_token: &str,
symbol_set_id: String,
content_hash: String,
) -> Result<()> {
let finish_upload_url: String = format!("{}/{}/{}", base_url, symbol_set_id, "finish_upload");
let request = FinishUploadRequest { content_hash };
let res = client
.put(finish_upload_url)
.header("Authorization", format!("Bearer {}", auth_token))
.header("Content-Type", "application/json")
.json(&request)
.send()
.context(format!("While finishing upload to {}", base_url))?;
if !res.status().is_success() {
bail!("Failed to finish upload: {:?}", res);
}
Ok(())
}
fn content_hash<Iter, Item>(upload_data: Iter) -> String
where
Iter: IntoIterator<Item = Item>,
Item: AsRef<[u8]>,
{
let mut hasher = sha2::Sha512::new();
for upload in uploads {
hasher.update(&upload.data);
for data in upload_data {
hasher.update(data.as_ref());
}
format!("{:x}", hasher.finalize())
}

View File

@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Optional
from django.core.files.uploadedfile import UploadedFile
from posthog.models.team.team import Team
@@ -8,7 +8,7 @@ import hashlib
from rest_framework import serializers, viewsets, status, request
from rest_framework.response import Response
from rest_framework.exceptions import ValidationError
from rest_framework.parsers import MultiPartParser, FileUploadParser
from rest_framework.parsers import MultiPartParser, FileUploadParser, JSONParser
from django.http import JsonResponse
from django.conf import settings
@@ -44,7 +44,7 @@ from posthog.tasks.email import send_error_tracking_issue_assigned
from posthog.hogql.compiler.bytecode import create_bytecode
from posthog.schema import PropertyGroupFilterValue
ONE_GIGABYTE = 1024 * 1024 * 1024
ONE_HUNDRED_MEGABYTES = 1024 * 1024 * 100
JS_DATA_MAGIC = b"posthog_error_tracking"
JS_DATA_VERSION = 1
JS_DATA_TYPE_SOURCE_AND_MAP = 2
@@ -433,6 +433,7 @@ class ErrorTrackingSymbolSetViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSe
queryset = ErrorTrackingSymbolSet.objects.all()
serializer_class = ErrorTrackingSymbolSetSerializer
parser_classes = [MultiPartParser, FileUploadParser]
scope_object_write_actions = ["start_upload", "finish_upload"]
def safely_get_queryset(self, queryset):
queryset = queryset.filter(team_id=self.team.id)
@@ -474,6 +475,8 @@ class ErrorTrackingSymbolSetViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSe
# pull the symbol set reference from the query params
chunk_id = request.query_params.get("chunk_id", None)
multipart = request.query_params.get("multipart", False)
release_id = request.query_params.get("release_id", None)
if not chunk_id:
return Response({"detail": "chunk_id query parameter is required"}, status=status.HTTP_400_BAD_REQUEST)
@@ -487,34 +490,78 @@ class ErrorTrackingSymbolSetViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSe
data = request.data["file"].read()
(storage_ptr, content_hash) = upload_content(bytearray(data))
release_id = request.query_params.get("release_id", None)
if release_id:
objects = ErrorTrackingRelease.objects.all().filter(team=self.team, id=release_id)
if len(objects) < 1:
raise ValueError(f"Unknown release: {release_id}")
release = objects[0]
else:
release = None
with transaction.atomic():
# Use update_or_create for proper upsert behavior
symbol_set, created = ErrorTrackingSymbolSet.objects.update_or_create(
team=self.team,
ref=chunk_id,
release=release,
defaults={
"storage_ptr": storage_ptr,
"content_hash": content_hash,
"failure_reason": None,
},
)
# Delete any existing frames associated with this symbol set
ErrorTrackingStackFrame.objects.filter(team=self.team, symbol_set=symbol_set).delete()
create_symbol_set(chunk_id, self.team, release_id, storage_ptr, content_hash)
return Response({"ok": True}, status=status.HTTP_201_CREATED)
@action(methods=["POST"], detail=False)
def start_upload(self, request, **kwargs):
chunk_id = request.query_params.get("chunk_id", None)
release_id = request.query_params.get("release_id", None)
if not settings.OBJECT_STORAGE_ENABLED:
raise ValidationError(
code="object_storage_required",
detail="Object storage must be available to allow source map uploads.",
)
if not chunk_id:
return Response({"detail": "chunk_id query parameter is required"}, status=status.HTTP_400_BAD_REQUEST)
file_key = generate_symbol_set_file_key()
presigned_url = object_storage.get_presigned_post(
file_key=file_key,
conditions=[["content-length-range", 0, ONE_HUNDRED_MEGABYTES]],
expiration=60,
)
symbol_set = create_symbol_set(chunk_id, self.team, release_id, file_key)
return Response(
{"presigned_url": presigned_url, "symbol_set_id": str(symbol_set.pk)}, status=status.HTTP_201_CREATED
)
@action(methods=["PUT"], detail=True, parser_classes=[JSONParser])
def finish_upload(self, request, **kwargs):
content_hash = request.data.get("content_hash")
if not content_hash:
raise ValidationError(
code="content_hash_required",
detail="A content hash must be provided to complete symbol set upload.",
)
if not settings.OBJECT_STORAGE_ENABLED:
raise ValidationError(
code="object_storage_required",
detail="Object storage must be available to allow source map uploads.",
)
symbol_set = self.get_object()
s3_upload = object_storage.head_object(file_key=symbol_set.storage_ptr)
if s3_upload:
content_length = s3_upload.get("ContentLength")
if not content_length or content_length > ONE_HUNDRED_MEGABYTES:
symbol_set.delete()
raise ValidationError(
code="file_too_large",
detail="The uploaded symbol set file was too large.",
)
else:
raise ValidationError(
code="file_not_found",
detail="No file has been uploaded for the symbol set.",
)
if not symbol_set.content_hash:
symbol_set.content_hash = content_hash
symbol_set.save()
return Response({"success": True}, status=status.HTTP_200_OK)
class ErrorTrackingAssignmentRuleSerializer(serializers.ModelSerializer):
assignee = serializers.SerializerMethodField()
@@ -705,6 +752,36 @@ class ErrorTrackingSuppressionRuleViewSet(TeamAndOrgViewSetMixin, viewsets.Model
return Response(serializer.data, status=status.HTTP_201_CREATED)
def create_symbol_set(
chunk_id: str, team: Team, release_id: str | None, storage_ptr: str, content_hash: Optional[str] = None
):
if release_id:
objects = ErrorTrackingRelease.objects.all().filter(team=team, id=release_id)
if len(objects) < 1:
raise ValueError(f"Unknown release: {release_id}")
release = objects[0]
else:
release = None
with transaction.atomic():
# Use update_or_create for proper upsert behavior
symbol_set, created = ErrorTrackingSymbolSet.objects.update_or_create(
team=team,
ref=chunk_id,
release=release,
defaults={
"storage_ptr": storage_ptr,
"content_hash": content_hash,
"failure_reason": None,
},
)
# Delete any existing frames associated with this symbol set
ErrorTrackingStackFrame.objects.filter(team=team, symbol_set=symbol_set).delete()
return symbol_set
def upload_symbol_set(minified: UploadedFile, source_map: UploadedFile) -> tuple[str, str]:
js_data = construct_js_data_object(minified.read(), source_map.read())
return upload_content(js_data)
@@ -713,22 +790,21 @@ def upload_symbol_set(minified: UploadedFile, source_map: UploadedFile) -> tuple
def upload_content(content: bytearray) -> tuple[str, str]:
content_hash = hashlib.sha512(content).hexdigest()
if settings.OBJECT_STORAGE_ENABLED:
# TODO - maybe a gigabyte is too much?
if len(content) > ONE_GIGABYTE:
raise ValidationError(
code="file_too_large", detail="Combined source map and symbol set must be less than 1 gigabyte"
)
upload_path = f"{settings.OBJECT_STORAGE_ERROR_TRACKING_SOURCE_MAPS_FOLDER}/{str(uuid7())}"
object_storage.write(upload_path, bytes(content))
return (upload_path, content_hash)
else:
if not settings.OBJECT_STORAGE_ENABLED:
raise ValidationError(
code="object_storage_required",
detail="Object storage must be available to allow source map uploads.",
)
if len(content) > ONE_HUNDRED_MEGABYTES:
raise ValidationError(
code="file_too_large", detail="Combined source map and symbol set must be less than 100MB"
)
upload_path = generate_symbol_set_file_key()
object_storage.write(upload_path, bytes(content))
return (upload_path, content_hash)
def construct_js_data_object(minified: bytes, source_map: bytes) -> bytearray:
# See rust/cymbal/hacks/js_data.rs
@@ -769,3 +845,7 @@ def validate_bytecode(bytecode: list[Any]) -> None:
def get_suppression_rules(team: Team):
return list(ErrorTrackingSuppressionRule.objects.filter(team=team).values_list("filters", flat=True))
def generate_symbol_set_file_key():
return f"{settings.OBJECT_STORAGE_ERROR_TRACKING_SOURCE_MAPS_FOLDER}/{str(uuid7())}"

View File

@@ -1,6 +1,6 @@
import os
from boto3 import resource
from unittest.mock import patch
from rest_framework import status
from freezegun import freeze_time
from django.test import override_settings
@@ -165,6 +165,63 @@ class TestErrorTracking(APIBaseTest):
assert ErrorTrackingIssueFingerprintV2.objects.filter(fingerprint="fingerprint_two", version=1).exists()
assert ErrorTrackingIssue.objects.count() == 1
def test_can_start_symbol_set_upload(self) -> None:
chunk_id = uuid7()
response = self.client.post(
f"/api/environments/{self.team.id}/error_tracking/symbol_sets/start_upload?chunk_id={chunk_id}"
)
response_json = response.json()
assert response_json["presigned_url"] is not None
symbol_set = ErrorTrackingSymbolSet.objects.get(id=response_json["symbol_set_id"])
assert symbol_set.content_hash is None
def test_finish_upload_fails_if_file_not_found(self):
symbol_set = ErrorTrackingSymbolSet.objects.create(
team=self.team, ref=str(uuid7()), storage_ptr=f"symbolsets/{uuid7()}"
)
response = self.client.put(
f"/api/environments/{self.team.id}/error_tracking/symbol_sets/{symbol_set.pk}/finish_upload",
data={"content_hash": "this_is_a_content_hash"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()["code"] == "file_not_found"
@patch("posthog.storage.object_storage._client")
def test_finish_upload_fails_if_uploaded_file_is_too_large(self, patched_s3_client):
patched_s3_client.head_object.return_value = {"ContentLength": 1073741824} # 1GB
symbol_set = ErrorTrackingSymbolSet.objects.create(
team=self.team, ref=str(uuid7()), storage_ptr=f"symbolsets/{uuid7()}"
)
response = self.client.put(
f"/api/environments/{self.team.id}/error_tracking/symbol_sets/{symbol_set.pk}/finish_upload",
data={"content_hash": "this_is_a_content_hash"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()["code"] == "file_too_large"
@patch("posthog.storage.object_storage._client")
def test_finish_upload_updates_the_content_hash(self, patched_s3_client):
patched_s3_client.head_object.return_value = {"ContentLength": 1048576} # 1MB
symbol_set = ErrorTrackingSymbolSet.objects.create(
team=self.team, ref=str(uuid7()), storage_ptr=f"symbolsets/{uuid7()}"
)
response = self.client.put(
f"/api/environments/{self.team.id}/error_tracking/symbol_sets/{symbol_set.pk}/finish_upload",
data={"content_hash": "this_is_a_content_hash"},
)
symbol_set.refresh_from_db()
assert response.status_code == status.HTTP_200_OK
assert symbol_set.content_hash == "this_is_a_content_hash"
def test_can_upload_a_source_map(self) -> None:
with self.settings(OBJECT_STORAGE_ENABLED=True, OBJECT_STORAGE_ERROR_TRACKING_SOURCE_MAPS_FOLDER=TEST_BUCKET):
symbol_set = ErrorTrackingSymbolSet.objects.create(

View File

@@ -1,5 +1,5 @@
import abc
from typing import Optional, Union
from typing import Optional, Union, Any
import structlog
from boto3 import client
@@ -21,10 +21,20 @@ class ObjectStorageClient(metaclass=abc.ABCMeta):
def head_bucket(self, bucket: str) -> bool:
pass
@abc.abstractmethod
def head_object(self, bucket: str, file_key: str) -> Optional[dict]:
pass
@abc.abstractmethod
def get_presigned_url(self, bucket: str, file_key: str, expiration: int = 3600) -> Optional[str]:
pass
@abc.abstractmethod
def get_presigned_post(
self, bucket: str, file_key: str, conditions: list[Any], expiration: int = 3600
) -> Optional[dict]:
pass
@abc.abstractmethod
def list_objects(self, bucket: str, prefix: str) -> Optional[list[str]]:
pass
@@ -61,9 +71,17 @@ class UnavailableStorage(ObjectStorageClient):
def head_bucket(self, bucket: str):
return False
def head_object(self, bucket: str, file_key: str):
return None
def get_presigned_url(self, bucket: str, file_key: str, expiration: int = 3600) -> Optional[str]:
pass
def get_presigned_post(
self, bucket: str, file_key: str, conditions: list[Any], expiration: int = 3600
) -> Optional[dict]:
pass
def list_objects(self, bucket: str, prefix: str) -> Optional[list[str]]:
pass
@@ -97,6 +115,13 @@ class ObjectStorage(ObjectStorageClient):
logger.warn("object_storage.health_check_failed", bucket=bucket, error=e)
return False
def head_object(self, bucket: str, file_key) -> Optional[dict]:
try:
return self.aws_client.head_object(Bucket=bucket, Key=file_key)
except Exception as e:
logger.warn("object_storage.head_object_failed", bucket=bucket, file_key=file_key, error=e)
return None
def get_presigned_url(self, bucket: str, file_key: str, expiration: int = 3600) -> Optional[str]:
try:
return self.aws_client.generate_presigned_url(
@@ -110,6 +135,18 @@ class ObjectStorage(ObjectStorageClient):
capture_exception(e)
return None
def get_presigned_post(
self, bucket: str, file_key: str, conditions: list[Any], expiration: int = 3600
) -> Optional[dict]:
try:
return self.aws_client.generate_presigned_post(
bucket, file_key, Conditions=conditions, ExpiresIn=expiration
)
except Exception as e:
logger.exception("object_storage.get_presigned_post_failed", file_name=file_key, error=e)
capture_exception(e)
return None
def list_objects(self, bucket: str, prefix: str) -> Optional[list[str]]:
try:
s3_response = self.aws_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
@@ -281,5 +318,15 @@ def get_presigned_url(file_key: str, expiration: int = 3600) -> Optional[str]:
)
def get_presigned_post(file_key: str, conditions: list[Any], expiration: int = 3600) -> Optional[dict]:
return object_storage_client().get_presigned_post(
bucket=settings.OBJECT_STORAGE_BUCKET, file_key=file_key, conditions=conditions, expiration=expiration
)
def head_object(file_key: str, bucket: str = settings.OBJECT_STORAGE_BUCKET) -> Optional[dict]:
return object_storage_client().head_object(file_key=file_key, bucket=bucket)
def health_check() -> bool:
return object_storage_client().head_bucket(bucket=settings.OBJECT_STORAGE_BUCKET)

View File

@@ -17,6 +17,7 @@ from posthog.storage.object_storage import (
read,
write,
get_presigned_url,
get_presigned_post,
list_objects,
copy_objects,
ObjectStorage,
@@ -90,6 +91,18 @@ class TestStorage(APIBaseTest):
presigned_url,
)
def test_can_generate_presigned_post_url(self) -> None:
with self.settings(OBJECT_STORAGE_ENABLED=True):
file_name = f"{TEST_BUCKET}/test_can_generate_presigned_upload_url/{uuid.uuid4()}"
presigned_url = get_presigned_post(file_name, conditions=[])
assert presigned_url is not None
assert "fields" in presigned_url
assert re.match(
r"^http://localhost:\d+/posthog",
presigned_url["url"],
)
def test_can_list_objects_with_prefix(self) -> None:
with self.settings(OBJECT_STORAGE_ENABLED=True):
shared_prefix = "a_shared_prefix"

View File

@@ -218,18 +218,27 @@ const CountColumn = ({ record, columnName }: { record: unknown; columnName: stri
const Header = (): JSX.Element => {
const { user } = useValues(userLogic)
const onClick = (): void => {
setInterval(() => {
throw new Error('Kaboom !')
}, 100)
}
return (
<PageHeader
buttons={
<>
{user?.is_staff ? (
<LemonButton
onClick={() => {
posthog.captureException(new Error('Kaboom !'))
}}
>
Send an exception
</LemonButton>
<>
<LemonButton
onClick={() => {
posthog.captureException(new Error('Kaboom !'))
}}
>
Send an exception
</LemonButton>
<LemonButton onClick={onClick}>Start exception loop</LemonButton>
</>
) : null}
<LemonButton to="https://posthog.com/docs/error-tracking" type="secondary" targetBlank>
Documentation

View File

@@ -0,0 +1,35 @@
{
"db_name": "PostgreSQL",
"query": "SELECT group_type, team_id, group_type_index FROM posthog_grouptypemapping\n WHERE (group_type, team_id) = ANY(SELECT * FROM UNNEST($1::text[], $2::int[]))",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "group_type",
"type_info": "Varchar"
},
{
"ordinal": 1,
"name": "team_id",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "group_type_index",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"TextArray",
"Int4Array"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "4343f5351a5751f5c36aca99d8d19e7516ff5074840d1a0dad1842075d877388"
}

View File

@@ -1,23 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT group_type_index FROM posthog_grouptypemapping WHERE group_type = $1 AND team_id = $2",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "group_type_index",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text",
"Int4"
]
},
"nullable": [
false
]
},
"hash": "c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT id, team_id, ref as set_ref, storage_ptr, created_at, failure_reason, content_hash, last_used\n FROM posthog_errortrackingsymbolset\n WHERE team_id = $1 AND ref = $2",
"query": "SELECT id, team_id, ref as set_ref, storage_ptr, created_at, failure_reason, content_hash, last_used\n FROM posthog_errortrackingsymbolset\n WHERE (content_hash is not null OR storage_ptr is null) AND team_id = $1 AND ref = $2",
"describe": {
"columns": [
{
@@ -61,5 +61,5 @@
true
]
},
"hash": "f389911c2ee51d01e5dae92f6ba120cf2186a728ba52924d36eeb22377351dcf"
"hash": "da4bb58032057ce9d01119f6ae68e6591135f7d0fab4e2d2d004bf7e2e9d2461"
}

View File

@@ -267,7 +267,7 @@ mod test {
storage_ptr: Some(chunk_id.clone()),
failure_reason: None,
created_at: Utc::now(),
content_hash: None,
content_hash: Some("fake-hash".to_string()),
last_used: Some(Utc::now()),
};

View File

@@ -262,11 +262,14 @@ impl SymbolSetRecord {
team_id: i32,
set_ref: &str,
) -> Result<Option<Self>, UnhandledError> {
// Query looks a bit odd. Symbol sets are usable by cymbal if they have no storage ptr (indicating an
// unfound symbol set) or if they have a content hash (indicating a full saved symbol set). The in-between
// states (storage_ptr is not null AND content_hash is null) indicate an ongoing upload.
let mut record = sqlx::query_as!(
SymbolSetRecord,
r#"SELECT id, team_id, ref as set_ref, storage_ptr, created_at, failure_reason, content_hash, last_used
FROM posthog_errortrackingsymbolset
WHERE team_id = $1 AND ref = $2"#,
WHERE (content_hash is not null OR storage_ptr is null) AND team_id = $1 AND ref = $2"#,
team_id,
set_ref
)