diff --git a/cli/Cargo.lock b/cli/Cargo.lock index a58892ad42..f01ac284f9 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -229,9 +229,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.27" +version = "1.2.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc" +checksum = "4ad45f4f74e4e20eaa392913b7b33a7091c87e59628f4dd27888205ad888843c" dependencies = [ "shlex", ] @@ -1474,7 +1474,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "posthog-cli" -version = "0.3.2" +version = "0.3.3" dependencies = [ "anyhow", "clap", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 1643109de1..1dd5c78bb9 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "posthog-cli" -version = "0.3.2" +version = "0.3.3" authors = [ "David ", "Olly ", diff --git a/cli/src/commands/sourcemap/upload.rs b/cli/src/commands/sourcemap/upload.rs index a8868ff7a9..00b36e9b33 100644 --- a/cli/src/commands/sourcemap/upload.rs +++ b/cli/src/commands/sourcemap/upload.rs @@ -2,7 +2,7 @@ use core::str; use std::collections::HashMap; use std::path::PathBuf; -use anyhow::{bail, Context, Ok, Result}; +use anyhow::{anyhow, bail, Context, Ok, Result}; use reqwest::blocking::multipart::{Form, Part}; use reqwest::blocking::Client; use serde::{Deserialize, Serialize}; @@ -16,21 +16,32 @@ use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair}; const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; // 100 MB -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] struct StartUploadResponseData { presigned_url: PresignedUrl, symbol_set_id: String, } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct PresignedUrl { pub url: String, pub fields: HashMap, } #[derive(Debug, Clone, Serialize, Deserialize)] -struct FinishUploadRequest { - pub content_hash: String, +struct BulkUploadStartRequest { + release_id: Option, + chunk_ids: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct BulkUploadStartResponse { + id_map: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct BulkUploadFinishRequest { + content_hashes: HashMap, } pub fn upload( @@ -101,34 +112,47 @@ fn upload_chunks( ) -> Result<()> { let client = reqwest::blocking::Client::new(); let release_id = release.map(|r| r.id.to_string()); - for upload in uploads { - info!("Uploading chunk {}", upload.chunk_id); + let chunk_ids = uploads + .iter() + .filter(|u| { + if u.data.len() > MAX_FILE_SIZE { + warn!( + "Skipping chunk {} because the file size is too large ({})", + u.chunk_id, + u.data.len() + ); + false + } else { + true + } + }) + .map(|u| u.chunk_id.clone()) + .collect::>(); - let upload_size = upload.data.len(); - if upload_size > MAX_FILE_SIZE { - warn!( - "Skipping chunk {} because the file size is too large ({})", - upload.chunk_id, upload_size - ); - continue; - } + let start_response = start_upload(&client, base_url, token, chunk_ids, &release_id)?; - let upload_response = - start_upload(&client, base_url, token, &upload.chunk_id, &release_id)?; + let mut id_map: HashMap<_, _> = uploads + .into_iter() + .map(|u| (u.chunk_id.clone(), u)) + .collect(); + + let mut content_hashes = HashMap::new(); + + for (chunk_id, data) in start_response.id_map.into_iter() { + info!("Uploading chunk {}", chunk_id); + let upload = id_map.remove(&chunk_id).ok_or(anyhow!( + "Got a chunk ID back from posthog that we didn't expect!" + ))?; let content_hash = content_hash([&upload.data]); - upload_to_s3(&client, upload_response.presigned_url, upload.data)?; + upload_to_s3(&client, data.presigned_url.clone(), upload.data)?; - finish_upload( - &client, - base_url, - token, - upload_response.symbol_set_id, - content_hash, - )?; + content_hashes.insert(data.symbol_set_id.clone(), content_hash); } + finish_upload(&client, base_url, token, content_hashes)?; + Ok(()) } @@ -136,20 +160,20 @@ fn start_upload( client: &Client, base_url: &str, auth_token: &str, - chunk_id: &str, + chunk_ids: Vec, release_id: &Option, -) -> Result { - let start_upload_url: String = format!("{}{}", base_url, "/start_upload"); +) -> Result { + let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload"); - let mut params = vec![("chunk_id", chunk_id)]; - if let Some(id) = release_id { - params.push(("release_id", id)); - } + let request = BulkUploadStartRequest { + chunk_ids, + release_id: release_id.clone(), + }; let res = client .post(&start_upload_url) .header("Authorization", format!("Bearer {}", auth_token)) - .query(¶ms) + .json(&request) .send() .context(format!("While starting upload to {}", start_upload_url))?; @@ -157,8 +181,7 @@ fn start_upload( bail!("Failed to start upload: {:?}", res); } - let data: StartUploadResponseData = res.json()?; - Ok(data) + Ok(res.json()?) } fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec) -> Result<()> { @@ -188,14 +211,13 @@ fn finish_upload( client: &Client, base_url: &str, auth_token: &str, - symbol_set_id: String, - content_hash: String, + content_hashes: HashMap, ) -> Result<()> { - let finish_upload_url: String = format!("{}/{}/{}", base_url, symbol_set_id, "finish_upload"); - let request = FinishUploadRequest { content_hash }; + let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload"); + let request = BulkUploadFinishRequest { content_hashes }; let res = client - .put(finish_upload_url) + .post(finish_upload_url) .header("Authorization", format!("Bearer {}", auth_token)) .header("Content-Type", "application/json") .json(&request) diff --git a/posthog/api/error_tracking.py b/posthog/api/error_tracking.py index 8c5e9c7232..363d818915 100644 --- a/posthog/api/error_tracking.py +++ b/posthog/api/error_tracking.py @@ -432,7 +432,15 @@ class ErrorTrackingSymbolSetViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSe queryset = ErrorTrackingSymbolSet.objects.all() serializer_class = ErrorTrackingSymbolSetSerializer parser_classes = [MultiPartParser, FileUploadParser] - scope_object_write_actions = ["start_upload", "finish_upload", "destroy", "update", "create"] + scope_object_write_actions = [ + "bulk_start_upload", + "bulk_finish_upload", + "start_upload", + "finish_upload", + "destroy", + "update", + "create", + ] def safely_get_queryset(self, queryset): queryset = queryset.filter(team_id=self.team.id) @@ -561,6 +569,89 @@ class ErrorTrackingSymbolSetViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSe return Response({"success": True}, status=status.HTTP_200_OK) + @action(methods=["POST"], detail=False, parser_classes=[JSONParser]) + def bulk_start_upload(self, request, **kwargs): + # Extract a list of chunk IDs from the request json + chunk_ids = request.data.get("chunk_ids") + # Grab the release ID from the request json + release_id = request.data.get("release_id", None) + if not chunk_ids: + return Response({"detail": "chunk_ids query parameter is required"}, status=status.HTTP_400_BAD_REQUEST) + + if not settings.OBJECT_STORAGE_ENABLED: + raise ValidationError( + code="object_storage_required", + detail="Object storage must be available to allow source map uploads.", + ) + + # For each of the chunk IDs, make a new symbol set and presigned URL + id_url_map = {} + for chunk_id in chunk_ids: + file_key = generate_symbol_set_file_key() + presigned_url = object_storage.get_presigned_post( + file_key=file_key, + conditions=[["content-length-range", 0, ONE_HUNDRED_MEGABYTES]], + expiration=60, + ) + symbol_set = create_symbol_set(chunk_id, self.team, release_id, file_key) + id_url_map[chunk_id] = {"presigned_url": presigned_url, "symbol_set_id": str(symbol_set.pk)} + + return Response({"id_map": id_url_map}, status=status.HTTP_201_CREATED) + + @action(methods=["POST"], detail=False, parser_classes=[JSONParser]) + def bulk_finish_upload(self, request, **kwargs): + # Get the map of symbol_set_id:content_hashes + content_hashes = request.data.get("content_hashes", {}) + if not content_hashes: + return Response( + {"detail": "content_hashes query parameter is required"}, status=status.HTTP_400_BAD_REQUEST + ) + + if not settings.OBJECT_STORAGE_ENABLED: + raise ValidationError( + code="object_storage_required", + detail="Object storage must be available to allow source map uploads.", + ) + + try: + for symbol_set_id, content_hash in content_hashes.items(): + symbol_set = ErrorTrackingSymbolSet.objects.get(id=symbol_set_id, team=self.team) + s3_upload = None + if symbol_set.storage_ptr: + s3_upload = object_storage.head_object(file_key=symbol_set.storage_ptr) + + if s3_upload: + content_length = s3_upload.get("ContentLength") + + if not content_length or content_length > ONE_HUNDRED_MEGABYTES: + symbol_set.delete() + + raise ValidationError( + code="file_too_large", + detail="The uploaded symbol set file was too large.", + ) + else: + raise ValidationError( + code="file_not_found", + detail="No file has been uploaded for the symbol set.", + ) + + if not symbol_set.content_hash: + symbol_set.content_hash = content_hash + symbol_set.save() + except Exception: + for id in content_hashes.keys(): + # Try to clean up the symbol sets preemptively if the upload fails + try: + symbol_set = ErrorTrackingSymbolSet.objects.all().filter(id=id, team=self.team).get() + symbol_set.delete() + except Exception: + pass + + raise + + return Response({"success": True}, status=status.HTTP_201_CREATED) + class ErrorTrackingAssignmentRuleSerializer(serializers.ModelSerializer): assignee = serializers.SerializerMethodField()