!121 重定向丢失body数据修复

Merge pull request !121 from Tiga Ultraman/redirect_bug
This commit is contained in:
openharmony_ci 2024-09-02 07:37:14 +00:00 committed by Gitee
commit 55220b9f3e
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
8 changed files with 168 additions and 74 deletions

View File

@ -25,8 +25,8 @@ tokio_base = ["tokio"] # Uses asynchronous components of `tokio`
ylong_base = ["ylong_runtime"] # Uses asynchronous components of `ylong`
[dependencies]
tokio = { version = "1.20.1", features = ["io-util"], optional = true }
ylong_runtime = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_runtime.git", optional = true }
tokio = { version = "1.20.1", features = ["io-util", "fs"], optional = true }
ylong_runtime = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_runtime.git", features = ["fs", "sync"], optional = true }
[dev-dependencies]
tokio = { version = "1.20.1", features = ["io-util", "rt-multi-thread", "macros"] }

View File

@ -13,12 +13,13 @@
// TODO: reuse mime later.
use std::future::Future;
use std::io::Cursor;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::vec::IntoIter;
use crate::body::async_impl::Body;
use crate::body::async_impl::{Body, ReusableReader};
use crate::{AsyncRead, ReadBuf};
/// A structure that helps you build a `multipart/form-data` message.
@ -174,10 +175,23 @@ impl MultiPart {
states.push(MultiPartState::bytes(
format!("--{}--\r\n", self.boundary).into_bytes(),
));
self.status = ReadStatus::Reading(MultiPartStates {
states: states.into_iter(),
curr: None,
})
self.status = ReadStatus::Reading(MultiPartStates { states, index: 0 })
}
pub(crate) async fn reuse_inner(&mut self) -> std::io::Result<()> {
match std::mem::replace(&mut self.status, ReadStatus::Never) {
ReadStatus::Never => Ok(()),
ReadStatus::Reading(mut states) => {
let res = states.reuse().await;
self.status = ReadStatus::Reading(states);
res
}
ReadStatus::Finish(mut states) => {
states.reuse().await?;
self.status = ReadStatus::Reading(states);
Ok(())
}
}
}
}
@ -196,7 +210,7 @@ impl AsyncRead for MultiPart {
match self.status {
ReadStatus::Never => self.build_status(),
ReadStatus::Reading(_) => {}
ReadStatus::Finish => return Poll::Ready(Ok(())),
ReadStatus::Finish(_) => return Poll::Ready(Ok(())),
}
let status = if let ReadStatus::Reading(ref mut status) = self.status {
@ -213,7 +227,10 @@ impl AsyncRead for MultiPart {
Poll::Ready(Ok(())) => {
let new_filled = buf.filled().len();
if filled == new_filled {
self.status = ReadStatus::Finish;
match std::mem::replace(&mut self.status, ReadStatus::Never) {
ReadStatus::Reading(states) => self.status = ReadStatus::Finish(states),
_ => unreachable!(),
};
}
Poll::Ready(Ok(()))
}
@ -230,6 +247,23 @@ impl AsyncRead for MultiPart {
}
}
impl ReusableReader for MultiPart {
fn reuse<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>>
where
Self: 'a,
{
Box::pin(async {
match self.status {
ReadStatus::Never => Ok(()),
ReadStatus::Reading(_) => self.reuse_inner().await,
ReadStatus::Finish(_) => self.reuse_inner().await,
}
})
}
}
/// A structure that represents a part of `multipart/form-data` message.
///
/// # Examples
@ -352,8 +386,8 @@ impl Part {
/// Sets a stream body of this `Part`.
///
/// The body message will be set to the body part.
pub fn stream<T: AsyncRead + Send + Sync + 'static>(mut self, body: T) -> Self {
self.body = Some(MultiPartState::stream(Box::pin(body)));
pub fn stream<T: ReusableReader + Send + Sync + 'static + Unpin>(mut self, body: T) -> Self {
self.body = Some(MultiPartState::stream(Box::new(body)));
self
}
}
@ -365,7 +399,7 @@ impl Default for Part {
}
/// A basic trait for MultiPart.
pub trait MultiPartBase: AsyncRead {
pub trait MultiPartBase: ReusableReader {
/// Get reference of MultiPart.
fn multipart(&self) -> &MultiPart;
}
@ -379,12 +413,27 @@ impl MultiPartBase for MultiPart {
enum ReadStatus {
Never,
Reading(MultiPartStates),
Finish,
Finish(MultiPartStates),
}
struct MultiPartStates {
states: IntoIter<MultiPartState>,
curr: Option<MultiPartState>,
states: Vec<MultiPartState>,
index: usize,
}
impl MultiPartStates {
async fn reuse(&mut self) -> std::io::Result<()> {
self.index = 0;
for state in self.states.iter_mut() {
match state {
MultiPartState::Bytes(bytes) => bytes.set_position(0),
MultiPartState::Stream(stream) => {
stream.reuse().await?;
}
}
}
Ok(())
}
}
impl MultiPartStates {
@ -393,11 +442,11 @@ impl MultiPartStates {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let mut state = if let Some(state) = self.curr.take() {
state
} else {
return Poll::Ready(Ok(()));
let state = match self.states.get_mut(self.index) {
Some(state) => state,
None => return Poll::Ready(Ok(())),
};
match state {
MultiPartState::Bytes(ref mut bytes) => {
let filled_len = buf.filled().len();
@ -406,39 +455,26 @@ impl MultiPartStates {
let new = std::io::Read::read(bytes, unfilled).unwrap();
buf.set_filled(filled_len + new);
if new >= unfilled_len {
self.curr = Some(state);
if new < unfilled_len {
self.index += 1;
}
Poll::Ready(Ok(()))
}
MultiPartState::Stream(ref mut stream) => {
MultiPartState::Stream(stream) => {
let old_len = buf.filled().len();
let result = stream.as_mut().poll_read(cx, buf);
let result = unsafe { Pin::new_unchecked(stream).poll_read(cx, buf) };
let new_len = buf.filled().len();
self.poll_result(result, old_len, new_len, state)
}
}
}
fn poll_result(
&mut self,
result: Poll<std::io::Result<()>>,
old_len: usize,
new_len: usize,
state: MultiPartState,
) -> Poll<std::io::Result<()>> {
match result {
Poll::Ready(Ok(())) => {
if old_len != new_len {
self.curr = Some(state);
match result {
Poll::Ready(Ok(())) => {
if old_len == new_len {
self.index += 1;
}
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
x => x,
}
Poll::Ready(Ok(()))
}
Poll::Pending => {
self.curr = Some(state);
Poll::Pending
}
x => x,
}
}
}
@ -451,13 +487,9 @@ impl AsyncRead for MultiPartStates {
) -> Poll<std::io::Result<()>> {
let this = self.get_mut();
while !buf.initialize_unfilled().is_empty() {
if this.curr.is_none() {
this.curr = match this.states.next() {
None => break,
x => x,
}
if this.states.get(this.index).is_none() {
break;
}
match this.poll_read_curr(cx, buf) {
Poll::Ready(Ok(())) => {}
x => return x,
@ -469,7 +501,7 @@ impl AsyncRead for MultiPartStates {
enum MultiPartState {
Bytes(Cursor<Vec<u8>>),
Stream(Pin<Box<dyn AsyncRead + Send + Sync>>),
Stream(Box<dyn ReusableReader + Send + Sync + Unpin>),
}
impl MultiPartState {
@ -477,7 +509,7 @@ impl MultiPartState {
Self::Bytes(Cursor::new(bytes))
}
fn stream(reader: Pin<Box<dyn AsyncRead + Send + Sync>>) -> Self {
fn stream(reader: Box<dyn ReusableReader + Send + Sync + Unpin>) -> Self {
Self::Stream(reader)
}
}

View File

@ -46,6 +46,7 @@ mod empty;
mod mime;
mod text;
pub use async_impl::ReusableReader;
pub use chunk::{Chunk, ChunkBody, ChunkBodyDecoder, ChunkExt, ChunkState, Chunks};
pub use empty::EmptyBody;
pub use mime::{
@ -399,6 +400,40 @@ pub mod async_impl {
Pin::new(&mut *fut.body).poll_data(cx, fut.buf)
}
}
/// The reuse trait of request body.
pub trait ReusableReader: AsyncRead + Sync {
/// Reset body state, Ensure that the body can be re-read.
fn reuse<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>>
where
Self: 'a;
}
impl ReusableReader for crate::File {
fn reuse<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>>
where
Self: 'a,
{
use crate::AsyncSeekExt;
Box::pin(async { self.rewind().await.map(|_| ()) })
}
}
impl ReusableReader for &[u8] {
fn reuse<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>>
where
Self: 'a,
{
Box::pin(async { Ok(()) })
}
}
}
// Type definitions of the origin of the body data.

View File

@ -45,6 +45,12 @@ pub mod version;
pub(crate) mod util;
#[cfg(feature = "tokio_base")]
pub(crate) use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
pub(crate) use tokio::{
fs::File,
io::{AsyncRead, AsyncReadExt, AsyncSeekExt, ReadBuf},
};
#[cfg(feature = "ylong_base")]
pub(crate) use ylong_runtime::io::{AsyncRead, AsyncReadExt, ReadBuf};
pub(crate) use ylong_runtime::{
fs::File,
io::{AsyncRead, AsyncReadExt, AsyncSeekExt, ReadBuf},
};

View File

@ -17,7 +17,7 @@ use ylong_http::request::uri::Uri;
use super::pool::ConnPool;
use super::timeout::TimeoutFuture;
use super::{conn, Body, Connector, HttpConnector, Request, Response};
use super::{conn, Connector, HttpConnector, Request, Response};
use crate::async_impl::interceptor::{IdleInterceptor, Interceptor, Interceptors};
use crate::async_impl::request::Message;
use crate::error::HttpClientError;
@ -36,7 +36,7 @@ use crate::util::redirect::{RedirectInfo, Trigger};
use crate::util::request::RequestArc;
#[cfg(feature = "__c_openssl")]
use crate::CertVerifier;
use crate::Retry;
use crate::{ErrorKind, Retry};
/// HTTP asynchronous client implementation. Users can use `async_impl::Client`
/// to send `Request` asynchronously.
@ -147,7 +147,7 @@ impl<C: Connector> Client<C> {
loop {
let response = self.send_request(request.clone()).await;
if let Err(ref err) = response {
if retries > 0 && request.ref_mut().body_mut().reuse() {
if retries > 0 && request.ref_mut().body_mut().reuse().await.is_ok() {
self.interceptors.intercept_retry(err)?;
retries -= 1;
continue;
@ -217,9 +217,12 @@ impl<C: Connector> Client<C> {
{
Trigger::NextLink => {
// Here the body should be reused.
if !request.ref_mut().body_mut().reuse() {
*request.ref_mut().body_mut() = Body::empty();
}
request
.ref_mut()
.body_mut()
.reuse()
.await
.map_err(|e| HttpClientError::from_io_error(ErrorKind::Redirect, e))?;
self.interceptors
.intercept_redirect_request(request.ref_mut())?;
response = self.send_unformatted_request(request.clone()).await?;

View File

@ -19,6 +19,7 @@ use core::task::{Context, Poll};
use std::io::Cursor;
use std::sync::Arc;
use ylong_http::body::async_impl::ReusableReader;
use ylong_http::body::MultiPartBase;
use ylong_http::request::uri::PercentEncoder as PerEncoder;
use ylong_http::request::{Request as Req, RequestBuilder as ReqBuilder};
@ -255,7 +256,7 @@ pub struct Body {
pub(crate) enum BodyKind {
Empty,
Slice(Cursor<Vec<u8>>),
Stream(Box<dyn AsyncRead + Send + Sync + Unpin>),
Stream(Box<dyn ReusableReader + Send + Sync + Unpin>),
Multipart(Box<dyn MultiPartBase + Send + Sync + Unpin>),
}
@ -304,10 +305,10 @@ impl Body {
/// ```
pub fn stream<T>(stream: T) -> Self
where
T: AsyncRead + Send + Sync + Unpin + 'static,
T: ReusableReader + Send + Sync + Unpin + 'static,
{
Body::new(BodyKind::Stream(
Box::new(stream) as Box<dyn AsyncRead + Send + Sync + Unpin>
Box::new(stream) as Box<dyn ReusableReader + Send + Sync + Unpin>
))
}
@ -340,15 +341,15 @@ impl Body {
Self { inner }
}
// TODO: Considers reusing unread stream ?
pub(crate) fn reuse(&mut self) -> bool {
pub(crate) async fn reuse(&mut self) -> std::io::Result<()> {
match self.inner {
BodyKind::Empty => true,
BodyKind::Empty => Ok(()),
BodyKind::Slice(ref mut slice) => {
slice.set_position(0);
true
Ok(())
}
_ => false,
BodyKind::Stream(ref mut stream) => stream.reuse().await,
BodyKind::Multipart(ref mut multipart) => multipart.reuse().await,
}
}
}
@ -470,7 +471,6 @@ mod ut_client_request {
.length(Some(4)),
);
let mut request = RequestBuilder::default().body(Body::multipart(mp)).unwrap();
assert!(!request.body_mut().reuse());
let handle = ylong_runtime::spawn(async move {
let mut buf = vec![0u8; 50];
let mut v_size = vec![];

View File

@ -14,11 +14,13 @@
mod builder;
mod operator;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub use builder::{UploaderBuilder, WantsReader};
pub use operator::{Console, UploadOperator};
use ylong_http::body::async_impl::ReusableReader;
use ylong_http::body::{MultiPart, MultiPartBase};
use crate::runtime::{AsyncRead, ReadBuf};
@ -90,7 +92,7 @@ pub struct Uploader<R, T> {
info: Option<UploadInfo>,
}
impl<R: AsyncRead + Unpin> Uploader<R, Console> {
impl<R: ReusableReader + Unpin> Uploader<R, Console> {
/// Creates an `Uploader` with a `Console` operator which displays process
/// on console.
///
@ -123,7 +125,7 @@ impl Uploader<(), ()> {
impl<R, T> AsyncRead for Uploader<R, T>
where
R: AsyncRead + Unpin,
R: ReusableReader + Unpin,
T: UploadOperator + Unpin,
{
fn poll_read(
@ -159,7 +161,23 @@ where
}
}
impl<T: UploadOperator + Unpin> MultiPartBase for Uploader<MultiPart, T> {
impl<R, T> ReusableReader for Uploader<R, T>
where
R: ReusableReader + Unpin,
T: UploadOperator + Unpin + Sync,
{
fn reuse<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>>
where
Self: 'a,
{
self.info = None;
self.reader.reuse()
}
}
impl<T: UploadOperator + Unpin + Sync> MultiPartBase for Uploader<MultiPart, T> {
fn multipart(&self) -> &MultiPart {
&self.reader
}

View File

@ -22,7 +22,7 @@
// ylong_http crate re-export.
#[cfg(any(feature = "ylong_base", feature = "tokio_base"))]
pub use ylong_http::body::{EmptyBody, TextBody};
pub use ylong_http::body::{EmptyBody, ReusableReader, TextBody};
pub use ylong_http::headers::{
Header, HeaderName, HeaderValue, HeaderValueIter, HeaderValueIterMut, Headers, HeadersIntoIter,
HeadersIter, HeadersIterMut,