codecheck 2nd

Signed-off-by: Tiga Ultraman <liaoxude@huawei.com>
This commit is contained in:
Tiga Ultraman 2024-05-23 12:15:25 +08:00
parent ab944d6159
commit 2b635b0411
16 changed files with 1124 additions and 988 deletions

View File

@ -167,6 +167,10 @@ impl<T: Read> ChunkBody<FromReader<T>> {
}
impl<T: AsyncRead + Unpin + Send + Sync> ChunkBody<FromAsyncReader<T>> {
fn chunk_encode(&mut self, dst: &mut [u8]) -> usize {
self.chunk_encode_reader(dst)
}
/// Creates a new `ChunkBody` by `async reader`.
///
/// # Examples
@ -187,8 +191,30 @@ impl<T: AsyncRead + Unpin + Send + Sync> ChunkBody<FromAsyncReader<T>> {
}
}
fn chunk_encode(&mut self, dst: &mut [u8]) -> usize {
self.chunk_encode_reader(dst)
fn poll_partial(
&mut self,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, Error>> {
if !self.encode_status.get_flag() {
let mut read_buf = ReadBuf::new(&mut self.chunk_data.chunk_buf);
match Pin::new(&mut *self.from).poll_read(_cx, &mut read_buf) {
Poll::Ready(Ok(())) => {
let size = read_buf.filled().len();
self.encode_status.set_flag(true);
// chunk idx reset zero
self.encode_status.set_chunk_idx(0);
self.chunk_data.chunk_last = size;
let data_size = self.chunk_encode(buf);
Poll::Ready(Ok(data_size))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(Ok(self.chunk_encode(buf)))
}
}
}
@ -274,27 +300,7 @@ impl<T: AsyncRead + Unpin + Send + Sync> async_impl::Body for ChunkBody<FromAsyn
let mut count = 0;
while count != buf.len() {
let encode_size = match chunk_body.data_status {
DataState::Partial => {
if !chunk_body.encode_status.get_flag() {
let mut read_buf = ReadBuf::new(&mut chunk_body.chunk_data.chunk_buf);
match Pin::new(&mut *chunk_body.from).poll_read(_cx, &mut read_buf) {
Poll::Ready(Ok(())) => {
let size = read_buf.filled().len();
chunk_body.encode_status.set_flag(true);
// chunk idx reset zero
chunk_body.encode_status.set_chunk_idx(0);
chunk_body.chunk_data.chunk_last = size;
let data_size = chunk_body.chunk_encode(&mut buf[count..]);
Poll::Ready(data_size)
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(chunk_body.chunk_encode(&mut buf[count..]))
}
}
DataState::Partial => chunk_body.poll_partial(_cx, &mut buf[count..])?,
DataState::Complete => Poll::Ready(chunk_body.trailer_encode(&mut buf[count..])),
DataState::Finish => {
return Poll::Ready(Ok(count));
@ -972,6 +978,22 @@ impl ChunkBodyDecoder {
}
remains = rest;
if self
.match_decode_result(chunk, &mut results, remains)
.is_some()
{
break;
}
}
Ok((results, remains))
}
fn match_decode_result<'b, 'a: 'b>(
&mut self,
chunk: Chunk<'a>,
results: &mut Chunks<'b>,
remains: &[u8],
) -> Option<()> {
match (chunk.is_complete(), self.is_last_chunk) {
(false, _) => {
if self.is_chunk_trailer
@ -980,29 +1002,28 @@ impl ChunkBodyDecoder {
results.push(chunk);
self.chunk_num += 1;
if remains.is_empty() {
break;
return Some(());
}
} else {
results.push(chunk);
break;
return Some(());
}
}
(true, true) => {
results.push(chunk);
self.is_last_chunk = false;
self.chunk_num = 0;
break;
return Some(());
}
(true, false) => {
results.push(chunk);
self.chunk_num += 1;
if remains.is_empty() {
break;
return Some(());
}
}
}
}
Ok((results, remains))
None
}
/// Get trailer headers.
@ -1071,7 +1092,10 @@ impl ChunkBodyDecoder {
fn decode_size<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> {
self.stage = Stage::Size;
if buf.is_empty() {
return Ok((self.sized_chunk(&buf[..0]), buf));
return Ok((
Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::MetaSize),
buf,
));
}
self.chunk_flag = false;
for (i, &b) in buf.iter().enumerate() {
@ -1114,17 +1138,25 @@ impl ChunkBodyDecoder {
_ => return Err(ErrorKind::InvalidInput.into()),
}
}
Ok((self.sized_chunk(&buf[..0]), &buf[buf.len()..]))
Ok((
Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::MetaSize),
&buf[buf.len()..],
))
}
fn sized_chunk<'a>(&self, buf: &'a [u8]) -> Chunk<'a> {
fn sized_chunk<'a>(
data: &'a [u8],
trailer: Option<&'a [u8]>,
size: usize,
state: ChunkState,
) -> Chunk<'a> {
Chunk {
id: 0,
state: ChunkState::MetaSize,
size: self.total_size,
state,
size,
extension: ChunkExt::new(),
data: &buf[..0],
trailer: None,
data,
trailer,
}
}
@ -1153,102 +1185,73 @@ impl ChunkBodyDecoder {
fn skip_extension<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> {
self.stage = Stage::Extension;
if self.is_chunk_trailer {
self.skip_trailer_ext(buf)
} else {
self.skip_chunk_ext(buf)
}
}
fn skip_trailer_ext<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> {
for (i, &b) in buf.iter().enumerate() {
match b {
b'\r' => {
if self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = true;
self.decode_cr()?;
return self.skip_trailer_crlf(&buf[i + 1..]);
}
b'\n' => {
if !self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = false;
self.decode_lf()?;
return self.skip_trailer_crlf(&buf[i..]);
}
_ => {}
}
}
Ok((
Chunk {
id: 0,
state: ChunkState::MetaExt,
size: self.total_size,
extension: ChunkExt::new(),
data: &buf[..0],
trailer: Some(&buf[..0]),
},
Self::sized_chunk(
&buf[..0],
Some(&buf[..0]),
self.total_size,
ChunkState::MetaExt,
),
&buf[buf.len()..],
))
} else {
}
fn skip_chunk_ext<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> {
for (i, &b) in buf.iter().enumerate() {
match b {
b'\r' => {
if self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = true;
self.decode_cr()?;
return self.skip_crlf(&buf[i + 1..]);
}
b'\n' => {
if !self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = false;
self.decode_lf()?;
return self.skip_crlf(&buf[i..]);
}
_ => {}
}
}
Ok((
Chunk {
id: 0,
state: ChunkState::MetaExt,
size: self.total_size,
extension: ChunkExt::new(),
data: &buf[..0],
trailer: None,
},
Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::MetaExt),
&buf[buf.len()..],
))
}
}
fn skip_crlf<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> {
self.stage = Stage::SizeEnd;
for (i, &b) in buf.iter().enumerate() {
match b {
b'\r' => {
if self.cr_meet {
// TODO Check whether the state machine needs to be reused after the parsing
// fails and whether the state machine status needs to be adjusted.
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = true;
self.decode_cr()?;
}
b'\n' => {
if !self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = false;
self.decode_lf()?;
return self.decode_data(&buf[i + 1..]);
}
_ => return Err(ErrorKind::InvalidInput.into()),
}
}
Ok((
Chunk {
id: 0,
state: ChunkState::MetaCrlf,
size: self.total_size,
extension: ChunkExt::new(),
data: &buf[..0],
trailer: None,
},
Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::MetaCrlf),
&buf[buf.len()..],
))
}
@ -1258,16 +1261,10 @@ impl ChunkBodyDecoder {
for (i, &b) in buf.iter().enumerate() {
match b {
b'\r' => {
if self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = true;
self.decode_cr()?;
}
b'\n' => {
if !self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = false;
self.decode_lf()?;
self.is_trailer_crlf = true;
return self.decode_trailer_data(&buf[i + 1..]);
}
@ -1275,18 +1272,32 @@ impl ChunkBodyDecoder {
}
}
Ok((
Chunk {
id: 0,
state: ChunkState::MetaCrlf,
size: self.total_size,
extension: ChunkExt::new(),
data: &buf[..0],
trailer: Some(&buf[..0]),
},
Self::sized_chunk(
&buf[..0],
Some(&buf[..0]),
self.total_size,
ChunkState::MetaCrlf,
),
&buf[buf.len()..],
))
}
fn decode_cr(&mut self) -> Result<(), HttpError> {
if self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = true;
Ok(())
}
fn decode_lf(&mut self) -> Result<(), HttpError> {
if !self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = false;
Ok(())
}
fn decode_trailer_data<'a>(
&mut self,
buf: &'a [u8],
@ -1294,14 +1305,7 @@ impl ChunkBodyDecoder {
self.stage = Stage::TrailerData;
if buf.is_empty() {
return Ok((
Chunk {
id: 0,
state: ChunkState::Data,
size: 0,
extension: ChunkExt::new(),
data: &buf[..0],
trailer: Some(&buf[..0]),
},
Self::sized_chunk(&buf[..0], Some(&buf[..0]), 0, ChunkState::Data),
&buf[buf.len()..],
));
}
@ -1313,32 +1317,20 @@ impl ChunkBodyDecoder {
for (i, &b) in buf.iter().enumerate() {
match b {
b'\r' => {
if self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = true;
self.decode_cr()?;
return self.skip_trailer_last_crlf(&buf[..i], &buf[i + 1..]);
}
b'\n' => {
if !self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = false;
self.decode_lf()?;
return self.skip_trailer_last_crlf(&buf[..i], &buf[i..]);
}
_ => {}
}
}
self.is_trailer_crlf = false;
Ok((
Chunk {
id: 0,
state: ChunkState::Data,
size: 0,
extension: ChunkExt::new(),
data: &buf[..0],
trailer: Some(buf),
},
Self::sized_chunk(&buf[..0], Some(buf), 0, ChunkState::Data),
&buf[buf.len()..],
))
}
@ -1347,14 +1339,7 @@ impl ChunkBodyDecoder {
self.stage = Stage::Data;
if buf.is_empty() {
return Ok((
Chunk {
id: 0,
state: ChunkState::Data,
size: self.total_size,
extension: ChunkExt::new(),
data: &buf[..0],
trailer: None,
},
Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::Data),
&buf[buf.len()..],
));
}
@ -1367,14 +1352,7 @@ impl ChunkBodyDecoder {
} else {
self.rest_size -= buf.len();
Ok((
Chunk {
id: 0,
state: ChunkState::Data,
size: self.total_size,
extension: ChunkExt::new(),
data: buf,
trailer: None,
},
Self::sized_chunk(buf, None, self.total_size, ChunkState::Data),
&buf[buf.len()..],
))
}
@ -1389,57 +1367,30 @@ impl ChunkBodyDecoder {
for (i, &b) in buf.iter().enumerate() {
match b {
b'\r' => {
if self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = true;
self.decode_cr()?;
}
b'\n' => {
if !self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = false;
self.decode_lf()?;
return if self.is_last_chunk {
self.stage = Stage::TrailerEndCrlf;
Ok((
Chunk {
id: 0,
state: ChunkState::Finish,
size: 0,
extension: ChunkExt::new(),
data: &buf[..0],
trailer: Some(&buf[..0]),
},
Self::sized_chunk(&buf[..0], Some(&buf[..0]), 0, ChunkState::Finish),
&buf[i + 1..],
))
} else {
self.cr_meet = false;
self.is_trailer_crlf = true;
self.stage = Stage::TrailerData;
let complete_chunk = Chunk {
id: 0,
state: ChunkState::DataCrlf,
size: 0,
extension: ChunkExt::new(),
data: &data[..0],
trailer: Some(data),
};
let complete_chunk =
Self::sized_chunk(&data[..0], Some(data), 0, ChunkState::DataCrlf);
return Ok((complete_chunk, &buf[i + 1..]));
};
}
_ => return Err(ErrorKind::InvalidInput.into()),
}
}
Ok((
Chunk {
id: 0,
state: ChunkState::DataCrlf,
size: 0,
extension: ChunkExt::new(),
data: &data[..0],
trailer: Some(data),
},
Self::sized_chunk(&data[..0], Some(data), 0, ChunkState::DataCrlf),
&buf[buf.len()..],
))
}
@ -1453,25 +1404,13 @@ impl ChunkBodyDecoder {
for (i, &b) in buf.iter().enumerate() {
match b {
b'\r' => {
if self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = true;
self.decode_cr()?;
}
b'\n' => {
if !self.cr_meet {
return Err(ErrorKind::InvalidInput.into());
}
self.cr_meet = false;
self.decode_lf()?;
self.stage = Stage::Size;
let complete_chunk = Chunk {
id: 0,
state: ChunkState::Finish,
size: self.total_size,
extension: ChunkExt::new(),
data,
trailer: None,
};
let complete_chunk =
Self::sized_chunk(data, None, self.total_size, ChunkState::Finish);
self.total_size = 0;
return Ok((complete_chunk, &buf[i + 1..]));
}
@ -1479,14 +1418,7 @@ impl ChunkBodyDecoder {
}
}
Ok((
Chunk {
id: 0,
state: ChunkState::DataCrlf,
size: self.total_size,
extension: ChunkExt::new(),
data,
trailer: None,
},
Self::sized_chunk(data, None, self.total_size, ChunkState::DataCrlf),
&buf[buf.len()..],
))
}

View File

@ -16,6 +16,7 @@ use core::pin::Pin;
use core::task::{Context, Poll};
use std::io::Read;
use crate::body::async_impl::Body;
use crate::body::mime::common::{data_copy, SizeResult, TokenStatus};
use crate::body::mime::{EncodeHeaders, MixFrom, PartStatus};
use crate::body::{async_impl, sync_impl, MimePart};
@ -199,20 +200,7 @@ impl async_impl::Body for MimePartEncoder<'_> {
PartStatus::Start => Poll::Ready(self.start_encode()),
PartStatus::Headers => Poll::Ready(self.headers_encode(&mut buf[count..])),
PartStatus::Crlf => Poll::Ready(self.crlf_encode(&mut buf[count..])),
PartStatus::Body => match &mut self.body {
Some(body) => {
let poll_result = Pin::new(body).poll_data(cx, &mut buf[count..]);
if let Poll::Ready(Ok(0)) = poll_result {
// complete async read body
self.check_next();
};
poll_result
}
_ => {
self.check_next();
Poll::Ready(Ok(0))
}
},
PartStatus::Body => self.poll_mime_body(cx, &mut buf[count..]),
PartStatus::End => return Poll::Ready(Ok(count)),
};
@ -226,6 +214,29 @@ impl async_impl::Body for MimePartEncoder<'_> {
}
}
impl MimePartEncoder<'_> {
fn poll_mime_body(
&mut self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, std::io::Error>> {
match self.body {
Some(ref mut body) => {
let poll_result = Pin::new(body).poll_data(cx, buf);
if let Poll::Ready(Ok(0)) = poll_result {
// complete async read body
self.check_next();
};
poll_result
}
_ => {
self.check_next();
Poll::Ready(Ok(0))
}
}
}
}
#[cfg(test)]
mod ut_mime_part_encoder {
use crate::body::{async_impl, sync_impl, MimePart, MimePartEncoder};

View File

@ -581,13 +581,7 @@ impl MimeTypeTag {
}
}
b'a' => {
// application
if b[1..].eq_ignore_ascii_case(b"pplication") {
return Ok(Self::Application);
// audio
} else if b[1..].eq_ignore_ascii_case(b"udio") {
return Ok(Self::Audio);
}
return Self::mime_byte_a(b);
}
b'f' => {
// font
@ -602,16 +596,7 @@ impl MimeTypeTag {
}
}
b'm' => {
// message
if b[1..].eq_ignore_ascii_case(b"essage") {
return Ok(Self::Message);
// model
} else if b[1..].eq_ignore_ascii_case(b"odel") {
return Ok(Self::Model);
// multipart
} else if b[1..].eq_ignore_ascii_case(b"ultipart") {
return Ok(Self::Multipart);
}
return Self::mime_byte_m(b);
}
b't' => {
// text
@ -630,6 +615,32 @@ impl MimeTypeTag {
Err(ErrorKind::InvalidInput.into())
}
fn mime_byte_a(b: &[u8]) -> Result<MimeTypeTag, HttpError> {
// application
if b[1..].eq_ignore_ascii_case(b"pplication") {
Ok(Self::Application)
// audio
} else if b[1..].eq_ignore_ascii_case(b"udio") {
Ok(Self::Audio)
} else {
Err(ErrorKind::InvalidInput.into())
}
}
fn mime_byte_m(b: &[u8]) -> Result<MimeTypeTag, HttpError> {
// message
if b[1..].eq_ignore_ascii_case(b"essage") {
Ok(Self::Message)
// model
} else if b[1..].eq_ignore_ascii_case(b"odel") {
Ok(Self::Model)
// multipart
} else if b[1..].eq_ignore_ascii_case(b"ultipart") {
Ok(Self::Multipart)
} else {
Err(ErrorKind::InvalidInput.into())
}
}
}
// From [RFC6838](http://tools.ietf.org/html/rfc6838#section-4.2):

View File

@ -532,7 +532,15 @@ impl EncodeHeader {
fn encode(&mut self, buf: &mut [u8]) -> TokenResult<usize> {
match self.status.take().unwrap() {
HeaderStatus::Name => {
HeaderStatus::Name => self.encode_name(buf),
HeaderStatus::Colon => self.encode_colon(buf),
HeaderStatus::Value => self.encode_value(buf),
HeaderStatus::Crlf(crlf) => self.encode_crlf(buf, crlf),
HeaderStatus::EmptyHeader => Ok(TokenStatus::Complete(0)),
}
}
fn encode_name(&mut self, buf: &mut [u8]) -> TokenResult<usize> {
let name = self.name.as_bytes();
let mut task = WriteData::new(name, &mut self.name_idx, buf);
match task.write()? {
@ -546,7 +554,8 @@ impl EncodeHeader {
}
}
}
HeaderStatus::Colon => {
fn encode_colon(&mut self, buf: &mut [u8]) -> TokenResult<usize> {
let colon = ":".as_bytes();
let mut task = WriteData::new(colon, &mut self.colon_idx, buf);
match task.write()? {
@ -560,7 +569,8 @@ impl EncodeHeader {
}
}
}
HeaderStatus::Value => {
fn encode_value(&mut self, buf: &mut [u8]) -> TokenResult<usize> {
let value = self.value.as_slice();
let mut task = WriteData::new(value, &mut self.value_idx, buf);
match task.write()? {
@ -575,7 +585,9 @@ impl EncodeHeader {
}
}
}
HeaderStatus::Crlf(mut crlf) => match crlf.encode(buf)? {
fn encode_crlf(&mut self, buf: &mut [u8], mut crlf: EncodeCrlf) -> TokenResult<usize> {
match crlf.encode(buf)? {
TokenStatus::Complete(size) => {
if let Some(iter) = self.inner.next() {
let (header_name, header_value) = iter;
@ -594,8 +606,6 @@ impl EncodeHeader {
self.status = Some(HeaderStatus::Crlf(crlf));
Ok(TokenStatus::Partial(size))
}
},
HeaderStatus::EmptyHeader => Ok(TokenStatus::Complete(0)),
}
}
}

View File

@ -755,6 +755,7 @@ impl FrameDecoder {
if fragment_start_index > fragment_end_index {
return Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
}
self.hpack.hpack_decode(buf)?;
let promised_stream_id = if is_padded {
get_stream_id(&buf[1..5])
} else {
@ -763,9 +764,15 @@ impl FrameDecoder {
if is_connection_frame(promised_stream_id as usize) {
return Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
}
self.push_promise_framing(end_headers, promised_stream_id)
}
fn push_promise_framing(
&mut self,
end_headers: bool,
promised_stream_id: u32,
) -> Result<FrameKind, H2Error> {
if end_headers {
self.hpack
.hpack_decode(&buf[fragment_start_index..fragment_end_index])?;
let headers = self.hpack.hpack_finish()?;
let frame = Frame::new(
self.header.stream_id,
@ -781,8 +788,6 @@ impl FrameDecoder {
self.continuations.is_end_headers = false;
self.continuations.stream_id = self.header.stream_id;
self.continuations.promised_stream_id = promised_stream_id;
self.hpack
.hpack_decode(&buf[fragment_start_index..fragment_end_index])?;
Ok(FrameKind::Partial)
}
}

View File

@ -11,8 +11,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::h2::frame::{FrameFlags, FrameType, Payload, Setting};
use crate::h2::{Frame, HpackEncoder};
use crate::h2::frame::{FrameFlags, FrameType, Payload, Priority, Setting};
use crate::h2::{Frame, Goaway, HpackEncoder, Settings};
// TODO: Classify encoder errors per RFC specifications into categories like
// stream or connection errors. Identify specific error types such as
@ -338,7 +338,45 @@ impl FrameEncoder {
};
let bytes_to_write = remaining_header_bytes.min(buf.len());
for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) {
self.iterate_headers_header(frame, buf, bytes_to_write)?;
self.encoded_bytes += bytes_to_write;
let bytes_written = bytes_to_write;
let mut payload_bytes_written = 0;
if self.encoded_bytes >= frame_header_size {
payload_bytes_written = self
.write_payload(&mut buf[bytes_written..], self.remaining_header_payload);
self.headers_header_status();
}
Ok(bytes_written + payload_bytes_written)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn headers_header_status(&mut self) {
if self.remaining_header_payload <= self.max_frame_size {
self.state = if self.is_end_stream {
FrameEncoderState::HeadersComplete
} else {
FrameEncoderState::EncodingHeadersPayload
};
} else {
self.state = FrameEncoderState::EncodingContinuationFrames;
}
}
fn iterate_headers_header(
&self,
frame: &Frame,
buf: &mut [u8],
len: usize,
) -> Result<(), FrameEncoderErr> {
for (buf_index, item) in buf.iter_mut().enumerate().take(len) {
let header_byte_index = self.encoded_bytes + buf_index;
match header_byte_index {
// The first 3 bytes represent the payload length in the frame header.
@ -365,33 +403,7 @@ impl FrameEncoder {
}
}
}
self.encoded_bytes += bytes_to_write;
let bytes_written = bytes_to_write;
let mut payload_bytes_written = 0;
if self.encoded_bytes >= frame_header_size {
payload_bytes_written = self
.write_payload(&mut buf[bytes_written..], self.remaining_header_payload);
if self.remaining_header_payload <= self.max_frame_size {
self.state = if self.is_end_stream {
FrameEncoderState::HeadersComplete
} else {
FrameEncoderState::EncodingHeadersPayload
};
} else {
self.state = FrameEncoderState::EncodingContinuationFrames;
}
}
Ok(bytes_written + payload_bytes_written)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
Ok(())
}
fn encode_headers_payload(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -407,6 +419,18 @@ impl FrameEncoder {
self.remaining_header_payload -= payload_bytes_written;
// Updates the state based on the encoding progress
self.headers_payload_status();
Ok(payload_bytes_written)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn headers_payload_status(&mut self) {
if self.hpack_encoder.is_finished() {
if self.remaining_header_payload <= self.max_frame_size {
self.state = if self.is_end_stream || self.is_end_headers {
@ -420,14 +444,6 @@ impl FrameEncoder {
} else {
self.state = FrameEncoderState::EncodingHeadersPayload;
}
Ok(payload_bytes_written)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn encode_continuation_frames(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -437,13 +453,11 @@ impl FrameEncoder {
self.state = FrameEncoderState::HeadersComplete;
return Ok(0);
}
let available_space = buf.len();
let frame_header_size = 9;
if available_space < frame_header_size {
return Ok(0);
}
// Encodes CONTINUATION frame header.
let continuation_frame_len = self.remaining_header_payload.min(self.max_frame_size);
for (buf_index, item) in buf.iter_mut().enumerate().take(3) {
@ -476,8 +490,19 @@ impl FrameEncoder {
self.remaining_header_payload -= payload_bytes_written;
// Updates the state based on the encoding progress.
if self.hpack_encoder.is_finished()
&& self.remaining_header_payload <= self.max_frame_size
self.update_continuation_state();
Ok(frame_header_size + payload_bytes_written)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn update_continuation_state(&mut self) {
if self.hpack_encoder.is_finished() && self.remaining_header_payload <= self.max_frame_size
{
self.state = if self.is_end_stream || self.is_end_headers {
FrameEncoderState::HeadersComplete
@ -487,14 +512,6 @@ impl FrameEncoder {
} else {
self.state = FrameEncoderState::EncodingContinuationFrames;
}
Ok(frame_header_size + payload_bytes_written)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn encode_data_header(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -509,12 +526,34 @@ impl FrameEncoder {
};
let bytes_to_write = remaining_header_bytes.min(buf.len());
for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) {
self.iterate_data_header(frame, buf, data_frame.data().len(), bytes_to_write)?;
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingDataPayload;
self.remaining_payload_bytes = data_frame.data().len();
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn iterate_data_header(
&self,
frame: &Frame,
buf: &mut [u8],
payload_len: usize,
len: usize,
) -> Result<(), FrameEncoderErr> {
for (buf_index, item) in buf.iter_mut().enumerate().take(len) {
let header_byte_index = self.encoded_bytes + buf_index;
match header_byte_index {
// The first 3 bytes represent the payload length in the frame header.
0..=2 => {
let payload_len = data_frame.data().len();
*item = ((payload_len >> (16 - (8 * header_byte_index))) & 0xFF) as u8;
}
// The 4th byte represents the frame type in the frame header.
@ -536,19 +575,7 @@ impl FrameEncoder {
}
}
}
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingDataPayload;
self.remaining_payload_bytes = data_frame.data().len();
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
Ok(())
}
fn encode_data_payload(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -622,14 +649,33 @@ impl FrameEncoder {
frame_header_size - self.encoded_bytes
};
let bytes_to_write = remaining_header_bytes.min(buf.len());
for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) {
self.iterate_goaway_header(frame, buf, bytes_to_write)?;
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingGoawayPayload;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn iterate_goaway_header(
&self,
frame: &Frame,
buf: &mut [u8],
len: usize,
) -> Result<(), FrameEncoderErr> {
for (buf_index, item) in buf.iter_mut().enumerate().take(len) {
let header_byte_index = self.encoded_bytes + buf_index;
match header_byte_index {
0..=2 => {
if let Payload::Goaway(goaway_payload) = frame.payload() {
let payload_size = goaway_payload.encoded_len();
*item =
((payload_size >> (8 * (2 - header_byte_index))) & 0xFF) as u8;
*item = ((payload_size >> (8 * (2 - header_byte_index))) & 0xFF) as u8;
} else {
return Err(FrameEncoderErr::UnexpectedPayloadType);
}
@ -649,17 +695,7 @@ impl FrameEncoder {
}
}
}
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingGoawayPayload;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
Ok(())
}
fn encode_goaway_payload(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -669,31 +705,8 @@ impl FrameEncoder {
let remaining_payload_bytes =
payload_size.saturating_sub(self.encoded_bytes.saturating_sub(9));
let bytes_to_write = remaining_payload_bytes.min(buf.len());
for (buf_index, buf_item) in buf.iter_mut().enumerate().take(bytes_to_write) {
let payload_byte_index = self.encoded_bytes - 9 + buf_index;
match payload_byte_index {
0..=3 => {
let last_stream_id_byte_index = payload_byte_index;
*buf_item = (goaway.get_last_stream_id()
>> (24 - (8 * last_stream_id_byte_index)))
as u8;
}
4..=7 => {
let error_code_byte_index = payload_byte_index - 4;
*buf_item = (goaway.get_error_code()
>> (24 - (8 * error_code_byte_index)))
as u8;
}
_ => {
let debug_data_index = payload_byte_index - 8;
if debug_data_index < goaway.get_debug_data().len() {
*buf_item = goaway.get_debug_data()[debug_data_index];
} else {
return Err(FrameEncoderErr::InternalError);
}
}
}
}
self.iterate_goaway_payload(goaway, buf, bytes_to_write)?;
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == 9 + payload_size {
self.state = FrameEncoderState::DataComplete;
@ -708,6 +721,39 @@ impl FrameEncoder {
}
}
fn iterate_goaway_payload(
&self,
goaway: &Goaway,
buf: &mut [u8],
len: usize,
) -> Result<(), FrameEncoderErr> {
for (buf_index, buf_item) in buf.iter_mut().enumerate().take(len) {
let payload_byte_index = self.encoded_bytes - 9 + buf_index;
match payload_byte_index {
0..=3 => {
let last_stream_id_byte_index = payload_byte_index;
*buf_item = (goaway.get_last_stream_id()
>> (24 - (8 * last_stream_id_byte_index)))
as u8;
}
4..=7 => {
let error_code_byte_index = payload_byte_index - 4;
*buf_item =
(goaway.get_error_code() >> (24 - (8 * error_code_byte_index))) as u8;
}
_ => {
let debug_data_index = payload_byte_index - 8;
if debug_data_index < goaway.get_debug_data().len() {
*buf_item = goaway.get_debug_data()[debug_data_index];
} else {
return Err(FrameEncoderErr::InternalError);
}
}
}
}
Ok(())
}
fn encode_window_update_frame(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
if let Some(frame) = &self.current_frame {
if let Payload::WindowUpdate(_) = frame.payload() {
@ -719,7 +765,29 @@ impl FrameEncoder {
frame_header_size - self.encoded_bytes
};
let bytes_to_write = remaining_header_bytes.min(buf.len());
for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) {
self.iterate_window_update_header(frame, buf, bytes_to_write)?;
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingWindowUpdatePayload;
// Resets the encoded_bytes counter here.
self.encoded_bytes = 0;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn iterate_window_update_header(
&self,
frame: &Frame,
buf: &mut [u8],
len: usize,
) -> Result<(), FrameEncoderErr> {
for (buf_index, item) in buf.iter_mut().enumerate().take(len) {
let header_byte_index = self.encoded_bytes + buf_index;
match header_byte_index {
// The first 3 bytes represent the payload length in the frame header. For
@ -750,19 +818,7 @@ impl FrameEncoder {
}
}
}
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingWindowUpdatePayload;
// Resets the encoded_bytes counter here.
self.encoded_bytes = 0;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
Ok(())
}
fn encode_window_update_payload(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -805,12 +861,37 @@ impl FrameEncoder {
frame_header_size - self.encoded_bytes
};
let bytes_to_write = remaining_header_bytes.min(buf.len());
for buf_index in 0..bytes_to_write {
self.iterate_settings_header(
frame,
buf,
settings.get_settings().len() * 6,
bytes_to_write,
)?;
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingSettingsPayload;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn iterate_settings_header(
&self,
frame: &Frame,
buf: &mut [u8],
payload_len: usize,
len: usize,
) -> Result<(), FrameEncoderErr> {
for buf_index in 0..len {
let header_byte_index = self.encoded_bytes + buf_index;
match header_byte_index {
// The first 3 bytes represent the payload length in the frame header.
0..=2 => {
let payload_len = settings.get_settings().len() * 6;
buf[buf_index] = ((payload_len >> (16 - (8 * buf_index))) & 0xFF) as u8;
}
// The 4th byte represents the frame type in the frame header.
@ -833,17 +914,7 @@ impl FrameEncoder {
}
}
}
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingSettingsPayload;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
Ok(())
}
fn encode_settings_payload(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -853,7 +924,28 @@ impl FrameEncoder {
let remaining_payload_bytes =
settings_len.saturating_sub(self.encoded_bytes.saturating_sub(9));
let bytes_to_write = remaining_payload_bytes.min(buf.len());
for (buf_index, buf_item) in buf.iter_mut().enumerate().take(bytes_to_write) {
self.iterate_settings_payload(settings, buf, bytes_to_write)?;
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == 9 + settings_len {
self.state = FrameEncoderState::DataComplete;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn iterate_settings_payload(
&self,
settings: &Settings,
buf: &mut [u8],
len: usize,
) -> Result<(), FrameEncoderErr> {
for (buf_index, buf_item) in buf.iter_mut().enumerate().take(len) {
let payload_byte_index = self.encoded_bytes - 9 + buf_index;
let setting_index = payload_byte_index / 6;
let setting_byte_index = payload_byte_index % 6;
@ -883,18 +975,7 @@ impl FrameEncoder {
return Err(FrameEncoderErr::InternalError);
}
}
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == 9 + settings_len {
self.state = FrameEncoderState::DataComplete;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
Ok(())
}
fn encode_priority_frame(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -909,7 +990,27 @@ impl FrameEncoder {
};
let bytes_to_write = remaining_header_bytes.min(buf.len());
for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) {
self.iterate_priority_header(frame, buf, bytes_to_write)?;
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingPriorityPayload;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn iterate_priority_header(
&self,
frame: &Frame,
buf: &mut [u8],
len: usize,
) -> Result<(), FrameEncoderErr> {
for (buf_index, item) in buf.iter_mut().enumerate().take(len) {
let header_byte_index = self.encoded_bytes + buf_index;
match header_byte_index {
// The first 3 bytes represent the payload length in the frame header.
@ -936,17 +1037,7 @@ impl FrameEncoder {
}
}
}
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingPriorityPayload;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
Ok(())
}
fn encode_priority_payload(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -957,7 +1048,29 @@ impl FrameEncoder {
let remaining_payload_bytes = 5 - (self.encoded_bytes - frame_header_size);
let bytes_to_write = remaining_payload_bytes.min(buf.len());
for (buf_index, buf_item) in buf.iter_mut().enumerate().take(bytes_to_write) {
self.iterate_priority_payload(priority, buf, frame_header_size, bytes_to_write)?;
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size + 5 {
self.state = FrameEncoderState::DataComplete
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn iterate_priority_payload(
&self,
priority: &Priority,
buf: &mut [u8],
frame_header_size: usize,
len: usize,
) -> Result<(), FrameEncoderErr> {
for (buf_index, buf_item) in buf.iter_mut().enumerate().take(len) {
let payload_byte_index = self
.encoded_bytes
.saturating_sub(frame_header_size)
@ -982,18 +1095,7 @@ impl FrameEncoder {
}
}
}
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size + 5 {
self.state = FrameEncoderState::DataComplete
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
Ok(())
}
fn encode_rst_stream_frame(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
@ -1087,7 +1189,27 @@ impl FrameEncoder {
frame_header_size - self.encoded_bytes
};
let bytes_to_write = remaining_header_bytes.min(buf.len());
for buf_index in 0..bytes_to_write {
self.iterate_ping_header(frame, buf, bytes_to_write)?;
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingPingPayload;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
}
fn iterate_ping_header(
&self,
frame: &Frame,
buf: &mut [u8],
len: usize,
) -> Result<(), FrameEncoderErr> {
for buf_index in 0..len {
let header_byte_index = self.encoded_bytes + buf_index;
match header_byte_index {
// The first 3 bytes represent the payload length in the frame header.
@ -1116,17 +1238,7 @@ impl FrameEncoder {
}
}
}
self.encoded_bytes += bytes_to_write;
if self.encoded_bytes == frame_header_size {
self.state = FrameEncoderState::EncodingPingPayload;
}
Ok(bytes_to_write)
} else {
Err(FrameEncoderErr::UnexpectedPayloadType)
}
} else {
Err(FrameEncoderErr::NoCurrentFrame)
}
Ok(())
}
fn encode_ping_payload(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {

View File

@ -342,7 +342,12 @@ impl StaticTable {
(Header::Status, "404") => Some(TableIndex::Header(13)),
(Header::Status, "500") => Some(TableIndex::Header(14)),
(Header::Status, _) => Some(TableIndex::HeaderName(8)),
(Header::Other(s), v) => match (s.as_str(), v) {
(Header::Other(s), v) => Self::index_headers(s.as_str(), v),
}
}
fn index_headers(key: &str, value: &str) -> Option<TableIndex> {
match (key, value) {
("accept-charset", _) => Some(TableIndex::HeaderName(15)),
("accept-encoding", "gzip, deflate") => Some(TableIndex::Header(16)),
("accept-encoding", _) => Some(TableIndex::HeaderName(16)),
@ -392,7 +397,6 @@ impl StaticTable {
("via", _) => Some(TableIndex::HeaderName(60)),
("www-authenticate", _) => Some(TableIndex::HeaderName(61)),
_ => None,
},
}
}
}

View File

@ -645,13 +645,15 @@ impl Authority {
return Err(InvalidUri::UriMissAuthority);
}
let (authority, rest) = authority_token(bytes)?;
if !rest.is_empty() || authority.is_none() {
return Err(InvalidUri::InvalidAuthority);
if rest.is_empty() {
if let Some(auth) = authority {
return Ok(auth);
}
Ok(authority.unwrap())
}
Err(InvalidUri::InvalidAuthority)
}
/// Gets a immutable reference to `Host`.
/// Gets an immutable reference to `Host`.
///
/// # Examples
///
@ -1046,6 +1048,16 @@ fn authority_token(bytes: &[u8]) -> Result<(Option<Authority>, &[u8]), InvalidUr
}
}
}
authority_parse(bytes, end, colon_num, left_bracket, right_bracket)
}
fn authority_parse(
bytes: &[u8],
end: usize,
colon_num: i32,
left_bracket: bool,
right_bracket: bool,
) -> Result<(Option<Authority>, &[u8]), InvalidUri> {
// The authority does not exist.
if end == 0 {
return Ok((None, &bytes[end..]));

View File

@ -33,7 +33,7 @@ async fn client_send() -> Result<(), HttpClientError> {
// Creates a `Request`.
let request = Request::builder()
.url("127.0.0.1:3000")
.url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
.body(Body::empty())?;
// Sends request and receives a `Response`.

View File

@ -12,18 +12,21 @@
// limitations under the License.
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use ylong_http::body::async_impl::Body;
use ylong_http::body::{ChunkBody, TextBody};
use ylong_http::h1::{RequestEncoder, ResponseDecoder};
use ylong_http::request::uri::Scheme;
use ylong_http::response::ResponsePart;
use ylong_http::version::Version;
use super::StreamData;
use crate::async_impl::connector::ConnInfo;
use crate::async_impl::interceptor::Interceptors;
use crate::async_impl::request::Message;
use crate::async_impl::{HttpBody, Response};
use crate::async_impl::{HttpBody, Request, Response};
use crate::error::HttpClientError;
use crate::runtime::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use crate::util::dispatcher::http1::Http1Conn;
@ -46,66 +49,14 @@ where
.intercept_request(message.request.ref_mut())?;
let mut buf = vec![0u8; TEMP_BUF_SIZE];
// Encodes and sends Request-line and Headers(non-body fields).
let mut part_encoder = RequestEncoder::new(message.request.ref_mut().part().clone());
if conn.raw_mut().is_proxy() && message.request.ref_mut().uri().scheme() == Some(&Scheme::HTTP)
{
part_encoder.absolute_uri(true);
}
loop {
match part_encoder.encode(&mut buf[..]) {
Ok(0) => break,
Ok(written) => {
message.interceptor.intercept_input(&buf[..written])?;
// RequestEncoder writes `buf` as much as possible.
if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
conn.shutdown();
return err_from_io!(Request, e);
}
}
Err(e) => {
conn.shutdown();
return err_from_other!(Request, e);
}
}
}
let content_length = message
.request
.ref_mut()
.part()
.headers
.get("Content-Length")
.and_then(|v| v.to_string().ok())
.and_then(|v| v.parse::<u64>().ok())
.is_some();
let transfer_encoding = message
.request
.ref_mut()
.part()
.headers
.get("Transfer-Encoding")
.and_then(|v| v.to_string().ok())
.map(|v| v.contains("chunked"))
.unwrap_or(false);
let body = message.request.ref_mut().body_mut();
match (content_length, transfer_encoding) {
(_, true) => {
let body = ChunkBody::from_async_reader(body);
encode_body(&mut conn, body, &mut buf).await?;
}
(true, false) => {
let body = TextBody::from_async_reader(body);
encode_body(&mut conn, body, &mut buf).await?;
}
(false, false) => {
let body = TextBody::from_async_reader(body);
encode_body(&mut conn, body, &mut buf).await?;
}
};
encode_request_part(
message.request.ref_mut(),
&message.interceptor,
&mut conn,
&mut buf,
)
.await?;
encode_various_body(message.request.ref_mut(), &mut conn, &mut buf).await?;
// Decodes response part.
let (part, pre) = {
@ -135,6 +86,95 @@ where
}
};
decode_response(message, part, conn, pre)
}
async fn encode_various_body<S>(
request: &mut Request,
conn: &mut Http1Conn<S>,
buf: &mut [u8],
) -> Result<(), HttpClientError>
where
S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
{
let content_length = request
.part()
.headers
.get("Content-Length")
.and_then(|v| v.to_string().ok())
.and_then(|v| v.parse::<u64>().ok())
.is_some();
let transfer_encoding = request
.part()
.headers
.get("Transfer-Encoding")
.and_then(|v| v.to_string().ok())
.map(|v| v.contains("chunked"))
.unwrap_or(false);
let body = request.body_mut();
match (content_length, transfer_encoding) {
(_, true) => {
let body = ChunkBody::from_async_reader(body);
encode_body(conn, body, buf).await?;
}
(true, false) => {
let body = TextBody::from_async_reader(body);
encode_body(conn, body, buf).await?;
}
(false, false) => {
let body = TextBody::from_async_reader(body);
encode_body(conn, body, buf).await?;
}
};
Ok(())
}
async fn encode_request_part<S>(
request: &Request,
interceptor: &Arc<Interceptors>,
conn: &mut Http1Conn<S>,
buf: &mut [u8],
) -> Result<(), HttpClientError>
where
S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
{
// Encodes and sends Request-line and Headers(non-body fields).
let mut part_encoder = RequestEncoder::new(request.part().clone());
if conn.raw_mut().is_proxy() && request.uri().scheme() == Some(&Scheme::HTTP) {
part_encoder.absolute_uri(true);
}
loop {
match part_encoder.encode(&mut buf[..]) {
Ok(0) => break,
Ok(written) => {
interceptor.intercept_input(&buf[..written])?;
// RequestEncoder writes `buf` as much as possible.
if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
conn.shutdown();
return err_from_io!(Request, e);
}
}
Err(e) => {
conn.shutdown();
return err_from_other!(Request, e);
}
}
}
Ok(())
}
fn decode_response<S>(
mut message: Message,
part: ResponsePart,
conn: Http1Conn<S>,
pre: &[u8],
) -> Result<Response, HttpClientError>
where
S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
{
// The shutdown function only sets the current connection to the closed state
// and does not release the connection immediately.
// Instead, the connection will be completely closed

View File

@ -260,12 +260,7 @@ where
Some(Poll::Ready(Ok(())))
} else {
buf.append_slice(&data[..fill_len]);
if frame.flags().is_end_stream() {
text_io.is_closed = true;
Some(Poll::Ready(Ok(())))
} else {
None
}
Self::end_read(text_io, frame.flags().is_end_stream())
}
}
Payload::RstStream(reset) => {
@ -288,6 +283,15 @@ where
}
}
fn end_read(text_io: &mut TextIo<S>, end_stream: bool) -> Option<Poll<std::io::Result<()>>> {
if end_stream {
text_io.is_closed = true;
Some(Poll::Ready(Ok(())))
} else {
None
}
}
fn read_remaining_data(
text_io: &mut TextIo<S>,
buf: &mut HttpReadBuf,

View File

@ -208,9 +208,7 @@ impl UntilClose {
if buf.is_empty() {
return Poll::Ready(Ok(0));
}
let mut read = 0;
if let Some(pre) = self.pre.as_mut() {
// Here cursor read never failed.
let this_read = Read::read(pre, buf).unwrap();
@ -222,9 +220,23 @@ impl UntilClose {
}
if !buf[read..].is_empty() {
if let Some(mut io) = self.io.take() {
if let Some(io) = self.io.take() {
return self.poll_read_io(cx, io, read, buf);
}
}
Poll::Ready(Ok(read))
}
fn poll_read_io(
&mut self,
cx: &mut Context<'_>,
mut io: BoxStreamData,
read: usize,
buf: &mut [u8],
) -> Poll<Result<usize, HttpClientError>> {
let mut read = read;
let mut read_buf = ReadBuf::new(&mut buf[read..]);
return match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
// Disconnected.
Poll::Ready(Ok(())) => {
let filled = read_buf.filled().len();
@ -248,12 +260,9 @@ impl UntilClose {
Poll::Ready(Err(e)) => {
// If IO error occurs, shutdowns `io` before return.
io.shutdown();
return Poll::Ready(err_from_io!(BodyTransfer, e));
}
};
Poll::Ready(err_from_io!(BodyTransfer, e))
}
}
Poll::Ready(Ok(read))
}
}
@ -299,6 +308,25 @@ impl Text {
self.pre = None;
} else {
read += this_read;
if let Some(result) = self.read_remaining(buf, read) {
return result;
}
}
}
if !buf[read..].is_empty() {
if let Some(io) = self.io.take() {
return self.poll_read_io(cx, buf, io, read);
}
}
Poll::Ready(Ok(read))
}
fn read_remaining(
&mut self,
buf: &mut [u8],
read: usize,
) -> Option<Poll<Result<usize, HttpClientError>>> {
let (text, rem) = self.decoder.decode(&buf[..read]);
// Contains redundant `rem`, return error.
@ -307,20 +335,25 @@ impl Text {
if let Some(io) = self.io.take() {
io.shutdown();
};
return Poll::Ready(err_from_msg!(BodyDecode, "Not eof"));
Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof")))
}
(true, true) => {
self.io = None;
return Poll::Ready(Ok(read));
Some(Poll::Ready(Ok(read)))
}
// TextBodyDecoder decodes as much as possible here.
_ => {}
}
_ => None,
}
}
if !buf[read..].is_empty() {
if let Some(mut io) = self.io.take() {
fn poll_read_io(
&mut self,
cx: &mut Context<'_>,
buf: &mut [u8],
mut io: BoxStreamData,
read: usize,
) -> Poll<Result<usize, HttpClientError>> {
let mut read = read;
let mut read_buf = ReadBuf::new(&mut buf[read..]);
match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
// Disconnected.
@ -328,10 +361,7 @@ impl Text {
let filled = read_buf.filled().len();
if filled == 0 {
io.shutdown();
return Poll::Ready(err_from_msg!(
BodyDecode,
"Response body incomplete"
));
return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete"));
}
let (text, rem) = self.decoder.decode(read_buf.filled());
self.interceptors.intercept_output(read_buf.filled())?;
@ -340,30 +370,29 @@ impl Text {
match (text.is_complete(), rem.is_empty()) {
(true, false) => {
io.shutdown();
return Poll::Ready(err_from_msg!(BodyDecode, "Not eof"));
}
(true, true) => return Poll::Ready(Ok(read)),
_ => {}
Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))
}
(true, true) => Poll::Ready(Ok(read)),
_ => {
self.io = Some(io);
Poll::Ready(Ok(read))
}
}
}
Poll::Pending => {
self.io = Some(io);
if read != 0 {
return Poll::Ready(Ok(read));
}
return Poll::Pending;
Poll::Pending
}
Poll::Ready(Err(e)) => {
// If IO error occurs, shutdowns `io` before return.
io.shutdown();
return Poll::Ready(err_from_io!(BodyDecode, e));
Poll::Ready(err_from_io!(BodyDecode, e))
}
}
}
}
Poll::Ready(Ok(read))
}
}
#[cfg(feature = "http1_1")]

View File

@ -229,6 +229,25 @@ unsafe extern "C" fn destroy<S>(bio: *mut BIO) -> c_int {
1
}
macro_rules! catch_unwind_bio {
($io: expr, $flag: expr, $bio: expr, $state: expr) => {
match catch_unwind(AssertUnwindSafe(|| $io)) {
Ok(Err(err)) => {
if retry_error(&err) {
BIO_set_flags($bio, BIO_FLAGS_SHOULD_RETRY | $flag)
}
$state.error = Some(err);
-1
}
Ok(Ok(len)) => len as c_int,
Err(err) => {
$state.panic = Some(err);
-1
}
}
};
}
unsafe extern "C" fn bwrite<S: Write>(bio: *mut BIO, buf: *const c_char, len: c_int) -> c_int {
BIO_clear_flags(bio, BIO_FLAGS_SHOULD_RETRY | BIO_FLAGS_RWS);
@ -239,21 +258,7 @@ unsafe extern "C" fn bwrite<S: Write>(bio: *mut BIO, buf: *const c_char, len: c_
}
let buf = slice::from_raw_parts(buf as *const _, len as usize);
match catch_unwind(AssertUnwindSafe(|| state.stream.write(buf))) {
Ok(Err(err)) => {
if retry_error(&err) {
BIO_set_flags(bio, BIO_FLAGS_SHOULD_RETRY | BIO_FLAGS_WRITE)
}
state.error = Some(err);
-1
}
Ok(Ok(len)) => len as c_int,
Err(err) => {
state.panic = Some(err);
-1
}
}
catch_unwind_bio!(state.stream.write(buf), BIO_FLAGS_WRITE, bio, state)
}
unsafe extern "C" fn bread<S: Read>(bio: *mut BIO, buf: *mut c_char, len: c_int) -> c_int {
@ -262,20 +267,7 @@ unsafe extern "C" fn bread<S: Read>(bio: *mut BIO, buf: *mut c_char, len: c_int)
let state = get_state::<S>(bio);
let buf = slice::from_raw_parts_mut(buf as *mut _, len as usize);
match catch_unwind(AssertUnwindSafe(|| state.stream.read(buf))) {
Ok(Err(err)) => {
if retry_error(&err) {
BIO_set_flags(bio, BIO_FLAGS_SHOULD_RETRY | BIO_FLAGS_READ)
}
state.error = Some(err);
-1
}
Ok(Ok(len)) => len as c_int,
Err(err) => {
state.panic = Some(err);
-1
}
}
catch_unwind_bio!(state.stream.read(buf), BIO_FLAGS_READ, bio, state)
}
unsafe extern "C" fn bputs<S: Write>(bio: *mut BIO, buf: *const c_char) -> c_int {

View File

@ -148,16 +148,8 @@ impl SslContextBuilder {
where
P: AsRef<Path>,
{
let path = match file.as_ref().as_os_str().to_str() {
Some(path) => path,
None => return Err(ErrorStack::get()),
};
let file = match CString::new(path) {
Ok(path) => path,
Err(_) => return Err(ErrorStack::get()),
};
let file = Self::get_c_file(file)?;
let ptr = self.as_ptr_mut();
check_ret(unsafe {
SSL_CTX_load_verify_locations(ptr, file.as_ptr() as *const _, ptr::null())
})
@ -200,16 +192,8 @@ impl SslContextBuilder {
where
P: AsRef<Path>,
{
let path = match file.as_ref().as_os_str().to_str() {
Some(path) => path,
None => return Err(ErrorStack::get()),
};
let file = match CString::new(path) {
Ok(path) => path,
Err(_) => return Err(ErrorStack::get()),
};
let file = Self::get_c_file(file)?;
let ptr = self.as_ptr_mut();
check_ret(unsafe {
SSL_CTX_use_certificate_file(ptr, file.as_ptr() as *const _, file_type.as_raw())
})
@ -222,6 +206,16 @@ impl SslContextBuilder {
/// followed by intermediate CA certificates if applicable, and ending
/// at the highest level (root) CA.
pub(crate) fn set_certificate_chain_file<P>(&mut self, file: P) -> Result<(), ErrorStack>
where
P: AsRef<Path>,
{
let file = Self::get_c_file(file)?;
let ptr = self.as_ptr_mut();
check_ret(unsafe { SSL_CTX_use_certificate_chain_file(ptr, file.as_ptr() as *const _) })
.map(|_| ())
}
pub(crate) fn get_c_file<P>(file: P) -> Result<CString, ErrorStack>
where
P: AsRef<Path>,
{
@ -229,14 +223,10 @@ impl SslContextBuilder {
Some(path) => path,
None => return Err(ErrorStack::get()),
};
let file = match CString::new(path) {
Ok(path) => path,
Err(_) => return Err(ErrorStack::get()),
};
let ptr = self.as_ptr_mut();
check_ret(unsafe { SSL_CTX_use_certificate_chain_file(ptr, file.as_ptr() as *const _) })
.map(|_| ())
match CString::new(path) {
Ok(path) => Ok(path),
Err(_) => Err(ErrorStack::get()),
}
}
/// Sets the protocols to sent to the server for Application Layer Protocol

View File

@ -279,70 +279,21 @@ pub(crate) mod http2 {
flow.setup_recv_window(config.conn_window_size());
let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow);
let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE);
let decoder = FrameDecoder::new();
let (read, write) = crate::runtime::split(io);
let shutdown_flag = Arc::new(AtomicBool::new(false));
let settings_sync = Arc::new(Mutex::new(SettingsSync::default()));
let controller = StreamController::new(streams, shutdown_flag.clone());
// The id of the client stream, starting from 1
let next_stream_id = StreamId {
id: AtomicU32::new(1),
};
let (input_tx, input_rx) = unbounded_channel();
let (resp_tx, resp_rx) = unbounded_channel();
let (req_tx, req_rx) = unbounded_channel();
match input_tx.send(settings) {
Ok(_) => {
let send_settings_sync = settings_sync.clone();
let _send = crate::runtime::spawn(async move {
let mut writer = write;
match async_send_preface(&mut writer).await {
Ok(_) => {
let mut send =
SendData::new(encoder, send_settings_sync, writer, input_rx);
match Pin::new(&mut send).await {
Ok(_) => {}
Err(_e) => {}
}
}
Err(_e) => {}
}
});
let recv_settings_sync = settings_sync.clone();
let _recv = crate::runtime::spawn(async move {
let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx);
match Pin::new(&mut recv).await {
Ok(_) => {}
Err(_e) => {}
}
});
let _manager = crate::runtime::spawn(async move {
let mut conn_manager =
ConnManager::new(settings_sync, input_tx, resp_rx, req_rx, controller);
match Pin::new(&mut conn_manager).await {
Ok(_) => {}
Err(e) => {
conn_manager.exit_with_error(e);
}
}
});
}
Err(_e) => {
// Error is not possible, so it is not handled for the time
// being.
if input_tx.send(settings).is_ok() {
Self::launch(controller, req_rx, input_tx, input_rx, io);
}
}
Self {
next_stream_id,
sender: req_tx,
@ -350,6 +301,43 @@ pub(crate) mod http2 {
_mark: PhantomData,
}
}
fn launch(
controller: StreamController,
req_rx: UnboundedReceiver<ReqMessage>,
input_tx: UnboundedSender<Frame>,
input_rx: UnboundedReceiver<Frame>,
io: S,
) {
let (resp_tx, resp_rx) = unbounded_channel();
let (read, write) = crate::runtime::split(io);
let settings_sync = Arc::new(Mutex::new(SettingsSync::default()));
let send_settings_sync = settings_sync.clone();
let _send = crate::runtime::spawn(async move {
let mut writer = write;
if async_send_preface(&mut writer).await.is_ok() {
let encoder =
FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE);
let mut send = SendData::new(encoder, send_settings_sync, writer, input_rx);
let _ = Pin::new(&mut send).await;
}
});
let recv_settings_sync = settings_sync.clone();
let _recv = crate::runtime::spawn(async move {
let decoder = FrameDecoder::new();
let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx);
let _ = Pin::new(&mut recv).await;
});
let _manager = crate::runtime::spawn(async move {
let mut conn_manager =
ConnManager::new(settings_sync, input_tx, resp_rx, req_rx, controller);
if let Err(e) = Pin::new(&mut conn_manager).await {
conn_manager.exit_with_error(e);
}
});
}
}
impl<S> Dispatcher for Http2Dispatcher<S> {

View File

@ -227,15 +227,11 @@ impl NoProxy {
}
fn contains_ip(&self, ip: IpAddr) -> bool {
for block_ip in self.ips.iter() {
match block_ip {
Ip::Address(i) => {
for Ip::Address(i) in self.ips.iter() {
if &ip == i {
return true;
}
}
}
}
false
}