!109 gn编译新增http2

Merge pull request !109 from Tiga Ultraman/hukai_binary
This commit is contained in:
openharmony_ci 2024-05-25 03:48:15 +00:00 committed by Gitee
commit 9829cd655d
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 46 additions and 19 deletions

View File

@ -23,6 +23,8 @@ ohos_rust_static_library("ylong_http") {
features = [
"http1_1",
"huffman",
"http2",
"ylong_base",
]
@ -35,6 +37,8 @@ ohos_rust_unittest("rust_ylong_http_test_ut") {
rustflags = [
"--cfg=feature=\"http1_1\"",
"--cfg=feature=\"huffman\"",
"--cfg=feature=\"http2\"",
"--cfg=feature=\"ylong_base\"",
]

View File

@ -25,6 +25,7 @@ ohos_rust_shared_library("ylong_http_client_inner") {
"async",
"c_openssl_3_0",
"http1_1",
"http2",
"ylong_base",
"__c_openssl",
"__tls",
@ -48,6 +49,7 @@ ohos_rust_unittest("rust_ylong_http_client_test_ut") {
rustflags = [
"--cfg=feature=\"async\"",
"--cfg=feature=\"http1_1\"",
"--cfg=feature=\"http2\"",
"--cfg=feature=\"c_openssl_3_0\"",
"--cfg=feature=\"__tls\"",
"--cfg=feature=\"__c_openssl\"",

View File

@ -260,7 +260,7 @@ where
Some(Poll::Ready(Ok(())))
} else {
buf.append_slice(&data[..fill_len]);
Self::end_read(text_io, frame.flags().is_end_stream())
Self::end_read(text_io, frame.flags().is_end_stream(), data_len)
}
}
Payload::RstStream(reset) => {
@ -283,12 +283,19 @@ where
}
}
fn end_read(text_io: &mut TextIo<S>, end_stream: bool) -> Option<Poll<std::io::Result<()>>> {
fn end_read(
text_io: &mut TextIo<S>,
end_stream: bool,
data_len: usize,
) -> Option<Poll<std::io::Result<()>>> {
if end_stream {
text_io.is_closed = true;
Some(Poll::Ready(Ok(())))
} else {
} else if data_len == 0 {
// no data read and is not end stream.
None
} else {
Some(Poll::Ready(Ok(())))
}
}
@ -311,12 +318,7 @@ where
} else {
buf.append_slice(&data[text_io.offset..text_io.offset + fill_len]);
text_io.offset = 0;
if frame.flags().is_end_stream() {
text_io.is_closed = true;
Some(Poll::Ready(Ok(())))
} else {
None
}
Self::end_read(text_io, frame.flags().is_end_stream(), data_len)
}
}
_ => Some(Poll::Ready(Err(std::io::Error::new(

View File

@ -75,6 +75,7 @@ pub(crate) mod runtime {
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex as AsyncMutex, MutexGuard,
},
task::JoinHandle,
};
#[cfg(all(feature = "tokio_base", feature = "async"))]
pub(crate) use tokio::{
@ -96,6 +97,7 @@ pub(crate) mod runtime {
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex as AsyncMutex, MutexGuard,
},
task::JoinHandle,
};
#[cfg(all(feature = "ylong_base", feature = "http2"))]

View File

@ -205,6 +205,7 @@ pub(crate) mod http2 {
pub(crate) next_stream_id: StreamId,
pub(crate) sender: UnboundedSender<ReqMessage>,
pub(crate) io_shutdown: Arc<AtomicBool>,
pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>,
pub(crate) _mark: PhantomData<S>,
}
@ -291,13 +292,15 @@ pub(crate) mod http2 {
// Error is not possible, so it is not handled for the time
// being.
let mut handles = Vec::with_capacity(3);
if input_tx.send(settings).is_ok() {
Self::launch(controller, req_rx, input_tx, input_rx, io);
Self::launch(controller, req_rx, input_tx, input_rx, &mut handles, io);
}
Self {
next_stream_id,
sender: req_tx,
io_shutdown: shutdown_flag,
handles,
_mark: PhantomData,
}
}
@ -307,13 +310,14 @@ pub(crate) mod http2 {
req_rx: UnboundedReceiver<ReqMessage>,
input_tx: UnboundedSender<Frame>,
input_rx: UnboundedReceiver<Frame>,
handles: &mut Vec<crate::runtime::JoinHandle<()>>,
io: S,
) {
let (resp_tx, resp_rx) = unbounded_channel();
let (read, write) = crate::runtime::split(io);
let settings_sync = Arc::new(Mutex::new(SettingsSync::default()));
let send_settings_sync = settings_sync.clone();
let _send = crate::runtime::spawn(async move {
let send = crate::runtime::spawn(async move {
let mut writer = write;
if async_send_preface(&mut writer).await.is_ok() {
let encoder =
@ -322,21 +326,24 @@ pub(crate) mod http2 {
let _ = Pin::new(&mut send).await;
}
});
handles.push(send);
let recv_settings_sync = settings_sync.clone();
let _recv = crate::runtime::spawn(async move {
let recv = crate::runtime::spawn(async move {
let decoder = FrameDecoder::new();
let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx);
let _ = Pin::new(&mut recv).await;
});
handles.push(recv);
let _manager = crate::runtime::spawn(async move {
let manager = crate::runtime::spawn(async move {
let mut conn_manager =
ConnManager::new(settings_sync, input_tx, resp_rx, req_rx, controller);
if let Err(e) = Pin::new(&mut conn_manager).await {
conn_manager.exit_with_error(e);
}
});
handles.push(manager);
}
}
@ -358,6 +365,17 @@ pub(crate) mod http2 {
}
}
impl<S> Drop for Http2Dispatcher<S> {
fn drop(&mut self) {
for handle in &self.handles {
#[cfg(feature = "ylong_base")]
handle.cancel();
#[cfg(feature = "tokio_base")]
handle.abort();
}
}
}
impl<S> Http2Conn<S> {
pub(crate) fn new(
id: u32,

View File

@ -117,12 +117,12 @@ impl ConnManager {
self.controller
.streams
.window_update_streams(&self.input_tx)?;
self.poll_recv_request(cx);
self.poll_recv_request(cx)?;
self.poll_input_request(cx)?;
Poll::Pending
}
fn poll_recv_request(&mut self, cx: &mut Context<'_>) {
fn poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
loop {
#[cfg(feature = "tokio_base")]
match self.req_rx.poll_recv(cx) {
@ -137,8 +137,7 @@ impl ConnManager {
self.controller.streams.insert(message.id, message.request);
}
Poll::Ready(None) => {
// TODO May need to close the connection after
// the channel is closed?
return Err(DispatchErrorKind::ChannelClosed);
}
Poll::Pending => {
break;
@ -157,14 +156,14 @@ impl ConnManager {
self.controller.streams.insert(message.id, message.request);
}
Poll::Ready(Err(_e)) => {
// TODO May need to close the connection after
// the channel is closed?
return Err(DispatchErrorKind::ChannelClosed);
}
Poll::Pending => {
break;
}
}
}
Ok(())
}
fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {