Update to std-future (#39)

This commit is contained in:
Gurwinder Singh
2019-07-23 22:56:42 +05:30
committed by Sean McArthur
parent 9a7bb13490
commit c37ae75c5c
6 changed files with 108 additions and 246 deletions
+2 -2
View File
@@ -1,7 +1,7 @@
language: rust
rust:
- stable
- beta
# - stable
# - beta
- nightly
sudo: false
+5 -5
View File
@@ -8,16 +8,16 @@ keywords = ["hyper", "tls", "http", "https", "ssl"]
homepage = "https://hyper.rs"
repository = "https://github.com/hyperium/hyper-tls"
documentation = "https://docs.rs/hyper-tls"
edition = "2018"
[features]
vendored = ["native-tls/vendored"]
[dependencies]
bytes = "0.4"
futures = "0.1.21"
native-tls = "0.2"
hyper = "0.12"
tokio-io = "0.1"
hyper = { git = "https://github.com/hyperium/hyper" }
tokio-io = { git = "https://github.com/tokio-rs/tokio" }
tokio-tls = { git = "https://github.com/tokio-rs/tokio" }
[dev-dependencies]
tokio = "0.1.5"
tokio = { git = "https://github.com/tokio-rs/tokio" }
+18 -23
View File
@@ -1,28 +1,23 @@
extern crate futures;
extern crate hyper;
extern crate hyper_tls;
extern crate tokio;
#![feature(async_await)]
use futures::{future, Future, Stream};
use std::io::Write;
fn main() {
tokio::run(future::lazy(|| {
let https = hyper_tls::HttpsConnector::new(4).unwrap();
let client = hyper::Client::builder()
.build::<_, hyper::Body>(https);
#[tokio::main]
async fn main() -> Result<(), hyper::Error> {
let https = hyper_tls::HttpsConnector::new(4).unwrap();
let client = hyper::Client::builder().build::<_, hyper::Body>(https);
client
.get("https://hyper.rs".parse().unwrap())
.and_then(|res| {
println!("Status: {}", res.status());
println!("Headers:\n{:#?}", res.headers());
res.into_body().for_each(|chunk| {
::std::io::stdout()
.write_all(&chunk)
.map_err(|e| panic!("example expects stdout to work: {}", e))
})
})
.map_err(|e| println!("request error: {}", e))
}));
let res = client.get("https://hyper.rs".parse().unwrap()).await?;
println!("Status: {}", res.status());
println!("Headers:\n{:#?}", res.headers());
let mut body = res.into_body();
while let Some(chunk) = body.next().await {
let chunk = chunk?;
std::io::stdout()
.write_all(&chunk)
.expect("example expects stdout to work");
}
Ok(())
}
+35 -59
View File
@@ -1,12 +1,15 @@
use std::fmt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::{Async, Future, future, Poll};
use hyper::client::connect::{Connect, Connected, Destination, HttpConnector};
pub use native_tls::Error;
use native_tls::{self, HandshakeError, TlsConnector};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tls::TlsConnector;
use stream::{MaybeHttpsStream, TlsStream};
use crate::stream::MaybeHttpsStream;
/// A Connector for the `https` scheme.
#[derive(Clone)]
@@ -32,9 +35,7 @@ impl HttpsConnector<HttpConnector> {
/// If you would like to force the use of HTTPS then call https_only(true)
/// on the returned connector.
pub fn new(threads: usize) -> Result<Self, Error> {
TlsConnector::builder()
.build()
.map(|tls| HttpsConnector::new_(threads, tls))
native_tls::TlsConnector::new().map(|tls| HttpsConnector::new_(threads, tls.into()))
}
fn new_(threads: usize, tls: TlsConnector) -> Self {
@@ -80,8 +81,7 @@ impl<T: fmt::Debug> fmt::Debug for HttpsConnector<T> {
impl<T> Connect for HttpsConnector<T>
where
T: Connect<Error=io::Error>,
T::Transport: 'static,
T: Connect<Error = io::Error>,
T::Future: 'static,
{
type Transport = MaybeHttpsStream<T::Transport>;
@@ -92,45 +92,47 @@ where
let is_https = dst.scheme() == "https";
// Early abort if HTTPS is forced but can't be used
if !is_https && self.force_https {
let err = io::Error::new(io::ErrorKind::Other, "HTTPS scheme forced but can't be used");
return HttpsConnecting(Box::new(future::err(err)));
let err = io::Error::new(
io::ErrorKind::Other,
"HTTPS scheme forced but can't be used",
);
return HttpsConnecting(Box::pin(async { Err(err) }));
}
let host = dst.host().to_owned();
let connecting = self.http.connect(dst);
let tls = self.tls.clone();
let fut: BoxedFut<T::Transport> = if is_https {
let fut = connecting.and_then(move |(tcp, connected)| {
let handshake = Handshaking {
inner: Some(tls.connect(&host, tcp)),
};
handshake
.map(|conn| (MaybeHttpsStream::Https(TlsStream::new(conn)), connected))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
});
Box::new(fut)
} else {
Box::new(connecting.map(|(tcp, connected)| {
(MaybeHttpsStream::Http(tcp), connected)
}))
let fut = async move {
let (tcp, connected) = match connecting.await {
Ok(v) => v,
Err(e) => return Err(e),
};
let maybe = if is_https {
let tls = tls
.connect(&host, tcp)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
MaybeHttpsStream::Https(tls)
} else {
MaybeHttpsStream::Http(tcp)
};
Ok((maybe, connected))
};
HttpsConnecting(fut)
HttpsConnecting(Box::pin(fut))
}
}
type BoxedFut<T> = Box<Future<Item=(MaybeHttpsStream<T>, Connected), Error=io::Error> + Send>;
type BoxedFut<T> =
Pin<Box<dyn Future<Output = io::Result<(MaybeHttpsStream<T>, Connected)>> + Send>>;
/// A Future representing work to connect to a URL, and a TLS handshake.
pub struct HttpsConnecting<T>(BoxedFut<T>);
impl<T: AsyncRead + AsyncWrite + Unpin> Future for HttpsConnecting<T> {
type Output = Result<(MaybeHttpsStream<T>, Connected), io::Error>;
impl<T> Future for HttpsConnecting<T> {
type Item = (MaybeHttpsStream<T>, Connected);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
@@ -139,29 +141,3 @@ impl<T> fmt::Debug for HttpsConnecting<T> {
f.pad("HttpsConnecting")
}
}
struct Handshaking<T> {
inner: Option<Result<native_tls::TlsStream<T>, HandshakeError<T>>>,
}
impl<T: io::Read + io::Write> Future for Handshaking<T> {
type Item = native_tls::TlsStream<T>;
type Error = native_tls::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.take().expect("polled after ready") {
Ok(stream) => Ok(stream.into()),
Err(HandshakeError::WouldBlock(mid)) => {
match mid.handshake() {
Ok(stream) => Ok(stream.into()),
Err(HandshakeError::Failure(err)) => Err(err),
Err(HandshakeError::WouldBlock(mid)) => {
self.inner = Some(Err(HandshakeError::WouldBlock(mid)));
Ok(Async::NotReady)
}
}
},
Err(HandshakeError::Failure(err)) => Err(err),
}
}
}
+11 -26
View File
@@ -7,44 +7,29 @@
//! ## Example
//!
//! ```no_run
//! extern crate futures;
//! extern crate hyper;
//! extern crate hyper_tls;
//! extern crate tokio;
//! #![feature(async_await)]
//!
//! use futures::{future, Future};
//! use hyper_tls::HttpsConnector;
//! use hyper::Client;
//!
//! fn main() {
//! tokio::run(future::lazy(|| {
//! // 4 is number of blocking DNS threads
//! let https = HttpsConnector::new(4).unwrap();
//! let client = Client::builder()
//! .build::<_, hyper::Body>(https);
//! #[tokio::main]
//! async fn main() -> Result<(), hyper::Error>{
//! // 4 is number of blocking DNS threads
//! let https = HttpsConnector::new(4).unwrap();
//! let client = Client::builder().build::<_, hyper::Body>(https);
//!
//! client
//! .get("https://hyper.rs".parse().unwrap())
//! .map(|res| {
//! assert_eq!(res.status(), 200);
//! })
//! .map_err(|e| println!("request error: {}", e))
//! }));
//! let res = client.get("https://hyper.rs".parse().unwrap()).await?;
//! assert_eq!(res.status(), 200);
//! Ok(())
//! }
//! ```
#![doc(html_root_url = "https://docs.rs/hyper-tls/0.3.2")]
#![cfg_attr(test, deny(warnings))]
#![deny(missing_docs)]
#![deny(missing_debug_implementations)]
#![feature(async_await)]
extern crate bytes;
extern crate futures;
extern crate hyper;
extern crate native_tls;
#[macro_use]
extern crate tokio_io;
pub use client::{HttpsConnector, HttpsConnecting, Error};
pub use client::{Error, HttpsConnecting, HttpsConnector};
pub use stream::{MaybeHttpsStream, TlsStream};
mod client;
+37 -131
View File
@@ -1,10 +1,10 @@
use std::fmt;
use std::io::{self, Read, Write};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Buf, BufMut};
use futures::Poll;
use native_tls;
use tokio_io::{AsyncRead, AsyncWrite};
pub use tokio_tls::TlsStream;
/// A stream that might be protected with TLS.
pub enum MaybeHttpsStream<T> {
@@ -14,36 +14,17 @@ pub enum MaybeHttpsStream<T> {
Https(TlsStream<T>),
}
/// A stream protected with TLS.
pub struct TlsStream<T> {
inner: native_tls::TlsStream<T>,
}
// ===== impl MaybeHttpsStream =====
impl<T: fmt::Debug> fmt::Debug for MaybeHttpsStream<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
MaybeHttpsStream::Http(ref s) => {
f.debug_tuple("Http")
.field(s)
.finish()
},
MaybeHttpsStream::Https(ref s) => {
f.debug_tuple("Https")
.field(s)
.finish()
},
match self {
MaybeHttpsStream::Http(s) => f.debug_tuple("Http").field(s).finish(),
MaybeHttpsStream::Https(s) => f.debug_tuple("Https").field(s).finish(),
}
}
}
impl<T> From<native_tls::TlsStream<T>> for MaybeHttpsStream<T> {
fn from(inner: native_tls::TlsStream<T>) -> Self {
MaybeHttpsStream::Https(TlsStream::from(inner))
}
}
impl<T> From<T> for MaybeHttpsStream<T> {
fn from(inner: T) -> Self {
MaybeHttpsStream::Http(inner)
@@ -56,129 +37,54 @@ impl<T> From<TlsStream<T>> for MaybeHttpsStream<T> {
}
}
impl<T: Read + Write> Read for MaybeHttpsStream<T> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
MaybeHttpsStream::Http(ref mut s) => s.read(buf),
MaybeHttpsStream::Https(ref mut s) => s.read(buf),
}
}
}
impl<T: Read + Write> Write for MaybeHttpsStream<T> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match *self {
MaybeHttpsStream::Http(ref mut s) => s.write(buf),
MaybeHttpsStream::Https(ref mut s) => s.write(buf),
}
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
match *self {
MaybeHttpsStream::Http(ref mut s) => s.flush(),
MaybeHttpsStream::Https(ref mut s) => s.flush(),
}
}
}
impl<T: AsyncRead + AsyncWrite> AsyncRead for MaybeHttpsStream<T> {
impl<T: AsyncRead + AsyncWrite + Unpin> AsyncRead for MaybeHttpsStream<T> {
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match *self {
MaybeHttpsStream::Http(ref s) => s.prepare_uninitialized_buffer(buf),
MaybeHttpsStream::Https(ref s) => s.prepare_uninitialized_buffer(buf),
match self {
MaybeHttpsStream::Http(s) => s.prepare_uninitialized_buffer(buf),
MaybeHttpsStream::Https(s) => s.prepare_uninitialized_buffer(buf),
}
}
#[inline]
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
match *self {
MaybeHttpsStream::Http(ref mut s) => s.read_buf(buf),
MaybeHttpsStream::Https(ref mut s) => s.read_buf(buf),
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
match Pin::get_mut(self) {
MaybeHttpsStream::Http(s) => Pin::new(s).poll_read(cx, buf),
MaybeHttpsStream::Https(s) => Pin::new(s).poll_read(cx, buf),
}
}
}
impl<T: AsyncWrite + AsyncRead> AsyncWrite for MaybeHttpsStream<T> {
impl<T: AsyncWrite + AsyncRead + Unpin> AsyncWrite for MaybeHttpsStream<T> {
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
match *self {
MaybeHttpsStream::Http(ref mut s) => s.shutdown(),
MaybeHttpsStream::Https(ref mut s) => s.shutdown(),
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
match Pin::get_mut(self) {
MaybeHttpsStream::Http(s) => Pin::new(s).poll_write(cx, buf),
MaybeHttpsStream::Https(s) => Pin::new(s).poll_write(cx, buf),
}
}
#[inline]
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
match *self {
MaybeHttpsStream::Http(ref mut s) => s.write_buf(buf),
MaybeHttpsStream::Https(ref mut s) => s.write_buf(buf),
}
}
}
// ===== impl TlsStream =====
impl<T> TlsStream<T> {
pub(crate) fn new(inner: native_tls::TlsStream<T>) -> Self {
TlsStream {
inner,
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match Pin::get_mut(self) {
MaybeHttpsStream::Http(s) => Pin::new(s).poll_flush(cx),
MaybeHttpsStream::Https(s) => Pin::new(s).poll_flush(cx),
}
}
/// Get access to the internal `native_tls::TlsStream` stream which also
/// transitively allows access to `T`.
pub fn get_ref(&self) -> &native_tls::TlsStream<T> {
&self.inner
}
/// Get mutable access to the internal `native_tls::TlsStream` stream which
/// also transitively allows mutable access to `T`.
pub fn get_mut(&mut self) -> &mut native_tls::TlsStream<T> {
&mut self.inner
}
}
impl<T: fmt::Debug> fmt::Debug for TlsStream<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.inner, f)
}
}
impl<T> From<native_tls::TlsStream<T>> for TlsStream<T> {
fn from(stream: native_tls::TlsStream<T>) -> Self {
TlsStream { inner: stream }
}
}
impl<T: Read + Write> Read for TlsStream<T> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl<T: Read + Write> Write for TlsStream<T> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl<T: AsyncRead + AsyncWrite> AsyncRead for TlsStream<T> {}
impl<T: AsyncWrite + AsyncRead> AsyncWrite for TlsStream<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.inner.shutdown());
self.inner.get_mut().shutdown()
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match Pin::get_mut(self) {
MaybeHttpsStream::Http(s) => Pin::new(s).poll_shutdown(cx),
MaybeHttpsStream::Https(s) => Pin::new(s).poll_shutdown(cx),
}
}
}