mirror of
https://github.com/tauri-apps/tauri-plugin-http.git
synced 2026-01-31 00:45:17 +01:00
fix(http): return response early in JS before waiting for chunks on the rust side (#2522)
Committed via a GitHub action: https://github.com/tauri-apps/plugins-workspace/actions/runs/13867339185 Co-authored-by: amrbashir <amrbashir@users.noreply.github.com>
This commit is contained in:
committed by
tauri-bot
parent
2d7873f257
commit
5bb0ef3112
@@ -1 +1 @@
|
||||
if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,a;"function"==typeof SuppressedError&&SuppressedError;const i="__TAURI_TO_IPC_KEY__";class o{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),a.set(this,[]),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:i})=>{if(i==t(this,s,"f"))for(t(this,n,"f").call(this,e),r(this,s,t(this,s,"f")+1);t(this,s,"f")in t(this,a,"f");){const e=t(this,a,"f")[t(this,s,"f")];t(this,n,"f").call(this,e),delete t(this,a,"f")[t(this,s,"f")],r(this,s,t(this,s,"f")+1)}else t(this,a,"f")[i]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,a=new WeakMap,i)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[i]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request canceled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,a=t?.proxy,i=t?.danger;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy,delete t.danger);const f=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,h=new Request(e,t),_=await h.arrayBuffer(),u=0!==_.byteLength?Array.from(new Uint8Array(_)):null;for(const[e,t]of h.headers)f.get(e)||f.set(e,t);const l=(f instanceof Headers?Array.from(f.entries()):Array.isArray(f)?f:Object.entries(f)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const w=new o,p=new ReadableStream({start:e=>{w.onmessage=t=>{if(r?.aborted)return e.error(d),void e.close();(t instanceof ArrayBuffer?0!=t.byteLength:0!=t.length)?e.enqueue(new Uint8Array(t)):e.close()}}}),m=await c("plugin:http|fetch",{clientConfig:{method:h.method,url:h.url,headers:l,data:u,maxRedirections:n,connectTimeout:s,proxy:a,danger:i},streamChannel:w}),y=()=>c("plugin:http|fetch_cancel",{rid:m});if(r?.aborted)throw y(),new Error(d);r?.addEventListener("abort",(()=>{y()}));const{status:T,statusText:g,url:A,headers:R}=await c("plugin:http|fetch_send",{rid:m}),b=new Response(p,{status:T,statusText:g});return Object.defineProperty(b,"url",{value:A}),Object.defineProperty(b,"headers",{value:new Headers(R)}),b},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})}
|
||||
if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,a;"function"==typeof SuppressedError&&SuppressedError;const i="__TAURI_TO_IPC_KEY__";class o{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),a.set(this,[]),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:i})=>{if(i==t(this,s,"f"))for(t(this,n,"f").call(this,e),r(this,s,t(this,s,"f")+1);t(this,s,"f")in t(this,a,"f");){const e=t(this,a,"f")[t(this,s,"f")];t(this,n,"f").call(this,e),delete t(this,a,"f")[t(this,s,"f")],r(this,s,t(this,s,"f")+1)}else t(this,a,"f")[i]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,a=new WeakMap,i)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[i]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request cancelled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,a=t?.proxy,i=t?.danger;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy,delete t.danger);const h=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,f=new Request(e,t),_=await f.arrayBuffer(),u=0!==_.byteLength?Array.from(new Uint8Array(_)):null;for(const[e,t]of f.headers)h.get(e)||h.set(e,t);const l=(h instanceof Headers?Array.from(h.entries()):Array.isArray(h)?h:Object.entries(h)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const w=await c("plugin:http|fetch",{clientConfig:{method:f.method,url:f.url,headers:l,data:u,maxRedirections:n,connectTimeout:s,proxy:a,danger:i}}),p=()=>c("plugin:http|fetch_cancel",{rid:w});if(r?.aborted)throw p(),new Error(d);r?.addEventListener("abort",(()=>{p()}));const{status:y,statusText:m,url:T,headers:g,rid:A}=await c("plugin:http|fetch_send",{rid:w}),R=new ReadableStream({start:e=>{const t=new o;t.onmessage=t=>{r?.aborted?e.error(d):(t instanceof ArrayBuffer?0!=t.byteLength:0!=t.length)?e.enqueue(new Uint8Array(t)):e.close()},c("plugin:http|fetch_read_body",{rid:A,streamChannel:t}).catch((t=>{e.error(t)}))}}),b=new Response(R,{status:y,statusText:m});return Object.defineProperty(b,"url",{value:T}),Object.defineProperty(b,"headers",{value:new Headers(g)}),b},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})}
|
||||
|
||||
@@ -28,7 +28,7 @@ var core = require('@tauri-apps/api/core');
|
||||
*
|
||||
* @module
|
||||
*/
|
||||
const ERROR_REQUEST_CANCELLED = 'Request canceled';
|
||||
const ERROR_REQUEST_CANCELLED = 'Request cancelled';
|
||||
/**
|
||||
* Fetch a resource from the network. It returns a `Promise` that resolves to the
|
||||
* `Response` to that `Request`, whether it is successful or not.
|
||||
@@ -91,14 +91,37 @@ async function fetch(input, init) {
|
||||
if (signal?.aborted) {
|
||||
throw new Error(ERROR_REQUEST_CANCELLED);
|
||||
}
|
||||
const streamChannel = new core.Channel();
|
||||
const rid = await core.invoke('plugin:http|fetch', {
|
||||
clientConfig: {
|
||||
method: req.method,
|
||||
url: req.url,
|
||||
headers: mappedHeaders,
|
||||
data,
|
||||
maxRedirections,
|
||||
connectTimeout,
|
||||
proxy,
|
||||
danger
|
||||
}
|
||||
});
|
||||
const abort = () => core.invoke('plugin:http|fetch_cancel', { rid });
|
||||
// abort early here if needed
|
||||
if (signal?.aborted) {
|
||||
// we don't care about the result of this proimse
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
abort();
|
||||
throw new Error(ERROR_REQUEST_CANCELLED);
|
||||
}
|
||||
signal?.addEventListener('abort', () => void abort());
|
||||
const { status, statusText, url, headers: responseHeaders, rid: responseRid } = await core.invoke('plugin:http|fetch_send', {
|
||||
rid
|
||||
});
|
||||
const readableStreamBody = new ReadableStream({
|
||||
start: (controller) => {
|
||||
const streamChannel = new core.Channel();
|
||||
streamChannel.onmessage = (res) => {
|
||||
// close early if aborted
|
||||
if (signal?.aborted) {
|
||||
controller.error(ERROR_REQUEST_CANCELLED);
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
// close when the signal to close (an empty chunk)
|
||||
@@ -112,33 +135,15 @@ async function fetch(input, init) {
|
||||
// have untraceable error that's hard to debug.
|
||||
controller.enqueue(new Uint8Array(res));
|
||||
};
|
||||
// run a non-blocking body stream fetch
|
||||
core.invoke('plugin:http|fetch_read_body', {
|
||||
rid: responseRid,
|
||||
streamChannel
|
||||
}).catch((e) => {
|
||||
controller.error(e);
|
||||
});
|
||||
}
|
||||
});
|
||||
const rid = await core.invoke('plugin:http|fetch', {
|
||||
clientConfig: {
|
||||
method: req.method,
|
||||
url: req.url,
|
||||
headers: mappedHeaders,
|
||||
data,
|
||||
maxRedirections,
|
||||
connectTimeout,
|
||||
proxy,
|
||||
danger
|
||||
},
|
||||
streamChannel
|
||||
});
|
||||
const abort = () => core.invoke('plugin:http|fetch_cancel', { rid });
|
||||
// abort early here if needed
|
||||
if (signal?.aborted) {
|
||||
// we don't care about the result of this proimse
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
abort();
|
||||
throw new Error(ERROR_REQUEST_CANCELLED);
|
||||
}
|
||||
signal?.addEventListener('abort', () => void abort());
|
||||
const { status, statusText, url, headers: responseHeaders } = await core.invoke('plugin:http|fetch_send', {
|
||||
rid
|
||||
});
|
||||
const res = new Response(readableStreamBody, {
|
||||
status,
|
||||
statusText
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Channel, invoke } from '@tauri-apps/api/core';
|
||||
import { invoke, Channel } from '@tauri-apps/api/core';
|
||||
|
||||
// Copyright 2019-2023 Tauri Programme within The Commons Conservancy
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
@@ -26,7 +26,7 @@ import { Channel, invoke } from '@tauri-apps/api/core';
|
||||
*
|
||||
* @module
|
||||
*/
|
||||
const ERROR_REQUEST_CANCELLED = 'Request canceled';
|
||||
const ERROR_REQUEST_CANCELLED = 'Request cancelled';
|
||||
/**
|
||||
* Fetch a resource from the network. It returns a `Promise` that resolves to the
|
||||
* `Response` to that `Request`, whether it is successful or not.
|
||||
@@ -89,14 +89,37 @@ async function fetch(input, init) {
|
||||
if (signal?.aborted) {
|
||||
throw new Error(ERROR_REQUEST_CANCELLED);
|
||||
}
|
||||
const streamChannel = new Channel();
|
||||
const rid = await invoke('plugin:http|fetch', {
|
||||
clientConfig: {
|
||||
method: req.method,
|
||||
url: req.url,
|
||||
headers: mappedHeaders,
|
||||
data,
|
||||
maxRedirections,
|
||||
connectTimeout,
|
||||
proxy,
|
||||
danger
|
||||
}
|
||||
});
|
||||
const abort = () => invoke('plugin:http|fetch_cancel', { rid });
|
||||
// abort early here if needed
|
||||
if (signal?.aborted) {
|
||||
// we don't care about the result of this proimse
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
abort();
|
||||
throw new Error(ERROR_REQUEST_CANCELLED);
|
||||
}
|
||||
signal?.addEventListener('abort', () => void abort());
|
||||
const { status, statusText, url, headers: responseHeaders, rid: responseRid } = await invoke('plugin:http|fetch_send', {
|
||||
rid
|
||||
});
|
||||
const readableStreamBody = new ReadableStream({
|
||||
start: (controller) => {
|
||||
const streamChannel = new Channel();
|
||||
streamChannel.onmessage = (res) => {
|
||||
// close early if aborted
|
||||
if (signal?.aborted) {
|
||||
controller.error(ERROR_REQUEST_CANCELLED);
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
// close when the signal to close (an empty chunk)
|
||||
@@ -110,33 +133,15 @@ async function fetch(input, init) {
|
||||
// have untraceable error that's hard to debug.
|
||||
controller.enqueue(new Uint8Array(res));
|
||||
};
|
||||
// run a non-blocking body stream fetch
|
||||
invoke('plugin:http|fetch_read_body', {
|
||||
rid: responseRid,
|
||||
streamChannel
|
||||
}).catch((e) => {
|
||||
controller.error(e);
|
||||
});
|
||||
}
|
||||
});
|
||||
const rid = await invoke('plugin:http|fetch', {
|
||||
clientConfig: {
|
||||
method: req.method,
|
||||
url: req.url,
|
||||
headers: mappedHeaders,
|
||||
data,
|
||||
maxRedirections,
|
||||
connectTimeout,
|
||||
proxy,
|
||||
danger
|
||||
},
|
||||
streamChannel
|
||||
});
|
||||
const abort = () => invoke('plugin:http|fetch_cancel', { rid });
|
||||
// abort early here if needed
|
||||
if (signal?.aborted) {
|
||||
// we don't care about the result of this proimse
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
abort();
|
||||
throw new Error(ERROR_REQUEST_CANCELLED);
|
||||
}
|
||||
signal?.addEventListener('abort', () => void abort());
|
||||
const { status, statusText, url, headers: responseHeaders } = await invoke('plugin:http|fetch_send', {
|
||||
rid
|
||||
});
|
||||
const res = new Response(readableStreamBody, {
|
||||
status,
|
||||
statusText
|
||||
|
||||
@@ -106,7 +106,7 @@ export interface DangerousSettings {
|
||||
acceptInvalidHostnames?: boolean
|
||||
}
|
||||
|
||||
const ERROR_REQUEST_CANCELLED = 'Request canceled'
|
||||
const ERROR_REQUEST_CANCELLED = 'Request cancelled'
|
||||
|
||||
/**
|
||||
* Fetch a resource from the network. It returns a `Promise` that resolves to the
|
||||
@@ -186,35 +186,6 @@ export async function fetch(
|
||||
throw new Error(ERROR_REQUEST_CANCELLED)
|
||||
}
|
||||
|
||||
const streamChannel = new Channel<ArrayBuffer | number[]>()
|
||||
|
||||
const readableStreamBody = new ReadableStream({
|
||||
start: (controller) => {
|
||||
streamChannel.onmessage = (res: ArrayBuffer | number[]) => {
|
||||
// close early if aborted
|
||||
if (signal?.aborted) {
|
||||
controller.error(ERROR_REQUEST_CANCELLED)
|
||||
controller.close()
|
||||
return
|
||||
}
|
||||
|
||||
// close when the signal to close (an empty chunk)
|
||||
// is sent from the IPC.
|
||||
if (
|
||||
res instanceof ArrayBuffer ? res.byteLength == 0 : res.length == 0
|
||||
) {
|
||||
controller.close()
|
||||
return
|
||||
}
|
||||
|
||||
// the content conversion (like .text(), .json(), etc.) in Response
|
||||
// must have Uint8Array as its content, else it will
|
||||
// have untraceable error that's hard to debug.
|
||||
controller.enqueue(new Uint8Array(res))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const rid = await invoke<number>('plugin:http|fetch', {
|
||||
clientConfig: {
|
||||
method: req.method,
|
||||
@@ -225,8 +196,7 @@ export async function fetch(
|
||||
connectTimeout,
|
||||
proxy,
|
||||
danger
|
||||
},
|
||||
streamChannel
|
||||
}
|
||||
})
|
||||
|
||||
const abort = () => invoke('plugin:http|fetch_cancel', { rid })
|
||||
@@ -253,11 +223,47 @@ export async function fetch(
|
||||
status,
|
||||
statusText,
|
||||
url,
|
||||
headers: responseHeaders
|
||||
headers: responseHeaders,
|
||||
rid: responseRid
|
||||
} = await invoke<FetchSendResponse>('plugin:http|fetch_send', {
|
||||
rid
|
||||
})
|
||||
|
||||
const readableStreamBody = new ReadableStream({
|
||||
start: (controller) => {
|
||||
const streamChannel = new Channel<ArrayBuffer | number[]>()
|
||||
streamChannel.onmessage = (res: ArrayBuffer | number[]) => {
|
||||
// close early if aborted
|
||||
if (signal?.aborted) {
|
||||
controller.error(ERROR_REQUEST_CANCELLED)
|
||||
return
|
||||
}
|
||||
|
||||
// close when the signal to close (an empty chunk)
|
||||
// is sent from the IPC.
|
||||
if (
|
||||
res instanceof ArrayBuffer ? res.byteLength == 0 : res.length == 0
|
||||
) {
|
||||
controller.close()
|
||||
return
|
||||
}
|
||||
|
||||
// the content conversion (like .text(), .json(), etc.) in Response
|
||||
// must have Uint8Array as its content, else it will
|
||||
// have untraceable error that's hard to debug.
|
||||
controller.enqueue(new Uint8Array(res))
|
||||
}
|
||||
|
||||
// run a non-blocking body stream fetch
|
||||
invoke('plugin:http|fetch_read_body', {
|
||||
rid: responseRid,
|
||||
streamChannel
|
||||
}).catch((e) => {
|
||||
controller.error(e)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
const res = new Response(readableStreamBody, {
|
||||
status,
|
||||
statusText
|
||||
|
||||
@@ -22,6 +22,9 @@ use crate::{
|
||||
|
||||
const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
|
||||
|
||||
struct ReqwestResponse(reqwest::Response);
|
||||
impl tauri::Resource for ReqwestResponse {}
|
||||
|
||||
type CancelableResponseResult = Result<reqwest::Response>;
|
||||
type CancelableResponseFuture =
|
||||
Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>;
|
||||
@@ -178,7 +181,6 @@ pub async fn fetch<R: Runtime>(
|
||||
client_config: ClientConfig,
|
||||
command_scope: CommandScope<Entry>,
|
||||
global_scope: GlobalScope<Entry>,
|
||||
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
|
||||
) -> crate::Result<ResourceId> {
|
||||
let ClientConfig {
|
||||
method,
|
||||
@@ -312,20 +314,7 @@ pub async fn fetch<R: Runtime>(
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::trace!("{:?}", request);
|
||||
|
||||
let fut = async move {
|
||||
let mut res = request.send().await?;
|
||||
|
||||
// send response through IPC channel
|
||||
while let Some(chunk) = res.chunk().await? {
|
||||
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?;
|
||||
}
|
||||
|
||||
// send empty vector when done
|
||||
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?;
|
||||
|
||||
// return that response
|
||||
Ok(res)
|
||||
};
|
||||
let fut = async move { request.send().await.map_err(Into::into) };
|
||||
|
||||
let mut resources_table = webview.resources_table();
|
||||
let rid = resources_table.add_request(Box::pin(fut));
|
||||
@@ -370,7 +359,7 @@ pub fn fetch_cancel<R: Runtime>(webview: Webview<R>, rid: ResourceId) -> crate::
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
#[command]
|
||||
pub async fn fetch_send<R: Runtime>(
|
||||
webview: Webview<R>,
|
||||
rid: ResourceId,
|
||||
@@ -410,6 +399,9 @@ pub async fn fetch_send<R: Runtime>(
|
||||
));
|
||||
}
|
||||
|
||||
let mut resources_table = webview.resources_table();
|
||||
let rid = resources_table.add(ReqwestResponse(res));
|
||||
|
||||
Ok(FetchResponse {
|
||||
status: status.as_u16(),
|
||||
status_text: status.canonical_reason().unwrap_or_default().to_string(),
|
||||
@@ -419,6 +411,30 @@ pub async fn fetch_send<R: Runtime>(
|
||||
})
|
||||
}
|
||||
|
||||
#[command]
|
||||
pub async fn fetch_read_body<R: Runtime>(
|
||||
webview: Webview<R>,
|
||||
rid: ResourceId,
|
||||
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
|
||||
) -> crate::Result<()> {
|
||||
let res = {
|
||||
let mut resources_table = webview.resources_table();
|
||||
resources_table.take::<ReqwestResponse>(rid)?
|
||||
};
|
||||
|
||||
let mut res = Arc::into_inner(res).unwrap().0;
|
||||
|
||||
// send response through IPC channel
|
||||
while let Some(chunk) = res.chunk().await? {
|
||||
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?;
|
||||
}
|
||||
|
||||
// send empty vector when done
|
||||
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// forbidden headers per fetch spec https://fetch.spec.whatwg.org/#terminology-headers
|
||||
#[cfg(not(feature = "unsafe-headers"))]
|
||||
fn is_unsafe_header(header: &HeaderName) -> bool {
|
||||
|
||||
@@ -36,7 +36,8 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
|
||||
.invoke_handler(tauri::generate_handler![
|
||||
commands::fetch,
|
||||
commands::fetch_cancel,
|
||||
commands::fetch_send
|
||||
commands::fetch_send,
|
||||
commands::fetch_read_body
|
||||
])
|
||||
.build()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user