fix(http): fix aborting a streaming response (#2562)

Committed via a GitHub action: https://github.com/tauri-apps/plugins-workspace/actions/runs/19635045772

Co-authored-by: FabianLars <FabianLars@users.noreply.github.com>
This commit is contained in:
Amr Bashir
2025-11-24 12:56:14 +00:00
committed by tauri-bot
parent 4e10deeb46
commit 5d2ac0fee2
11 changed files with 218 additions and 110 deletions

View File

@@ -1 +1 @@
if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("function"==typeof t?e!==t||!n:!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,i,a;"function"==typeof SuppressedError&&SuppressedError;const o="__TAURI_TO_IPC_KEY__";class c{constructor(e){n.set(this,void 0),s.set(this,0),i.set(this,[]),a.set(this,void 0),r(this,n,e||(()=>{})),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((e=>{const o=e.index;if("end"in e)return void(o==t(this,s,"f")?this.cleanupCallback():r(this,a,o));const c=e.message;if(o==t(this,s,"f")){for(t(this,n,"f").call(this,c),r(this,s,t(this,s,"f")+1);t(this,s,"f")in t(this,i,"f");){const e=t(this,i,"f")[t(this,s,"f")];t(this,n,"f").call(this,e),delete t(this,i,"f")[t(this,s,"f")],r(this,s,t(this,s,"f")+1)}t(this,s,"f")===t(this,a,"f")&&this.cleanupCallback()}else t(this,i,"f")[o]=c}))}cleanupCallback(){window.__TAURI_INTERNALS__.unregisterCallback(this.id)}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,i=new WeakMap,a=new WeakMap,o)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[o]()}}async function d(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const h="Request cancelled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(h);const n=t?.maxRedirections,s=t?.connectTimeout,i=t?.proxy,a=t?.danger;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy,delete t.danger);const o=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,f=new Request(e,t),l=await f.arrayBuffer(),u=0!==l.byteLength?Array.from(new Uint8Array(l)):null;for(const[e,t]of f.headers)o.get(e)||o.set(e,t);const _=(o instanceof Headers?Array.from(o.entries()):Array.isArray(o)?o:Object.entries(o)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(h);const w=await d("plugin:http|fetch",{clientConfig:{method:f.method,url:f.url,headers:_,data:u,maxRedirections:n,connectTimeout:s,proxy:i,danger:a}}),p=()=>d("plugin:http|fetch_cancel",{rid:w});if(r?.aborted)throw p(),new Error(h);r?.addEventListener("abort",(()=>{p()}));const{status:y,statusText:m,url:b,headers:T,rid:g}=await d("plugin:http|fetch_send",{rid:w}),A=[101,103,204,205,304].includes(y)?null:new ReadableStream({start:e=>{const t=new c;t.onmessage=t=>{if(r?.aborted)return void e.error(h);const n=new Uint8Array(t),s=n[n.byteLength-1],i=n.slice(0,n.byteLength-1);1!=s?e.enqueue(i):e.close()},d("plugin:http|fetch_read_body",{rid:g,streamChannel:t}).catch((t=>{e.error(t)}))}}),R=new Response(A,{status:y,statusText:m});return Object.defineProperty(R,"url",{value:b}),Object.defineProperty(R,"headers",{value:new Headers(T)}),R},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})}
if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";async function t(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}"function"==typeof SuppressedError&&SuppressedError;const r="Request cancelled";return e.fetch=async function(e,n){const a=n?.signal;if(a?.aborted)throw new Error(r);const o=n?.maxRedirections,s=n?.connectTimeout,i=n?.proxy,d=n?.danger;n&&(delete n.maxRedirections,delete n.connectTimeout,delete n.proxy,delete n.danger);const c=n?.headers?n.headers instanceof Headers?n.headers:new Headers(n.headers):new Headers,u=new Request(e,n),l=await u.arrayBuffer(),_=0!==l.byteLength?Array.from(new Uint8Array(l)):null;for(const[e,t]of u.headers)c.get(e)||c.set(e,t);const h=(c instanceof Headers?Array.from(c.entries()):Array.isArray(c)?c:Object.entries(c)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(a?.aborted)throw new Error(r);const f=await t("plugin:http|fetch",{clientConfig:{method:u.method,url:u.url,headers:h,data:_,maxRedirections:o,connectTimeout:s,proxy:i,danger:d}}),p=()=>t("plugin:http|fetch_cancel",{rid:f});if(a?.aborted)throw p(),new Error(r);a?.addEventListener("abort",(()=>{p()}));const{status:w,statusText:y,url:g,headers:b,rid:T}=await t("plugin:http|fetch_send",{rid:f}),R=()=>t("plugin:http|fetch_cancel_body",{rid:T}),m=[101,103,204,205,304].includes(w)?null:new ReadableStream({start:e=>{a?.addEventListener("abort",(()=>{e.error(r),R()}))},pull:e=>(async e=>{let r;try{r=await t("plugin:http|fetch_read_body",{rid:T})}catch(t){return e.error(t),void R()}const n=new Uint8Array(r),a=n[n.byteLength-1],o=n.slice(0,n.byteLength-1);1!==a?e.enqueue(o):e.close()})(e)}),A=new Response(m,{status:w,statusText:y});return Object.defineProperty(A,"url",{value:g}),Object.defineProperty(A,"headers",{value:new Headers(b)}),A},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})}

View File

@@ -6,7 +6,13 @@
#[allow(dead_code)]
mod scope;
const COMMANDS: &[&str] = &["fetch", "fetch_cancel", "fetch_send", "fetch_read_body"];
const COMMANDS: &[&str] = &[
"fetch",
"fetch_cancel",
"fetch_send",
"fetch_read_body",
"fetch_cancel_body",
];
/// HTTP scope entry.
#[derive(schemars::JsonSchema)]

View File

@@ -44,7 +44,7 @@ const ERROR_REQUEST_CANCELLED = 'Request cancelled';
* @since 2.0.0
*/
async function fetch(input, init) {
// abort early here if needed
// Optimistically check for abort signal and avoid doing any work
const signal = init?.signal;
if (signal?.aborted) {
throw new Error(ERROR_REQUEST_CANCELLED);
@@ -87,7 +87,7 @@ async function fetch(input, init) {
// eslint-disable-next-line
typeof val === 'string' ? val : val.toString()
]);
// abort early here if needed
// Optimistically check for abort signal and avoid doing any work on the Rust side
if (signal?.aborted) {
throw new Error(ERROR_REQUEST_CANCELLED);
}
@@ -104,7 +104,8 @@ async function fetch(input, init) {
}
});
const abort = () => core.invoke('plugin:http|fetch_cancel', { rid });
// abort early here if needed
// Optimistically check for abort signal
// and avoid doing any work after doing intial work on the Rust side
if (signal?.aborted) {
// we don't care about the result of this proimse
// eslint-disable-next-line @typescript-eslint/no-floating-promises
@@ -115,37 +116,46 @@ async function fetch(input, init) {
const { status, statusText, url, headers: responseHeaders, rid: responseRid } = await core.invoke('plugin:http|fetch_send', {
rid
});
const dropBody = () => {
return core.invoke('plugin:http|fetch_cancel_body', { rid: responseRid });
};
const readChunk = async (controller) => {
let data;
try {
data = await core.invoke('plugin:http|fetch_read_body', {
rid: responseRid
});
}
catch (e) {
// close the stream if an error occurs
// and drop the body on Rust side
controller.error(e);
void dropBody();
return;
}
const dataUint8 = new Uint8Array(data);
const lastByte = dataUint8[dataUint8.byteLength - 1];
const actualData = dataUint8.slice(0, dataUint8.byteLength - 1);
// close when the signal to close (last byte is 1) is sent from the IPC.
if (lastByte === 1) {
controller.close();
return;
}
controller.enqueue(actualData);
};
// no body for 101, 103, 204, 205 and 304
// see https://fetch.spec.whatwg.org/#null-body-status
const body = [101, 103, 204, 205, 304].includes(status)
? null
: new ReadableStream({
start: (controller) => {
const streamChannel = new core.Channel();
streamChannel.onmessage = (res) => {
// close early if aborted
if (signal?.aborted) {
controller.error(ERROR_REQUEST_CANCELLED);
return;
}
const resUint8 = new Uint8Array(res);
const lastByte = resUint8[resUint8.byteLength - 1];
const actualRes = resUint8.slice(0, resUint8.byteLength - 1);
// close when the signal to close (last byte is 1) is sent from the IPC.
if (lastByte == 1) {
controller.close();
return;
}
controller.enqueue(actualRes);
};
// run a non-blocking body stream fetch
core.invoke('plugin:http|fetch_read_body', {
rid: responseRid,
streamChannel
}).catch((e) => {
controller.error(e);
// listen for abort events to cancel reading
signal?.addEventListener('abort', () => {
controller.error(ERROR_REQUEST_CANCELLED);
void dropBody();
});
}
},
pull: (controller) => readChunk(controller)
});
const res = new Response(body, {
status,

View File

@@ -1,4 +1,4 @@
import { invoke, Channel } from '@tauri-apps/api/core';
import { invoke } from '@tauri-apps/api/core';
// Copyright 2019-2023 Tauri Programme within The Commons Conservancy
// SPDX-License-Identifier: Apache-2.0
@@ -42,7 +42,7 @@ const ERROR_REQUEST_CANCELLED = 'Request cancelled';
* @since 2.0.0
*/
async function fetch(input, init) {
// abort early here if needed
// Optimistically check for abort signal and avoid doing any work
const signal = init?.signal;
if (signal?.aborted) {
throw new Error(ERROR_REQUEST_CANCELLED);
@@ -85,7 +85,7 @@ async function fetch(input, init) {
// eslint-disable-next-line
typeof val === 'string' ? val : val.toString()
]);
// abort early here if needed
// Optimistically check for abort signal and avoid doing any work on the Rust side
if (signal?.aborted) {
throw new Error(ERROR_REQUEST_CANCELLED);
}
@@ -102,7 +102,8 @@ async function fetch(input, init) {
}
});
const abort = () => invoke('plugin:http|fetch_cancel', { rid });
// abort early here if needed
// Optimistically check for abort signal
// and avoid doing any work after doing intial work on the Rust side
if (signal?.aborted) {
// we don't care about the result of this proimse
// eslint-disable-next-line @typescript-eslint/no-floating-promises
@@ -113,37 +114,46 @@ async function fetch(input, init) {
const { status, statusText, url, headers: responseHeaders, rid: responseRid } = await invoke('plugin:http|fetch_send', {
rid
});
const dropBody = () => {
return invoke('plugin:http|fetch_cancel_body', { rid: responseRid });
};
const readChunk = async (controller) => {
let data;
try {
data = await invoke('plugin:http|fetch_read_body', {
rid: responseRid
});
}
catch (e) {
// close the stream if an error occurs
// and drop the body on Rust side
controller.error(e);
void dropBody();
return;
}
const dataUint8 = new Uint8Array(data);
const lastByte = dataUint8[dataUint8.byteLength - 1];
const actualData = dataUint8.slice(0, dataUint8.byteLength - 1);
// close when the signal to close (last byte is 1) is sent from the IPC.
if (lastByte === 1) {
controller.close();
return;
}
controller.enqueue(actualData);
};
// no body for 101, 103, 204, 205 and 304
// see https://fetch.spec.whatwg.org/#null-body-status
const body = [101, 103, 204, 205, 304].includes(status)
? null
: new ReadableStream({
start: (controller) => {
const streamChannel = new Channel();
streamChannel.onmessage = (res) => {
// close early if aborted
if (signal?.aborted) {
controller.error(ERROR_REQUEST_CANCELLED);
return;
}
const resUint8 = new Uint8Array(res);
const lastByte = resUint8[resUint8.byteLength - 1];
const actualRes = resUint8.slice(0, resUint8.byteLength - 1);
// close when the signal to close (last byte is 1) is sent from the IPC.
if (lastByte == 1) {
controller.close();
return;
}
controller.enqueue(actualRes);
};
// run a non-blocking body stream fetch
invoke('plugin:http|fetch_read_body', {
rid: responseRid,
streamChannel
}).catch((e) => {
controller.error(e);
// listen for abort events to cancel reading
signal?.addEventListener('abort', () => {
controller.error(ERROR_REQUEST_CANCELLED);
void dropBody();
});
}
},
pull: (controller) => readChunk(controller)
});
const res = new Response(body, {
status,

View File

@@ -26,7 +26,7 @@
* @module
*/
import { Channel, invoke } from '@tauri-apps/api/core'
import { invoke } from '@tauri-apps/api/core'
/**
* Configuration of a proxy that a Client should pass requests to.
@@ -126,7 +126,7 @@ export async function fetch(
input: URL | Request | string,
init?: RequestInit & ClientOptions
): Promise<Response> {
// abort early here if needed
// Optimistically check for abort signal and avoid doing any work
const signal = init?.signal
if (signal?.aborted) {
throw new Error(ERROR_REQUEST_CANCELLED)
@@ -181,7 +181,7 @@ export async function fetch(
]
)
// abort early here if needed
// Optimistically check for abort signal and avoid doing any work on the Rust side
if (signal?.aborted) {
throw new Error(ERROR_REQUEST_CANCELLED)
}
@@ -201,7 +201,8 @@ export async function fetch(
const abort = () => invoke('plugin:http|fetch_cancel', { rid })
// abort early here if needed
// Optimistically check for abort signal
// and avoid doing any work after doing intial work on the Rust side
if (signal?.aborted) {
// we don't care about the result of this proimse
// eslint-disable-next-line @typescript-eslint/no-floating-promises
@@ -229,41 +230,52 @@ export async function fetch(
rid
})
const dropBody = () => {
return invoke('plugin:http|fetch_cancel_body', { rid: responseRid })
}
const readChunk = async (
controller: ReadableStreamDefaultController<Uint8Array>
) => {
let data: ArrayBuffer
try {
data = await invoke('plugin:http|fetch_read_body', {
rid: responseRid
})
} catch (e) {
// close the stream if an error occurs
// and drop the body on Rust side
controller.error(e)
void dropBody()
return
}
const dataUint8 = new Uint8Array(data)
const lastByte = dataUint8[dataUint8.byteLength - 1]
const actualData = dataUint8.slice(0, dataUint8.byteLength - 1)
// close when the signal to close (last byte is 1) is sent from the IPC.
if (lastByte === 1) {
controller.close()
return
}
controller.enqueue(actualData)
}
// no body for 101, 103, 204, 205 and 304
// see https://fetch.spec.whatwg.org/#null-body-status
const body = [101, 103, 204, 205, 304].includes(status)
? null
: new ReadableStream({
: new ReadableStream<Uint8Array>({
start: (controller) => {
const streamChannel = new Channel<ArrayBuffer | number[]>()
streamChannel.onmessage = (res: ArrayBuffer | number[]) => {
// close early if aborted
if (signal?.aborted) {
controller.error(ERROR_REQUEST_CANCELLED)
return
}
const resUint8 = new Uint8Array(res)
const lastByte = resUint8[resUint8.byteLength - 1]
const actualRes = resUint8.slice(0, resUint8.byteLength - 1)
// close when the signal to close (last byte is 1) is sent from the IPC.
if (lastByte == 1) {
controller.close()
return
}
controller.enqueue(actualRes)
}
// run a non-blocking body stream fetch
invoke('plugin:http|fetch_read_body', {
rid: responseRid,
streamChannel
}).catch((e) => {
controller.error(e)
// listen for abort events to cancel reading
signal?.addEventListener('abort', () => {
controller.error(ERROR_REQUEST_CANCELLED)
void dropBody()
})
}
},
pull: (controller) => readChunk(controller)
})
const res = new Response(body, {

View File

@@ -0,0 +1,13 @@
# Automatically generated - DO NOT EDIT!
"$schema" = "../../schemas/schema.json"
[[permission]]
identifier = "allow-fetch-cancel-body"
description = "Enables the fetch_cancel_body command without any pre-configured scope."
commands.allow = ["fetch_cancel_body"]
[[permission]]
identifier = "deny-fetch-cancel-body"
description = "Denies the fetch_cancel_body command without any pre-configured scope."
commands.deny = ["fetch_cancel_body"]

View File

@@ -15,8 +15,9 @@ All fetch operations are enabled.
- `allow-fetch`
- `allow-fetch-cancel`
- `allow-fetch-read-body`
- `allow-fetch-send`
- `allow-fetch-read-body`
- `allow-fetch-cancel-body`
## Permission Table
@@ -82,6 +83,32 @@ Denies the fetch_cancel command without any pre-configured scope.
<tr>
<td>
`http:allow-fetch-cancel-body`
</td>
<td>
Enables the fetch_cancel_body command without any pre-configured scope.
</td>
</tr>
<tr>
<td>
`http:deny-fetch-cancel-body`
</td>
<td>
Denies the fetch_cancel_body command without any pre-configured scope.
</td>
</tr>
<tr>
<td>
`http:allow-fetch-read-body`
</td>

View File

@@ -17,6 +17,7 @@ All fetch operations are enabled.
permissions = [
"allow-fetch",
"allow-fetch-cancel",
"allow-fetch-read-body",
"allow-fetch-send",
"allow-fetch-read-body",
"allow-fetch-cancel-body",
]

View File

@@ -318,6 +318,18 @@
"const": "deny-fetch-cancel",
"markdownDescription": "Denies the fetch_cancel command without any pre-configured scope."
},
{
"description": "Enables the fetch_cancel_body command without any pre-configured scope.",
"type": "string",
"const": "allow-fetch-cancel-body",
"markdownDescription": "Enables the fetch_cancel_body command without any pre-configured scope."
},
{
"description": "Denies the fetch_cancel_body command without any pre-configured scope.",
"type": "string",
"const": "deny-fetch-cancel-body",
"markdownDescription": "Denies the fetch_cancel_body command without any pre-configured scope."
},
{
"description": "Enables the fetch_read_body command without any pre-configured scope.",
"type": "string",
@@ -343,10 +355,10 @@
"markdownDescription": "Denies the fetch_send command without any pre-configured scope."
},
{
"description": "This permission set configures what kind of\nfetch operations are available from the http plugin.\n\nThis enables all fetch operations but does not\nallow explicitly any origins to be fetched. This needs to\nbe manually configured before usage.\n\n#### Granted Permissions\n\nAll fetch operations are enabled.\n\n\n#### This default permission set includes:\n\n- `allow-fetch`\n- `allow-fetch-cancel`\n- `allow-fetch-read-body`\n- `allow-fetch-send`",
"description": "This permission set configures what kind of\nfetch operations are available from the http plugin.\n\nThis enables all fetch operations but does not\nallow explicitly any origins to be fetched. This needs to\nbe manually configured before usage.\n\n#### Granted Permissions\n\nAll fetch operations are enabled.\n\n\n#### This default permission set includes:\n\n- `allow-fetch`\n- `allow-fetch-cancel`\n- `allow-fetch-send`\n- `allow-fetch-read-body`\n- `allow-fetch-cancel-body`",
"type": "string",
"const": "default",
"markdownDescription": "This permission set configures what kind of\nfetch operations are available from the http plugin.\n\nThis enables all fetch operations but does not\nallow explicitly any origins to be fetched. This needs to\nbe manually configured before usage.\n\n#### Granted Permissions\n\nAll fetch operations are enabled.\n\n\n#### This default permission set includes:\n\n- `allow-fetch`\n- `allow-fetch-cancel`\n- `allow-fetch-read-body`\n- `allow-fetch-send`"
"markdownDescription": "This permission set configures what kind of\nfetch operations are available from the http plugin.\n\nThis enables all fetch operations but does not\nallow explicitly any origins to be fetched. This needs to\nbe manually configured before usage.\n\n#### Granted Permissions\n\nAll fetch operations are enabled.\n\n\n#### This default permission set includes:\n\n- `allow-fetch`\n- `allow-fetch-cancel`\n- `allow-fetch-send`\n- `allow-fetch-read-body`\n- `allow-fetch-cancel-body`"
}
]
}

View File

@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use tauri::{
async_runtime::Mutex,
command,
ipc::{Channel, CommandScope, GlobalScope},
ipc::{CommandScope, GlobalScope},
Manager, ResourceId, ResourceTable, Runtime, State, Webview,
};
use tokio::sync::oneshot::{channel, Receiver, Sender};
@@ -415,26 +415,42 @@ pub async fn fetch_send<R: Runtime>(
pub async fn fetch_read_body<R: Runtime>(
webview: Webview<R>,
rid: ResourceId,
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
) -> crate::Result<()> {
) -> crate::Result<tauri::ipc::Response> {
let res = {
let mut resources_table = webview.resources_table();
resources_table.take::<ReqwestResponse>(rid)?
let resources_table = webview.resources_table();
resources_table.get::<ReqwestResponse>(rid)?
};
let mut res = Arc::into_inner(res).unwrap().0;
// SAFETY: we can access the inner value mutably
// because we are the only ones with a reference to it
// and we don't want to use `Arc::into_inner` because we want to keep the value in the table
// for potential future calls to `fetch_cancel_body`
let res_ptr = Arc::as_ptr(&res) as *mut ReqwestResponse;
let res = unsafe { &mut *res_ptr };
let res = &mut res.0;
// send response through IPC channel
while let Some(chunk) = res.chunk().await? {
let mut chunk = chunk.to_vec();
// append 0 to indicate we are not done yet
chunk.push(0);
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk))?;
}
let Some(chunk) = res.chunk().await? else {
let mut resources_table = webview.resources_table();
resources_table.close(rid)?;
// send 1 to indicate we are done
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(vec![1]))?;
// return a response with a single byte to indicate that the body is empty
return Ok(tauri::ipc::Response::new(vec![1]));
};
let mut chunk = chunk.to_vec();
// append a 0 byte to indicate that the body is not empty
chunk.push(0);
Ok(tauri::ipc::Response::new(chunk))
}
#[command]
pub async fn fetch_cancel_body<R: Runtime>(
webview: Webview<R>,
rid: ResourceId,
) -> crate::Result<()> {
let mut resources_table = webview.resources_table();
resources_table.close(rid)?;
Ok(())
}

View File

@@ -84,7 +84,8 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
commands::fetch,
commands::fetch_cancel,
commands::fetch_send,
commands::fetch_read_body
commands::fetch_read_body,
commands::fetch_cancel_body,
])
.build()
}