Fix all clippy warnings (#652)

This commit is contained in:
gtsiam
2022-12-12 22:13:48 +02:00
committed by GitHub
parent 45604060cc
commit c574b6bf3e
39 changed files with 278 additions and 295 deletions
+1 -4
View File
@@ -50,10 +50,7 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
{
let (_, session) = tls.get_ref();
let negotiated_protocol = session.alpn_protocol();
assert_eq!(
Some(ALPN_H2.as_bytes()),
negotiated_protocol.as_ref().map(|x| &**x)
);
assert_eq!(Some(ALPN_H2.as_bytes()), negotiated_protocol);
}
println!("Starting client handshake");
+5 -5
View File
@@ -109,7 +109,7 @@ fn decode_frame(
if partial_inout.is_some() && head.kind() != Kind::Continuation {
proto_err!(conn: "expected CONTINUATION, got {:?}", head.kind());
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
let kind = head.kind();
@@ -231,7 +231,7 @@ fn decode_frame(
if head.stream_id() == 0 {
// Invalid stream identifier
proto_err!(conn: "invalid stream ID 0");
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) {
@@ -257,14 +257,14 @@ fn decode_frame(
Some(partial) => partial,
None => {
proto_err!(conn: "received unexpected CONTINUATION frame");
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
};
// The stream identifiers must match
if partial.frame.stream_id() != head.stream_id() {
proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID");
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
// Extend the buf
@@ -287,7 +287,7 @@ fn decode_frame(
// the attacker to go away.
if partial.buf.len() + bytes.len() > max_header_list_size {
proto_err!(conn: "CONTINUATION frame header block size over ignorable limit");
return Err(Error::library_go_away(Reason::COMPRESSION_ERROR).into());
return Err(Error::library_go_away(Reason::COMPRESSION_ERROR));
}
}
partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
+1 -7
View File
@@ -16,7 +16,7 @@ pub struct Data<T = Bytes> {
pad_len: Option<u8>,
}
#[derive(Copy, Clone, Eq, PartialEq)]
#[derive(Copy, Clone, Default, Eq, PartialEq)]
struct DataFlags(u8);
const END_STREAM: u8 = 0x1;
@@ -211,12 +211,6 @@ impl DataFlags {
}
}
impl Default for DataFlags {
fn default() -> Self {
DataFlags(0)
}
}
impl From<DataFlags> for u8 {
fn from(src: DataFlags) -> u8 {
src.0
+8 -5
View File
@@ -309,17 +309,20 @@ impl fmt::Debug for Headers {
// ===== util =====
pub fn parse_u64(src: &[u8]) -> Result<u64, ()> {
#[derive(Debug, PartialEq, Eq)]
pub struct ParseU64Error;
pub fn parse_u64(src: &[u8]) -> Result<u64, ParseU64Error> {
if src.len() > 19 {
// At danger for overflow...
return Err(());
return Err(ParseU64Error);
}
let mut ret = 0;
for &d in src {
if d < b'0' || d > b'9' {
return Err(());
return Err(ParseU64Error);
}
ret *= 10;
@@ -333,7 +336,7 @@ pub fn parse_u64(src: &[u8]) -> Result<u64, ()> {
#[derive(Debug)]
pub enum PushPromiseHeaderError {
InvalidContentLength(Result<u64, ()>),
InvalidContentLength(Result<u64, ParseU64Error>),
NotSafeAndCacheable,
}
@@ -381,7 +384,7 @@ impl PushPromise {
fn safe_and_cacheable(method: &Method) -> bool {
// Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods
// Safe: https://httpwg.org/specs/rfc7231.html#safe.methods
return method == Method::GET || method == Method::HEAD;
method == Method::GET || method == Method::HEAD
}
pub fn fields(&self) -> &HeaderMap {
+3 -3
View File
@@ -182,10 +182,10 @@ impl Settings {
}
}
Some(MaxFrameSize(val)) => {
if val < DEFAULT_MAX_FRAME_SIZE || val > MAX_MAX_FRAME_SIZE {
return Err(Error::InvalidSettingValue);
} else {
if DEFAULT_MAX_FRAME_SIZE <= val && val <= MAX_MAX_FRAME_SIZE {
settings.max_frame_size = Some(val);
} else {
return Err(Error::InvalidSettingValue);
}
}
Some(MaxHeaderListSize(val)) => {
+13 -16
View File
@@ -852,8 +852,7 @@ mod test {
fn test_decode_empty() {
let mut de = Decoder::new(0);
let mut buf = BytesMut::new();
let empty = de.decode(&mut Cursor::new(&mut buf), |_| {}).unwrap();
assert_eq!(empty, ());
let _: () = de.decode(&mut Cursor::new(&mut buf), |_| {}).unwrap();
}
#[test]
@@ -861,17 +860,16 @@ mod test {
let mut de = Decoder::new(0);
let mut buf = BytesMut::new();
buf.extend(&[0b01000000, 0x80 | 2]);
buf.extend([0b01000000, 0x80 | 2]);
buf.extend(huff_encode(b"foo"));
buf.extend(&[0x80 | 3]);
buf.extend([0x80 | 3]);
buf.extend(huff_encode(b"bar"));
let mut res = vec![];
let _ = de
.decode(&mut Cursor::new(&mut buf), |h| {
res.push(h);
})
.unwrap();
de.decode(&mut Cursor::new(&mut buf), |h| {
res.push(h);
})
.unwrap();
assert_eq!(res.len(), 1);
assert_eq!(de.table.size(), 0);
@@ -900,10 +898,10 @@ mod test {
let value = huff_encode(b"bar");
let mut buf = BytesMut::new();
// header name is non_huff encoded
buf.extend(&[0b01000000, 0x00 | 3]);
buf.extend([0b01000000, 3]);
buf.extend(b"foo");
// header value is partial
buf.extend(&[0x80 | 3]);
buf.extend([0x80 | 3]);
buf.extend(&value[0..1]);
let mut res = vec![];
@@ -917,11 +915,10 @@ mod test {
// extend buf with the remaining header value
buf.extend(&value[1..]);
let _ = de
.decode(&mut Cursor::new(&mut buf), |h| {
res.push(h);
})
.unwrap();
de.decode(&mut Cursor::new(&mut buf), |h| {
res.push(h);
})
.unwrap();
assert_eq!(res.len(), 1);
assert_eq!(de.table.size(), 0);
+4 -4
View File
@@ -118,12 +118,12 @@ impl Encoder {
encode_int(idx, 7, 0x80, dst);
}
Index::Name(idx, _) => {
let header = self.table.resolve(&index);
let header = self.table.resolve(index);
encode_not_indexed(idx, header.value_slice(), header.is_sensitive(), dst);
}
Index::Inserted(_) => {
let header = self.table.resolve(&index);
let header = self.table.resolve(index);
assert!(!header.is_sensitive());
@@ -133,7 +133,7 @@ impl Encoder {
encode_str(header.value_slice(), dst);
}
Index::InsertedValue(idx, _) => {
let header = self.table.resolve(&index);
let header = self.table.resolve(index);
assert!(!header.is_sensitive());
@@ -141,7 +141,7 @@ impl Encoder {
encode_str(header.value_slice(), dst);
}
Index::NotIndexed(_) => {
let header = self.table.resolve(&index);
let header = self.table.resolve(index);
encode_not_indexed2(
header.name().as_slice(),
+13 -13
View File
@@ -190,18 +190,18 @@ impl Header {
use http::header;
match *self {
Header::Field { ref name, .. } => match *name {
Header::Field { ref name, .. } => matches!(
*name,
header::AGE
| header::AUTHORIZATION
| header::CONTENT_LENGTH
| header::ETAG
| header::IF_MODIFIED_SINCE
| header::IF_NONE_MATCH
| header::LOCATION
| header::COOKIE
| header::SET_COOKIE => true,
_ => false,
},
| header::AUTHORIZATION
| header::CONTENT_LENGTH
| header::ETAG
| header::IF_MODIFIED_SINCE
| header::IF_NONE_MATCH
| header::LOCATION
| header::COOKIE
| header::SET_COOKIE
),
Header::Path(..) => true,
_ => false,
}
@@ -231,10 +231,10 @@ impl<'a> Name<'a> {
match self {
Name::Field(name) => Ok(Header::Field {
name: name.clone(),
value: HeaderValue::from_bytes(&*value)?,
value: HeaderValue::from_bytes(&value)?,
}),
Name::Authority => Ok(Header::Authority(BytesStr::try_from(value)?)),
Name::Method => Ok(Header::Method(Method::from_bytes(&*value)?)),
Name::Method => Ok(Header::Method(Method::from_bytes(&value)?)),
Name::Scheme => Ok(Header::Scheme(BytesStr::try_from(value)?)),
Name::Path => Ok(Header::Path(BytesStr::try_from(value)?)),
Name::Protocol => Ok(Header::Protocol(Protocol::try_from(value)?)),
+4 -5
View File
@@ -112,7 +112,7 @@ mod test {
#[test]
fn decode_single_byte() {
assert_eq!("o", decode(&[0b00111111]).unwrap());
assert_eq!("0", decode(&[0x0 + 7]).unwrap());
assert_eq!("0", decode(&[7]).unwrap());
assert_eq!("A", decode(&[(0x21 << 2) + 3]).unwrap());
}
@@ -138,7 +138,7 @@ mod test {
dst.clear();
encode(b"0", &mut dst);
assert_eq!(&dst[..], &[0x0 + 7]);
assert_eq!(&dst[..], &[7]);
dst.clear();
encode(b"A", &mut dst);
@@ -147,7 +147,7 @@ mod test {
#[test]
fn encode_decode_str() {
const DATA: &'static [&'static str] = &[
const DATA: &[&str] = &[
"hello world",
":method",
":scheme",
@@ -184,8 +184,7 @@ mod test {
#[test]
fn encode_decode_u8() {
const DATA: &'static [&'static [u8]] =
&[b"\0", b"\0\0\0", b"\0\x01\x02\x03\x04\x05", b"\xFF\xF8"];
const DATA: &[&[u8]] = &[b"\0", b"\0\0\0", b"\0\x01\x02\x03\x04\x05", b"\xFF\xF8"];
for s in DATA {
let mut dst = BytesMut::with_capacity(s.len());
+1 -1
View File
@@ -404,7 +404,7 @@ impl Table {
// Find the associated position
probe_loop!(probe < self.indices.len(), {
debug_assert!(!self.indices[probe].is_none());
debug_assert!(self.indices[probe].is_some());
let mut pos = self.indices[probe].unwrap();
+5 -5
View File
@@ -52,8 +52,8 @@ fn test_story(story: Value) {
Case {
seqno: case.get("seqno").unwrap().as_u64().unwrap(),
wire: wire,
expect: expect,
wire,
expect,
header_table_size: size,
}
})
@@ -142,10 +142,10 @@ fn key_str(e: &Header) -> &str {
fn value_str(e: &Header) -> &str {
match *e {
Header::Field { ref value, .. } => value.to_str().unwrap(),
Header::Authority(ref v) => &**v,
Header::Authority(ref v) => v,
Header::Method(ref m) => m.as_str(),
Header::Scheme(ref v) => &**v,
Header::Path(ref v) => &**v,
Header::Scheme(ref v) => v,
Header::Path(ref v) => v,
Header::Protocol(ref v) => v.as_str(),
Header::Status(ref v) => v.as_str(),
}
+1 -1
View File
@@ -80,7 +80,7 @@ impl FuzzHpack {
let high = rng.gen_range(128..MAX_CHUNK * 2);
let low = rng.gen_range(0..high);
frame.resizes.extend(&[low, high]);
frame.resizes.extend([low, high]);
}
1..=3 => {
frame.resizes.push(rng.gen_range(128..MAX_CHUNK * 2));
+1
View File
@@ -81,6 +81,7 @@
#![doc(html_root_url = "https://docs.rs/h2/0.3.15")]
#![deny(missing_debug_implementations, missing_docs)]
#![cfg_attr(test, deny(warnings))]
#![allow(clippy::type_complexity, clippy::manual_range_contains)]
macro_rules! proto_err {
(conn: $($msg:tt)+) => {
+1 -1
View File
@@ -215,7 +215,7 @@ where
});
match (ours, theirs) {
(Reason::NO_ERROR, Reason::NO_ERROR) => return Ok(()),
(Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()),
(ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
// If both sides reported an error, give their
// error back to th user. We assume our error
+2 -2
View File
@@ -13,7 +13,7 @@ pub enum Error {
Io(io::ErrorKind, Option<String>),
}
#[derive(Clone, Copy, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Initiator {
User,
Library,
@@ -70,7 +70,7 @@ impl fmt::Display for Error {
impl From<io::ErrorKind> for Error {
fn from(src: io::ErrorKind) -> Self {
Error::Io(src.into(), None)
Error::Io(src, None)
}
}
+1 -4
View File
@@ -200,10 +200,7 @@ impl PingPong {
impl ReceivedPing {
pub(crate) fn is_shutdown(&self) -> bool {
match *self {
ReceivedPing::Shutdown => true,
_ => false,
}
matches!(*self, Self::Shutdown)
}
}
+2 -1
View File
@@ -19,6 +19,7 @@ const UNCLAIMED_NUMERATOR: i32 = 1;
const UNCLAIMED_DENOMINATOR: i32 = 2;
#[test]
#[allow(clippy::assertions_on_constants)]
fn sanity_unclaimed_ratio() {
assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR);
assert!(UNCLAIMED_NUMERATOR >= 0);
@@ -188,7 +189,7 @@ impl FlowControl {
///
/// This type tries to centralize the knowledge of addition and subtraction
/// to this capacity, instead of having integer casts throughout the source.
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd)]
pub struct Window(i32);
impl Window {
+1
View File
@@ -7,6 +7,7 @@ mod send;
mod state;
mod store;
mod stream;
#[allow(clippy::module_inception)]
mod streams;
pub(crate) use self::prioritize::Prioritized;
+40 -34
View File
@@ -7,9 +7,11 @@ use crate::codec::UserError;
use crate::codec::UserError::*;
use bytes::buf::{Buf, Take};
use std::io;
use std::task::{Context, Poll, Waker};
use std::{cmp, fmt, mem};
use std::{
cmp::{self, Ordering},
fmt, io, mem,
task::{Context, Poll, Waker},
};
/// # Warning
///
@@ -235,39 +237,43 @@ impl Prioritize {
// If it were less, then we could never send out the buffered data.
let capacity = (capacity as usize) + stream.buffered_send_data;
if capacity == stream.requested_send_capacity as usize {
// Nothing to do
} else if capacity < stream.requested_send_capacity as usize {
// Update the target requested capacity
stream.requested_send_capacity = capacity as WindowSize;
// Currently available capacity assigned to the stream
let available = stream.send_flow.available().as_size();
// If the stream has more assigned capacity than requested, reclaim
// some for the connection
if available as usize > capacity {
let diff = available - capacity as WindowSize;
stream.send_flow.claim_capacity(diff);
self.assign_connection_capacity(diff, stream, counts);
match capacity.cmp(&(stream.requested_send_capacity as usize)) {
Ordering::Equal => {
// Nothing to do
}
} else {
// If trying to *add* capacity, but the stream send side is closed,
// there's nothing to be done.
if stream.state.is_send_closed() {
return;
Ordering::Less => {
// Update the target requested capacity
stream.requested_send_capacity = capacity as WindowSize;
// Currently available capacity assigned to the stream
let available = stream.send_flow.available().as_size();
// If the stream has more assigned capacity than requested, reclaim
// some for the connection
if available as usize > capacity {
let diff = available - capacity as WindowSize;
stream.send_flow.claim_capacity(diff);
self.assign_connection_capacity(diff, stream, counts);
}
}
Ordering::Greater => {
// If trying to *add* capacity, but the stream send side is closed,
// there's nothing to be done.
if stream.state.is_send_closed() {
return;
}
// Update the target requested capacity
stream.requested_send_capacity =
cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
// Update the target requested capacity
stream.requested_send_capacity =
cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
// Try to assign additional capacity to the stream. If none is
// currently available, the stream will be queued to receive some
// when more becomes available.
self.try_assign_capacity(stream);
// Try to assign additional capacity to the stream. If none is
// currently available, the stream will be queued to receive some
// when more becomes available.
self.try_assign_capacity(stream);
}
}
}
@@ -372,11 +378,11 @@ impl Prioritize {
continue;
}
counts.transition(stream, |_, mut stream| {
counts.transition(stream, |_, stream| {
// Try to assign capacity to the stream. This will also re-queue the
// stream if there isn't enough connection level capacity to fulfill
// the capacity request.
self.try_assign_capacity(&mut stream);
self.try_assign_capacity(stream);
})
}
}
+38 -38
View File
@@ -2,12 +2,12 @@ use super::*;
use crate::codec::UserError;
use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
use crate::proto::{self, Error};
use std::task::Context;
use http::{HeaderMap, Request, Response};
use std::cmp::Ordering;
use std::io;
use std::task::{Poll, Waker};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
#[derive(Debug)]
@@ -178,7 +178,7 @@ impl Recv {
if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
let content_length = match frame::parse_u64(content_length.as_bytes()) {
Ok(v) => v,
Err(()) => {
Err(_) => {
proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
}
@@ -221,11 +221,12 @@ impl Recv {
let stream_id = frame.stream_id();
let (pseudo, fields) = frame.into_parts();
if pseudo.protocol.is_some() {
if counts.peer().is_server() && !self.is_extended_connect_protocol_enabled {
proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
}
if pseudo.protocol.is_some()
&& counts.peer().is_server()
&& !self.is_extended_connect_protocol_enabled
{
proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
}
if !pseudo.is_informational() {
@@ -487,28 +488,32 @@ impl Recv {
// flow-controlled frames until it receives WINDOW_UPDATE frames that
// cause the flow-control window to become positive.
if target < old_sz {
// We must decrease the (local) window on every open stream.
let dec = old_sz - target;
tracing::trace!("decrementing all windows; dec={}", dec);
match target.cmp(&old_sz) {
Ordering::Less => {
// We must decrease the (local) window on every open stream.
let dec = old_sz - target;
tracing::trace!("decrementing all windows; dec={}", dec);
store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
})
} else if target > old_sz {
// We must increase the (local) window on every open stream.
let inc = target - old_sz;
tracing::trace!("incrementing all windows; inc={}", inc);
store.try_for_each(|mut stream| {
// XXX: Shouldn't the peer have already noticed our
// overflow and sent us a GOAWAY?
stream
.recv_flow
.inc_window(inc)
.map_err(proto::Error::library_go_away)?;
stream.recv_flow.assign_capacity(inc);
Ok::<_, proto::Error>(())
})?;
store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
})
}
Ordering::Greater => {
// We must increase the (local) window on every open stream.
let inc = target - old_sz;
tracing::trace!("incrementing all windows; inc={}", inc);
store.try_for_each(|mut stream| {
// XXX: Shouldn't the peer have already noticed our
// overflow and sent us a GOAWAY?
stream
.recv_flow
.inc_window(inc)
.map_err(proto::Error::library_go_away)?;
stream.recv_flow.assign_capacity(inc);
Ok::<_, proto::Error>(())
})?;
}
Ordering::Equal => (),
}
}
@@ -556,7 +561,7 @@ impl Recv {
"recv_data; frame ignored on locally reset {:?} for some time",
stream.id,
);
return Ok(self.ignore_data(sz)?);
return self.ignore_data(sz);
}
// Ensure that there is enough capacity on the connection before acting
@@ -596,7 +601,7 @@ impl Recv {
if stream.state.recv_close().is_err() {
proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
}
@@ -766,7 +771,7 @@ impl Recv {
}
pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
while let Some(_) = stream.pending_recv.pop_front(&mut self.buffer) {
while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
// drop it
}
}
@@ -1089,12 +1094,7 @@ impl Recv {
impl Open {
pub fn is_push_promise(&self) -> bool {
use self::Open::*;
match *self {
PushPromise => true,
_ => false,
}
matches!(*self, Self::PushPromise)
}
}
+50 -46
View File
@@ -7,11 +7,11 @@ use crate::frame::{self, Reason};
use crate::proto::{Error, Initiator};
use bytes::Buf;
use http;
use std::task::{Context, Poll, Waker};
use tokio::io::AsyncWrite;
use std::cmp::Ordering;
use std::io;
use std::task::{Context, Poll, Waker};
/// Manages state transitions related to outbound frames.
#[derive(Debug)]
@@ -456,57 +456,61 @@ impl Send {
let old_val = self.init_window_sz;
self.init_window_sz = val;
if val < old_val {
// We must decrease the (remote) window on every open stream.
let dec = old_val - val;
tracing::trace!("decrementing all windows; dec={}", dec);
match val.cmp(&old_val) {
Ordering::Less => {
// We must decrease the (remote) window on every open stream.
let dec = old_val - val;
tracing::trace!("decrementing all windows; dec={}", dec);
let mut total_reclaimed = 0;
store.for_each(|mut stream| {
let stream = &mut *stream;
let mut total_reclaimed = 0;
store.for_each(|mut stream| {
let stream = &mut *stream;
stream.send_flow.dec_send_window(dec);
stream.send_flow.dec_send_window(dec);
// It's possible that decreasing the window causes
// `window_size` (the stream-specific window) to fall below
// `available` (the portion of the connection-level window
// that we have allocated to the stream).
// In this case, we should take that excess allocation away
// and reassign it to other streams.
let window_size = stream.send_flow.window_size();
let available = stream.send_flow.available().as_size();
let reclaimed = if available > window_size {
// Drop down to `window_size`.
let reclaim = available - window_size;
stream.send_flow.claim_capacity(reclaim);
total_reclaimed += reclaim;
reclaim
} else {
0
};
// It's possible that decreasing the window causes
// `window_size` (the stream-specific window) to fall below
// `available` (the portion of the connection-level window
// that we have allocated to the stream).
// In this case, we should take that excess allocation away
// and reassign it to other streams.
let window_size = stream.send_flow.window_size();
let available = stream.send_flow.available().as_size();
let reclaimed = if available > window_size {
// Drop down to `window_size`.
let reclaim = available - window_size;
stream.send_flow.claim_capacity(reclaim);
total_reclaimed += reclaim;
reclaim
} else {
0
};
tracing::trace!(
"decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
stream.id,
dec,
reclaimed,
stream.send_flow
);
tracing::trace!(
"decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
stream.id,
dec,
reclaimed,
stream.send_flow
);
// TODO: Should this notify the producer when the capacity
// of a stream is reduced? Maybe it should if the capacity
// is reduced to zero, allowing the producer to stop work.
});
// TODO: Should this notify the producer when the capacity
// of a stream is reduced? Maybe it should if the capacity
// is reduced to zero, allowing the producer to stop work.
});
self.prioritize
.assign_connection_capacity(total_reclaimed, store, counts);
} else if val > old_val {
let inc = val - old_val;
self.prioritize
.assign_connection_capacity(total_reclaimed, store, counts);
}
Ordering::Greater => {
let inc = val - old_val;
store.try_for_each(|mut stream| {
self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
.map_err(Error::library_go_away)
})?;
store.try_for_each(|mut stream| {
self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
.map_err(Error::library_go_away)
})?;
}
Ordering::Equal => (),
}
}
+29 -40
View File
@@ -343,10 +343,7 @@ impl State {
}
pub fn is_scheduled_reset(&self) -> bool {
match self.inner {
Closed(Cause::ScheduledLibraryReset(..)) => true,
_ => false,
}
matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
}
pub fn is_local_reset(&self) -> bool {
@@ -367,65 +364,57 @@ impl State {
}
pub fn is_send_streaming(&self) -> bool {
match self.inner {
matches!(
self.inner,
Open {
local: Streaming, ..
} => true,
HalfClosedRemote(Streaming) => true,
_ => false,
}
local: Streaming,
..
} | HalfClosedRemote(Streaming)
)
}
/// Returns true when the stream is in a state to receive headers
pub fn is_recv_headers(&self) -> bool {
match self.inner {
Idle => true,
Open {
matches!(
self.inner,
Idle | Open {
remote: AwaitingHeaders,
..
} => true,
HalfClosedLocal(AwaitingHeaders) => true,
ReservedRemote => true,
_ => false,
}
} | HalfClosedLocal(AwaitingHeaders)
| ReservedRemote
)
}
pub fn is_recv_streaming(&self) -> bool {
match self.inner {
matches!(
self.inner,
Open {
remote: Streaming, ..
} => true,
HalfClosedLocal(Streaming) => true,
_ => false,
}
remote: Streaming,
..
} | HalfClosedLocal(Streaming)
)
}
pub fn is_closed(&self) -> bool {
match self.inner {
Closed(_) => true,
_ => false,
}
matches!(self.inner, Closed(_))
}
pub fn is_recv_closed(&self) -> bool {
match self.inner {
Closed(..) | HalfClosedRemote(..) | ReservedLocal => true,
_ => false,
}
matches!(
self.inner,
Closed(..) | HalfClosedRemote(..) | ReservedLocal
)
}
pub fn is_send_closed(&self) -> bool {
match self.inner {
Closed(..) | HalfClosedLocal(..) | ReservedRemote => true,
_ => false,
}
matches!(
self.inner,
Closed(..) | HalfClosedLocal(..) | ReservedRemote
)
}
pub fn is_idle(&self) -> bool {
match self.inner {
Idle => true,
_ => false,
}
matches!(self.inner, Idle)
}
pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
+5 -7
View File
@@ -1,7 +1,5 @@
use super::*;
use slab;
use indexmap::{self, IndexMap};
use std::convert::Infallible;
@@ -302,15 +300,15 @@ where
let mut stream = store.resolve(idxs.head);
if idxs.head == idxs.tail {
assert!(N::next(&*stream).is_none());
assert!(N::next(&stream).is_none());
self.indices = None;
} else {
idxs.head = N::take_next(&mut *stream).unwrap();
idxs.head = N::take_next(&mut stream).unwrap();
self.indices = Some(idxs);
}
debug_assert!(N::is_queued(&*stream));
N::set_queued(&mut *stream, false);
debug_assert!(N::is_queued(&stream));
N::set_queued(&mut stream, false);
return Some(stream);
}
@@ -347,7 +345,7 @@ impl<'a> Ptr<'a> {
}
pub fn store_mut(&mut self) -> &mut Store {
&mut self.store
self.store
}
/// Remove the stream from the store
+4 -7
View File
@@ -252,7 +252,7 @@ impl Stream {
// The stream is not in any queue
!self.is_pending_send && !self.is_pending_send_capacity &&
!self.is_pending_accept && !self.is_pending_window_update &&
!self.is_pending_open && !self.reset_at.is_some()
!self.is_pending_open && self.reset_at.is_none()
}
/// Returns true when the consumer of the stream has dropped all handles
@@ -379,7 +379,7 @@ impl store::Next for NextSend {
if val {
// ensure that stream is not queued for being opened
// if it's being put into queue for sending data
debug_assert_eq!(stream.is_pending_open, false);
debug_assert!(!stream.is_pending_open);
}
stream.is_pending_send = val;
}
@@ -450,7 +450,7 @@ impl store::Next for NextOpen {
if val {
// ensure that stream is not queued for being sent
// if it's being put into queue for opening the stream
debug_assert_eq!(stream.is_pending_send, false);
debug_assert!(!stream.is_pending_send);
}
stream.is_pending_open = val;
}
@@ -486,9 +486,6 @@ impl store::Next for NextResetExpire {
impl ContentLength {
pub fn is_head(&self) -> bool {
match *self {
ContentLength::Head => true,
_ => false,
}
matches!(*self, Self::Head)
}
}
+12 -12
View File
@@ -312,29 +312,29 @@ impl<B> DynStreams<'_, B> {
pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();
me.recv_headers(self.peer, &self.send_buffer, frame)
me.recv_headers(self.peer, self.send_buffer, frame)
}
pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();
me.recv_data(self.peer, &self.send_buffer, frame)
me.recv_data(self.peer, self.send_buffer, frame)
}
pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();
me.recv_reset(&self.send_buffer, frame)
me.recv_reset(self.send_buffer, frame)
}
/// Notify all streams that a connection-level error happened.
pub fn handle_error(&mut self, err: proto::Error) -> StreamId {
let mut me = self.inner.lock().unwrap();
me.handle_error(&self.send_buffer, err)
me.handle_error(self.send_buffer, err)
}
pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();
me.recv_go_away(&self.send_buffer, frame)
me.recv_go_away(self.send_buffer, frame)
}
pub fn last_processed_id(&self) -> StreamId {
@@ -343,22 +343,22 @@ impl<B> DynStreams<'_, B> {
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();
me.recv_window_update(&self.send_buffer, frame)
me.recv_window_update(self.send_buffer, frame)
}
pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();
me.recv_push_promise(&self.send_buffer, frame)
me.recv_push_promise(self.send_buffer, frame)
}
pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
let mut me = self.inner.lock().map_err(|_| ())?;
me.recv_eof(&self.send_buffer, clear_pending_accept)
me.recv_eof(self.send_buffer, clear_pending_accept)
}
pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
let mut me = self.inner.lock().unwrap();
me.send_reset(&self.send_buffer, id, reason)
me.send_reset(self.send_buffer, id, reason)
}
pub fn send_go_away(&mut self, last_processed_id: StreamId) {
@@ -725,7 +725,7 @@ impl Inner {
}
None => {
proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state");
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
};
@@ -1146,7 +1146,7 @@ impl<B> StreamRef<B> {
let mut child_stream = me.store.resolve(child_key);
child_stream.unlink();
child_stream.remove();
return Err(err.into());
return Err(err);
}
me.refs += 1;
@@ -1390,7 +1390,7 @@ impl Clone for OpaqueStreamRef {
OpaqueStreamRef {
inner: self.inner.clone(),
key: self.key.clone(),
key: self.key,
}
}
}
+1 -1
View File
@@ -413,7 +413,7 @@ where
) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
// Always try to advance the internal state. Getting Pending also is
// needed to allow this function to return Pending.
if let Poll::Ready(_) = self.poll_closed(cx)? {
if self.poll_closed(cx)?.is_ready() {
// If the socket is closed, don't return anything
// TODO: drop any pending streams
return Poll::Ready(None);
+2 -2
View File
@@ -556,8 +556,8 @@ impl PingPong {
pub fn send_ping(&mut self, ping: Ping) -> Result<(), crate::Error> {
// Passing a `Ping` here is just to be forwards-compatible with
// eventually allowing choosing a ping payload. For now, we can
// just drop it.
drop(ping);
// just ignore it.
let _ = ping;
self.inner.send_ping().map_err(|err| match err {
Some(err) => err.into(),
+2 -2
View File
@@ -9,8 +9,8 @@ use h2::{
frame::{self, Frame, StreamId},
};
pub const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
pub const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
pub const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
// ==== helper functions to easily construct h2 Frames ====
+2 -2
View File
@@ -56,7 +56,7 @@ struct Inner {
closed: bool,
}
const PREFACE: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
/// Create a new mock and handle
pub fn new() -> (Mock, Handle) {
@@ -148,7 +148,7 @@ impl Handle {
poll_fn(move |cx| {
while buf.has_remaining() {
let res = Pin::new(self.codec.get_mut())
.poll_write(cx, &mut buf.chunk())
.poll_write(cx, buf.chunk())
.map_err(|e| panic!("write err={:?}", e));
let n = ready!(res).unwrap();
+1 -1
View File
@@ -103,7 +103,7 @@ where
// Connection is done...
b.await
}
Right((v, _)) => return v,
Right((v, _)) => v,
Left((Err(e), _)) => panic!("err: {:?}", e),
}
})
+2 -2
View File
@@ -36,7 +36,7 @@ pub async fn yield_once() {
pub fn wait_for_capacity(stream: h2::SendStream<Bytes>, target: usize) -> WaitForCapacity {
WaitForCapacity {
stream: Some(stream),
target: target,
target,
}
}
@@ -66,7 +66,7 @@ impl Future for WaitForCapacity {
assert_ne!(act, 0);
if act >= self.target {
return Poll::Ready(self.stream.take().unwrap().into());
return Poll::Ready(self.stream.take().unwrap());
}
}
}
+4 -5
View File
@@ -371,7 +371,7 @@ async fn send_request_poll_ready_when_connection_error() {
resp2.await.expect_err("req2");
}));
while let Some(_) = unordered.next().await {}
while unordered.next().await.is_some() {}
};
join(srv, h2).await;
@@ -489,9 +489,8 @@ async fn http_2_request_without_scheme_or_authority() {
client
.send_request(request, true)
.expect_err("should be UserError");
let ret = h2.await.expect("h2");
let _: () = h2.await.expect("h2");
drop(client);
ret
};
join(srv, h2).await;
@@ -1452,8 +1451,8 @@ async fn extended_connect_request() {
join(srv, h2).await;
}
const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
trait MockH2 {
fn handshake(&mut self) -> &mut Self;
+1 -1
View File
@@ -58,7 +58,7 @@ impl Server {
}
fn addr(&self) -> SocketAddr {
self.addr.clone()
self.addr
}
fn request_count(&self) -> usize {
+3 -3
View File
@@ -11,9 +11,8 @@ async fn recv_single_ping() {
// Create the handshake
let h2 = async move {
let (client, conn) = client::handshake(m).await.unwrap();
let c = conn.await.unwrap();
(client, c)
let (_client, conn) = client::handshake(m).await.unwrap();
let _: () = conn.await.unwrap();
};
let mock = async move {
@@ -146,6 +145,7 @@ async fn user_notifies_when_connection_closes() {
srv
};
#[allow(clippy::async_yields_async)]
let client = async move {
let (_client, mut conn) = client::handshake(io).await.expect("client handshake");
// yield once so we can ack server settings
+1 -1
View File
@@ -223,7 +223,7 @@ async fn pending_push_promises_reset_when_dropped() {
assert_eq!(resp.status(), StatusCode::OK);
};
let _ = conn.drive(req).await;
conn.drive(req).await;
conn.await.expect("client");
drop(client);
};
+2 -2
View File
@@ -5,8 +5,8 @@ use futures::StreamExt;
use h2_support::prelude::*;
use tokio::io::AsyncWriteExt;
const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
#[tokio::test]
async fn read_preface_in_multiple_frames() {
+1 -1
View File
@@ -786,7 +786,7 @@ async fn rst_while_closing() {
// Enqueue trailers frame.
let _ = stream.send_trailers(HeaderMap::new());
// Signal the server mock to send RST_FRAME
let _ = tx.send(()).unwrap();
let _: () = tx.send(()).unwrap();
drop(stream);
yield_once().await;
// yield once to allow the server mock to be polled
+6 -6
View File
@@ -10,7 +10,7 @@ fn main() {
let path = args.get(1).expect("usage: genfixture [PATH]");
let path = Path::new(path);
let mut tests = HashMap::new();
let mut tests: HashMap<String, Vec<String>> = HashMap::new();
for entry in WalkDir::new(path) {
let entry = entry.unwrap();
@@ -28,21 +28,21 @@ fn main() {
let fixture_path = path.split("fixtures/hpack/").last().unwrap();
// Now, split that into the group and the name
let module = fixture_path.split("/").next().unwrap();
let module = fixture_path.split('/').next().unwrap();
tests
.entry(module.to_string())
.or_insert(vec![])
.or_default()
.push(fixture_path.to_string());
}
let mut one = false;
for (module, tests) in tests {
let module = module.replace("-", "_");
let module = module.replace('-', "_");
if one {
println!("");
println!();
}
one = true;
@@ -51,7 +51,7 @@ fn main() {
println!(" {} => {{", module);
for test in tests {
let ident = test.split("/").nth(1).unwrap().split(".").next().unwrap();
let ident = test.split('/').nth(1).unwrap().split('.').next().unwrap();
println!(" ({}, {:?});", ident, test);
}
+5 -5
View File
@@ -112,8 +112,8 @@ impl Node {
};
start.transitions.borrow_mut().push(Transition {
target: target,
byte: byte,
target,
byte,
maybe_eos: self.maybe_eos,
});
@@ -238,7 +238,7 @@ pub fn main() {
let (encode, decode) = load_table();
println!("// !!! DO NOT EDIT !!! Generated by util/genhuff/src/main.rs");
println!("");
println!();
println!("// (num-bits, bits)");
println!("pub const ENCODE_TABLE: [(usize, u64); 257] = [");
@@ -247,7 +247,7 @@ pub fn main() {
}
println!("];");
println!("");
println!();
println!("// (next-state, byte, flags)");
println!("pub const DECODE_TABLE: [[(usize, u8, u8); 16]; 256] = [");
@@ -256,7 +256,7 @@ pub fn main() {
println!("];");
}
const TABLE: &'static str = r##"
const TABLE: &str = r##"
( 0) |11111111|11000 1ff8 [13]
( 1) |11111111|11111111|1011000 7fffd8 [23]
( 2) |11111111|11111111|11111110|0010 fffffe2 [28]