mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
fix: use par-iter for s3 uploads (#38174)
This commit is contained in:
46
cli/Cargo.lock
generated
46
cli/Cargo.lock
generated
@@ -358,6 +358,31 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-deque"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
|
||||
dependencies = [
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.9.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
|
||||
|
||||
[[package]]
|
||||
name = "crossterm"
|
||||
version = "0.25.0"
|
||||
@@ -1511,6 +1536,7 @@ dependencies = [
|
||||
"posthog-rs",
|
||||
"posthog-symbol-data",
|
||||
"ratatui",
|
||||
"rayon",
|
||||
"reqwest 0.12.23",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -1704,6 +1730,26 @@ dependencies = [
|
||||
"unicode-width 0.2.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
|
||||
dependencies = [
|
||||
"either",
|
||||
"rayon-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon-core"
|
||||
version = "1.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
|
||||
dependencies = [
|
||||
"crossbeam-deque",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.17"
|
||||
|
||||
@@ -47,6 +47,7 @@ tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
|
||||
posthog-rs = { version = "0.3.5", default-features = false }
|
||||
sha2 = "0.10.9"
|
||||
urlencoding = "2.1.3"
|
||||
rayon = "1.11.0"
|
||||
|
||||
[dev-dependencies]
|
||||
test-log = "0.2.17"
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Ok, Result};
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
use reqwest::blocking::multipart::{Form, Part};
|
||||
use reqwest::blocking::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -160,23 +161,27 @@ fn upload_chunks(
|
||||
|
||||
let start_response = start_upload(&client, base_url, token, chunk_ids, &release_id)?;
|
||||
|
||||
let mut id_map: HashMap<_, _> = uploads
|
||||
let id_map: HashMap<_, _> = uploads
|
||||
.into_iter()
|
||||
.map(|u| (u.chunk_id.clone(), u))
|
||||
.collect();
|
||||
|
||||
let mut content_hashes = HashMap::new();
|
||||
let res: Result<HashMap<String, String>> = start_response
|
||||
.id_map
|
||||
.into_par_iter()
|
||||
.map(|(chunk_id, data)| {
|
||||
info!("Uploading chunk {}", chunk_id);
|
||||
let upload = id_map.get(&chunk_id).ok_or(anyhow!(
|
||||
"Got a chunk ID back from posthog that we didn't expect!"
|
||||
))?;
|
||||
|
||||
for (chunk_id, data) in start_response.id_map.into_iter() {
|
||||
info!("Uploading chunk {}", chunk_id);
|
||||
let upload = id_map.remove(&chunk_id).ok_or(anyhow!(
|
||||
"Got a chunk ID back from posthog that we didn't expect!"
|
||||
))?;
|
||||
let content_hash = content_hash([&upload.data]);
|
||||
upload_to_s3(&client, data.presigned_url.clone(), &upload.data)?;
|
||||
Ok((data.symbol_set_id, content_hash))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let content_hash = content_hash([&upload.data]);
|
||||
upload_to_s3(&client, data.presigned_url.clone(), upload.data)?;
|
||||
content_hashes.insert(data.symbol_set_id.clone(), content_hash);
|
||||
}
|
||||
let content_hashes = res?;
|
||||
|
||||
finish_upload(&client, base_url, token, content_hashes)?;
|
||||
|
||||
@@ -211,7 +216,7 @@ fn start_upload(
|
||||
Ok(res.json()?)
|
||||
}
|
||||
|
||||
fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec<u8>) -> Result<()> {
|
||||
fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: &[u8]) -> Result<()> {
|
||||
let mut last_err = None;
|
||||
let mut delay = std::time::Duration::from_millis(500);
|
||||
for attempt in 1..=3 {
|
||||
@@ -219,7 +224,7 @@ fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec<u8>) ->
|
||||
for (key, value) in &presigned_url.fields {
|
||||
form = form.text(key.clone(), value.clone());
|
||||
}
|
||||
let part = Part::bytes(data.clone());
|
||||
let part = Part::bytes(data.to_vec());
|
||||
form = form.part("file", part);
|
||||
|
||||
let res = client.post(&presigned_url.url).multipart(form).send();
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use posthog_cli::{cmd, utils::posthog::init_posthog};
|
||||
use rayon::ThreadPoolBuilder;
|
||||
use tracing::{error, info};
|
||||
|
||||
fn main() {
|
||||
@@ -10,6 +11,12 @@ fn main() {
|
||||
)
|
||||
.finish();
|
||||
|
||||
// Init the rayon thread pool
|
||||
ThreadPoolBuilder::new()
|
||||
.num_threads(10)
|
||||
.build_global()
|
||||
.expect("We successfully install a global thread pool");
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber).expect("Failed to set tracing subscriber");
|
||||
|
||||
init_posthog();
|
||||
|
||||
Reference in New Issue
Block a user