gecko-dev/servo/components/net/websocket_loader.rs

660 lines
22 KiB
Rust
Raw Normal View History

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use cookie::Cookie;
use fetch::methods::{should_be_blocked_due_to_bad_port, should_be_blocked_due_to_nosniff};
use hosts::replace_host;
use http_loader::{HttpState, is_redirect_status, set_default_accept};
use http_loader::{set_default_accept_language, set_request_cookies};
use hyper::buffer::BufReader;
use hyper::header::{CacheControl, CacheDirective, Connection, ConnectionOption};
use hyper::header::{Headers, Host, SetCookie, Pragma, Protocol, ProtocolName, Upgrade};
use hyper::http::h1::{LINE_ENDING, parse_response};
use hyper::method::Method;
use hyper::net::HttpStream;
use hyper::status::StatusCode;
use hyper::version::HttpVersion;
use ipc_channel::ipc::{IpcReceiver, IpcSender};
use net_traits::{CookieSource, MessageData, NetworkError};
use net_traits::{WebSocketDomAction, WebSocketNetworkEvent};
use net_traits::request::{Destination, RequestInit, RequestMode};
use servo_url::ServoUrl;
use std::io::{self, Write};
use std::net::TcpStream;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use url::Position;
use websocket::Message;
use websocket::header::{Origin, WebSocketAccept, WebSocketKey, WebSocketProtocol, WebSocketVersion};
use websocket::message::OwnedMessage;
use websocket::receiver::{Reader as WsReader, Receiver as WsReceiver};
use websocket::sender::{Sender as WsSender, Writer as WsWriter};
use websocket::ws::dataframe::DataFrame;
pub fn init(
req_init: RequestInit,
resource_event_sender: IpcSender<WebSocketNetworkEvent>,
dom_action_receiver: IpcReceiver<WebSocketDomAction>,
http_state: Arc<HttpState>
) {
thread::Builder::new().name(format!("WebSocket connection to {}", req_init.url)).spawn(move || {
let channel = establish_a_websocket_connection(req_init, &http_state);
let (ws_sender, mut receiver) = match channel {
Ok((protocol_in_use, sender, receiver)) => {
let _ = resource_event_sender.send(WebSocketNetworkEvent::ConnectionEstablished { protocol_in_use });
(sender, receiver)
},
Err(e) => {
debug!("Failed to establish a WebSocket connection: {:?}", e);
let _ = resource_event_sender.send(WebSocketNetworkEvent::Fail);
return;
}
};
let initiated_close = Arc::new(AtomicBool::new(false));
let ws_sender = Arc::new(Mutex::new(ws_sender));
let initiated_close_incoming = initiated_close.clone();
let ws_sender_incoming = ws_sender.clone();
thread::spawn(move || {
for message in receiver.incoming_messages() {
let message = match message {
Ok(m) => m,
Err(e) => {
debug!("Error receiving incoming WebSocket message: {:?}", e);
let _ = resource_event_sender.send(WebSocketNetworkEvent::Fail);
break;
}
};
let message = match message {
OwnedMessage::Text(_) => {
MessageData::Text(String::from_utf8_lossy(&message.take_payload()).into_owned())
},
OwnedMessage::Binary(_) => MessageData::Binary(message.take_payload()),
OwnedMessage::Ping(_) => {
let pong = Message::pong(message.take_payload());
ws_sender_incoming.lock().unwrap().send_message(&pong).unwrap();
continue;
},
OwnedMessage::Pong(_) => continue,
OwnedMessage::Close(ref msg) => {
if !initiated_close_incoming.fetch_or(true, Ordering::SeqCst) {
ws_sender_incoming.lock().unwrap().send_message(&message).unwrap();
}
let (code, reason) = match *msg {
None => (None, "".into()),
Some(ref data) => (Some(data.status_code), data.reason.clone())
};
let _ = resource_event_sender.send(WebSocketNetworkEvent::Close(code, reason));
break;
},
};
let _ = resource_event_sender.send(WebSocketNetworkEvent::MessageReceived(message));
}
});
while let Ok(dom_action) = dom_action_receiver.recv() {
match dom_action {
WebSocketDomAction::SendMessage(MessageData::Text(data)) => {
ws_sender.lock().unwrap().send_message(&Message::text(data)).unwrap();
},
WebSocketDomAction::SendMessage(MessageData::Binary(data)) => {
ws_sender.lock().unwrap().send_message(&Message::binary(data)).unwrap();
},
WebSocketDomAction::Close(code, reason) => {
if !initiated_close.fetch_or(true, Ordering::SeqCst) {
let message = match code {
Some(code) => Message::close_because(code, reason.unwrap_or("".to_owned())),
None => Message::close()
};
ws_sender.lock().unwrap().send_message(&message).unwrap();
}
},
}
}
servo: Merge #14592 - Remove the util crate (from asajeffrey:util-goodbye); r=mbrubeck <!-- Please describe your changes on the following line: --> This PR removes the `util` crate. * Replaced the `spawn_named` and `clamp` functions by appropriate uses of `std::thread::Builder::spawn`, `std::cmp::min` and `std::cmp::max`. * Moved `opts`, `prefs` and `resource_files` into a new `config` crate. * Moved `remutex` and `geometry` into their own crates. --- <!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `__` with appropriate data: --> - [X] `./mach build -d` does not report any errors - [X] `./mach test-tidy` does not report any errors - [X] These changes do not require tests because they are refactorings <!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. --> Source-Repo: https://github.com/servo/servo Source-Revision: 4eb653817f87e5fb47de34356f558eb76ecbca9f --HG-- rename : servo/components/util/Cargo.toml => servo/components/config/Cargo.toml rename : servo/components/util/basedir.rs => servo/components/config/basedir.rs rename : servo/components/util/lib.rs => servo/components/config/lib.rs rename : servo/components/util/opts.rs => servo/components/config/opts.rs rename : servo/components/util/prefs.rs => servo/components/config/prefs.rs rename : servo/components/util/resource_files.rs => servo/components/config/resource_files.rs rename : servo/components/util/geometry.rs => servo/components/geometry/lib.rs rename : servo/components/util/remutex.rs => servo/components/remutex/lib.rs rename : servo/tests/unit/util/lib.rs => servo/tests/unit/servo_config/lib.rs rename : servo/tests/unit/util/opts.rs => servo/tests/unit/servo_config/opts.rs rename : servo/tests/unit/util/prefs.rs => servo/tests/unit/servo_config/prefs.rs rename : servo/tests/unit/util/remutex.rs => servo/tests/unit/servo_remutex/lib.rs
2016-12-15 00:48:42 +00:00
}).expect("Thread spawning failed");
}
type Stream = HttpStream;
// https://fetch.spec.whatwg.org/#concept-websocket-connection-obtain
fn obtain_a_websocket_connection(url: &ServoUrl) -> Result<Stream, NetworkError> {
// Step 1.
let host = url.host_str().unwrap();
// Step 2.
let port = url.port_or_known_default().unwrap();
// Step 3.
// We did not replace the scheme by "http" or "https" in step 1 of
// establish_a_websocket_connection.
let secure = match url.scheme() {
"ws" => false,
"wss" => true,
_ => panic!("URL's scheme should be ws or wss"),
};
if secure {
return Err(NetworkError::Internal("WSS is disabled for now.".into()));
}
// Steps 4-5.
let host = replace_host(host);
let tcp_stream = TcpStream::connect((&*host, port)).map_err(|e| {
NetworkError::Internal(format!("Could not connect to host: {}", e))
})?;
Ok(HttpStream(tcp_stream))
}
// https://fetch.spec.whatwg.org/#concept-websocket-establish
fn establish_a_websocket_connection(
req_init: RequestInit,
http_state: &HttpState
) -> Result<(Option<String>, WsWriter<HttpStream>, WsReader<HttpStream>), NetworkError>
{
let protocols = match req_init.mode {
RequestMode::WebSocket { protocols } => protocols.clone(),
_ => panic!("Received a RequestInit with a non-websocket mode in websocket_loader"),
};
// Steps 1 is not really applicable here, given we don't exactly go
// through the same infrastructure as the Fetch spec.
// Step 2, slimmed down because we don't go through the whole Fetch infra.
let mut headers = Headers::new();
// Step 3.
headers.set(Upgrade(vec![Protocol::new(ProtocolName::WebSocket, None)]));
// Step 4.
headers.set(Connection(vec![ConnectionOption::ConnectionHeader("upgrade".into())]));
// Step 5.
let key_value = WebSocketKey::new();
// Step 6.
headers.set(key_value);
// Step 7.
headers.set(WebSocketVersion::WebSocket13);
// Step 8.
if !protocols.is_empty() {
headers.set(WebSocketProtocol(protocols.clone()));
}
// Steps 9-10.
// TODO: handle permessage-deflate extension.
// Step 11 and network error check from step 12.
let response = fetch(req_init.url, req_init.origin.ascii_serialization(), headers, http_state)?;
// Step 12, the status code check.
if response.status != StatusCode::SwitchingProtocols {
return Err(NetworkError::Internal("Response's status should be 101.".into()));
}
// Step 13.
if !protocols.is_empty() {
if response.headers.get::<WebSocketProtocol>().map_or(true, |protocols| protocols.is_empty()) {
return Err(NetworkError::Internal(
"Response's Sec-WebSocket-Protocol header is missing, malformed or empty.".into()));
}
}
// Step 14.2.
let upgrade_header = response.headers.get::<Upgrade>().ok_or_else(|| {
NetworkError::Internal("Response should have an Upgrade header.".into())
})?;
if upgrade_header.len() != 1 {
return Err(NetworkError::Internal("Response's Upgrade header should have only one value.".into()));
}
if upgrade_header[0].name != ProtocolName::WebSocket {
return Err(NetworkError::Internal("Response's Upgrade header value should be \"websocket\".".into()));
}
// Step 14.3.
let connection_header = response.headers.get::<Connection>().ok_or_else(|| {
NetworkError::Internal("Response should have a Connection header.".into())
})?;
let connection_includes_upgrade = connection_header.iter().any(|option| {
match *option {
ConnectionOption::ConnectionHeader(ref option) => *option == "upgrade",
_ => false,
}
});
if !connection_includes_upgrade {
return Err(NetworkError::Internal("Response's Connection header value should include \"upgrade\".".into()));
}
// Step 14.4.
let accept_header = response.headers.get::<WebSocketAccept>().ok_or_else(|| {
NetworkError::Internal("Response should have a Sec-Websocket-Accept header.".into())
})?;
if *accept_header != WebSocketAccept::new(&key_value) {
return Err(NetworkError::Internal(
"Response's Sec-WebSocket-Accept header value did not match the sent key.".into()));
}
// Step 14.5.
// TODO: handle permessage-deflate extension.
// We don't support any extension, so we fail at the mere presence of
// a Sec-WebSocket-Extensions header.
if response.headers.get_raw("Sec-WebSocket-Extensions").is_some() {
return Err(NetworkError::Internal(
"Response's Sec-WebSocket-Extensions header value included unsupported extensions.".into()));
}
// Step 14.6.
let protocol_in_use = if let Some(response_protocols) = response.headers.get::<WebSocketProtocol>() {
for replied in &**response_protocols {
if !protocols.iter().any(|requested| requested.eq_ignore_ascii_case(replied)) {
return Err(NetworkError::Internal(
"Response's Sec-WebSocket-Protocols contain values that were not requested.".into()));
}
}
response_protocols.first().cloned()
} else {
None
};
let sender = WsSender::new(true);
let writer = WsWriter {
stream: response.writer,
sender
};
let receiver = WsReceiver::new(false);
let reader = WsReader {
stream: response.reader,
receiver,
};
Ok((protocol_in_use, writer, reader))
}
struct Response {
status: StatusCode,
headers: Headers,
reader: BufReader<Stream>,
writer: Stream,
}
// https://fetch.spec.whatwg.org/#concept-fetch
fn fetch(url: ServoUrl,
origin: String,
mut headers: Headers,
http_state: &HttpState)
-> Result<Response, NetworkError> {
// Step 1.
// TODO: handle request's window.
// Step 2.
// TODO: handle request's origin.
// Step 3.
set_default_accept(Destination::None, &mut headers);
// Step 4.
set_default_accept_language(&mut headers);
// Step 5.
// TODO: handle request's priority.
// Step 6.
// Not applicable: not a navigation request.
// Step 7.
// We know this is a subresource request.
{
// Step 7.1.
// Not applicable: client hints list is empty.
// Steps 7.2-3.
// TODO: handle fetch groups.
}
// Step 8.
main_fetch(url, origin, headers, http_state)
}
// https://fetch.spec.whatwg.org/#concept-main-fetch
fn main_fetch(url: ServoUrl,
origin: String,
mut headers: Headers,
http_state: &HttpState)
-> Result<Response, NetworkError> {
// Step 1.
let mut response = None;
// Step 2.
// Not applicable: requests local-URLs-only flag is unset.
// Step 3.
// TODO: handle content security policy violations.
// Step 4.
// TODO: handle upgrade to a potentially secure URL.
// Step 5.
if should_be_blocked_due_to_bad_port(&url) {
response = Some(Err(NetworkError::Internal("Request should be blocked due to bad port.".into())));
}
// TODO: handle blocking as mixed content.
// TODO: handle blocking by content security policy.
// Steps 6-8.
// TODO: handle request's referrer policy.
// Step 9.
// Not applicable: request's current URL's scheme is not "ftp".
// Step 10.
// TODO: handle known HSTS host domain.
// Step 11.
// Not applicable: request's synchronous flag is set.
// Step 12.
let mut response = response.unwrap_or_else(|| {
// We must run the first sequence of substeps, given request's mode
// is "websocket".
// Step 12.1.
// Not applicable: the response is never exposed to the Web so it
// doesn't need to be filtered at all.
// Step 12.2.
scheme_fetch(&url, origin, &mut headers, http_state)
});
// Step 13.
// Not applicable: recursive flag is unset.
// Step 14.
// Not applicable: the response is never exposed to the Web so it doesn't
// need to be filtered at all.
// Steps 15-16.
// Not applicable: no need to maintain an internal response.
// Step 17.
if response.is_ok() {
// TODO: handle blocking as mixed content.
// TODO: handle blocking by content security policy.
// Not applicable: blocking due to MIME type matters only for scripts.
if should_be_blocked_due_to_nosniff(Destination::None, &headers) {
response = Err(NetworkError::Internal("Request should be blocked due to nosniff.".into()));
}
}
// Step 18.
// Not applicable: we don't care about the body at all.
// Step 19.
// Not applicable: request's integrity metadata is the empty string.
// Step 20.
// TODO: wait for response's body here, maybe?
response
}
// https://fetch.spec.whatwg.org/#concept-scheme-fetch
fn scheme_fetch(url: &ServoUrl,
origin: String,
headers: &mut Headers,
http_state: &HttpState)
-> Result<Response, NetworkError> {
// In the case of a WebSocket request, HTTP fetch is always used.
http_fetch(url, origin, headers, http_state)
}
// https://fetch.spec.whatwg.org/#concept-http-fetch
fn http_fetch(url: &ServoUrl,
origin: String,
headers: &mut Headers,
http_state: &HttpState)
-> Result<Response, NetworkError> {
// Step 1.
// Not applicable: with step 3 being useless here, this one is too.
// Step 2.
// Not applicable: we don't need to maintain an internal response.
// Step 3.
// Not applicable: request's service-workers mode is "none".
// Step 4.
// There cannot be a response yet at this point.
let mut response = {
// Step 4.1.
// Not applicable: CORS-preflight flag is unset.
// Step 4.2.
// Not applicable: request's redirect mode is "error".
// Step 4.3.
let response = http_network_or_cache_fetch(url, origin, headers, http_state);
// Step 4.4.
// Not applicable: CORS flag is unset.
response
};
// Step 5.
if response.as_ref().ok().map_or(false, |response| is_redirect_status(response.status)) {
// Step 5.1.
// Not applicable: the connection does not use HTTP/2.
// Steps 5.2-4.
// Not applicable: matters only if request's redirect mode is not "error".
// Step 5.5.
// Request's redirect mode is "error".
response = Err(NetworkError::Internal("Response should not be a redirection.".into()));
}
// Step 6.
response
}
// https://fetch.spec.whatwg.org/#concept-http-network-or-cache-fetch
fn http_network_or_cache_fetch(url: &ServoUrl,
origin: String,
headers: &mut Headers,
http_state: &HttpState)
-> Result<Response, NetworkError> {
// Steps 1-3.
// Not applicable: we don't even have a request yet, and there is no body
// in a WebSocket request.
// Step 4.
// Not applicable: credentials flag is always set
// because credentials mode is "include."
// Steps 5-9.
// Not applicable: there is no body in a WebSocket request.
// Step 10.
// TODO: handle header Referer.
// Step 11.
// Request's mode is "websocket".
headers.set(Origin(origin));
// Step 12.
// TODO: handle header User-Agent.
// Steps 13-14.
// Not applicable: request's cache mode is "no-store".
// Step 15.
{
// Step 15.1.
// We know there is no Pragma header yet.
headers.set(Pragma::NoCache);
// Step 15.2.
// We know there is no Cache-Control header yet.
headers.set(CacheControl(vec![CacheDirective::NoCache]));
}
// Step 16.
// TODO: handle Accept-Encoding.
// Not applicable: Connection header is already present.
// TODO: handle DNT.
headers.set(Host {
hostname: url.host_str().unwrap().to_owned(),
port: url.port(),
});
// Step 17.
// Credentials flag is set.
{
// Step 17.1.
// TODO: handle user agent configured to block cookies.
set_request_cookies(&url, headers, &http_state.cookie_jar);
// Steps 17.2-6.
// Not applicable: request has no Authorization header.
}
// Step 18.
// TODO: proxy-authentication entry.
// Step 19.
// Not applicable: with step 21 being useless, this one is too.
// Step 20.
// Not applicable: revalidatingFlag is only useful if step 21 is.
// Step 21.
// Not applicable: cache mode is "no-store".
// Step 22.
// There is no response yet.
let response = {
// Step 22.1.
// Not applicable: cache mode is "no-store".
// Step 22.2.
let forward_response = http_network_fetch(url, headers, http_state);
// Step 22.3.
// Not applicable: request's method is not unsafe.
// Step 22.4.
// Not applicable: revalidatingFlag is unset.
// Step 22.5.
// There is no response yet and the response should not be cached.
forward_response
};
// Step 23.
// TODO: handle 401 status when request's window is not "no-window".
// Step 24.
// TODO: handle 407 status when request's window is not "no-window".
// Step 25.
// Not applicable: authentication-fetch flag is unset.
// Step 26.
response
}
// https://fetch.spec.whatwg.org/#concept-http-network-fetch
fn http_network_fetch(url: &ServoUrl,
headers: &Headers,
http_state: &HttpState)
-> Result<Response, NetworkError> {
// Step 1.
// Not applicable: credentials flag is set.
// Steps 2-3.
// Request's mode is "websocket".
let connection = obtain_a_websocket_connection(url)?;
// Step 4.
// Not applicable: requests body is null.
// Step 5.
let response = make_request(connection, url, headers)?;
// Steps 6-12.
// Not applicable: correct WebSocket responses don't have a body.
// Step 13.
// TODO: handle response's CSP list.
// Step 14.
// Not applicable: request's cache mode is "no-store".
// Step 15.
if let Some(cookies) = response.headers.get::<SetCookie>() {
let mut jar = http_state.cookie_jar.write().unwrap();
for cookie in &**cookies {
if let Some(cookie) = Cookie::from_cookie_string(cookie.clone(), url, CookieSource::HTTP) {
jar.push(cookie, url, CookieSource::HTTP);
}
}
}
// Step 16.
// Not applicable: correct WebSocket responses don't have a body.
// Step 17.
Ok(response)
}
fn make_request(mut stream: Stream,
url: &ServoUrl,
headers: &Headers)
-> Result<Response, NetworkError> {
write_request(&mut stream, url, headers).map_err(|e| {
NetworkError::Internal(format!("Request could not be sent: {}", e))
})?;
// FIXME: Stream isn't supposed to be cloned.
let writer = stream.clone();
// FIXME: BufReader from hyper isn't supposed to be used.
let mut reader = BufReader::new(stream);
let head = parse_response(&mut reader).map_err(|e| {
NetworkError::Internal(format!("Response could not be read: {}", e))
})?;
// This isn't in the spec, but this is the correct thing to do for WebSocket requests.
if head.version != HttpVersion::Http11 {
return Err(NetworkError::Internal("Response's HTTP version should be HTTP/1.1.".into()));
}
// FIXME: StatusCode::from_u16 isn't supposed to be used.
let status = StatusCode::from_u16(head.subject.0);
Ok(Response {
status: status,
headers: head.headers,
reader: reader,
writer: writer,
})
}
fn write_request(stream: &mut Stream,
url: &ServoUrl,
headers: &Headers)
-> io::Result<()> {
// Write "GET /foo/bar HTTP/1.1\r\n".
let method = Method::Get;
let request_uri = &url.as_url()[Position::BeforePath..Position::AfterQuery];
let version = HttpVersion::Http11;
write!(stream, "{} {} {}{}", method, request_uri, version, LINE_ENDING)?;
// Write the headers.
write!(stream, "{}{}", headers, LINE_ENDING)
}