!124 http2 headers过大时header frame写入不完整问题

Merge pull request !124 from Tiga Ultraman/hukai_binary
This commit is contained in:
openharmony_ci 2024-07-04 02:25:47 +00:00 committed by Gitee
commit 47b6e132da
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 141 additions and 78 deletions

View File

@ -17,12 +17,15 @@ use crate::h2::{Frame, Goaway, HpackEncoder, Settings};
// TODO: Classify encoder errors per RFC specifications into categories like
// stream or connection errors. Identify specific error types such as
// Frame_size_error/Protocol Error.
const DEFAULT_MAX_FRAME_SIZE: usize = 16384;
#[derive(Debug)]
pub enum FrameEncoderErr {
EncodingData,
UnexpectedPayloadType,
NoCurrentFrame,
InternalError,
HeadersNotEnd,
}
#[derive(PartialEq, Debug)]
@ -82,11 +85,16 @@ enum FrameEncoderState {
/// also handles the fragmentation of frames.
pub struct FrameEncoder {
current_frame: Option<Frame>,
// `max_frame_size` is actually useless in the Encoder for headers frame and continuation
// frame, because the frame length does not exceed the length of the
// `header_payload_buffer`
max_frame_size: usize,
max_header_list_size: usize,
hpack_encoder: HpackEncoder,
state: FrameEncoderState,
encoded_bytes: usize,
// `remaining_header_payload` will always be smaller than the minimum max_frame_size,
// because the `header_payload_buffer` length is the minimum max_frame_size.
remaining_header_payload: usize,
remaining_payload_bytes: usize,
is_end_stream: bool,
@ -110,15 +118,15 @@ impl FrameEncoder {
remaining_payload_bytes: 0,
is_end_stream: false,
is_end_headers: false,
// Initialized to default max frame size.
header_payload_buffer: vec![0; 16383],
// Initialized to default max frame size(16384).
header_payload_buffer: vec![0; DEFAULT_MAX_FRAME_SIZE],
header_payload_index: 0,
}
}
/// Sets the current frame to be encoded by the `FrameEncoder`. The state of
/// the encoder is updated based on the payload type of the frame.
pub fn set_frame(&mut self, frame: Frame) {
pub fn set_frame(&mut self, frame: Frame) -> Result<(), FrameEncoderErr> {
self.is_end_stream = frame.flags().is_end_stream();
self.is_end_headers = frame.flags().is_end_headers();
self.current_frame = Some(frame);
@ -129,6 +137,9 @@ impl FrameEncoder {
match &self.current_frame {
Some(frame) => match frame.payload() {
Payload::Headers(headers) => {
if !self.is_end_headers {
return Err(FrameEncoderErr::HeadersNotEnd);
}
self.hpack_encoder.set_parts(headers.get_parts());
self.header_payload_index = 0;
// TODO: Handle potential scenario where HPACK encoding may not be able to
@ -150,6 +161,7 @@ impl FrameEncoder {
},
None => self.state = FrameEncoderState::Idle,
}
Ok(())
}
/// Encodes the current frame into the given buffer. The state of the
@ -326,6 +338,21 @@ impl FrameEncoder {
self.hpack_encoder = HpackEncoder::with_max_size(self.max_header_list_size)
}
fn finish_current_frame(&mut self) {
self.header_payload_index = 0;
self.is_end_stream = false;
self.current_frame = None;
self.is_end_headers = false;
self.remaining_header_payload = 0;
self.encoded_bytes = 0;
}
fn read_rest_payload(&mut self) {
self.header_payload_index = 0;
let payload_size = self.hpack_encoder.encode(&mut self.header_payload_buffer);
self.remaining_header_payload = payload_size;
}
fn encode_headers_frame(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
if let Some(frame) = &self.current_frame {
if let Payload::Headers(_) = frame.payload() {
@ -346,6 +373,7 @@ impl FrameEncoder {
if self.encoded_bytes >= frame_header_size {
payload_bytes_written = self
.write_payload(&mut buf[bytes_written..], self.remaining_header_payload);
self.encoded_bytes += payload_bytes_written;
self.headers_header_status();
}
@ -359,14 +387,15 @@ impl FrameEncoder {
}
fn headers_header_status(&mut self) {
if self.remaining_header_payload <= self.max_frame_size {
self.state = if self.is_end_stream {
FrameEncoderState::HeadersComplete
} else {
FrameEncoderState::EncodingHeadersPayload
};
self.state = if self.header_payload_index < self.remaining_header_payload {
FrameEncoderState::EncodingHeadersPayload
} else if self.hpack_encoder.is_finished() {
// set_frame ensures that headers must be is_end_headers
self.finish_current_frame();
FrameEncoderState::HeadersComplete
} else {
self.state = FrameEncoderState::EncodingContinuationFrames;
self.read_rest_payload();
FrameEncoderState::EncodingContinuationFrames
}
}
@ -390,7 +419,12 @@ impl FrameEncoder {
}
// The 5th byte represents the frame flags in the frame header.
4 => {
*item = frame.flags().bits();
if self.hpack_encoder.is_finished() {
*item = frame.flags().bits();
} else {
// If not finished, it is followed by a Continuation frame.
*item = frame.flags().bits() & 0xFB
}
}
// The last 4 bytes (6th to 9th) represent the stream identifier in the
// frame header.
@ -409,17 +443,15 @@ impl FrameEncoder {
fn encode_headers_payload(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
if let Some(frame) = &self.current_frame {
if let Payload::Headers(_) = frame.payload() {
let available_space = buf.len();
if available_space == 0 {
if buf.is_empty() {
return Ok(0);
}
let payload_bytes_written = self.write_payload(buf, self.remaining_header_payload);
self.encoded_bytes += payload_bytes_written;
self.remaining_header_payload -= payload_bytes_written;
// Updates the state based on the encoding progress
self.headers_payload_status();
self.headers_header_status();
Ok(payload_bytes_written)
} else {
@ -430,50 +462,26 @@ impl FrameEncoder {
}
}
fn headers_payload_status(&mut self) {
if self.hpack_encoder.is_finished() {
if self.remaining_header_payload <= self.max_frame_size {
self.state = if self.is_end_stream || self.is_end_headers {
FrameEncoderState::HeadersComplete
} else {
FrameEncoderState::EncodingContinuationFrames
};
} else {
self.state = FrameEncoderState::EncodingContinuationFrames;
}
} else {
self.state = FrameEncoderState::EncodingHeadersPayload;
}
}
fn encode_continuation_frames(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
if let Some(frame) = &self.current_frame {
if let Payload::Headers(_) = frame.payload() {
if self.remaining_header_payload == 0 {
self.state = FrameEncoderState::HeadersComplete;
return Ok(0);
}
let available_space = buf.len();
let frame_header_size = 9;
// TODO allow available_space < 9
if available_space < frame_header_size {
return Ok(0);
}
// Encodes CONTINUATION frame header.
// And this value is always the remaining_header_payload.
let continuation_frame_len = self.remaining_header_payload.min(self.max_frame_size);
for (buf_index, item) in buf.iter_mut().enumerate().take(3) {
*item = ((continuation_frame_len >> (16 - (8 * buf_index))) & 0xFF) as u8;
}
buf[3] = FrameType::Continuation as u8;
let mut new_flags = FrameFlags::empty();
if self.remaining_header_payload <= self.max_frame_size {
if self.is_end_headers {
// Sets the END_HEADER flag on the last CONTINUATION frame.
new_flags.set_end_headers(true);
}
if self.is_end_stream {
// Sets the END_STREAM flag.
new_flags.set_end_stream(true);
}
if self.remaining_header_payload <= self.max_frame_size && self.hpack_encoder.is_finished() && self.is_end_headers {
// Sets the END_HEADER flag on the last CONTINUATION frame.
new_flags.set_end_headers(true);
}
buf[4] = new_flags.bits();
@ -487,10 +495,9 @@ impl FrameEncoder {
let payload_bytes_written =
self.write_payload(&mut buf[frame_header_size..], continuation_frame_len);
self.encoded_bytes += payload_bytes_written;
self.remaining_header_payload -= payload_bytes_written;
// Updates the state based on the encoding progress.
self.update_continuation_state();
self.headers_header_status();
Ok(frame_header_size + payload_bytes_written)
} else {
@ -501,19 +508,6 @@ impl FrameEncoder {
}
}
fn update_continuation_state(&mut self) {
if self.hpack_encoder.is_finished() && self.remaining_header_payload <= self.max_frame_size
{
self.state = if self.is_end_stream || self.is_end_headers {
FrameEncoderState::HeadersComplete
} else {
FrameEncoderState::EncodingContinuationFrames
};
} else {
self.state = FrameEncoderState::EncodingContinuationFrames;
}
}
fn encode_data_header(&mut self, buf: &mut [u8]) -> Result<usize, FrameEncoderErr> {
if let Some(frame) = &self.current_frame {
if let Payload::Data(data_frame) = frame.payload() {
@ -1319,7 +1313,7 @@ mod ut_frame_encoder {
Payload::Data(Data::new(data_payload.clone())),
);
encoder.set_frame(data_frame);
encoder.set_frame(data_frame).unwrap();
let mut first_buf = [0u8; 2];
let mut second_buf = [0u8; 38];
@ -1364,7 +1358,7 @@ mod ut_frame_encoder {
let frame = Frame::new(1, frame_flag, Payload::Headers(Headers::new(new_parts)));
// Set the current frame for the encoder
frame_encoder.set_frame(frame);
frame_encoder.set_frame(frame).unwrap();
let mut buf = vec![0; 50];
let first_encoded = frame_encoder.encode(&mut buf).unwrap();
@ -1405,7 +1399,7 @@ mod ut_frame_encoder {
let mut first_buf = [0u8; 9];
let mut second_buf = [0u8; 30];
let mut third_buf = [0u8; 6];
encoder.set_frame(settings_frame);
encoder.set_frame(settings_frame).unwrap();
let first_encoded = encoder.encode(&mut first_buf).unwrap();
assert_eq!(encoder.state, FrameEncoderState::EncodingSettingsPayload);
@ -1464,7 +1458,7 @@ mod ut_frame_encoder {
Payload::Ping(Ping { data: ping_payload }),
);
encoder.set_frame(ping_frame);
encoder.set_frame(ping_frame).unwrap();
let mut first_buf = [0u8; 9];
let mut second_buf = [0u8; 8];
@ -1538,15 +1532,15 @@ mod ut_frame_encoder {
Payload::Headers(Headers::new(new_parts.clone())),
);
encoder.set_frame(frame_1);
encoder.set_frame(frame_1).unwrap();
let mut first_buf = [0u8; 50];
let first_encoding = encoder.encode(&mut first_buf).unwrap();
encoder.set_frame(data_frame);
encoder.set_frame(data_frame).unwrap();
let mut second_buf = [0u8; 50];
let second_encoding = encoder.encode(&mut second_buf).unwrap();
encoder.set_frame(frame_2);
encoder.set_frame(frame_2).unwrap();
let mut third_buf = [0u8; 50];
let third_encoding = encoder.encode(&mut third_buf).unwrap();
@ -1583,7 +1577,7 @@ mod ut_frame_encoder {
);
// Set the current frame for the encoder
frame_encoder.set_frame(frame);
frame_encoder.set_frame(frame).unwrap();
let mut buf = vec![0; 50];
let first_encoded = frame_encoder.encode(&mut buf).unwrap();
@ -1628,7 +1622,7 @@ mod ut_frame_encoder {
);
// Sets the current frame for the encoder.
frame_encoder.set_frame(frame);
frame_encoder.set_frame(frame).unwrap();
let mut buf = vec![0; 50];
let first_encoded = frame_encoder.encode(&mut buf).unwrap();
@ -1667,7 +1661,7 @@ mod ut_frame_encoder {
let priority_frame =
Frame::new(131, FrameFlags::new(0), Payload::Priority(priority_payload));
encoder.set_frame(priority_frame);
encoder.set_frame(priority_frame).unwrap();
let mut buf = [0u8; 14];
@ -1727,7 +1721,7 @@ mod ut_frame_encoder {
);
// 3. Sets the frame for the encoder.
encoder.set_frame(goaway_frame);
encoder.set_frame(goaway_frame).unwrap();
// 4. Encodes the frame and its payload using buffer segments.
let mut first_buf = [0u8; 9];
@ -1798,7 +1792,7 @@ mod ut_frame_encoder {
let settings = Settings::new(settings_payload);
let settings_frame = Frame::new(0, FrameFlags::new(0), Payload::Settings(settings));
encoder.set_frame(settings_frame);
encoder.set_frame(settings_frame).unwrap();
let new_setting = Setting::MaxFrameSize(8192);
encoder.update_setting(new_setting.clone());
@ -1839,7 +1833,7 @@ mod ut_frame_encoder {
Payload::Headers(Headers::new(new_parts.clone())),
);
frame_encoder.set_frame(frame);
frame_encoder.set_frame(frame).unwrap();
frame_encoder.state = FrameEncoderState::EncodingContinuationFrames;
let mut buf = [0u8; 5000];
@ -1852,14 +1846,14 @@ mod ut_frame_encoder {
Payload::Headers(Headers::new(new_parts.clone())),
);
frame_encoder.set_frame(frame);
frame_encoder.set_frame(frame).unwrap();
frame_encoder.state = FrameEncoderState::EncodingContinuationFrames;
assert!(frame_encoder.encode_continuation_frames(&mut buf).is_ok());
let frame_flag = FrameFlags::empty();
let frame = Frame::new(1, frame_flag, Payload::Ping(Ping::new([0; 8])));
frame_encoder.set_frame(frame);
frame_encoder.set_frame(frame).unwrap();
frame_encoder.state = FrameEncoderState::EncodingContinuationFrames;
assert!(frame_encoder.encode_continuation_frames(&mut buf).is_err());
}
@ -1887,14 +1881,14 @@ mod ut_frame_encoder {
);
// Sets the frame to the frame_encoder and test padding encoding.
frame_encoder.set_frame(data_frame);
frame_encoder.set_frame(data_frame).unwrap();
frame_encoder.state = FrameEncoderState::EncodingDataPadding;
let mut buf = [0u8; 600];
assert!(frame_encoder.encode_padding(&mut buf).is_ok());
let headers_payload = Payload::Headers(Headers::new(Parts::new()));
let headers_frame = Frame::new(1, frame_flags.clone(), headers_payload);
frame_encoder.set_frame(headers_frame);
frame_encoder.set_frame(headers_frame).unwrap();
frame_encoder.state = FrameEncoderState::EncodingDataPadding;
assert!(frame_encoder.encode_padding(&mut buf).is_err());

View File

@ -430,4 +430,69 @@ mod ut_http2 {
panic!("Unexpected frame type")
}
}
/// UT for ensure that the response body(data frame) can read ends normally.
///
/// # Brief
/// 1. Creates three data frames, one greater than buf, one less than buf,
/// and the last one equal to and finished with buf.
/// 2. The response body data is read from TextIo using a buf of 10 bytes.
/// 3. The body is all read, and the size is the same as the default.
/// 5. Checks that result.
#[cfg(feature = "ylong_base")]
#[test]
fn ut_http2_body_poll_read() {
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use ylong_http::h2::{Data, Frame, FrameFlags};
use ylong_runtime::futures::poll_fn;
use ylong_runtime::io::{AsyncRead, ReadBuf};
use crate::async_impl::conn::http2::TextIo;
use crate::util::dispatcher::http2::Http2Conn;
let (resp_tx, resp_rx) = crate::runtime::unbounded_channel();
let (req_tx, _req_rx) = crate::runtime::unbounded_channel();
let shutdown = Arc::new(AtomicBool::new(false));
let mut conn: Http2Conn<()> = Http2Conn::new(1, shutdown, req_tx);
conn.receiver.set_receiver(resp_rx);
let mut text_io = TextIo::new(conn);
let data_1 = Frame::new(
1,
FrameFlags::new(0),
Payload::Data(Data::new(vec![b'a'; 128])),
);
let data_2 = Frame::new(
1,
FrameFlags::new(0),
Payload::Data(Data::new(vec![b'a'; 2])),
);
let data_3 = Frame::new(
1,
FrameFlags::new(1),
Payload::Data(Data::new(vec![b'a'; 10])),
);
let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_1));
let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_2));
let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_3));
ylong_runtime::block_on(async {
let mut buf = [0_u8; 10];
let mut output_vec = vec![];
let mut size = buf.len();
// `output_vec < 1024` in order to be able to exit normally in case of an
// exception.
while size != 0 && output_vec.len() < 1024 {
let mut buffer = ReadBuf::new(buf.as_mut_slice());
poll_fn(|cx| Pin::new(&mut text_io).poll_read(cx, &mut buffer))
.await
.unwrap();
size = buffer.filled_len();
output_vec.extend_from_slice(&buf[..size]);
}
assert_eq!(output_vec.len(), 140);
})
}
}

View File

@ -95,12 +95,16 @@ impl<S: AsyncWrite + Unpin + Sync + Send + 'static> Future for SendData<S> {
} else {
frame
};
sender.encoder.set_frame(frame);
// This error will never happen.
sender.encoder.set_frame(frame).map_err(|_| {
DispatchErrorKind::H2(H2Error::ConnectionError(ErrorCode::IntervalError))
})?;
sender.state = InputState::WriteFrame;
}
InputState::WriteFrame => {
match sender.poll_writer_frame(cx) {
Poll::Ready(_) => {}
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
};
sender.state = InputState::RecvFrame;