feat: depot API

This commit is contained in:
DecDuck
2025-12-20 01:11:05 +11:00
parent 28dfa09084
commit 990bd6cf34
13 changed files with 514 additions and 329 deletions

199
Cargo.lock generated
View File

@@ -2,6 +2,17 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "aes"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@@ -192,6 +203,15 @@ version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
[[package]]
name = "block-buffer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.19.0"
@@ -259,6 +279,16 @@ dependencies = [
"half",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "4.5.53"
@@ -293,6 +323,15 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "cpufeatures"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
dependencies = [
"libc",
]
[[package]]
name = "criterion"
version = "0.8.0"
@@ -360,6 +399,25 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
[[package]]
name = "crypto-common"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "ctr"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"
dependencies = [
"cipher",
]
[[package]]
name = "dashmap"
version = "6.1.0"
@@ -417,6 +475,16 @@ dependencies = [
"powerfmt",
]
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@@ -430,19 +498,24 @@ dependencies = [
[[package]]
name = "droplet-rs"
version = "0.9.2"
source = "git+https://github.com/Drop-OSS/droplet-rs.git#d8f37886d479d8d9fec7e51523628e863306dddb"
version = "0.12.2"
source = "git+https://github.com/Drop-OSS/droplet-rs.git#05f7027b362fcc72b7cc621df8b5b0850b6cf082"
dependencies = [
"anyhow",
"async-trait",
"dyn-clone",
"futures",
"getrandom 0.3.4",
"hex",
"humansize",
"rcgen",
"ring",
"serde",
"serde_json",
"sha2",
"time",
"time-macros",
"tokio",
"webpki",
"uuid",
"x509-parser 0.17.0",
]
@@ -489,6 +562,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
@@ -496,6 +584,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@@ -504,6 +593,23 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
@@ -533,14 +639,28 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "generic-array"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.16"
@@ -636,6 +756,15 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humansize"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7"
dependencies = [
"libm",
]
[[package]]
name = "hyper"
version = "1.8.1"
@@ -801,6 +930,15 @@ dependencies = [
"icu_properties",
]
[[package]]
name = "inout"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01"
dependencies = [
"generic-array",
]
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -854,6 +992,12 @@ version = "0.2.178"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
[[package]]
name = "libm"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
[[package]]
name = "linux-raw-sys"
version = "0.11.0"
@@ -1487,6 +1631,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha2"
version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "shlex"
version = "1.3.0"
@@ -1751,10 +1906,13 @@ dependencies = [
name = "torrential"
version = "0.1.0"
dependencies = [
"aes",
"anyhow",
"async-trait",
"axum",
"bytes",
"criterion",
"ctr",
"dashmap",
"droplet-rs",
"futures-util",
@@ -1842,6 +2000,12 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "typenum"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
[[package]]
name = "unicode-ident"
version = "1.0.22"
@@ -1872,6 +2036,23 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "uuid"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
dependencies = [
"getrandom 0.3.4",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
@@ -1984,16 +2165,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "1.0.4"

View File

@@ -23,7 +23,7 @@ simple_logger = { version = "5.1.0", default-features = false, features = [
"colors",
] }
tokio = { version = "*", features = ["rt-multi-thread", "sync"] }
droplet-rs = { git = "https://github.com/Drop-OSS/droplet-rs.git" }
droplet-rs = { git="https://github.com/Drop-OSS/droplet-rs.git" }
dashmap = "6.1.0"
anyhow = "1.0.100"
serde_json = "1.0.145"
@@ -31,6 +31,9 @@ url = { version = "2.5.7", default-features = false }
tokio-util = { version = "0.7.17", features = ["io"] }
async-trait = "0.1.89"
futures-util = "0.3.31"
ctr = "0.9.2"
aes = "0.8.4"
bytes = "*"
[lints.clippy]
pedantic = { level = "warn", priority = -1 }

8
docs/protocol.md Normal file
View File

@@ -0,0 +1,8 @@
# Protocol
`torrential` implements the Depot API as defined by https://developer.droposs.org/web/depot, and it prefixed with `/api/v1/depot` for NGINX proxying.
It also has the following endpoints, only accessible by the Drop server for security reasons:
- `/key` for sharing the authentication key from the Drop server to torrential
- `/invalidate` for pre-emptivel clearing the download context cache. Contexts are automatically cleared regardless, so this endpoint failing is not a hard error on the Drop side
- `/healthcheck`. Does healthcheck.

6
docs/structure.md Normal file
View File

@@ -0,0 +1,6 @@
# Structure
Torrential is a typical Rust project. Source files are in `src/`.
`handlers.rs` contains most non-download endpoint handlers. `serve.rs` contains the download endpoint handler.
`remote.rs` handles communciating with the Drop server.

View File

@@ -1,16 +1,21 @@
use std::{collections::HashMap, hash::RandomState, time::Instant};
use std::{path::PathBuf, time::Instant};
use droplet_rs::versions::{create_backend_constructor, types::VersionBackend};
use anyhow::anyhow;
use droplet_rs::{
manifest::Manifest,
versions::{create_backend_constructor, types::VersionBackend},
};
use log::{info, warn};
use reqwest::StatusCode;
use crate::{
remote::{ContextResponseBody, LibraryBackend, ContextProvider},
remote::{LibraryBackend, VersionResponseBody, fetch_version_data},
state::AppInitData,
util::ErrorOption,
};
pub struct DownloadContext {
pub(crate) chunk_lookup_table: HashMap<String, (String, usize, usize)>,
pub(crate) manifest: Manifest,
pub(crate) backend: Box<dyn VersionBackend + Send + Sync + 'static>,
last_access: Instant,
}
@@ -24,33 +29,16 @@ impl DownloadContext {
}
pub async fn create_download_context(
metadata_provider: &dyn ContextProvider,
backend_factory: &dyn BackendFactory,
init_data: &AppInitData,
game_id: String,
version_name: String,
) -> Result<DownloadContext, ErrorOption> {
let context = metadata_provider
.fetch_context(init_data.token(), game_id, version_name.clone())
.await?;
let version_data = fetch_version_data(init_data, game_id, version_name.clone()).await?;
let backend = backend_factory.create_backend(init_data, &context, &version_name)?;
let mut chunk_lookup_table = HashMap::with_capacity_and_hasher(
context.manifest.values().map(|v| v.ids.len()).sum(),
RandomState::default(),
);
for (path, file_chunks) in context.manifest {
let mut start = 0;
for (chunk, length) in file_chunks.ids.into_iter().zip(file_chunks.lengths) {
chunk_lookup_table.insert(chunk, (path.clone(), start, start + length));
start += length;
}
}
let backend = create_backend(&version_data)?;
let download_context = DownloadContext {
chunk_lookup_table,
manifest: version_data.manifest,
backend,
last_access: Instant::now(),
};
@@ -58,39 +46,34 @@ pub async fn create_download_context(
Ok(download_context)
}
pub trait BackendFactory: Send + Sync {
fn create_backend(
&self,
init_data: &AppInitData,
context: &ContextResponseBody,
version_name: &String,
) -> Result<Box<dyn VersionBackend + Send + Sync>, StatusCode>;
}
fn create_backend(
version_data: &VersionResponseBody,
) -> Result<Box<dyn VersionBackend + Send + Sync>, StatusCode> {
let base_path = version_data
.library
.options
.get("baseDir")
.unwrap()
.as_str()
.unwrap();
pub struct DropBackendFactory;
impl BackendFactory for DropBackendFactory {
fn create_backend(
&self,
init_data: &AppInitData,
context: &ContextResponseBody,
version_name: &String,
) -> Result<Box<dyn VersionBackend + Send + Sync>, StatusCode> {
let (version_path, backend) = init_data
.libraries()
.get(&context.library_id)
.ok_or(StatusCode::NOT_FOUND)?;
let version_path = version_path.join(&context.library_path);
let version_path = match backend {
LibraryBackend::Filesystem => version_path.join(version_name),
let version_path = PathBuf::from(base_path);
let version_path = version_path.join(version_data.library_path.clone());
let version_path = match version_data.library.backend {
LibraryBackend::Filesystem => version_path.join(version_data.version_path.clone()),
LibraryBackend::FlatFilesystem => version_path,
};
if !version_path.exists() {
warn!("{} path doesn't exist for version", version_path.display());
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
let backend =
create_backend_constructor(&version_path).ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
// TODO: Not eat this error
let backend = backend().map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;
let backend = backend()
.inspect_err(|err| warn!("failed to create version backend: {:?}", err))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(backend)
}
}

View File

@@ -1,10 +1,22 @@
use std::sync::Arc;
use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
task::Poll,
};
use axum::{Json, extract::State};
use reqwest::StatusCode;
use serde::Deserialize;
use axum::{Json, body::Body, extract::State, http::{HeaderMap, HeaderValue}, response::IntoResponse};
use bytes::BufMut;
use reqwest::{StatusCode, header::CONTENT_TYPE};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::io::Write;
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
use crate::state::AppState;
use crate::{remote::fetch_instance_games, state::AppState};
pub async fn healthcheck(State(state): State<Arc<AppState>>) -> StatusCode {
let initialised = state.token.initialized();
@@ -16,14 +28,104 @@ pub async fn healthcheck(State(state): State<Arc<AppState>>) -> StatusCode {
#[derive(Deserialize)]
pub struct InvalidateBody {
game_id: String,
version_name: String,
game: String,
version: String,
}
pub async fn invalidate(
State(state): State<Arc<AppState>>,
Json(payload): Json<InvalidateBody>,
) -> StatusCode {
state.context_cache.remove(&(payload.game_id, payload.version_name));
state.context_cache.remove(&(payload.game, payload.version));
StatusCode::OK
}
struct SpeedtestStream {
remaining: usize,
}
impl SpeedtestStream {
pub fn new() -> Self {
SpeedtestStream {
remaining: 1024 * 1024 * 50,
}
}
fn content_length(&self) -> usize {
self.remaining
}
}
const ZERO: [u8; 1024] = [0u8; _];
impl AsyncRead for SpeedtestStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if self.remaining > 0 {
let mut writer = buf.writer();
let amount = writer.write(&ZERO);
match amount {
Ok(amount) => self.remaining -= amount,
Err(err) => return Poll::Ready(Err(err)),
};
};
return Poll::Ready(Ok(()));
}
}
pub async fn speedtest() -> Result<impl IntoResponse, StatusCode> {
let speedtest = SpeedtestStream::new();
let ct = speedtest.content_length();
let speedtest_stream = ReaderStream::new(speedtest);
let body = Body::from_stream(speedtest_stream);
let mut headers = HeaderMap::new();
headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
headers.insert("Content-Length", ct.into());
Ok((headers, body))
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct GameData {
version_id: String,
compression: String,
}
#[derive(Serialize)]
struct Manifest {
content: HashMap<String, Vec<GameData>>,
}
pub async fn manifest(
State(state): State<Arc<AppState>>,
) -> Result<impl IntoResponse, StatusCode> {
let games = fetch_instance_games(
state
.token
.get()
.ok_or(StatusCode::from_u16(503).unwrap())?,
)
.await?;
let mut content = HashMap::new();
for game in games {
content.insert(
game.id,
game.versions
.into_iter()
.map(|v| GameData {
version_id: v.version_id,
compression: "none".to_owned(),
})
.collect::<Vec<GameData>>(),
);
}
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
Ok((headers, json!(Manifest { content }).to_string()))
}

View File

@@ -2,17 +2,12 @@ use tokio::sync::Semaphore;
mod download;
pub mod serve;
pub mod handlers;
mod manifest;
mod remote;
pub mod state;
mod token;
mod util;
pub use download::DownloadContext;
pub use download::{BackendFactory, DropBackendFactory};
pub use remote::{
DropLibraryProvider, DropContextProvider, LibraryConfigurationProvider, ContextProvider,
};
pub use token::set_token;
static GLOBAL_CONTEXT_SEMAPHORE: Semaphore = Semaphore::const_new(1);

View File

@@ -4,16 +4,14 @@ use std::{
};
use axum::{
Router,
Router, handler,
routing::{get, post},
};
use dashmap::DashMap;
use log::info;
use simple_logger::SimpleLogger;
use tokio::{runtime::Handle, sync::OnceCell};
use torrential::{
DropBackendFactory, DropContextProvider, DropLibraryProvider, handlers, serve, set_token, state::AppState
};
use torrential::{handlers, serve, set_token, state::AppState};
use url::Url;
#[tokio::main]
@@ -21,21 +19,16 @@ async fn main() {
initialise_logger();
if let Ok(working_directory) = std::env::var("WORKING_DIRECTORY") {
info!("moving to working directory {}", working_directory);
set_current_dir(working_directory).expect("failed to change working directory");
}
let metrics = Handle::current().metrics();
info!("using {} threads", metrics.num_workers());
let remote_url = get_remote_url();
let shared_state = Arc::new(AppState {
token: OnceCell::new(),
context_cache: DashMap::new(),
metadata_provider: Arc::new(DropContextProvider::new(remote_url.clone())),
backend_factory: Arc::new(DropBackendFactory),
library_provider: Arc::new(DropLibraryProvider::new(remote_url)),
});
let app = setup_app(shared_state);
@@ -46,12 +39,14 @@ async fn main() {
fn setup_app(shared_state: Arc<AppState>) -> Router {
Router::new()
.route(
"/api/v1/depot/{game_id}/{version_name}/{*chunk_ids}",
"/api/v1/depot/content/{game_id}/{version_name}/{chunk_id}",
get(serve::serve_file),
)
.route("/token", post(set_token))
.route("/api/v1/depot/manifest.json", get(handlers::manifest))
.route("/api/v1/depot/speedtest", get(handlers::speedtest))
.route("/key", post(set_token))
.route("/healthcheck", get(handlers::healthcheck))
.route("/invalid", post(handlers::invalidate))
.route("/invalidate", post(handlers::invalidate))
.with_state(shared_state)
}
@@ -67,15 +62,3 @@ fn initialise_logger() {
.init()
.unwrap();
}
fn get_remote_url() -> Url {
let user_provided = env::var("DROP_SERVER_URL");
let url = Url::parse(
user_provided
.as_ref()
.map_or("http://localhost:3000", |v| v),
)
.expect("failed to parse URL");
info!("using Drop server url {url}");
url
}

View File

@@ -1,11 +0,0 @@
use std::collections::HashMap;
use serde::Deserialize;
#[derive(Deserialize)]
pub struct DropChunk {
pub ids: Vec<String>,
pub lengths: Vec<usize>,
}
pub type DropletManifest = HashMap<String, DropChunk>;

View File

@@ -1,21 +1,44 @@
use std::{env, sync::LazyLock};
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use reqwest::StatusCode;
use droplet_rs::manifest::Manifest;
use log::info;
use reqwest::{Client, ClientBuilder, StatusCode};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::{manifest::DropletManifest, util::ErrorOption};
use crate::{state::AppInitData, util::ErrorOption};
static CLIENT: LazyLock<Client> = LazyLock::new(|| {
ClientBuilder::new()
.build()
.expect("failed to build client")
});
static REMOTE_URL: LazyLock<Url> = LazyLock::new(|| {
let user_provided = env::var("DROP_SERVER_URL");
let url = Url::parse(
user_provided
.as_ref()
.map_or("http://localhost:3000", |v| v),
)
.expect("failed to parse URL");
info!("using Drop server url {}", url);
url
});
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ContextResponseBody {
pub manifest: DropletManifest,
pub library_id: String,
pub struct VersionResponseBody {
pub manifest: Manifest,
pub library: LibrarySource,
pub library_path: String,
pub version_path: String,
}
#[derive(Serialize)]
pub struct ContextQuery {
pub struct VersionQuery {
game: String,
version: String,
}
@@ -34,44 +57,68 @@ pub struct LibrarySource {
pub backend: LibraryBackend,
}
pub struct DropContextProvider {
client: reqwest::Client,
base_url: Url,
}
impl DropContextProvider {
pub fn new(url: Url) -> Self {
Self {
client: reqwest::Client::new(),
base_url: url,
}
}
}
#[async_trait]
impl ContextProvider for DropContextProvider {
async fn fetch_context(
&self,
token: String,
pub async fn fetch_version_data(
init_data: &AppInitData,
game_id: String,
version_name: String,
) -> Result<ContextResponseBody, ErrorOption> {
let context_response = self
.client
.get(self.base_url.join("/api/v1/admin/depot/context")?)
.query(&ContextQuery {
version_id: String,
) -> Result<VersionResponseBody, ErrorOption> {
let version_data_response = CLIENT
.get(REMOTE_URL.join("/api/v1/admin/depot/manifest")?)
.query(&VersionQuery {
game: game_id,
version: version_name,
version: version_id,
})
.header("Authorization", format!("Bearer {token}"))
.header("Authorization", format!("Bearer {}", init_data.key))
.send()
.await?;
if !context_response.status().is_success() {
if context_response.status() == StatusCode::BAD_REQUEST {
if !version_data_response.status().is_success() {
if version_data_response.status() == StatusCode::BAD_REQUEST {
return Err(StatusCode::NOT_FOUND.into());
}
return Err(anyhow!(
"Fetching context failed with non-success code: {}, {}",
version_data_response.status(),
version_data_response
.text()
.await
.unwrap_or("(failed to read body)".to_owned())
)
.into());
}
let version_data: VersionResponseBody = version_data_response.json().await?;
Ok(version_data)
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SkeletonVersion {
pub version_id: String,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SkeletonGame {
pub id: String,
pub versions: Vec<SkeletonVersion>,
}
pub async fn fetch_instance_games(
init_data: &AppInitData,
) -> Result<Vec<SkeletonGame>, ErrorOption> {
let context_response = CLIENT
.get(REMOTE_URL.join("/api/v1/admin/depot/versions")?)
.header("Authorization", format!("Bearer {}", init_data.key))
.send()
.await?;
if !context_response.status().is_success() {
return Err(anyhow!(
"Fetching instance games failed with non-success code: {}, {}",
context_response.status(),
context_response
.text()
@@ -81,63 +128,7 @@ impl ContextProvider for DropContextProvider {
.into());
}
let context: ContextResponseBody = context_response.json().await?;
let games: Vec<SkeletonGame> = context_response.json().await?;
Ok(context)
}
}
#[async_trait]
pub trait ContextProvider: Send + Sync {
/// Fetches the manifest for a specific game version.
async fn fetch_context(
&self,
token: String,
game_id: String,
version_name: String,
) -> Result<ContextResponseBody, ErrorOption>;
}
#[async_trait]
pub trait LibraryConfigurationProvider: Send + Sync {
async fn fetch_sources(&self, token: &String) -> anyhow::Result<Vec<LibrarySource>>;
}
pub struct DropLibraryProvider {
client: reqwest::Client,
base_url: Url,
}
impl DropLibraryProvider {
pub fn new(url: Url) -> Self {
Self {
client: reqwest::Client::new(),
base_url: url,
}
}
}
#[async_trait]
impl LibraryConfigurationProvider for DropLibraryProvider {
async fn fetch_sources(&self, token: &String) -> anyhow::Result<Vec<LibrarySource>> {
let source_response = self
.client
.get(self.base_url.join("/api/v1/admin/library/sources")?)
.header("Authorization", format!("Bearer {token}"))
.send()
.await?;
if !source_response.status().is_success() {
return Err(anyhow!(
"Fetching library sources failed with non-success code: {}, {}",
source_response.status(),
source_response
.text()
.await
.unwrap_or("(failed to read body)".to_owned())
));
}
let library_sources: Vec<LibrarySource> = source_response.json().await?;
Ok(library_sources)
}
Ok(games)
}

View File

@@ -1,58 +1,72 @@
use std::sync::Arc;
use std::{io::Error, rc::Rc, sync::Arc};
use aes::cipher::{KeyIvInit, StreamCipher};
use axum::{
body::Body,
extract::{Path, State},
http::HeaderMap,
response::{AppendHeaders, IntoResponse},
};
use bytes::Bytes;
use dashmap::{DashMap, mapref::one::RefMut};
use droplet_rs::versions::types::{MinimumFileObject, VersionFile};
use droplet_rs::{
manifest::ChunkData,
versions::types::{MinimumFileObject, VersionFile},
};
use futures_util::{StreamExt, stream};
use log::{error, info};
use reqwest::{StatusCode, header};
use tokio::sync::SemaphorePermit;
use tokio_util::io::ReaderStream;
use futures_util::{StreamExt as _, stream};
use crate::{
DownloadContext, GLOBAL_CONTEXT_SEMAPHORE, download::create_download_context, state::AppState,
};
type Aes128Ctr64LE = ctr::Ctr64LE<aes::Aes128>;
pub async fn serve_file(
State(state): State<Arc<AppState>>,
Path((game_id, version_name, chunk_ids)): Path<(String, String, String)>,
Path((game_id, version_name, chunk_id)): Path<(String, String, String)>,
) -> Result<impl IntoResponse, StatusCode> {
let context_cache = &state.context_cache;
let mut context = get_or_generate_context(&state, context_cache, game_id, version_name).await?;
let mut context = get_or_create_context(&state, context_cache, game_id, version_name).await?;
context.reset_last_access();
let chunk_ids = chunk_ids.split("/").collect::<Vec<&str>>();
let mut streams = Vec::with_capacity(chunk_ids.len());
let mut content_lengths = Vec::with_capacity(chunk_ids.len());
let mut total_size = 0;
for chunk_id in chunk_ids {
let (relative_filename, start, end) = lookup_chunk(chunk_id, &context)?;
let reader = get_file_reader(&mut context, relative_filename, start, end).await?;
let chunk_data = lookup_chunk(&chunk_id, &context)?;
let mut streams = Vec::with_capacity(chunk_data.files.len());
let mut content_length = 0;
for file_entry in &chunk_data.files {
let reader = get_file_reader(
&mut context,
file_entry.filename.clone(),
file_entry.start,
file_entry.start + file_entry.length,
)
.await?;
let stream = ReaderStream::new(reader);
streams.push(stream);
content_lengths.push((end - start).to_string());
total_size += end - start;
content_length += file_entry.length;
}
let stream = stream::iter(streams).flatten();
let body: Body = Body::from_stream(stream);
let mut cipher = Aes128Ctr64LE::new(&context.manifest.key.into(), &chunk_data.iv.into());
let encrypted_stream = stream.chunks(3).map(move |raw| -> Result<Bytes, Error> {
let data: Result<Vec<Bytes>, Error> = raw.into_iter().collect();
let mut data = data?.concat();
cipher.apply_keystream(&mut data);
Ok(data.into())
});
let body: Body = Body::from_stream(encrypted_stream);
let mut headers = HeaderMap::new();
headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
headers.insert("Content-Length", total_size.to_string().parse().unwrap());
headers.insert(
"Content-Lengths",
content_lengths.join(",").parse().unwrap(),
);
headers.insert("Content-Length", content_length.into());
Ok((headers, body))
}
@@ -62,12 +76,16 @@ async fn acquire_permit<'a>() -> SemaphorePermit<'a> {
.await
.expect("failed to acquire semaphore");
}
/**
* Needs to be cloned for reference reasons
*/
fn lookup_chunk(
chunk_id: &str,
context: &RefMut<'_, (String, String), DownloadContext>,
) -> Result<(String, usize, usize), StatusCode> {
) -> Result<ChunkData, StatusCode> {
context
.chunk_lookup_table
.manifest
.chunks
.get(chunk_id)
.cloned()
.ok_or(StatusCode::NOT_FOUND)
@@ -91,11 +109,11 @@ async fn get_file_reader(
)
.await
.map_err(|v| {
error!("reader error: {v:?}");
error!("reader error for '{}': {v:?}", relative_filename);
StatusCode::INTERNAL_SERVER_ERROR
})
}
async fn get_or_generate_context<'a>(
async fn get_or_create_context<'a>(
state: &Arc<AppState>,
context_cache: &'a DashMap<(String, String), DownloadContext>,
game_id: String,
@@ -114,13 +132,8 @@ async fn get_or_generate_context<'a>(
Ok(already_done)
} else {
info!("generating context for {}...", game_id);
let context_result = create_download_context(
&*state.metadata_provider,
&*state.backend_factory,
initialisation_data,
game_id.clone(),
version_name.clone(),
)
let context_result =
create_download_context(initialisation_data, game_id.clone(), version_name.clone())
.await?;
state.context_cache.insert(key.clone(), context_result);

View File

@@ -1,38 +1,18 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use std::{collections::HashMap, path::PathBuf};
use dashmap::DashMap;
use tokio::sync::OnceCell;
use crate::{
BackendFactory, DownloadContext, LibraryConfigurationProvider, ContextProvider,
remote::LibraryBackend,
};
use crate::
DownloadContext
;
pub struct AppState {
pub token: OnceCell<AppInitData>,
pub context_cache: DashMap<(String, String), DownloadContext>,
pub metadata_provider: Arc<dyn ContextProvider>,
pub backend_factory: Arc<dyn BackendFactory>,
pub library_provider: Arc<dyn LibraryConfigurationProvider>,
}
#[derive(Debug)]
pub struct AppInitData {
token: String,
libraries: HashMap<String, (PathBuf, LibraryBackend)>,
}
impl AppInitData {
pub fn new(token: String, libraries: HashMap<String, (PathBuf, LibraryBackend)>) -> Self {
Self { token, libraries }
}
pub fn token(&self) -> String {
self.token.clone()
}
pub fn set_token(&mut self, token: String) {
self.token = token
}
pub fn libraries(&self) -> &HashMap<String, (PathBuf, LibraryBackend)> {
&self.libraries
}
pub key: String,
}

View File

@@ -1,16 +1,15 @@
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc};
use std::sync::Arc;
use axum::{Json, extract::State};
use log::{error, info};
use reqwest::StatusCode;
use serde::Deserialize;
use crate::remote::{self, LibraryBackend, LibrarySource};
use crate::state::{AppInitData, AppState};
#[derive(Deserialize)]
pub struct TokenPayload {
token: String,
key: String,
}
pub async fn set_token(
@@ -21,20 +20,9 @@ pub async fn set_token(
return Ok(StatusCode::OK);
}
let token = payload.token;
let key = payload.key;
let library_sources = state
.library_provider
.fetch_sources(&token)
.await
.map_err(|v| {
error!("{v:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let valid_library_sources = filter_library_sources(library_sources);
set_generated_token(&state, token, valid_library_sources)?;
set_depot_key(&state, key)?;
info!("connected to drop server successfully");
@@ -44,48 +32,21 @@ pub async fn set_token(
fn check_token_exists(state: &Arc<AppState>, payload: &TokenPayload) -> bool {
if let Some(existing_data) = state.token.get() {
assert!(
*existing_data.token() == payload.token,
*existing_data.key == payload.key,
"already set up but provided with a different token"
);
return true;
}
false
}
fn filter_library_sources(
library_sources: Vec<LibrarySource>,
) -> HashMap<String, (PathBuf, LibraryBackend)> {
library_sources
.into_iter()
.filter(|v| {
matches!(
v.backend,
remote::LibraryBackend::Filesystem | remote::LibraryBackend::FlatFilesystem
)
})
.map(|v| {
let path = PathBuf::from_str(
v.options
.as_object()
.unwrap()
.get("baseDir")
.unwrap()
.as_str()
.unwrap(),
)
.unwrap();
(v.id, (path, v.backend))
})
.collect()
}
fn set_generated_token(
fn set_depot_key(
state: &Arc<AppState>,
token: String,
libraries: HashMap<String, (PathBuf, LibraryBackend)>,
key: String
) -> Result<(), StatusCode> {
state
.token
.set(AppInitData::new(token, libraries))
.set(AppInitData { key })
.map_err(|err| {
error!("failed to set token: {err:?}");
StatusCode::INTERNAL_SERVER_ERROR