From 990bd6cf34374c66f2f4ae3097b5dd5a2a3b4e52 Mon Sep 17 00:00:00 2001 From: DecDuck Date: Sat, 20 Dec 2025 01:11:05 +1100 Subject: [PATCH] feat: depot API --- Cargo.lock | 199 ++++++++++++++++++++++++++++++++++++++++++---- Cargo.toml | 5 +- docs/protocol.md | 8 ++ docs/structure.md | 6 ++ src/download.rs | 95 +++++++++------------- src/handlers.rs | 118 +++++++++++++++++++++++++-- src/lib.rs | 5 -- src/main.rs | 33 ++------ src/manifest.rs | 11 --- src/remote.rs | 195 ++++++++++++++++++++++----------------------- src/serve.rs | 81 +++++++++++-------- src/state.rs | 32 ++------ src/token.rs | 55 ++----------- 13 files changed, 514 insertions(+), 329 deletions(-) create mode 100644 docs/protocol.md create mode 100644 docs/structure.md delete mode 100644 src/manifest.rs diff --git a/Cargo.lock b/Cargo.lock index 24eadec..dab0e33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,17 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -192,6 +203,15 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -259,6 +279,16 @@ dependencies = [ "half", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "4.5.53" @@ -293,6 +323,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "criterion" version = "0.8.0" @@ -360,6 +399,25 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + [[package]] name = "dashmap" version = "6.1.0" @@ -417,6 +475,16 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -430,19 +498,24 @@ dependencies = [ [[package]] name = "droplet-rs" -version = "0.9.2" -source = "git+https://github.com/Drop-OSS/droplet-rs.git#d8f37886d479d8d9fec7e51523628e863306dddb" +version = "0.12.2" +source = "git+https://github.com/Drop-OSS/droplet-rs.git#05f7027b362fcc72b7cc621df8b5b0850b6cf082" dependencies = [ "anyhow", "async-trait", "dyn-clone", + "futures", + "getrandom 0.3.4", "hex", + "humansize", "rcgen", "ring", + "serde", + "serde_json", + "sha2", "time", - "time-macros", "tokio", - "webpki", + "uuid", "x509-parser 0.17.0", ] @@ -489,6 +562,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -496,6 +584,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -504,6 +593,23 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + [[package]] name = "futures-macro" version = "0.3.31" @@ -533,14 +639,28 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -636,6 +756,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + [[package]] name = "hyper" version = "1.8.1" @@ -801,6 +930,15 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "generic-array", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -854,6 +992,12 @@ version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +[[package]] +name = "libm" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" + [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -1487,6 +1631,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1751,10 +1906,13 @@ dependencies = [ name = "torrential" version = "0.1.0" dependencies = [ + "aes", "anyhow", "async-trait", "axum", + "bytes", "criterion", + "ctr", "dashmap", "droplet-rs", "futures-util", @@ -1842,6 +2000,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + [[package]] name = "unicode-ident" version = "1.0.22" @@ -1872,6 +2036,23 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "uuid" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" +dependencies = [ + "getrandom 0.3.4", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "walkdir" version = "2.5.0" @@ -1984,16 +2165,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki-roots" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index 88c95fe..8ae8aba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ simple_logger = { version = "5.1.0", default-features = false, features = [ "colors", ] } tokio = { version = "*", features = ["rt-multi-thread", "sync"] } -droplet-rs = { git = "https://github.com/Drop-OSS/droplet-rs.git" } +droplet-rs = { git="https://github.com/Drop-OSS/droplet-rs.git" } dashmap = "6.1.0" anyhow = "1.0.100" serde_json = "1.0.145" @@ -31,6 +31,9 @@ url = { version = "2.5.7", default-features = false } tokio-util = { version = "0.7.17", features = ["io"] } async-trait = "0.1.89" futures-util = "0.3.31" +ctr = "0.9.2" +aes = "0.8.4" +bytes = "*" [lints.clippy] pedantic = { level = "warn", priority = -1 } diff --git a/docs/protocol.md b/docs/protocol.md new file mode 100644 index 0000000..f2e2bfa --- /dev/null +++ b/docs/protocol.md @@ -0,0 +1,8 @@ +# Protocol + +`torrential` implements the Depot API as defined by https://developer.droposs.org/web/depot, and it prefixed with `/api/v1/depot` for NGINX proxying. + +It also has the following endpoints, only accessible by the Drop server for security reasons: + - `/key` for sharing the authentication key from the Drop server to torrential + - `/invalidate` for pre-emptivel clearing the download context cache. Contexts are automatically cleared regardless, so this endpoint failing is not a hard error on the Drop side + - `/healthcheck`. Does healthcheck. \ No newline at end of file diff --git a/docs/structure.md b/docs/structure.md new file mode 100644 index 0000000..0985d4f --- /dev/null +++ b/docs/structure.md @@ -0,0 +1,6 @@ +# Structure +Torrential is a typical Rust project. Source files are in `src/`. + +`handlers.rs` contains most non-download endpoint handlers. `serve.rs` contains the download endpoint handler. + +`remote.rs` handles communciating with the Drop server. \ No newline at end of file diff --git a/src/download.rs b/src/download.rs index 0f3f8b7..7c62ef9 100644 --- a/src/download.rs +++ b/src/download.rs @@ -1,16 +1,21 @@ -use std::{collections::HashMap, hash::RandomState, time::Instant}; +use std::{path::PathBuf, time::Instant}; -use droplet_rs::versions::{create_backend_constructor, types::VersionBackend}; +use anyhow::anyhow; +use droplet_rs::{ + manifest::Manifest, + versions::{create_backend_constructor, types::VersionBackend}, +}; +use log::{info, warn}; use reqwest::StatusCode; use crate::{ - remote::{ContextResponseBody, LibraryBackend, ContextProvider}, + remote::{LibraryBackend, VersionResponseBody, fetch_version_data}, state::AppInitData, util::ErrorOption, }; pub struct DownloadContext { - pub(crate) chunk_lookup_table: HashMap, + pub(crate) manifest: Manifest, pub(crate) backend: Box, last_access: Instant, } @@ -24,33 +29,16 @@ impl DownloadContext { } pub async fn create_download_context( - metadata_provider: &dyn ContextProvider, - backend_factory: &dyn BackendFactory, init_data: &AppInitData, game_id: String, version_name: String, ) -> Result { - let context = metadata_provider - .fetch_context(init_data.token(), game_id, version_name.clone()) - .await?; + let version_data = fetch_version_data(init_data, game_id, version_name.clone()).await?; - let backend = backend_factory.create_backend(init_data, &context, &version_name)?; - - let mut chunk_lookup_table = HashMap::with_capacity_and_hasher( - context.manifest.values().map(|v| v.ids.len()).sum(), - RandomState::default(), - ); - - for (path, file_chunks) in context.manifest { - let mut start = 0; - for (chunk, length) in file_chunks.ids.into_iter().zip(file_chunks.lengths) { - chunk_lookup_table.insert(chunk, (path.clone(), start, start + length)); - start += length; - } - } + let backend = create_backend(&version_data)?; let download_context = DownloadContext { - chunk_lookup_table, + manifest: version_data.manifest, backend, last_access: Instant::now(), }; @@ -58,39 +46,34 @@ pub async fn create_download_context( Ok(download_context) } -pub trait BackendFactory: Send + Sync { - fn create_backend( - &self, - init_data: &AppInitData, - context: &ContextResponseBody, - version_name: &String, - ) -> Result, StatusCode>; -} +fn create_backend( + version_data: &VersionResponseBody, +) -> Result, StatusCode> { + let base_path = version_data + .library + .options + .get("baseDir") + .unwrap() + .as_str() + .unwrap(); -pub struct DropBackendFactory; -impl BackendFactory for DropBackendFactory { - fn create_backend( - &self, - init_data: &AppInitData, - context: &ContextResponseBody, - version_name: &String, - ) -> Result, StatusCode> { - let (version_path, backend) = init_data - .libraries() - .get(&context.library_id) - .ok_or(StatusCode::NOT_FOUND)?; + let version_path = PathBuf::from(base_path); + let version_path = version_path.join(version_data.library_path.clone()); + let version_path = match version_data.library.backend { + LibraryBackend::Filesystem => version_path.join(version_data.version_path.clone()), + LibraryBackend::FlatFilesystem => version_path, + }; - let version_path = version_path.join(&context.library_path); - let version_path = match backend { - LibraryBackend::Filesystem => version_path.join(version_name), - LibraryBackend::FlatFilesystem => version_path, - }; - - let backend = - create_backend_constructor(&version_path).ok_or(StatusCode::INTERNAL_SERVER_ERROR)?; - - // TODO: Not eat this error - let backend = backend().map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; - Ok(backend) + if !version_path.exists() { + warn!("{} path doesn't exist for version", version_path.display()); + return Err(StatusCode::INTERNAL_SERVER_ERROR); } + + let backend = + create_backend_constructor(&version_path).ok_or(StatusCode::INTERNAL_SERVER_ERROR)?; + + let backend = backend() + .inspect_err(|err| warn!("failed to create version backend: {:?}", err)) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(backend) } diff --git a/src/handlers.rs b/src/handlers.rs index f7b0fe1..636a8de 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,10 +1,22 @@ -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + task::Poll, +}; -use axum::{Json, extract::State}; -use reqwest::StatusCode; -use serde::Deserialize; +use axum::{Json, body::Body, extract::State, http::{HeaderMap, HeaderValue}, response::IntoResponse}; +use bytes::BufMut; +use reqwest::{StatusCode, header::CONTENT_TYPE}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::io::Write; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; -use crate::state::AppState; +use crate::{remote::fetch_instance_games, state::AppState}; pub async fn healthcheck(State(state): State>) -> StatusCode { let initialised = state.token.initialized(); @@ -16,14 +28,104 @@ pub async fn healthcheck(State(state): State>) -> StatusCode { #[derive(Deserialize)] pub struct InvalidateBody { - game_id: String, - version_name: String, + game: String, + version: String, } pub async fn invalidate( State(state): State>, Json(payload): Json, ) -> StatusCode { - state.context_cache.remove(&(payload.game_id, payload.version_name)); + state.context_cache.remove(&(payload.game, payload.version)); StatusCode::OK } + +struct SpeedtestStream { + remaining: usize, +} + +impl SpeedtestStream { + pub fn new() -> Self { + SpeedtestStream { + remaining: 1024 * 1024 * 50, + } + } + fn content_length(&self) -> usize { + self.remaining + } +} +const ZERO: [u8; 1024] = [0u8; _]; +impl AsyncRead for SpeedtestStream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + if self.remaining > 0 { + let mut writer = buf.writer(); + + let amount = writer.write(&ZERO); + match amount { + Ok(amount) => self.remaining -= amount, + Err(err) => return Poll::Ready(Err(err)), + }; + }; + return Poll::Ready(Ok(())); + } +} + +pub async fn speedtest() -> Result { + let speedtest = SpeedtestStream::new(); + let ct = speedtest.content_length(); + let speedtest_stream = ReaderStream::new(speedtest); + let body = Body::from_stream(speedtest_stream); + + let mut headers = HeaderMap::new(); + headers.insert("Content-Type", "application/octet-stream".parse().unwrap()); + headers.insert("Content-Length", ct.into()); + + Ok((headers, body)) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct GameData { + version_id: String, + compression: String, +} + +#[derive(Serialize)] +struct Manifest { + content: HashMap>, +} + +pub async fn manifest( + State(state): State>, +) -> Result { + let games = fetch_instance_games( + state + .token + .get() + .ok_or(StatusCode::from_u16(503).unwrap())?, + ) + .await?; + + let mut content = HashMap::new(); + for game in games { + content.insert( + game.id, + game.versions + .into_iter() + .map(|v| GameData { + version_id: v.version_id, + compression: "none".to_owned(), + }) + .collect::>(), + ); + } + + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + + Ok((headers, json!(Manifest { content }).to_string())) +} diff --git a/src/lib.rs b/src/lib.rs index 4a28441..e97b94f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,17 +2,12 @@ use tokio::sync::Semaphore; mod download; pub mod serve; pub mod handlers; -mod manifest; mod remote; pub mod state; mod token; mod util; pub use download::DownloadContext; -pub use download::{BackendFactory, DropBackendFactory}; -pub use remote::{ - DropLibraryProvider, DropContextProvider, LibraryConfigurationProvider, ContextProvider, -}; pub use token::set_token; static GLOBAL_CONTEXT_SEMAPHORE: Semaphore = Semaphore::const_new(1); diff --git a/src/main.rs b/src/main.rs index c1ac1b6..14fd39c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,16 +4,14 @@ use std::{ }; use axum::{ - Router, + Router, handler, routing::{get, post}, }; use dashmap::DashMap; use log::info; use simple_logger::SimpleLogger; use tokio::{runtime::Handle, sync::OnceCell}; -use torrential::{ - DropBackendFactory, DropContextProvider, DropLibraryProvider, handlers, serve, set_token, state::AppState -}; +use torrential::{handlers, serve, set_token, state::AppState}; use url::Url; #[tokio::main] @@ -21,21 +19,16 @@ async fn main() { initialise_logger(); if let Ok(working_directory) = std::env::var("WORKING_DIRECTORY") { + info!("moving to working directory {}", working_directory); set_current_dir(working_directory).expect("failed to change working directory"); } let metrics = Handle::current().metrics(); info!("using {} threads", metrics.num_workers()); - let remote_url = get_remote_url(); - let shared_state = Arc::new(AppState { token: OnceCell::new(), context_cache: DashMap::new(), - - metadata_provider: Arc::new(DropContextProvider::new(remote_url.clone())), - backend_factory: Arc::new(DropBackendFactory), - library_provider: Arc::new(DropLibraryProvider::new(remote_url)), }); let app = setup_app(shared_state); @@ -46,12 +39,14 @@ async fn main() { fn setup_app(shared_state: Arc) -> Router { Router::new() .route( - "/api/v1/depot/{game_id}/{version_name}/{*chunk_ids}", + "/api/v1/depot/content/{game_id}/{version_name}/{chunk_id}", get(serve::serve_file), ) - .route("/token", post(set_token)) + .route("/api/v1/depot/manifest.json", get(handlers::manifest)) + .route("/api/v1/depot/speedtest", get(handlers::speedtest)) + .route("/key", post(set_token)) .route("/healthcheck", get(handlers::healthcheck)) - .route("/invalid", post(handlers::invalidate)) + .route("/invalidate", post(handlers::invalidate)) .with_state(shared_state) } @@ -67,15 +62,3 @@ fn initialise_logger() { .init() .unwrap(); } - -fn get_remote_url() -> Url { - let user_provided = env::var("DROP_SERVER_URL"); - let url = Url::parse( - user_provided - .as_ref() - .map_or("http://localhost:3000", |v| v), - ) - .expect("failed to parse URL"); - info!("using Drop server url {url}"); - url -} diff --git a/src/manifest.rs b/src/manifest.rs deleted file mode 100644 index f928f23..0000000 --- a/src/manifest.rs +++ /dev/null @@ -1,11 +0,0 @@ -use std::collections::HashMap; - -use serde::Deserialize; - -#[derive(Deserialize)] -pub struct DropChunk { - pub ids: Vec, - pub lengths: Vec, -} - -pub type DropletManifest = HashMap; diff --git a/src/remote.rs b/src/remote.rs index 0b3a0be..61838ba 100644 --- a/src/remote.rs +++ b/src/remote.rs @@ -1,21 +1,44 @@ +use std::{env, sync::LazyLock}; + use anyhow::{Result, anyhow}; use async_trait::async_trait; -use reqwest::StatusCode; +use droplet_rs::manifest::Manifest; +use log::info; +use reqwest::{Client, ClientBuilder, StatusCode}; use serde::{Deserialize, Serialize}; use url::Url; -use crate::{manifest::DropletManifest, util::ErrorOption}; +use crate::{state::AppInitData, util::ErrorOption}; + +static CLIENT: LazyLock = LazyLock::new(|| { + ClientBuilder::new() + .build() + .expect("failed to build client") +}); + +static REMOTE_URL: LazyLock = LazyLock::new(|| { + let user_provided = env::var("DROP_SERVER_URL"); + let url = Url::parse( + user_provided + .as_ref() + .map_or("http://localhost:3000", |v| v), + ) + .expect("failed to parse URL"); + info!("using Drop server url {}", url); + url +}); #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -pub struct ContextResponseBody { - pub manifest: DropletManifest, - pub library_id: String, +pub struct VersionResponseBody { + pub manifest: Manifest, + pub library: LibrarySource, pub library_path: String, + pub version_path: String, } #[derive(Serialize)] -pub struct ContextQuery { +pub struct VersionQuery { game: String, version: String, } @@ -34,110 +57,78 @@ pub struct LibrarySource { pub backend: LibraryBackend, } -pub struct DropContextProvider { - client: reqwest::Client, - base_url: Url, -} -impl DropContextProvider { - pub fn new(url: Url) -> Self { - Self { - client: reqwest::Client::new(), - base_url: url, - } - } -} -#[async_trait] -impl ContextProvider for DropContextProvider { - async fn fetch_context( - &self, - token: String, - game_id: String, - version_name: String, - ) -> Result { - let context_response = self - .client - .get(self.base_url.join("/api/v1/admin/depot/context")?) - .query(&ContextQuery { - game: game_id, - version: version_name, - }) - .header("Authorization", format!("Bearer {token}")) - .send() - .await?; +pub async fn fetch_version_data( + init_data: &AppInitData, + game_id: String, + version_id: String, +) -> Result { + let version_data_response = CLIENT + .get(REMOTE_URL.join("/api/v1/admin/depot/manifest")?) + .query(&VersionQuery { + game: game_id, + version: version_id, + }) + .header("Authorization", format!("Bearer {}", init_data.key)) + .send() + .await?; - if !context_response.status().is_success() { - if context_response.status() == StatusCode::BAD_REQUEST { - return Err(StatusCode::NOT_FOUND.into()); - } - - return Err(anyhow!( - "Fetching context failed with non-success code: {}, {}", - context_response.status(), - context_response - .text() - .await - .unwrap_or("(failed to read body)".to_owned()) - ) - .into()); + if !version_data_response.status().is_success() { + if version_data_response.status() == StatusCode::BAD_REQUEST { + return Err(StatusCode::NOT_FOUND.into()); } - let context: ContextResponseBody = context_response.json().await?; - - Ok(context) + return Err(anyhow!( + "Fetching context failed with non-success code: {}, {}", + version_data_response.status(), + version_data_response + .text() + .await + .unwrap_or("(failed to read body)".to_owned()) + ) + .into()); } + + let version_data: VersionResponseBody = version_data_response.json().await?; + + Ok(version_data) } -#[async_trait] -pub trait ContextProvider: Send + Sync { - /// Fetches the manifest for a specific game version. - async fn fetch_context( - &self, - token: String, - game_id: String, - version_name: String, - ) -> Result; +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SkeletonVersion { + pub version_id: String, } -#[async_trait] -pub trait LibraryConfigurationProvider: Send + Sync { - async fn fetch_sources(&self, token: &String) -> anyhow::Result>; +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SkeletonGame { + pub id: String, + pub versions: Vec, } -pub struct DropLibraryProvider { - client: reqwest::Client, - base_url: Url, -} -impl DropLibraryProvider { - pub fn new(url: Url) -> Self { - Self { - client: reqwest::Client::new(), - base_url: url, - } - } -} - -#[async_trait] -impl LibraryConfigurationProvider for DropLibraryProvider { - async fn fetch_sources(&self, token: &String) -> anyhow::Result> { - let source_response = self - .client - .get(self.base_url.join("/api/v1/admin/library/sources")?) - .header("Authorization", format!("Bearer {token}")) - .send() - .await?; - - if !source_response.status().is_success() { - return Err(anyhow!( - "Fetching library sources failed with non-success code: {}, {}", - source_response.status(), - source_response - .text() - .await - .unwrap_or("(failed to read body)".to_owned()) - )); - } - - let library_sources: Vec = source_response.json().await?; - - Ok(library_sources) + +pub async fn fetch_instance_games( + init_data: &AppInitData, +) -> Result, ErrorOption> { + let context_response = CLIENT + .get(REMOTE_URL.join("/api/v1/admin/depot/versions")?) + .header("Authorization", format!("Bearer {}", init_data.key)) + .send() + .await?; + + if !context_response.status().is_success() { + + return Err(anyhow!( + "Fetching instance games failed with non-success code: {}, {}", + context_response.status(), + context_response + .text() + .await + .unwrap_or("(failed to read body)".to_owned()) + ) + .into()); } + + let games: Vec = context_response.json().await?; + + Ok(games) } diff --git a/src/serve.rs b/src/serve.rs index b16c961..20bf3b5 100644 --- a/src/serve.rs +++ b/src/serve.rs @@ -1,58 +1,72 @@ -use std::sync::Arc; +use std::{io::Error, rc::Rc, sync::Arc}; +use aes::cipher::{KeyIvInit, StreamCipher}; use axum::{ body::Body, extract::{Path, State}, http::HeaderMap, response::{AppendHeaders, IntoResponse}, }; +use bytes::Bytes; use dashmap::{DashMap, mapref::one::RefMut}; -use droplet_rs::versions::types::{MinimumFileObject, VersionFile}; +use droplet_rs::{ + manifest::ChunkData, + versions::types::{MinimumFileObject, VersionFile}, +}; +use futures_util::{StreamExt, stream}; use log::{error, info}; use reqwest::{StatusCode, header}; use tokio::sync::SemaphorePermit; use tokio_util::io::ReaderStream; -use futures_util::{StreamExt as _, stream}; - use crate::{ DownloadContext, GLOBAL_CONTEXT_SEMAPHORE, download::create_download_context, state::AppState, }; +type Aes128Ctr64LE = ctr::Ctr64LE; + pub async fn serve_file( State(state): State>, - Path((game_id, version_name, chunk_ids)): Path<(String, String, String)>, + Path((game_id, version_name, chunk_id)): Path<(String, String, String)>, ) -> Result { let context_cache = &state.context_cache; - let mut context = get_or_generate_context(&state, context_cache, game_id, version_name).await?; + let mut context = get_or_create_context(&state, context_cache, game_id, version_name).await?; context.reset_last_access(); - let chunk_ids = chunk_ids.split("/").collect::>(); - let mut streams = Vec::with_capacity(chunk_ids.len()); - let mut content_lengths = Vec::with_capacity(chunk_ids.len()); - let mut total_size = 0; - for chunk_id in chunk_ids { - let (relative_filename, start, end) = lookup_chunk(chunk_id, &context)?; - let reader = get_file_reader(&mut context, relative_filename, start, end).await?; + let chunk_data = lookup_chunk(&chunk_id, &context)?; + let mut streams = Vec::with_capacity(chunk_data.files.len()); + let mut content_length = 0; + + for file_entry in &chunk_data.files { + let reader = get_file_reader( + &mut context, + file_entry.filename.clone(), + file_entry.start, + file_entry.start + file_entry.length, + ) + .await?; let stream = ReaderStream::new(reader); streams.push(stream); - content_lengths.push((end - start).to_string()); - - total_size += end - start; + content_length += file_entry.length; } let stream = stream::iter(streams).flatten(); - let body: Body = Body::from_stream(stream); + let mut cipher = Aes128Ctr64LE::new(&context.manifest.key.into(), &chunk_data.iv.into()); + let encrypted_stream = stream.chunks(3).map(move |raw| -> Result { + let data: Result, Error> = raw.into_iter().collect(); + let mut data = data?.concat(); + + cipher.apply_keystream(&mut data); + + Ok(data.into()) + }); + let body: Body = Body::from_stream(encrypted_stream); let mut headers = HeaderMap::new(); headers.insert("Content-Type", "application/octet-stream".parse().unwrap()); - headers.insert("Content-Length", total_size.to_string().parse().unwrap()); - headers.insert( - "Content-Lengths", - content_lengths.join(",").parse().unwrap(), - ); + headers.insert("Content-Length", content_length.into()); Ok((headers, body)) } @@ -62,12 +76,16 @@ async fn acquire_permit<'a>() -> SemaphorePermit<'a> { .await .expect("failed to acquire semaphore"); } +/** + * Needs to be cloned for reference reasons + */ fn lookup_chunk( chunk_id: &str, context: &RefMut<'_, (String, String), DownloadContext>, -) -> Result<(String, usize, usize), StatusCode> { +) -> Result { context - .chunk_lookup_table + .manifest + .chunks .get(chunk_id) .cloned() .ok_or(StatusCode::NOT_FOUND) @@ -91,11 +109,11 @@ async fn get_file_reader( ) .await .map_err(|v| { - error!("reader error: {v:?}"); + error!("reader error for '{}': {v:?}", relative_filename); StatusCode::INTERNAL_SERVER_ERROR }) } -async fn get_or_generate_context<'a>( +async fn get_or_create_context<'a>( state: &Arc, context_cache: &'a DashMap<(String, String), DownloadContext>, game_id: String, @@ -114,14 +132,9 @@ async fn get_or_generate_context<'a>( Ok(already_done) } else { info!("generating context for {}...", game_id); - let context_result = create_download_context( - &*state.metadata_provider, - &*state.backend_factory, - initialisation_data, - game_id.clone(), - version_name.clone(), - ) - .await?; + let context_result = + create_download_context(initialisation_data, game_id.clone(), version_name.clone()) + .await?; state.context_cache.insert(key.clone(), context_result); diff --git a/src/state.rs b/src/state.rs index e6c9394..3cfd35c 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,38 +1,18 @@ -use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use std::{collections::HashMap, path::PathBuf}; use dashmap::DashMap; use tokio::sync::OnceCell; -use crate::{ - BackendFactory, DownloadContext, LibraryConfigurationProvider, ContextProvider, - remote::LibraryBackend, -}; +use crate:: + DownloadContext +; pub struct AppState { pub token: OnceCell, pub context_cache: DashMap<(String, String), DownloadContext>, - - pub metadata_provider: Arc, - pub backend_factory: Arc, - pub library_provider: Arc, } #[derive(Debug)] pub struct AppInitData { - token: String, - libraries: HashMap, -} -impl AppInitData { - pub fn new(token: String, libraries: HashMap) -> Self { - Self { token, libraries } - } - pub fn token(&self) -> String { - self.token.clone() - } - pub fn set_token(&mut self, token: String) { - self.token = token - } - pub fn libraries(&self) -> &HashMap { - &self.libraries - } -} + pub key: String, +} \ No newline at end of file diff --git a/src/token.rs b/src/token.rs index 971d73a..2b46ccf 100644 --- a/src/token.rs +++ b/src/token.rs @@ -1,16 +1,15 @@ -use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc}; +use std::sync::Arc; use axum::{Json, extract::State}; use log::{error, info}; use reqwest::StatusCode; use serde::Deserialize; -use crate::remote::{self, LibraryBackend, LibrarySource}; use crate::state::{AppInitData, AppState}; #[derive(Deserialize)] pub struct TokenPayload { - token: String, + key: String, } pub async fn set_token( @@ -21,20 +20,9 @@ pub async fn set_token( return Ok(StatusCode::OK); } - let token = payload.token; + let key = payload.key; - let library_sources = state - .library_provider - .fetch_sources(&token) - .await - .map_err(|v| { - error!("{v:?}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - let valid_library_sources = filter_library_sources(library_sources); - - set_generated_token(&state, token, valid_library_sources)?; + set_depot_key(&state, key)?; info!("connected to drop server successfully"); @@ -44,48 +32,21 @@ pub async fn set_token( fn check_token_exists(state: &Arc, payload: &TokenPayload) -> bool { if let Some(existing_data) = state.token.get() { assert!( - *existing_data.token() == payload.token, + *existing_data.key == payload.key, "already set up but provided with a different token" ); return true; } false } -fn filter_library_sources( - library_sources: Vec, -) -> HashMap { - library_sources - .into_iter() - .filter(|v| { - matches!( - v.backend, - remote::LibraryBackend::Filesystem | remote::LibraryBackend::FlatFilesystem - ) - }) - .map(|v| { - let path = PathBuf::from_str( - v.options - .as_object() - .unwrap() - .get("baseDir") - .unwrap() - .as_str() - .unwrap(), - ) - .unwrap(); - (v.id, (path, v.backend)) - }) - .collect() -} -fn set_generated_token( +fn set_depot_key( state: &Arc, - token: String, - libraries: HashMap, + key: String ) -> Result<(), StatusCode> { state .token - .set(AppInitData::new(token, libraries)) + .set(AppInitData { key }) .map_err(|err| { error!("failed to set token: {err:?}"); StatusCode::INTERNAL_SERVER_ERROR