!128 修复http2内存和cpu超限

Merge pull request !128 from Tiga Ultraman/binary
This commit is contained in:
openharmony_ci 2024-08-06 09:14:27 +00:00 committed by Gitee
commit 5a6a13997b
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
14 changed files with 756 additions and 306 deletions

View File

@ -21,6 +21,8 @@ use crate::h2::{Frame, Goaway, HpackEncoder, Settings};
// Frame_size_error/Protocol Error.
const DEFAULT_MAX_FRAME_SIZE: usize = 16384;
const DEFAULT_HEADER_TABLE_SIZE: usize = 4096;
#[derive(Debug)]
pub enum FrameEncoderErr {
EncodingData,
@ -42,6 +44,7 @@ enum FrameEncoderState {
EncodingHeadersPayload,
// The state for encoding the padding octets for the HEADERS frame, if the PADDED flag is set.
EncodingHeadersPadding,
// TODO compare to max_header_list_size
// The state for encoding CONTINUATION frames if the header block exceeds the max_frame_size.
EncodingContinuationFrames,
// The final state, indicating that the HEADERS frame and any necessary CONTINUATION frames
@ -109,12 +112,12 @@ pub struct FrameEncoder {
impl FrameEncoder {
/// Constructs a new `FrameEncoder` with specified maximum frame size and
/// maximum header list size.
pub fn new(max_frame_size: usize, max_header_list_size: usize) -> Self {
pub fn new(max_frame_size: usize, use_huffman: bool) -> Self {
FrameEncoder {
current_frame: None,
max_frame_size,
max_header_list_size,
hpack_encoder: HpackEncoder::with_max_size(max_header_list_size),
max_header_list_size: usize::MAX,
hpack_encoder: HpackEncoder::new(DEFAULT_HEADER_TABLE_SIZE, use_huffman),
state: FrameEncoderState::Idle,
encoded_bytes: 0,
data_offset: 0,
@ -343,9 +346,14 @@ impl FrameEncoder {
}
/// Sets the maximum header table size for the current encoder instance.
// TODO enable update header table size.
pub fn update_header_table_size(&mut self, size: usize) {
self.hpack_encoder.update_max_dynamic_table_size(size)
}
// TODO enable update max header list size.
pub(crate) fn update_max_header_list_size(&mut self, size: usize) {
self.max_header_list_size = size;
self.hpack_encoder = HpackEncoder::with_max_size(self.max_header_list_size)
}
fn finish_current_frame(&mut self) {
@ -1328,7 +1336,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_data_frame_encoding() {
let mut encoder = FrameEncoder::new(4096, 4096);
let mut encoder = FrameEncoder::new(4096, false);
let data_payload = b"hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh".to_vec();
let data_frame = Frame::new(
@ -1367,7 +1375,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_headers_frame_encoding() {
let mut frame_encoder = FrameEncoder::new(4096, 8190);
let mut frame_encoder = FrameEncoder::new(4096, false);
let mut new_parts = Parts::new();
new_parts.pseudo.set_method(Some("GET".to_string()));
@ -1406,7 +1414,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_settings_frame_encoding() {
let mut encoder = FrameEncoder::new(4096, 4096);
let mut encoder = FrameEncoder::new(4096, false);
let settings_payload = vec![
Setting::HeaderTableSize(4096),
Setting::EnablePush(true),
@ -1473,7 +1481,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_ping_frame_encoding() {
let mut encoder = FrameEncoder::new(4096, 4096);
let mut encoder = FrameEncoder::new(4096, false);
let ping_payload = [1, 2, 3, 4, 5, 6, 7, 8];
let ping_frame = Frame::new(
@ -1525,7 +1533,7 @@ mod ut_frame_encoder {
/// 4. Checks whether the encoding results are correct.
#[test]
fn ut_continue_frame_encoding() {
let mut encoder = FrameEncoder::new(4096, 8190);
let mut encoder = FrameEncoder::new(4096, false);
let mut new_parts = Parts::new();
new_parts.pseudo.set_method(Some("GET".to_string()));
@ -1587,7 +1595,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_rst_stream_frame_encoding() {
let mut frame_encoder = FrameEncoder::new(4096, 8190);
let mut frame_encoder = FrameEncoder::new(4096, false);
let error_code = 12345678;
let rst_stream_payload = Payload::RstStream(RstStream::new(error_code));
@ -1632,7 +1640,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_window_update_frame_encoding() {
let mut frame_encoder = FrameEncoder::new(4096, 8190);
let mut frame_encoder = FrameEncoder::new(4096, false);
let window_size_increment = 12345678;
let window_update_payload = Payload::WindowUpdate(WindowUpdate::new(window_size_increment));
@ -1677,7 +1685,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_priority_frame_encoding() {
let mut encoder = FrameEncoder::new(4096, 4096);
let mut encoder = FrameEncoder::new(4096, false);
// Maximum value for a 31-bit integer
let stream_dependency = 0x7FFFFFFF;
let priority_payload = Priority::new(true, stream_dependency, 15);
@ -1732,7 +1740,7 @@ mod ut_frame_encoder {
#[test]
fn ut_goaway_frame_encoding() {
// 1. Creates a `FrameEncoder`.
let mut encoder = FrameEncoder::new(4096, 4096);
let mut encoder = FrameEncoder::new(4096, false);
// 2. Creates a `Frame` with `Payload::Goaway`.
let last_stream_id = 1;
@ -1782,24 +1790,11 @@ mod ut_frame_encoder {
/// 3. Checks whether the maximum frame size was updated correctly.
#[test]
fn ut_update_max_frame_size() {
let mut encoder = FrameEncoder::new(4096, 4096);
let mut encoder = FrameEncoder::new(4096, false);
encoder.update_max_frame_size(8192);
assert_eq!(encoder.max_frame_size, 8192);
}
/// UT test cases for `FrameEncoder::update_header_table_size`.
///
/// # Brief
/// 1. Creates a `FrameEncoder`.
/// 2. Updates the maximum header table size.
/// 3. Checks whether the maximum header table size was updated correctly.
#[test]
fn ut_update_header_table_size() {
let mut encoder = FrameEncoder::new(4096, 4096);
encoder.update_header_table_size(8192);
assert_eq!(encoder.max_header_list_size, 8192);
}
/// UT test cases for `FrameEncoder::update_setting`.
///
/// # Brief
@ -1811,7 +1806,7 @@ mod ut_frame_encoder {
/// 6. Checks whether the setting was updated correctly.
#[test]
fn ut_update_setting() {
let mut encoder = FrameEncoder::new(4096, 4096);
let mut encoder = FrameEncoder::new(4096, false);
let settings_payload = vec![Setting::MaxFrameSize(4096)];
let settings = Settings::new(settings_payload);
let settings_frame = Frame::new(0, FrameFlags::new(0), Payload::Settings(settings));
@ -1838,7 +1833,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_encode_continuation_frames() {
let mut frame_encoder = FrameEncoder::new(4096, 8190);
let mut frame_encoder = FrameEncoder::new(4096, false);
let mut new_parts = Parts::new();
assert!(new_parts.is_empty());
new_parts.pseudo.set_method(Some("GET".to_string()));
@ -1849,7 +1844,7 @@ mod ut_frame_encoder {
.set_authority(Some("example.com".to_string()));
let mut frame_flag = FrameFlags::empty();
frame_flag.set_end_headers(false);
frame_flag.set_end_headers(true);
frame_flag.set_end_stream(false);
let frame = Frame::new(
1,
@ -1863,7 +1858,8 @@ mod ut_frame_encoder {
assert!(frame_encoder.encode_continuation_frames(&mut buf).is_ok());
let frame_flag = FrameFlags::empty();
let mut frame_flag = FrameFlags::empty();
frame_flag.set_end_headers(true);
let frame = Frame::new(
1,
frame_flag,
@ -1874,7 +1870,8 @@ mod ut_frame_encoder {
frame_encoder.state = FrameEncoderState::EncodingContinuationFrames;
assert!(frame_encoder.encode_continuation_frames(&mut buf).is_ok());
let frame_flag = FrameFlags::empty();
let mut frame_flag = FrameFlags::empty();
frame_flag.set_end_headers(true);
let frame = Frame::new(1, frame_flag, Payload::Ping(Ping::new([0; 8])));
frame_encoder.set_frame(frame).unwrap();
@ -1892,10 +1889,11 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_encode_padding() {
let mut frame_encoder = FrameEncoder::new(4096, 8190);
let mut frame_encoder = FrameEncoder::new(4096, false);
// Creates a padded data frame.
let mut frame_flags = FrameFlags::empty();
frame_flags.set_end_headers(true);
frame_flags.set_padded(true);
let data_payload = vec![0u8; 500];
let data_frame = Frame::new(
@ -1931,7 +1929,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_encode_small_data_frame() {
let mut encoder = FrameEncoder::new(100, 4096);
let mut encoder = FrameEncoder::new(100, false);
let data_payload = vec![b'a'; 10];
let mut buf = [0u8; 10];
encode_small_frame(&mut encoder, &mut buf, data_payload.clone());
@ -1971,7 +1969,7 @@ mod ut_frame_encoder {
/// 5. Checks whether the result is correct.
#[test]
fn ut_encode_large_data_frame() {
let mut encoder = FrameEncoder::new(100, 4096);
let mut encoder = FrameEncoder::new(100, false);
let data_payload = vec![b'a'; 1024];
let mut buf = [0u8; 10];

View File

@ -29,7 +29,7 @@ use std::convert::{Infallible, TryFrom};
use crate::error::{ErrorKind, HttpError};
/// The http2 error handle implementation
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum H2Error {
/// [`Stream Error`] Handling.
///
@ -45,7 +45,7 @@ pub enum H2Error {
/// [`Error Codes`] implementation.
///
/// [`Error Codes`]: https://httpwg.org/specs/rfc9113.html#ErrorCodes
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum ErrorCode {
/// The associated condition is not a result of an error. For example,
/// a `GOAWAY` might include this code to indicate graceful shutdown of a

View File

@ -22,17 +22,23 @@ use crate::h2::{Parts, PseudoHeaders};
pub(crate) struct HpackEncoder {
table: DynamicTable,
holder: ReprEncStateHolder,
use_huffman: bool,
}
impl HpackEncoder {
/// Create a `HpackEncoder` with the given max size.
pub(crate) fn with_max_size(max_size: usize) -> Self {
/// Create a `HpackEncoder` with the given max dynamic table size and
/// huffman usage.
pub(crate) fn new(max_size: usize, use_huffman: bool) -> Self {
Self {
table: DynamicTable::with_max_size(max_size),
holder: ReprEncStateHolder::new(),
use_huffman,
}
}
// TODO enable update header_table_size
pub(crate) fn update_max_dynamic_table_size(&self, _max_size: usize) {}
/// Set the `Parts` to be encoded.
pub(crate) fn set_parts(&mut self, parts: Parts) {
self.holder.set_parts(parts)
@ -43,7 +49,7 @@ impl HpackEncoder {
pub(crate) fn encode(&mut self, dst: &mut [u8]) -> usize {
let mut encoder = ReprEncoder::new(&mut self.table);
encoder.load(&mut self.holder);
let size = encoder.encode(dst);
let size = encoder.encode(dst, self.use_huffman);
if size == dst.len() {
encoder.save(&mut self.holder);
}
@ -91,7 +97,7 @@ mod ut_hpack_encoder {
fn rfc7541_test_cases() {
// C.2.1. Literal Header Field with Indexing
hpack_test_cases!(
HpackEncoder::with_max_size(4096),
HpackEncoder::new(4096, false),
26, "400a637573746f6d2d6b65790d637573746f6d2d686561646572", 55,
{
Header::Other(String::from("custom-key")),
@ -104,7 +110,7 @@ mod ut_hpack_encoder {
// C.2.4. Indexed Header Field
hpack_test_cases!(
HpackEncoder::with_max_size(4096),
HpackEncoder::new(4096, false),
1, "82", 0,
{
Header::Method,
@ -114,7 +120,7 @@ mod ut_hpack_encoder {
// C.3. Request Examples without Huffman Coding
{
let mut encoder = HpackEncoder::with_max_size(4096);
let mut encoder = HpackEncoder::new(4096, false);
// C.3.1. First Request
hpack_test_cases!(
&mut encoder,
@ -172,7 +178,7 @@ mod ut_hpack_encoder {
// C.5. Response Examples without Huffman Coding
{
let mut encoder = HpackEncoder::with_max_size(256);
let mut encoder = HpackEncoder::new(256, false);
// C.5.1. First Response
hpack_test_cases!(
&mut encoder,

View File

@ -19,6 +19,7 @@ use crate::h2::hpack::representation::PrefixIndexMask;
use crate::h2::hpack::table::{DynamicTable, Header, TableIndex, TableSearcher};
use crate::h2::{Parts, PseudoHeaders};
use crate::headers::HeadersIntoIter;
use crate::huffman::huffman_encode;
/// Encoder implementation for decoding representation. The encode interface
/// supports segmented writing.
@ -61,7 +62,7 @@ impl<'a> ReprEncoder<'a> {
/// Decoding is complete only when `self.iter` and `self.state` are both
/// `None`. It is recommended that users save the result to a
/// `RecEncStateHolder` immediately after using the method.
pub(crate) fn encode(&mut self, dst: &mut [u8]) -> usize {
pub(crate) fn encode(&mut self, dst: &mut [u8], use_huffman: bool) -> usize {
// If `dst` is empty, leave the state unchanged.
if dst.is_empty() {
return 0;
@ -92,13 +93,17 @@ impl<'a> ReprEncoder<'a> {
Some(TableIndex::HeaderName(index)) => {
// Update it to the dynamic table first, then decode it.
self.table.update(h.clone(), v.clone());
Indexing::new(index, v.into_bytes(), false).encode(&mut dst[cur..])
Indexing::new(index, v.into_bytes(), use_huffman).encode(&mut dst[cur..])
}
None => {
// Update it to the dynamic table first, then decode it.
self.table.update(h.clone(), v.clone());
IndexingWithName::new(h.into_string().into_bytes(), v.into_bytes(), false)
.encode(&mut dst[cur..])
IndexingWithName::new(
h.into_string().into_bytes(),
v.into_bytes(),
use_huffman,
)
.encode(&mut dst[cur..])
}
};
match result {
@ -426,8 +431,9 @@ impl IndexAndValue {
}
fn set_value(mut self, value: Vec<u8>, is_huffman: bool) -> Self {
self.value_length = Some(Integer::length(value.len(), is_huffman));
self.value_octets = Some(Octets::new(value));
let octets = Octets::new(value, is_huffman);
self.value_length = Some(Integer::length(octets.len(), is_huffman));
self.value_octets = Some(octets);
self
}
@ -465,10 +471,12 @@ impl NameAndValue {
}
fn set_name_and_value(mut self, name: Vec<u8>, value: Vec<u8>, is_huffman: bool) -> Self {
self.name_length = Some(Integer::length(name.len(), is_huffman));
self.name_octets = Some(Octets::new(name));
self.value_length = Some(Integer::length(value.len(), is_huffman));
self.value_octets = Some(Octets::new(value));
let name_octets = Octets::new(name, is_huffman);
self.name_length = Some(Integer::length(name_octets.len(), is_huffman));
self.name_octets = Some(name_octets);
let value_octets = Octets::new(value, is_huffman);
self.value_length = Some(Integer::length(value_octets.len(), is_huffman));
self.value_octets = Some(value_octets);
self
}
@ -496,7 +504,7 @@ impl Integer {
fn length(length: usize, is_huffman: bool) -> Self {
Self {
int: IntegerEncoder::new(length, 0x7f, u8::from(is_huffman)),
int: IntegerEncoder::new(length, 0x7f, pre_mask(is_huffman)),
}
}
@ -520,8 +528,18 @@ pub(crate) struct Octets {
}
impl Octets {
fn new(src: Vec<u8>) -> Self {
Self { src, idx: 0 }
fn new(src: Vec<u8>, is_huffman: bool) -> Self {
if is_huffman {
let mut dst = Vec::with_capacity(src.len());
huffman_encode(src.as_slice(), dst.as_mut());
Self { src: dst, idx: 0 }
} else {
Self { src, idx: 0 }
}
}
fn len(&self) -> usize {
self.src.len()
}
fn encode(mut self, dst: &mut [u8]) -> Result<usize, Self> {
@ -549,6 +567,14 @@ impl Octets {
}
}
fn pre_mask(is_huffman: bool) -> u8 {
if is_huffman {
0x80
} else {
0
}
}
#[cfg(test)]
mod ut_repre_encoder {
use super::*;

View File

@ -14,43 +14,31 @@
//! This is a simple asynchronous HTTPS client example.
use ylong_http_client::async_impl::{Body, Client, Downloader, Request};
use ylong_http_client::{Certificate, HttpClientError, Redirect, TlsVersion};
use ylong_http_client::{HttpClientError, Redirect, TlsVersion};
fn main() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Tokio runtime build err.");
let mut v = vec![];
for _i in 0..3 {
let handle = rt.spawn(req());
v.push(handle);
}
let handle = rt.spawn(req());
rt.block_on(async move {
for h in v {
let _ = h.await;
}
let _ = handle.await.unwrap().unwrap();
});
}
async fn req() -> Result<(), HttpClientError> {
let v = "some certs".as_bytes();
let cert = Certificate::from_pem(v)?;
// Creates a `async_impl::Client`
let client = Client::builder()
.redirect(Redirect::default())
.tls_built_in_root_certs(false) // not use root certs
.danger_accept_invalid_certs(true) // not verify certs
.max_tls_version(TlsVersion::TLS_1_2)
.min_tls_version(TlsVersion::TLS_1_2)
.add_root_certificate(cert)
.build()?;
// Creates a `Request`.
let request = Request::builder()
.url("https://www.example.com")
.header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0")
.url("http://vipspeedtest8.wuhan.net.cn:8080/download?size=1073741824")
.body(Body::empty())?;
// Sends request and receives a `Response`.

View File

@ -498,6 +498,35 @@ impl ClientBuilder {
self
}
/// Sets allowed max size of local cached frame, By default, 5 frames are
/// allowed per stream.
///
/// # Examples
///
/// ```
/// use ylong_http_client::async_impl::ClientBuilder;
///
/// let config = ClientBuilder::new().allowed_cache_frame_size(10);
/// ```
pub fn allowed_cache_frame_size(mut self, size: usize) -> Self {
self.http.http2_config.set_allowed_cache_frame_size(size);
self
}
/// Sets whether to use huffman coding in hpack. The default is true.
///
/// # Examples
///
/// ```
/// use ylong_http_client::async_impl::ClientBuilder;
///
/// let config = ClientBuilder::new().use_huffman_coding(true);
/// ```
pub fn use_huffman_coding(mut self, use_huffman: bool) -> Self {
self.http.http2_config.set_use_huffman_coding(use_huffman);
self
}
/// Sets the `SETTINGS_MAX_FRAME_SIZE`.
///
/// # Examples
@ -575,7 +604,7 @@ impl ClientBuilder {
impl ClientBuilder {
/// Sets the maximum allowed TLS version for connections.
///
/// By default there's no maximum.
/// By default, there's no maximum.
///
/// # Examples
///
@ -592,7 +621,7 @@ impl ClientBuilder {
/// Sets the minimum required TLS version for connections.
///
/// By default the TLS backend's own default is used.
/// By default, the TLS backend's own default is used.
///
/// # Examples
///

View File

@ -453,10 +453,10 @@ mod ut_http2 {
use crate::async_impl::conn::http2::TextIo;
use crate::util::dispatcher::http2::Http2Conn;
let (resp_tx, resp_rx) = crate::runtime::unbounded_channel();
let (resp_tx, resp_rx) = ylong_runtime::sync::mpsc::bounded_channel(20);
let (req_tx, _req_rx) = crate::runtime::unbounded_channel();
let shutdown = Arc::new(AtomicBool::new(false));
let mut conn: Http2Conn<()> = Http2Conn::new(1, shutdown, req_tx);
let mut conn: Http2Conn<()> = Http2Conn::new(1, 20, shutdown, req_tx);
conn.receiver.set_receiver(resp_rx);
let mut text_io = TextIo::new(conn);
let data_1 = Frame::new(
@ -474,9 +474,19 @@ mod ut_http2 {
FrameFlags::new(1),
Payload::Data(Data::new(vec![b'a'; 10])),
);
let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_1));
let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_2));
let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_3));
ylong_runtime::block_on(async {
let _ = resp_tx
.send(crate::util::dispatcher::http2::RespMessage::Output(data_1))
.await;
let _ = resp_tx
.send(crate::util::dispatcher::http2::RespMessage::Output(data_2))
.await;
let _ = resp_tx
.send(crate::util::dispatcher::http2::RespMessage::Output(data_3))
.await;
});
ylong_runtime::block_on(async {
let mut buf = [0_u8; 10];
let mut output_vec = vec![];

View File

@ -72,7 +72,11 @@ pub(crate) mod runtime {
io::{split, ReadHalf, WriteHalf},
spawn,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
mpsc::{
channel as bounded_channel, error::SendError, unbounded_channel,
Receiver as BoundedReceiver, Sender as BoundedSender, UnboundedReceiver,
UnboundedSender,
},
Mutex as AsyncMutex, MutexGuard,
},
task::JoinHandle,
@ -94,7 +98,11 @@ pub(crate) mod runtime {
pub(crate) use ylong_runtime::{
spawn,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
error::SendError,
mpsc::{
bounded_channel, unbounded_channel, BoundedReceiver, BoundedSender,
UnboundedReceiver, UnboundedSender,
},
Mutex as AsyncMutex, MutexGuard,
},
task::JoinHandle,

View File

@ -75,6 +75,8 @@ pub(crate) mod http2 {
init_conn_window_size: u32,
init_stream_window_size: u32,
enable_push: bool,
allowed_cache_frame_size: usize,
use_huffman: bool,
}
impl H2Config {
@ -106,6 +108,14 @@ pub(crate) mod http2 {
self.init_stream_window_size = size;
}
pub(crate) fn set_allowed_cache_frame_size(&mut self, size: usize) {
self.allowed_cache_frame_size = size;
}
pub(crate) fn set_use_huffman_coding(&mut self, use_huffman: bool) {
self.use_huffman = use_huffman;
}
/// Gets the SETTINGS_MAX_FRAME_SIZE.
pub(crate) fn max_frame_size(&self) -> u32 {
self.max_frame_size
@ -132,6 +142,14 @@ pub(crate) mod http2 {
pub(crate) fn stream_window_size(&self) -> u32 {
self.init_stream_window_size
}
pub(crate) fn allowed_cache_frame_size(&self) -> usize {
self.allowed_cache_frame_size
}
pub(crate) fn use_huffman_coding(&self) -> bool {
self.use_huffman
}
}
impl Default for H2Config {
@ -143,6 +161,8 @@ pub(crate) mod http2 {
init_conn_window_size: DEFAULT_CONN_WINDOW_SIZE,
init_stream_window_size: DEFAULT_STREAM_WINDOW_SIZE,
enable_push: false,
allowed_cache_frame_size: 5,
use_huffman: true,
}
}
}

View File

@ -148,6 +148,7 @@ pub(crate) mod http1 {
#[cfg(feature = "http2")]
pub(crate) mod http2 {
use std::collections::HashMap;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
@ -157,24 +158,29 @@ pub(crate) mod http2 {
use ylong_http::error::HttpError;
use ylong_http::h2::{
ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, Goaway, H2Error, Payload,
Settings, SettingsBuilder,
RstStream, Settings, SettingsBuilder,
};
use crate::runtime::{
unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, UnboundedReceiver,
UnboundedSender, WriteHalf,
bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, BoundedReceiver,
BoundedSender, SendError, UnboundedReceiver, UnboundedSender, WriteHalf,
};
use crate::util::config::H2Config;
use crate::util::dispatcher::{ConnDispatcher, Dispatcher};
use crate::util::h2::{ConnManager, FlowControl, RecvData, RequestWrapper, SendData, Streams};
use crate::util::h2::{
ConnManager, FlowControl, H2StreamState, RecvData, RequestWrapper, SendData,
StreamEndState, Streams,
};
use crate::ErrorKind::Request;
use crate::{ErrorKind, HttpClientError};
const DEFAULT_MAX_STREAM_ID: u32 = u32::MAX >> 1;
const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13;
const DEFAULT_MAX_HEADER_LIST_SIZE: usize = 16 << 20;
const DEFAULT_WINDOW_SIZE: u32 = 65535;
pub(crate) type ManagerSendFut =
Pin<Box<dyn Future<Output = Result<(), SendError<RespMessage>>> + Send + Sync>>;
pub(crate) enum RespMessage {
Output(Frame),
OutputExit(DispatchErrorKind),
@ -187,11 +193,11 @@ pub(crate) mod http2 {
pub(crate) struct ReqMessage {
pub(crate) id: u32,
pub(crate) sender: UnboundedSender<RespMessage>,
pub(crate) sender: BoundedSender<RespMessage>,
pub(crate) request: RequestWrapper,
}
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub(crate) enum DispatchErrorKind {
H2(H2Error),
Io(std::io::ErrorKind),
@ -203,6 +209,7 @@ pub(crate) mod http2 {
// threads according to HTTP2 syntax.
pub(crate) struct Http2Dispatcher<S> {
pub(crate) next_stream_id: StreamId,
pub(crate) allowed_cache: usize,
pub(crate) sender: UnboundedSender<ReqMessage>,
pub(crate) io_shutdown: Arc<AtomicBool>,
pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>,
@ -212,6 +219,7 @@ pub(crate) mod http2 {
pub(crate) struct Http2Conn<S> {
// Handle id
pub(crate) id: u32,
pub(crate) allow_cached_frames: usize,
// Sends frame to StreamController
pub(crate) sender: UnboundedSender<ReqMessage>,
pub(crate) receiver: RespReceiver,
@ -224,11 +232,12 @@ pub(crate) mod http2 {
// closed.
pub(crate) io_shutdown: Arc<AtomicBool>,
// The senders of all connected stream channels of response.
pub(crate) senders: HashMap<u32, UnboundedSender<RespMessage>>,
pub(crate) senders: HashMap<u32, BoundedSender<RespMessage>>,
pub(crate) curr_message: HashMap<u32, ManagerSendFut>,
// Stream information on the connection.
pub(crate) streams: Streams,
// Received GO_AWAY frame.
pub(crate) go_away: Option<u32>,
pub(crate) recved_go_away: Option<u32>,
// The last GO_AWAY frame sent by the client.
pub(crate) go_away_sync: GoAwaySync,
}
@ -257,7 +266,7 @@ pub(crate) mod http2 {
#[derive(Default)]
pub(crate) struct RespReceiver {
receiver: Option<UnboundedReceiver<RespMessage>>,
receiver: Option<BoundedReceiver<RespMessage>>,
}
impl<S> ConnDispatcher<S>
@ -294,10 +303,19 @@ pub(crate) mod http2 {
// being.
let mut handles = Vec::with_capacity(3);
if input_tx.send(settings).is_ok() {
Self::launch(controller, req_rx, input_tx, input_rx, &mut handles, io);
Self::launch(
config.allowed_cache_frame_size(),
config.use_huffman_coding(),
controller,
(input_tx, input_rx),
req_rx,
&mut handles,
io,
);
}
Self {
next_stream_id,
allowed_cache: config.allowed_cache_frame_size(),
sender: req_tx,
io_shutdown: shutdown_flag,
handles,
@ -306,23 +324,24 @@ pub(crate) mod http2 {
}
fn launch(
allow_num: usize,
use_huffman: bool,
controller: StreamController,
input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>),
req_rx: UnboundedReceiver<ReqMessage>,
input_tx: UnboundedSender<Frame>,
input_rx: UnboundedReceiver<Frame>,
handles: &mut Vec<crate::runtime::JoinHandle<()>>,
io: S,
) {
let (resp_tx, resp_rx) = unbounded_channel();
let (resp_tx, resp_rx) = bounded_channel(allow_num);
let (read, write) = crate::runtime::split(io);
let settings_sync = Arc::new(Mutex::new(SettingsSync::default()));
let send_settings_sync = settings_sync.clone();
let send = crate::runtime::spawn(async move {
let mut writer = write;
if async_send_preface(&mut writer).await.is_ok() {
let encoder =
FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE);
let mut send = SendData::new(encoder, send_settings_sync, writer, input_rx);
let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, use_huffman);
let mut send =
SendData::new(encoder, send_settings_sync, writer, input_channel.1);
let _ = Pin::new(&mut send).await;
}
});
@ -338,10 +357,8 @@ pub(crate) mod http2 {
let manager = crate::runtime::spawn(async move {
let mut conn_manager =
ConnManager::new(settings_sync, input_tx, resp_rx, req_rx, controller);
if let Err(e) = Pin::new(&mut conn_manager).await {
conn_manager.exit_with_error(e);
}
ConnManager::new(settings_sync, input_channel.0, resp_rx, req_rx, controller);
let _ = Pin::new(&mut conn_manager).await;
});
handles.push(manager);
}
@ -356,7 +373,7 @@ pub(crate) mod http2 {
return None;
}
let sender = self.sender.clone();
let handle = Http2Conn::new(id, self.io_shutdown.clone(), sender);
let handle = Http2Conn::new(id, self.allowed_cache, self.io_shutdown.clone(), sender);
Some(handle)
}
@ -379,11 +396,13 @@ pub(crate) mod http2 {
impl<S> Http2Conn<S> {
pub(crate) fn new(
id: u32,
allow_cached_num: usize,
io_shutdown: Arc<AtomicBool>,
sender: UnboundedSender<ReqMessage>,
) -> Self {
Self {
id,
allow_cached_frames: allow_cached_num,
sender,
receiver: RespReceiver::default(),
io_shutdown,
@ -395,7 +414,7 @@ pub(crate) mod http2 {
&mut self,
request: RequestWrapper,
) -> Result<(), HttpClientError> {
let (tx, rx) = unbounded_channel::<RespMessage>();
let (tx, rx) = bounded_channel::<RespMessage>(self.allow_cached_frames);
self.receiver.set_receiver(rx);
self.sender
.send(ReqMessage {
@ -420,8 +439,9 @@ pub(crate) mod http2 {
Self {
io_shutdown: shutdown,
senders: HashMap::new(),
curr_message: HashMap::new(),
streams,
go_away: None,
recved_go_away: None,
go_away_sync: GoAwaySync::default(),
}
}
@ -430,7 +450,7 @@ pub(crate) mod http2 {
self.io_shutdown.store(true, Ordering::Release);
}
pub(crate) fn go_away_unsent_stream(
pub(crate) fn get_unsent_streams(
&mut self,
last_stream_id: u32,
) -> Result<Vec<u32>, H2Error> {
@ -443,21 +463,86 @@ pub(crate) mod http2 {
Ok(self.streams.get_go_away_streams(last_stream_id))
}
pub(crate) fn send_message_to_stream(&mut self, stream_id: u32, message: RespMessage) {
pub(crate) fn send_message_to_stream(
&mut self,
cx: &mut Context<'_>,
stream_id: u32,
message: RespMessage,
) -> Poll<Result<(), H2Error>> {
if let Some(sender) = self.senders.get(&stream_id) {
// If the client coroutine has exited, this frame is skipped.
match sender.send(message) {
Ok(_) => {}
Err(_e) => {
let mut tx = {
let sender = sender.clone();
let ft = async move { sender.send(message).await };
Box::pin(ft)
};
match tx.as_mut().poll(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
// The current coroutine sending the request exited prematurely.
Poll::Ready(Err(_)) => {
self.senders.remove(&stream_id);
Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError)))
}
Poll::Pending => {
self.curr_message.insert(stream_id, tx);
Poll::Pending
}
}
} else {
Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError)))
}
}
pub(crate) fn poll_blocked_message(
&mut self,
cx: &mut Context<'_>,
input_tx: &UnboundedSender<Frame>,
) -> Poll<()> {
let keys: Vec<u32> = self.curr_message.keys().cloned().collect();
let mut blocked = false;
for key in keys {
if let Some(mut task) = self.curr_message.remove(&key) {
match task.as_mut().poll(cx) {
Poll::Ready(Ok(_)) => {}
// The current coroutine sending the request exited prematurely.
Poll::Ready(Err(_)) => {
self.senders.remove(&key);
if let Some(state) = self.streams.stream_state(key) {
if !matches!(state, H2StreamState::Closed(_)) {
if let StreamEndState::OK = self.streams.send_local_reset(key) {
let rest_payload =
RstStream::new(ErrorCode::NoError.into_code());
let frame = Frame::new(
key as usize,
FrameFlags::empty(),
Payload::RstStream(rest_payload),
);
// ignore the send error occurs here in order to finish all
// tasks.
let _ = input_tx.send(frame);
}
}
}
}
Poll::Pending => {
self.curr_message.insert(key, task);
blocked = true;
}
}
}
}
if blocked {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
impl RespReceiver {
pub(crate) fn set_receiver(&mut self, receiver: UnboundedReceiver<RespMessage>) {
pub(crate) fn set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>) {
self.receiver = Some(receiver);
}

View File

@ -22,23 +22,38 @@ use ylong_http::h2::{
ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting,
};
use crate::runtime::{UnboundedReceiver, UnboundedSender};
use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender};
use crate::util::dispatcher::http2::{
DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync,
StreamController,
};
use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState};
#[derive(Copy, Clone)]
enum ManagerState {
Send,
Receive,
Exit(DispatchErrorKind),
}
pub(crate) struct ConnManager {
state: ManagerState,
next_state: ManagerState,
// Synchronize SETTINGS frames sent by the client.
settings: Arc<Mutex<SettingsSync>>,
// channel transmitter between manager and io input.
input_tx: UnboundedSender<Frame>,
// channel receiver between manager and io output.
resp_rx: UnboundedReceiver<OutputMessage>,
resp_rx: BoundedReceiver<OutputMessage>,
// channel receiver between manager and stream coroutine.
req_rx: UnboundedReceiver<ReqMessage>,
controller: StreamController,
handshakes: HandShakes,
}
struct HandShakes {
local: bool,
peer: bool,
}
impl Future for ConnManager {
@ -47,42 +62,60 @@ impl Future for ConnManager {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let manager = self.get_mut();
loop {
// Receives a response frame from io output.
match manager.resp_rx.poll_recv(cx) {
#[cfg(feature = "tokio_base")]
Poll::Ready(Some(message)) => match message {
OutputMessage::Output(frame) => {
manager.poll_recv_message(frame)?;
match manager.state {
ManagerState::Send => {
if manager.poll_blocked_frames(cx).is_pending() {
return Poll::Pending;
}
// io output occurs error.
OutputMessage::OutputExit(e) => {
manager.manage_resp_error(e)?;
}
},
#[cfg(feature = "ylong_base")]
Poll::Ready(Ok(message)) => match message {
OutputMessage::Output(frame) => {
manager.poll_recv_message(frame)?;
}
// io output occurs error.
OutputMessage::OutputExit(e) => {
manager.manage_resp_error(e)?;
}
},
#[cfg(feature = "tokio_base")]
Poll::Ready(None) => {
manager.exit_with_error(DispatchErrorKind::ChannelClosed);
return Poll::Ready(Ok(()));
}
#[cfg(feature = "ylong_base")]
Poll::Ready(Err(_e)) => {
manager.exit_with_error(DispatchErrorKind::ChannelClosed);
return Poll::Ready(Ok(()));
}
ManagerState::Receive => {
// Receives a response frame from io output.
match manager.resp_rx.poll_recv(cx) {
#[cfg(feature = "tokio_base")]
Poll::Ready(Some(message)) => match message {
OutputMessage::Output(frame) => {
if manager.poll_recv_message(cx, frame)?.is_pending() {
return Poll::Pending;
}
}
// io output occurs error.
OutputMessage::OutputExit(e) => {
// Note error returned immediately.
if manager.manage_resp_error(cx, e)?.is_pending() {
return Poll::Pending;
}
}
},
#[cfg(feature = "ylong_base")]
Poll::Ready(Ok(message)) => match message {
OutputMessage::Output(frame) => {
if manager.poll_recv_message(cx, frame)?.is_pending() {
return Poll::Pending;
}
}
// io output occurs error.
OutputMessage::OutputExit(e) => {
if manager.manage_resp_error(cx, e)?.is_pending() {
return Poll::Pending;
}
}
},
#[cfg(feature = "tokio_base")]
Poll::Ready(None) => {
return manager.poll_channel_closed_exit(cx);
}
#[cfg(feature = "ylong_base")]
Poll::Ready(Err(_e)) => {
return manager.poll_channel_closed_exit(cx);
}
Poll::Pending => {
return manager.manage_pending_state(cx);
Poll::Pending => {
// TODO manage error state.
return manager.manage_pending_state(cx);
}
}
}
ManagerState::Exit(e) => return Poll::Ready(Err(e)),
}
}
}
@ -92,16 +125,22 @@ impl ConnManager {
pub(crate) fn new(
settings: Arc<Mutex<SettingsSync>>,
input_tx: UnboundedSender<Frame>,
resp_rx: UnboundedReceiver<OutputMessage>,
resp_rx: BoundedReceiver<OutputMessage>,
req_rx: UnboundedReceiver<ReqMessage>,
controller: StreamController,
) -> Self {
Self {
state: ManagerState::Receive,
next_state: ManagerState::Receive,
settings,
input_tx,
resp_rx,
req_rx,
controller,
handshakes: HandShakes {
local: false,
peer: false,
},
}
}
@ -110,7 +149,7 @@ impl ConnManager {
cx: &mut Context<'_>,
) -> Poll<Result<(), DispatchErrorKind>> {
// The manager previously accepted a GOAWAY Frame.
if let Some(code) = self.controller.go_away {
if let Some(code) = self.controller.recved_go_away {
self.poll_deal_with_go_away(code)?;
}
self.controller.streams.window_update_conn(&self.input_tx)?;
@ -118,7 +157,9 @@ impl ConnManager {
.streams
.window_update_streams(&self.input_tx)?;
self.poll_recv_request(cx)?;
self.poll_input_request(cx)?;
if self.handshakes.local && self.handshakes.peer {
self.poll_input_request(cx)?;
}
Poll::Pending
}
@ -167,9 +208,11 @@ impl ConnManager {
}
fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
loop {
self.controller.streams.try_consume_pending_concurrency();
match self.controller.streams.next_stream() {
self.controller.streams.try_consume_pending_concurrency();
let size = self.controller.streams.pending_stream_num();
let mut index = 0;
while index < size {
match self.controller.streams.next_pending_stream() {
None => {
break;
}
@ -177,6 +220,7 @@ impl ConnManager {
self.input_stream_frame(cx, id)?;
}
}
index += 1;
}
Ok(())
}
@ -243,7 +287,11 @@ impl ConnManager {
.map_err(|_e| DispatchErrorKind::ChannelClosed)
}
fn poll_recv_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
fn poll_recv_frame(
&mut self,
cx: &mut Context<'_>,
frame: Frame,
) -> Poll<Result<(), DispatchErrorKind>> {
match frame.payload() {
Payload::Settings(_settings) => {
self.recv_settings_frame(frame)?;
@ -253,19 +301,21 @@ impl ConnManager {
}
Payload::PushPromise(_) => {
// TODO The current settings_enable_push setting is fixed to false.
return Err(H2Error::ConnectionError(ErrorCode::ProtocolError).into());
return Poll::Ready(Err(
H2Error::ConnectionError(ErrorCode::ProtocolError).into()
));
}
Payload::Goaway(_go_away) => {
self.recv_go_away_frame(frame)?;
return self.recv_go_away_frame(cx, frame).map_err(Into::into);
}
Payload::RstStream(_reset) => {
self.recv_reset_frame(frame)?;
return self.recv_reset_frame(cx, frame).map_err(Into::into);
}
Payload::Headers(_headers) => {
self.recv_header_frame(frame)?;
return self.recv_header_frame(cx, frame).map_err(Into::into);
}
Payload::Data(_data) => {
self.recv_data_frame(frame)?;
return self.recv_data_frame(cx, frame).map_err(Into::into);
}
Payload::WindowUpdate(_windows) => {
self.recv_window_frame(frame)?;
@ -273,7 +323,7 @@ impl ConnManager {
// Priority is no longer recommended, so keep it compatible but not processed.
Payload::Priority(_priority) => {}
}
Ok(())
Poll::Ready(Ok(()))
}
fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
@ -297,6 +347,7 @@ impl ConnManager {
}
}
connection.settings = SettingsState::Synced;
self.handshakes.local = true;
Ok(())
} else {
for setting in settings.get_settings() {
@ -320,7 +371,9 @@ impl ConnManager {
);
self.input_tx
.send(new_settings)
.map_err(|_e| DispatchErrorKind::ChannelClosed)
.map_err(|_e| DispatchErrorKind::ChannelClosed)?;
self.handshakes.peer = true;
Ok(())
}
}
@ -342,93 +395,116 @@ impl ConnManager {
}
}
fn recv_go_away_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
fn recv_go_away_frame(
&mut self,
cx: &mut Context<'_>,
frame: Frame,
) -> Poll<Result<(), H2Error>> {
let go_away = if let Payload::Goaway(goaway) = frame.payload() {
goaway
} else {
// this will not happen forever.
return Ok(());
return Poll::Ready(Ok(()));
};
// Prevents the current connection from generating a new stream.
self.controller.shutdown();
self.req_rx.close();
let last_stream_id = go_away.get_last_stream_id();
let streams = self
.controller
.go_away_unsent_stream(last_stream_id as u32)?;
let streams = self.controller.get_unsent_streams(last_stream_id as u32)?;
let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?);
let mut blocked = false;
for stream_id in streams {
self.controller
.send_message_to_stream(stream_id, RespMessage::OutputExit(error.clone().into()));
match self.controller.send_message_to_stream(
cx,
stream_id,
RespMessage::OutputExit(error.into()),
) {
// ignore error when going away.
Poll::Ready(_) => {}
Poll::Pending => {
blocked = true;
}
}
}
// Exit after the allowed stream is complete.
self.controller.go_away = Some(go_away.get_error_code());
Ok(())
self.controller.recved_go_away = Some(go_away.get_error_code());
if blocked {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn recv_reset_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
fn recv_reset_frame(
&mut self,
cx: &mut Context<'_>,
frame: Frame,
) -> Poll<Result<(), H2Error>> {
match self
.controller
.streams
.recv_remote_reset(frame.stream_id() as u32)
{
StreamEndState::OK => {
self.controller
.send_message_to_stream(frame.stream_id() as u32, RespMessage::Output(frame));
}
StreamEndState::Err(e) => {
return Err(e.into());
}
_ => {}
StreamEndState::OK => self.controller.send_message_to_stream(
cx,
frame.stream_id() as u32,
RespMessage::Output(frame),
),
StreamEndState::Err(e) => Poll::Ready(Err(e)),
StreamEndState::Ignore => Poll::Ready(Ok(())),
}
Ok(())
}
fn recv_header_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
fn recv_header_frame(
&mut self,
cx: &mut Context<'_>,
frame: Frame,
) -> Poll<Result<(), H2Error>> {
match self
.controller
.streams
.recv_headers(frame.stream_id() as u32, frame.flags().is_end_stream())
{
FrameRecvState::OK => {
self.controller
.send_message_to_stream(frame.stream_id() as u32, RespMessage::Output(frame));
}
FrameRecvState::Err(e) => {
return Err(e.into());
}
_ => {}
FrameRecvState::OK => self.controller.send_message_to_stream(
cx,
frame.stream_id() as u32,
RespMessage::Output(frame),
),
FrameRecvState::Err(e) => Poll::Ready(Err(e)),
FrameRecvState::Ignore => Poll::Ready(Ok(())),
}
Ok(())
}
fn recv_data_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
fn recv_data_frame(&mut self, cx: &mut Context<'_>, frame: Frame) -> Poll<Result<(), H2Error>> {
let data = if let Payload::Data(data) = frame.payload() {
data
} else {
// this will not happen forever.
return Ok(());
return Poll::Ready(Ok(()));
};
let id = frame.stream_id() as u32;
let len = data.size() as u32;
self.controller.streams.release_conn_recv_window(len)?;
self.controller
.streams
.release_stream_recv_window(id, len)?;
match self
.controller
.streams
.recv_data(id, frame.flags().is_end_stream())
{
FrameRecvState::OK => {
self.controller
.send_message_to_stream(frame.stream_id() as u32, RespMessage::Output(frame));
}
FrameRecvState::Ignore => {}
FrameRecvState::Err(e) => return Err(e.into()),
FrameRecvState::OK => self.controller.send_message_to_stream(
cx,
frame.stream_id() as u32,
RespMessage::Output(frame),
),
FrameRecvState::Ignore => Poll::Ready(Ok(())),
FrameRecvState::Err(e) => Poll::Ready(Err(e)),
}
self.controller.streams.release_conn_recv_window(len)?;
self.controller
.streams
.release_stream_recv_window(id, len)?;
Ok(())
}
fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
@ -453,27 +529,35 @@ impl ConnManager {
Ok(())
}
fn manage_resp_error(&mut self, kind: DispatchErrorKind) -> Result<(), DispatchErrorKind> {
fn manage_resp_error(
&mut self,
cx: &mut Context<'_>,
kind: DispatchErrorKind,
) -> Poll<Result<(), DispatchErrorKind>> {
match kind {
DispatchErrorKind::H2(h2) => {
match h2 {
H2Error::StreamError(id, code) => {
self.manage_stream_error(id, code)?;
}
H2Error::ConnectionError(code) => {
self.manage_conn_error(code)?;
}
}
Ok(())
}
DispatchErrorKind::H2(h2) => match h2 {
H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code),
H2Error::ConnectionError(code) => self.manage_conn_error(cx, code),
},
other => {
self.exit_with_error(other.clone());
Err(other)
let blocked = self.exit_with_error(cx, other);
if blocked {
self.state = ManagerState::Send;
self.next_state = ManagerState::Exit(other);
Poll::Pending
} else {
Poll::Ready(Err(other))
}
}
}
}
fn manage_stream_error(&mut self, id: u32, code: ErrorCode) -> Result<(), DispatchErrorKind> {
fn manage_stream_error(
&mut self,
cx: &mut Context<'_>,
id: u32,
code: ErrorCode,
) -> Poll<Result<(), DispatchErrorKind>> {
let rest_payload = RstStream::new(code.into_code());
let frame = Frame::new(
id as usize,
@ -486,29 +570,42 @@ impl ConnManager {
.send(frame)
.map_err(|_e| DispatchErrorKind::ChannelClosed)?;
self.controller.send_message_to_stream(
match self.controller.send_message_to_stream(
cx,
id,
RespMessage::OutputExit(DispatchErrorKind::ChannelClosed),
);
) {
Poll::Ready(_) => {
// error at the stream level due to early exit of the coroutine in which the
// request is located, ignored to avoid manager coroutine exit.
Poll::Ready(Ok(()))
}
Poll::Pending => {
self.state = ManagerState::Send;
// stream error will not cause manager exit with error(exit state). Takes
// effect only if blocked.
self.next_state = ManagerState::Receive;
Poll::Pending
}
}
}
StreamEndState::Ignore => {}
StreamEndState::Ignore => Poll::Ready(Ok(())),
StreamEndState::Err(e) => {
// This error will never happen.
return Err(e.into());
Poll::Ready(Err(e.into()))
}
}
Ok(())
}
fn manage_conn_error(&mut self, code: ErrorCode) -> Result<(), DispatchErrorKind> {
self.exit_with_error(DispatchErrorKind::H2(H2Error::ConnectionError(
code.clone(),
)));
// last_stream_id is set to 0 to ensure that all streams are
fn manage_conn_error(
&mut self,
cx: &mut Context<'_>,
code: ErrorCode,
) -> Poll<Result<(), DispatchErrorKind>> {
// last_stream_id is set to 0 to ensure that all pushed streams are
// shutdown.
let go_away_payload = Goaway::new(
code.clone().into_code(),
code.into_code(),
self.controller.streams.latest_remote_id as usize,
vec![],
);
@ -517,22 +614,32 @@ impl ConnManager {
FrameFlags::empty(),
Payload::Goaway(go_away_payload.clone()),
);
// Avoid sending the same GO_AWAY frame multiple times.
if let Some(ref go_away) = self.controller.go_away_sync.going_away {
if go_away.get_error_code() == go_away_payload.get_error_code()
&& go_away.get_last_stream_id() == go_away_payload.get_last_stream_id()
{
return Ok(());
return Poll::Ready(Ok(()));
}
}
// Avoid sending the same GO_AWAY frame multiple times.
self.controller.go_away_sync.going_away = Some(go_away_payload);
self.input_tx
.send(frame)
.map_err(|_e| DispatchErrorKind::ChannelClosed)?;
// TODO When the current client has an error,
// it always sends the GO_AWAY frame at the first time and exits directly.
// Should we consider letting part of the unfinished stream complete?
Err(H2Error::ConnectionError(code).into())
let blocked =
self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code)));
if blocked {
self.state = ManagerState::Send;
self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into());
Poll::Pending
} else {
// TODO When current client has an error,
// it always sends the GO_AWAY frame at the first time and exits directly.
// Should we consider letting part of the unfinished stream complete?
Poll::Ready(Err(H2Error::ConnectionError(code).into()))
}
}
fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> {
@ -583,18 +690,71 @@ impl ConnManager {
Ok(())
}
fn poll_recv_message(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
if let Err(kind) = self.poll_recv_frame(frame) {
self.manage_resp_error(kind)?;
fn poll_recv_message(
&mut self,
cx: &mut Context<'_>,
frame: Frame,
) -> Poll<Result<(), DispatchErrorKind>> {
match self.poll_recv_frame(cx, frame) {
Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind),
Poll::Pending => {
self.state = ManagerState::Send;
self.next_state = ManagerState::Receive;
Poll::Pending
}
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
}
Ok(())
}
pub(crate) fn exit_with_error(&mut self, error: DispatchErrorKind) {
fn poll_channel_closed_exit(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), DispatchErrorKind>> {
if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) {
self.state = ManagerState::Send;
self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed);
Poll::Pending
} else {
Poll::Ready(Err(DispatchErrorKind::ChannelClosed))
}
}
fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self.controller.poll_blocked_message(cx, &self.input_tx) {
Poll::Ready(_) => {
self.state = self.next_state;
// Reset state.
self.next_state = ManagerState::Receive;
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
pub(crate) fn exit_with_error(
&mut self,
cx: &mut Context<'_>,
error: DispatchErrorKind,
) -> bool {
self.controller.shutdown();
self.req_rx.close();
self.controller
.streams
.go_away_all_streams(&mut self.controller.senders, error);
self.controller.streams.clear_streams_states();
let ids = self.controller.streams.get_all_unclosed_streams();
let mut blocked = false;
for stream_id in ids {
match self.controller.send_message_to_stream(
cx,
stream_id,
RespMessage::OutputExit(error),
) {
// ignore error when going away.
Poll::Ready(_) => {}
Poll::Pending => {
blocked = true;
}
}
}
blocked
}
}

View File

@ -38,6 +38,6 @@ pub(crate) use input::SendData;
pub(crate) use io::{split, Reader, Writer};
pub(crate) use manager::ConnManager;
pub(crate) use output::RecvData;
pub(crate) use streams::{RequestWrapper, Streams};
pub(crate) use streams::{H2StreamState, RequestWrapper, StreamEndState, Streams};
pub const MAX_FLOW_CONTROL_WINDOW: u32 = (1 << 31) - 1;

View File

@ -19,19 +19,33 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use ylong_http::h2::{
ErrorCode, Frame, FrameDecoder, FrameKind, Frames, H2Error, Payload, Setting,
ErrorCode, Frame, FrameDecoder, FrameKind, FramesIntoIter, H2Error, Payload, Setting,
};
use crate::runtime::{AsyncRead, ReadBuf, ReadHalf, UnboundedSender};
use crate::runtime::{AsyncRead, BoundedSender, ReadBuf, ReadHalf, SendError};
use crate::util::dispatcher::http2::{
DispatchErrorKind, OutputMessage, SettingsState, SettingsSync,
};
pub(crate) type OutputSendFut =
Pin<Box<dyn Future<Output = Result<(), SendError<OutputMessage>>> + Send + Sync>>;
#[derive(Copy, Clone)]
enum DecodeState {
Read,
Send,
Exit(DispatchErrorKind),
}
pub(crate) struct RecvData<S> {
decoder: FrameDecoder,
settings: Arc<Mutex<SettingsSync>>,
reader: ReadHalf<S>,
resp_tx: UnboundedSender<OutputMessage>,
state: DecodeState,
next_state: DecodeState,
resp_tx: BoundedSender<OutputMessage>,
curr_message: Option<OutputSendFut>,
pending_iter: Option<FramesIntoIter>,
}
impl<S: AsyncRead + Unpin + Sync + Send + 'static> Future for RecvData<S> {
@ -48,72 +62,170 @@ impl<S: AsyncRead + Unpin + Sync + Send + 'static> RecvData<S> {
decoder: FrameDecoder,
settings: Arc<Mutex<SettingsSync>>,
reader: ReadHalf<S>,
resp_tx: UnboundedSender<OutputMessage>,
resp_tx: BoundedSender<OutputMessage>,
) -> Self {
Self {
decoder,
settings,
reader,
state: DecodeState::Read,
next_state: DecodeState::Read,
resp_tx,
curr_message: None,
pending_iter: None,
}
}
fn poll_read_frame(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> {
let mut buf = [0u8; 1024];
loop {
let mut read_buf = ReadBuf::new(&mut buf);
match Pin::new(&mut self.reader).poll_read(cx, &mut read_buf) {
Poll::Ready(Err(e)) => {
self.transmit_error(DispatchErrorKind::Disconnect)?;
return Poll::Ready(Err(e.into()));
}
Poll::Ready(Ok(())) => {}
Poll::Pending => {
return Poll::Pending;
}
}
let read = read_buf.filled().len();
if read == 0 {
self.transmit_error(DispatchErrorKind::Disconnect)?;
return Poll::Ready(Err(DispatchErrorKind::Disconnect));
}
match self.state {
DecodeState::Read => {
let mut read_buf = ReadBuf::new(&mut buf);
match Pin::new(&mut self.reader).poll_read(cx, &mut read_buf) {
Poll::Ready(Err(e)) => {
return self.transmit_error(cx, e.into());
}
Poll::Ready(Ok(())) => {}
Poll::Pending => {
return Poll::Pending;
}
}
let read = read_buf.filled().len();
if read == 0 {
return self.transmit_error(cx, DispatchErrorKind::Disconnect);
}
match self.decoder.decode(&buf[..read]) {
Ok(frames) => match self.transmit_frame(frames) {
Ok(_) => {}
Err(DispatchErrorKind::H2(e)) => {
self.transmit_error(e.into())?;
match self.decoder.decode(&buf[..read]) {
Ok(frames) => match self.poll_iterator_frames(cx, frames.into_iter()) {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(e)) => {
return Poll::Ready(Err(e));
}
Poll::Pending => {
self.next_state = DecodeState::Read;
}
},
Err(e) => {
match self.transmit_message(cx, OutputMessage::OutputExit(e.into())) {
Poll::Ready(Err(_)) => {
return Poll::Ready(Err(DispatchErrorKind::ChannelClosed))
}
Poll::Ready(Ok(_)) => {}
Poll::Pending => {
self.next_state = DecodeState::Read;
return Poll::Pending;
}
}
}
}
Err(e) => {
return Poll::Ready(Err(e));
}
DecodeState::Send => {
match self.poll_blocked_task(cx) {
Poll::Ready(Ok(_)) => {
self.state = self.next_state;
// Reset next state.
self.next_state = DecodeState::Read;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
}
},
Err(e) => {
self.transmit_error(e.into())?;
}
DecodeState::Exit(e) => {
return Poll::Ready(Err(e));
}
}
}
}
fn transmit_frame(&mut self, frames: Frames) -> Result<(), DispatchErrorKind> {
for kind in frames.into_iter() {
fn poll_blocked_task(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> {
if let Some(mut task) = self.curr_message.take() {
match task.as_mut().poll(cx) {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(_)) => {
return Poll::Ready(Err(DispatchErrorKind::ChannelClosed));
}
Poll::Pending => {
self.curr_message = Some(task);
return Poll::Pending;
}
}
}
if let Some(iter) = self.pending_iter.take() {
return self.poll_iterator_frames(cx, iter);
}
Poll::Ready(Ok(()))
}
fn poll_iterator_frames(
&mut self,
cx: &mut Context<'_>,
mut iter: FramesIntoIter,
) -> Poll<Result<(), DispatchErrorKind>> {
while let Some(kind) = iter.next() {
match kind {
FrameKind::Complete(frame) => {
self.update_settings(&frame)?;
self.resp_tx
.send(OutputMessage::Output(frame))
.map_err(|_e| DispatchErrorKind::ChannelClosed)?;
// TODO Whether to continue processing the remaining frames after connection
// error occurs in the Settings frame.
let message = if let Err(e) = self.update_settings(&frame) {
OutputMessage::OutputExit(DispatchErrorKind::H2(e))
} else {
OutputMessage::Output(frame)
};
match self.transmit_message(cx, message) {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(e)) => {
return Poll::Ready(Err(e));
}
Poll::Pending => {
self.pending_iter = Some(iter);
return Poll::Pending;
}
}
}
FrameKind::Partial => {}
}
}
Ok(())
Poll::Ready(Ok(()))
}
fn transmit_error(&self, err: DispatchErrorKind) -> Result<(), DispatchErrorKind> {
self.resp_tx
.send(OutputMessage::OutputExit(err))
.map_err(|_e| DispatchErrorKind::ChannelClosed)
fn transmit_error(
&mut self,
cx: &mut Context<'_>,
exit_err: DispatchErrorKind,
) -> Poll<Result<(), DispatchErrorKind>> {
match self.transmit_message(cx, OutputMessage::OutputExit(exit_err)) {
Poll::Ready(_) => Poll::Ready(Err(exit_err)),
Poll::Pending => {
self.next_state = DecodeState::Exit(exit_err);
Poll::Pending
}
}
}
fn transmit_message(
&mut self,
cx: &mut Context<'_>,
message: OutputMessage,
) -> Poll<Result<(), DispatchErrorKind>> {
let mut task = {
let sender = self.resp_tx.clone();
let ft = async move { sender.send(message).await };
Box::pin(ft)
};
match task.as_mut().poll(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
// The current coroutine sending the request exited prematurely.
Poll::Ready(Err(_)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)),
Poll::Pending => {
self.state = DecodeState::Send;
self.curr_message = Some(task);
Poll::Pending
}
}
}
fn update_settings(&mut self, frame: &Frame) -> Result<(), H2Error> {

View File

@ -20,12 +20,12 @@ use std::task::{Context, Poll};
use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload};
use crate::runtime::UnboundedSender;
use crate::util::dispatcher::http2::{DispatchErrorKind, RespMessage};
use crate::util::dispatcher::http2::DispatchErrorKind;
use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow};
use crate::util::h2::data_ref::BodyDataRef;
const INITIAL_MAX_SEND_STREAM_ID: u32 = u32::MAX >> 1;
const INITIAL_MAX_RECV_STREAM_ID: u32 = u32::MAX >> 1;
pub(crate) const INITIAL_MAX_SEND_STREAM_ID: u32 = u32::MAX >> 1;
pub(crate) const INITIAL_MAX_RECV_STREAM_ID: u32 = u32::MAX >> 1;
const INITIAL_LATEST_REMOTE_ID: u32 = 0;
const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100;
@ -77,7 +77,7 @@ pub(crate) enum StreamEndState {
// | recv R | closed | recv R |
// `----------------------->| |<----------------------'
// +--------+
#[derive(Clone, Debug)]
#[derive(Copy, Clone, Debug)]
pub(crate) enum H2StreamState {
Idle,
// When response does not depend on request,
@ -97,7 +97,7 @@ pub(crate) enum H2StreamState {
Closed(CloseReason),
}
#[derive(Clone, Debug)]
#[derive(Copy, Clone, Debug)]
pub(crate) enum CloseReason {
LocalRst,
RemoteRst,
@ -106,7 +106,7 @@ pub(crate) enum CloseReason {
EndStream,
}
#[derive(Clone, Debug)]
#[derive(Copy, Clone, Debug)]
pub(crate) enum ActiveState {
WaitHeaders,
WaitData,
@ -128,7 +128,7 @@ pub(crate) struct RequestWrapper {
pub(crate) struct Streams {
// Records the received goaway last_stream_id.
pub(crate) max_send_id: u32,
// Records the sent goaway last_stream_id.
// Records the send goaway last_stream_id.
pub(crate) max_recv_id: u32,
// Currently the client doesn't support push promise, so this value is always 0.
pub(crate) latest_remote_id: u32,
@ -294,6 +294,10 @@ impl Streams {
true
}
pub(crate) fn stream_state(&self, id: u32) -> Option<H2StreamState> {
self.stream_map.get(&id).map(|stream| stream.state)
}
pub(crate) fn insert(&mut self, id: u32, request: RequestWrapper) {
let send_window = SendWindow::new(self.stream_send_window_size as i32);
let recv_window = RecvWindow::new(self.stream_recv_window_size as i32);
@ -310,10 +314,14 @@ impl Streams {
self.pending_concurrency.push_back(id);
}
pub(crate) fn next_stream(&mut self) -> Option<u32> {
pub(crate) fn next_pending_stream(&mut self) -> Option<u32> {
self.pending_send.pop_front()
}
pub(crate) fn pending_stream_num(&self) -> usize {
self.pending_send.len()
}
pub(crate) fn try_consume_pending_concurrency(&mut self) {
while !self.reach_max_concurrency() {
match self.pending_concurrency.pop_front() {
@ -489,6 +497,7 @@ impl Streams {
for (id, unsent_stream) in self.stream_map.iter_mut() {
if *id >= last_stream_id {
match unsent_stream.state {
// TODO Whether the close state needs to be selected.
H2StreamState::Closed(_) => {}
H2StreamState::Idle => {
unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway);
@ -508,11 +517,8 @@ impl Streams {
ids
}
pub(crate) fn go_away_all_streams(
&mut self,
senders: &mut HashMap<u32, UnboundedSender<RespMessage>>,
error: DispatchErrorKind,
) {
pub(crate) fn get_all_unclosed_streams(&mut self) -> Vec<u32> {
let mut ids = vec![];
for (id, stream) in self.stream_map.iter_mut() {
match stream.state {
H2StreamState::Closed(_) => {}
@ -520,12 +526,14 @@ impl Streams {
stream.header = None;
stream.data.clear();
stream.state = H2StreamState::Closed(CloseReason::LocalGoAway);
if let Some(sender) = senders.get_mut(id) {
sender.send(RespMessage::OutputExit(error.clone())).ok();
}
ids.push(*id);
}
}
}
ids
}
pub(crate) fn clear_streams_states(&mut self) {
self.window_updating_streams.clear();
self.pending_stream_window.clear();
self.pending_send.clear();
@ -577,11 +585,11 @@ impl Streams {
recv,
} => {
stream.state = if eos {
H2StreamState::LocalHalfClosed(recv.clone())
H2StreamState::LocalHalfClosed(*recv)
} else {
H2StreamState::Open {
send: ActiveState::WaitData,
recv: recv.clone(),
recv: *recv,
}
};
}
@ -610,7 +618,7 @@ impl Streams {
recv,
} => {
if eos {
stream.state = H2StreamState::LocalHalfClosed(recv.clone());
stream.state = H2StreamState::LocalHalfClosed(*recv);
}
}
H2StreamState::RemoteHalfClosed(ActiveState::WaitData) => {
@ -698,7 +706,7 @@ impl Streams {
recv: ActiveState::WaitData,
} => {
if eos {
stream.state = H2StreamState::RemoteHalfClosed(send.clone());
stream.state = H2StreamState::RemoteHalfClosed(*send);
}
}
H2StreamState::LocalHalfClosed(ActiveState::WaitData) => {