mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
fix(err): uploads with fewer api calls (#34564)
This commit is contained in:
6
cli/Cargo.lock
generated
6
cli/Cargo.lock
generated
@@ -229,9 +229,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.27"
|
||||
version = "1.2.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc"
|
||||
checksum = "4ad45f4f74e4e20eaa392913b7b33a7091c87e59628f4dd27888205ad888843c"
|
||||
dependencies = [
|
||||
"shlex",
|
||||
]
|
||||
@@ -1474,7 +1474,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "posthog-cli"
|
||||
version = "0.3.2"
|
||||
version = "0.3.3"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "posthog-cli"
|
||||
version = "0.3.2"
|
||||
version = "0.3.3"
|
||||
authors = [
|
||||
"David <david@posthog.com>",
|
||||
"Olly <oliver@posthog.com>",
|
||||
|
||||
@@ -2,7 +2,7 @@ use core::str;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::{bail, Context, Ok, Result};
|
||||
use anyhow::{anyhow, bail, Context, Ok, Result};
|
||||
use reqwest::blocking::multipart::{Form, Part};
|
||||
use reqwest::blocking::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -16,21 +16,32 @@ use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair};
|
||||
|
||||
const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; // 100 MB
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct StartUploadResponseData {
|
||||
presigned_url: PresignedUrl,
|
||||
symbol_set_id: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct PresignedUrl {
|
||||
pub url: String,
|
||||
pub fields: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct FinishUploadRequest {
|
||||
pub content_hash: String,
|
||||
struct BulkUploadStartRequest {
|
||||
release_id: Option<String>,
|
||||
chunk_ids: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct BulkUploadStartResponse {
|
||||
id_map: HashMap<String, StartUploadResponseData>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct BulkUploadFinishRequest {
|
||||
content_hashes: HashMap<String, String>,
|
||||
}
|
||||
|
||||
pub fn upload(
|
||||
@@ -101,34 +112,47 @@ fn upload_chunks(
|
||||
) -> Result<()> {
|
||||
let client = reqwest::blocking::Client::new();
|
||||
let release_id = release.map(|r| r.id.to_string());
|
||||
for upload in uploads {
|
||||
info!("Uploading chunk {}", upload.chunk_id);
|
||||
let chunk_ids = uploads
|
||||
.iter()
|
||||
.filter(|u| {
|
||||
if u.data.len() > MAX_FILE_SIZE {
|
||||
warn!(
|
||||
"Skipping chunk {} because the file size is too large ({})",
|
||||
u.chunk_id,
|
||||
u.data.len()
|
||||
);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.map(|u| u.chunk_id.clone())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
let upload_size = upload.data.len();
|
||||
if upload_size > MAX_FILE_SIZE {
|
||||
warn!(
|
||||
"Skipping chunk {} because the file size is too large ({})",
|
||||
upload.chunk_id, upload_size
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let start_response = start_upload(&client, base_url, token, chunk_ids, &release_id)?;
|
||||
|
||||
let upload_response =
|
||||
start_upload(&client, base_url, token, &upload.chunk_id, &release_id)?;
|
||||
let mut id_map: HashMap<_, _> = uploads
|
||||
.into_iter()
|
||||
.map(|u| (u.chunk_id.clone(), u))
|
||||
.collect();
|
||||
|
||||
let mut content_hashes = HashMap::new();
|
||||
|
||||
for (chunk_id, data) in start_response.id_map.into_iter() {
|
||||
info!("Uploading chunk {}", chunk_id);
|
||||
let upload = id_map.remove(&chunk_id).ok_or(anyhow!(
|
||||
"Got a chunk ID back from posthog that we didn't expect!"
|
||||
))?;
|
||||
|
||||
let content_hash = content_hash([&upload.data]);
|
||||
|
||||
upload_to_s3(&client, upload_response.presigned_url, upload.data)?;
|
||||
upload_to_s3(&client, data.presigned_url.clone(), upload.data)?;
|
||||
|
||||
finish_upload(
|
||||
&client,
|
||||
base_url,
|
||||
token,
|
||||
upload_response.symbol_set_id,
|
||||
content_hash,
|
||||
)?;
|
||||
content_hashes.insert(data.symbol_set_id.clone(), content_hash);
|
||||
}
|
||||
|
||||
finish_upload(&client, base_url, token, content_hashes)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -136,20 +160,20 @@ fn start_upload(
|
||||
client: &Client,
|
||||
base_url: &str,
|
||||
auth_token: &str,
|
||||
chunk_id: &str,
|
||||
chunk_ids: Vec<String>,
|
||||
release_id: &Option<String>,
|
||||
) -> Result<StartUploadResponseData> {
|
||||
let start_upload_url: String = format!("{}{}", base_url, "/start_upload");
|
||||
) -> Result<BulkUploadStartResponse> {
|
||||
let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload");
|
||||
|
||||
let mut params = vec![("chunk_id", chunk_id)];
|
||||
if let Some(id) = release_id {
|
||||
params.push(("release_id", id));
|
||||
}
|
||||
let request = BulkUploadStartRequest {
|
||||
chunk_ids,
|
||||
release_id: release_id.clone(),
|
||||
};
|
||||
|
||||
let res = client
|
||||
.post(&start_upload_url)
|
||||
.header("Authorization", format!("Bearer {}", auth_token))
|
||||
.query(¶ms)
|
||||
.json(&request)
|
||||
.send()
|
||||
.context(format!("While starting upload to {}", start_upload_url))?;
|
||||
|
||||
@@ -157,8 +181,7 @@ fn start_upload(
|
||||
bail!("Failed to start upload: {:?}", res);
|
||||
}
|
||||
|
||||
let data: StartUploadResponseData = res.json()?;
|
||||
Ok(data)
|
||||
Ok(res.json()?)
|
||||
}
|
||||
|
||||
fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec<u8>) -> Result<()> {
|
||||
@@ -188,14 +211,13 @@ fn finish_upload(
|
||||
client: &Client,
|
||||
base_url: &str,
|
||||
auth_token: &str,
|
||||
symbol_set_id: String,
|
||||
content_hash: String,
|
||||
content_hashes: HashMap<String, String>,
|
||||
) -> Result<()> {
|
||||
let finish_upload_url: String = format!("{}/{}/{}", base_url, symbol_set_id, "finish_upload");
|
||||
let request = FinishUploadRequest { content_hash };
|
||||
let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload");
|
||||
let request = BulkUploadFinishRequest { content_hashes };
|
||||
|
||||
let res = client
|
||||
.put(finish_upload_url)
|
||||
.post(finish_upload_url)
|
||||
.header("Authorization", format!("Bearer {}", auth_token))
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&request)
|
||||
|
||||
Reference in New Issue
Block a user