servo: Merge #20391 - Measure cache memory usage (from modal17:iss19251); r=jdm

<!-- Please describe your changes on the following line: -->
- [X] make the memory cache data structure derive MallocSizeOf
- [X] add a new IpcReceiver argument to ResourceChannelManager::start that is used for listening to messages from the memory profiler (it must correspond with this sender)
- [X] use run_with_memory_reporting in the network code to register a memory reporter for that thread
- [X] when a message from the memory profiler arrives, create a report that includes that size of the public and private http caches

---
<!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `__` with appropriate data: -->
- [X] `./mach build -d` does not report any errors
- [X] `./mach test-tidy` does not report any errors
- [x] These changes fix #19251 (github issue number if applicable).

<!-- Either: -->
- [ ] There are tests for these changes OR
- [ ] These changes do not require tests because _____

<!-- Also, please make sure that "Allow edits from maintainers" checkbox is checked, so that we can help you if you get stuck somewhere along the way.-->

<!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. -->

Source-Repo: https://github.com/servo/servo
Source-Revision: 8f226f841bd5e1ab412ff2cd8417919b222f7555

--HG--
extra : subtree_source : https%3A//hg.mozilla.org/projects/converted-servo-linear
extra : subtree_revision : 024331f1072f2e02ebfd25a3ce8e95ec39cbf85f
This commit is contained in:
modal-d17 2018-04-02 02:01:50 -04:00
parent de485cb424
commit e41a335935
13 changed files with 312 additions and 76 deletions

9
servo/Cargo.lock generated
View File

@ -1646,13 +1646,17 @@ dependencies = [
"cssparser 0.23.2 (registry+https://github.com/rust-lang/crates.io-index)",
"euclid 0.17.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hashglobe 0.1.0",
"hyper 0.10.13 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper_serde 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"mozjs 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"selectors 0.19.0",
"serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_bytes 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"servo_arc 0.1.1",
"smallbitvec 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"string_cache 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)",
"url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"webrender_api 0.57.0 (git+https://github.com/servo/webrender)",
@ -1888,6 +1892,8 @@ dependencies = [
"ipc-channel 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"malloc_size_of 0.0.1",
"malloc_size_of_derive 0.0.1",
"matches 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"mime 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"mime_guess 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1898,6 +1904,8 @@ dependencies = [
"serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"servo-websocket 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"servo_allocator 0.0.1",
"servo_arc 0.1.1",
"servo_config 0.0.1",
"servo_url 0.0.1",
"threadpool 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1937,6 +1945,7 @@ dependencies = [
"msg 0.0.1",
"num-traits 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
"servo_arc 0.1.1",
"servo_config 0.0.1",
"servo_url 0.0.1",
"url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -23,13 +23,17 @@ app_units = "0.6"
cssparser = "0.23.0"
euclid = "0.17"
hashglobe = { path = "../hashglobe" }
hyper = "0.10"
hyper_serde = "0.8"
mozjs = { version = "0.6", features = ["promises"], optional = true }
selectors = { path = "../selectors" }
serde = "1.0.27"
serde_bytes = { version = "0.10", optional = true }
servo_arc = { path = "../servo_arc" }
smallbitvec = "1.0.3"
smallvec = "0.6"
string_cache = { version = "0.7", optional = true }
time = "0.1.17"
url = { version = "1.2", optional = true }
webrender_api = { git = "https://github.com/servo/webrender", features = ["ipc"], optional = true }
xml5ever = { version = "0.12", optional = true }

View File

@ -47,9 +47,12 @@ extern crate app_units;
extern crate cssparser;
extern crate euclid;
extern crate hashglobe;
extern crate hyper;
extern crate hyper_serde;
#[cfg(feature = "servo")]
extern crate mozjs as js;
extern crate selectors;
extern crate serde;
#[cfg(feature = "servo")]
extern crate serde_bytes;
extern crate servo_arc;
@ -57,6 +60,7 @@ extern crate smallbitvec;
extern crate smallvec;
#[cfg(feature = "servo")]
extern crate string_cache;
extern crate time;
#[cfg(feature = "url")]
extern crate url;
extern crate void;
@ -69,6 +73,7 @@ extern crate xml5ever;
use serde_bytes::ByteBuf;
use std::hash::{BuildHasher, Hash};
use std::mem::size_of;
use std::ops::{Deref, DerefMut};
use std::ops::Range;
use std::os::raw::c_void;
use void::Void;
@ -591,6 +596,18 @@ impl<T: MallocSizeOf> MallocConditionalSizeOf for servo_arc::Arc<T> {
}
}
/// If a mutex is stored directly as a member of a data type that is being measured,
/// it is the unique owner of its contents and deserves to be measured.
///
/// If a mutex is stored inside of an Arc value as a member of a data type that is being measured,
/// the Arc will not be automatically measured so there is no risk of overcounting the mutex's
/// contents.
impl<T: MallocSizeOf> MallocSizeOf for std::sync::Mutex<T> {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
(*self.lock().unwrap()).size_of(ops)
}
}
impl MallocSizeOf for smallbitvec::SmallBitVec {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
if let Some(ptr) = self.heap_ptr() {
@ -802,3 +819,90 @@ impl MallocSizeOf for xml5ever::QualName {
self.local.size_of(ops)
}
}
impl MallocSizeOf for hyper::header::Headers {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
self.iter().fold(0, |acc, x| {
let name = x.name();
let raw = self.get_raw(name);
acc + raw.size_of(ops)
})
}
}
impl MallocSizeOf for hyper::header::ContentType {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
self.0.size_of(ops)
}
}
impl MallocSizeOf for hyper::mime::Mime {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
self.0.size_of(ops) +
self.1.size_of(ops) +
self.2.size_of(ops)
}
}
impl MallocSizeOf for hyper::mime::Attr {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
match *self {
hyper::mime::Attr::Ext(ref s) => s.size_of(ops),
_ => 0,
}
}
}
impl MallocSizeOf for hyper::mime::Value {
fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
self.len() // Length of string value in bytes (not the char length of a string)!
}
}
malloc_size_of_is_0!(time::Duration);
malloc_size_of_is_0!(time::Tm);
impl<T> MallocSizeOf for hyper_serde::Serde<T> where
for <'de> hyper_serde::De<T>: serde::Deserialize<'de>,
for <'a> hyper_serde::Ser<'a, T>: serde::Serialize,
T: MallocSizeOf {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
self.0.size_of(ops)
}
}
// Placeholder for unique case where internals of Sender cannot be measured.
// malloc size of is 0 macro complains about type supplied!
impl<T> MallocSizeOf for std::sync::mpsc::Sender<T> {
fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
0
}
}
impl MallocSizeOf for hyper::status::StatusCode {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
match *self {
hyper::status::StatusCode::Unregistered(u) => u.size_of(ops),
_ => 0,
}
}
}
/// Measurable that defers to inner value and used to verify MallocSizeOf implementation in a
/// struct.
#[derive(Clone)]
pub struct Measurable<T: MallocSizeOf> (pub T);
impl<T: MallocSizeOf> Deref for Measurable<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
impl<T: MallocSizeOf> DerefMut for Measurable<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
}
}

View File

@ -24,6 +24,8 @@ immeta = "0.3.6"
ipc-channel = "0.10"
lazy_static = "1"
log = "0.4"
malloc_size_of = { path = "../malloc_size_of" }
malloc_size_of_derive = { path = "../malloc_size_of_derive" }
matches = "0.1"
mime = "0.2.1"
mime_guess = "1.8.0"
@ -33,6 +35,8 @@ openssl = "0.9"
profile_traits = {path = "../profile_traits"}
serde = "1.0"
serde_json = "1.0"
servo_allocator = {path = "../allocator"}
servo_arc = {path = "../servo_arc"}
servo_config = {path = "../config"}
servo_url = {path = "../url"}
servo-websocket = { version = "0.21", default-features = false, features = ["sync"] }

View File

@ -14,14 +14,17 @@ use hyper::header::Headers;
use hyper::method::Method;
use hyper::status::StatusCode;
use hyper_serde::Serde;
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps, MallocUnconditionalSizeOf, MallocUnconditionalShallowSizeOf};
use malloc_size_of::Measurable;
use net_traits::{Metadata, FetchMetadata};
use net_traits::request::Request;
use net_traits::response::{HttpsState, Response, ResponseBody};
use servo_arc::Arc;
use servo_config::prefs::PREFS;
use servo_url::ServoUrl;
use std::collections::HashMap;
use std::str;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Sender};
use time;
@ -29,7 +32,7 @@ use time::{Duration, Tm};
/// The key used to differentiate requests in the cache.
#[derive(Clone, Eq, Hash, PartialEq)]
#[derive(Clone, Eq, Hash, MallocSizeOf, PartialEq )]
pub struct CacheKey {
url: ServoUrl
}
@ -56,9 +59,16 @@ impl CacheKey {
/// A complete cached resource.
#[derive(Clone)]
struct CachedResource {
metadata: CachedMetadata,
request_headers: Arc<Mutex<Headers>>,
body: Arc<Mutex<ResponseBody>>,
aborted: Arc<AtomicBool>,
awaiting_body: Arc<Mutex<Vec<Sender<Data>>>>,
data: Measurable<MeasurableCachedResource>
}
#[derive(Clone, MallocSizeOf)]
struct MeasurableCachedResource {
metadata: CachedMetadata,
location_url: Option<Result<ServoUrl, String>>,
https_state: HttpsState,
status: Option<StatusCode>,
@ -66,25 +76,47 @@ struct CachedResource {
url_list: Vec<ServoUrl>,
expires: Duration,
last_validated: Tm,
aborted: Arc<AtomicBool>,
awaiting_body: Arc<Mutex<Vec<Sender<Data>>>>
}
impl MallocSizeOf for CachedResource {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
self.request_headers.unconditional_size_of(ops) +
self.body.unconditional_size_of(ops) +
self.aborted.unconditional_size_of(ops) +
self.awaiting_body.unconditional_size_of(ops) +
self.data.size_of(ops)
}
}
/// Metadata about a loaded resource, such as is obtained from HTTP headers.
#[derive(Clone)]
struct CachedMetadata {
/// Headers
pub headers: Arc<Mutex<Headers>>,
/// Fields that implement MallocSizeOf
pub data: Measurable<MeasurableCachedMetadata>
}
#[derive(Clone, MallocSizeOf)]
struct MeasurableCachedMetadata {
/// Final URL after redirects.
pub final_url: ServoUrl,
/// MIME type / subtype.
pub content_type: Option<Serde<ContentType>>,
/// Character set.
pub charset: Option<String>,
/// Headers
pub headers: Arc<Mutex<Headers>>,
/// HTTP Status
pub status: Option<(u16, Vec<u8>)>
}
impl MallocSizeOf for CachedMetadata {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
self.headers.unconditional_shallow_size_of(ops) +
self.headers.size_of(ops) +
self.data.size_of(ops)
}
}
/// Wrapper around a cached response, including information on re-validation needs
pub struct CachedResponse {
/// The response constructed from the cached resource
@ -94,6 +126,7 @@ pub struct CachedResponse {
}
/// A memory cache.
#[derive(MallocSizeOf)]
pub struct HttpCache {
/// cached responses.
entries: HashMap<CacheKey, Vec<CachedResource>>,
@ -278,7 +311,7 @@ fn create_cached_response(request: &Request,
cached_headers: &Headers,
done_chan: &mut DoneChannel)
-> CachedResponse {
let mut response = Response::new(cached_resource.metadata.final_url.clone());
let mut response = Response::new(cached_resource.data.metadata.data.final_url.clone());
response.headers = cached_headers.clone();
response.body = cached_resource.body.clone();
if let ResponseBody::Receiving(_) = *cached_resource.body.lock().unwrap() {
@ -286,18 +319,18 @@ fn create_cached_response(request: &Request,
*done_chan = Some((done_sender.clone(), done_receiver));
cached_resource.awaiting_body.lock().unwrap().push(done_sender);
}
response.location_url = cached_resource.location_url.clone();
response.status = cached_resource.status.clone();
response.raw_status = cached_resource.raw_status.clone();
response.url_list = cached_resource.url_list.clone();
response.https_state = cached_resource.https_state.clone();
response.location_url = cached_resource.data.location_url.clone();
response.status = cached_resource.data.status.clone();
response.raw_status = cached_resource.data.raw_status.clone();
response.url_list = cached_resource.data.url_list.clone();
response.https_state = cached_resource.data.https_state.clone();
response.referrer = request.referrer.to_url().cloned();
response.referrer_policy = request.referrer_policy.clone();
response.aborted = cached_resource.aborted.clone();
let expires = cached_resource.expires;
let expires = cached_resource.data.expires;
let adjusted_expires = get_expiry_adjustment_from_request_headers(request, expires);
let now = Duration::seconds(time::now().to_timespec().sec);
let last_validated = Duration::seconds(cached_resource.last_validated.to_timespec().sec);
let last_validated = Duration::seconds(cached_resource.data.last_validated.to_timespec().sec);
let time_since_validated = now - last_validated;
// TODO: take must-revalidate into account <https://tools.ietf.org/html/rfc7234#section-5.2.2.1>
// TODO: if this cache is to be considered shared, take proxy-revalidate into account
@ -312,18 +345,20 @@ fn create_cached_response(request: &Request,
fn create_resource_with_bytes_from_resource(bytes: &[u8], resource: &CachedResource)
-> CachedResource {
CachedResource {
metadata: resource.metadata.clone(),
request_headers: resource.request_headers.clone(),
body: Arc::new(Mutex::new(ResponseBody::Done(bytes.to_owned()))),
location_url: resource.location_url.clone(),
https_state: resource.https_state.clone(),
status: Some(StatusCode::PartialContent),
raw_status: Some((206, b"Partial Content".to_vec())),
url_list: resource.url_list.clone(),
expires: resource.expires.clone(),
last_validated: resource.last_validated.clone(),
aborted: Arc::new(AtomicBool::new(false)),
awaiting_body: Arc::new(Mutex::new(vec![]))
awaiting_body: Arc::new(Mutex::new(vec![])),
data: Measurable(MeasurableCachedResource {
metadata: resource.data.metadata.clone(),
location_url: resource.data.location_url.clone(),
https_state: resource.data.https_state.clone(),
status: Some(StatusCode::PartialContent),
raw_status: Some((206, b"Partial Content".to_vec())),
url_list: resource.data.url_list.clone(),
expires: resource.data.expires.clone(),
last_validated: resource.data.last_validated.clone(),
})
}
}
@ -334,13 +369,13 @@ fn handle_range_request(request: &Request,
done_chan: &mut DoneChannel)
-> Option<CachedResponse> {
let mut complete_cached_resources = candidates.iter().filter(|resource| {
match resource.raw_status {
match resource.data.raw_status {
Some((ref code, _)) => *code == 200,
None => false
}
});
let partial_cached_resources = candidates.iter().filter(|resource| {
match resource.raw_status {
match resource.data.raw_status {
Some((ref code, _)) => *code == 206,
None => false
}
@ -361,7 +396,7 @@ fn handle_range_request(request: &Request,
let requested = body.get(b..e);
if let Some(bytes) = requested {
let new_resource = create_resource_with_bytes_from_resource(bytes, complete_resource);
let cached_headers = new_resource.metadata.headers.lock().unwrap();
let cached_headers = new_resource.data.metadata.headers.lock().unwrap();
let cached_response = create_cached_response(request, &new_resource, &*cached_headers, done_chan);
return Some(cached_response);
}
@ -369,7 +404,7 @@ fn handle_range_request(request: &Request,
},
(&header::ByteRangeSpec::FromTo(beginning, end), None) => {
for partial_resource in partial_cached_resources {
let headers = partial_resource.metadata.headers.lock().unwrap();
let headers = partial_resource.data.metadata.headers.lock().unwrap();
let content_range = headers.get::<header::ContentRange>();
let (res_beginning, res_end) = match content_range {
Some(&header::ContentRange(
@ -401,7 +436,7 @@ fn handle_range_request(request: &Request,
let requested = body.get(b..);
if let Some(bytes) = requested {
let new_resource = create_resource_with_bytes_from_resource(bytes, complete_resource);
let cached_headers = new_resource.metadata.headers.lock().unwrap();
let cached_headers = new_resource.data.metadata.headers.lock().unwrap();
let cached_response = create_cached_response(request, &new_resource, &*cached_headers, done_chan);
return Some(cached_response);
}
@ -409,7 +444,7 @@ fn handle_range_request(request: &Request,
},
(&header::ByteRangeSpec::AllFrom(beginning), None) => {
for partial_resource in partial_cached_resources {
let headers = partial_resource.metadata.headers.lock().unwrap();
let headers = partial_resource.data.metadata.headers.lock().unwrap();
let content_range = headers.get::<header::ContentRange>();
let (res_beginning, res_end, total) = match content_range {
Some(&header::ContentRange(
@ -441,7 +476,7 @@ fn handle_range_request(request: &Request,
let requested = body.get(from_byte..);
if let Some(bytes) = requested {
let new_resource = create_resource_with_bytes_from_resource(bytes, complete_resource);
let cached_headers = new_resource.metadata.headers.lock().unwrap();
let cached_headers = new_resource.data.metadata.headers.lock().unwrap();
let cached_response = create_cached_response(request, &new_resource, &*cached_headers, done_chan);
return Some(cached_response);
}
@ -449,7 +484,7 @@ fn handle_range_request(request: &Request,
},
(&header::ByteRangeSpec::Last(offset), None) => {
for partial_resource in partial_cached_resources {
let headers = partial_resource.metadata.headers.lock().unwrap();
let headers = partial_resource.data.metadata.headers.lock().unwrap();
let content_range = headers.get::<header::ContentRange>();
let (res_beginning, res_end, total) = match content_range {
Some(&header::ContentRange(
@ -501,7 +536,7 @@ impl HttpCache {
let mut candidates = vec![];
for cached_resource in resources {
let mut can_be_constructed = true;
let cached_headers = cached_resource.metadata.headers.lock().unwrap();
let cached_headers = cached_resource.data.metadata.headers.lock().unwrap();
let original_request_headers = cached_resource.request_headers.lock().unwrap();
if let Some(vary_data) = cached_headers.get_raw("Vary") {
// Calculating Secondary Keys with Vary <https://tools.ietf.org/html/rfc7234#section-4.1>
@ -554,7 +589,7 @@ impl HttpCache {
// Returning the first response that can be constructed
// TODO: select the most appropriate one, using a known mechanism from a selecting header field,
// or using the Date header to return the most recent one.
let cached_headers = cached_resource.metadata.headers.lock().unwrap();
let cached_headers = cached_resource.data.metadata.headers.lock().unwrap();
let cached_response = create_cached_response(request, cached_resource, &*cached_headers, done_chan);
return Some(cached_response);
}
@ -589,24 +624,24 @@ impl HttpCache {
let entry_key = CacheKey::new(request.clone());
if let Some(cached_resources) = self.entries.get_mut(&entry_key) {
for cached_resource in cached_resources.iter_mut() {
let mut stored_headers = cached_resource.metadata.headers.lock().unwrap();
// Received a response with 304 status code, in response to a request that matches a cached resource.
// 1. update the headers of the cached resource.
// 2. return a response, constructed from the cached resource.
stored_headers.extend(response.headers.iter());
let mut constructed_response = Response::new(cached_resource.metadata.final_url.clone());
constructed_response.headers = stored_headers.clone();
let mut constructed_response = Response::new(cached_resource.data.metadata.data.final_url.clone());
constructed_response.body = cached_resource.body.clone();
constructed_response.status = cached_resource.status.clone();
constructed_response.https_state = cached_resource.https_state.clone();
constructed_response.status = cached_resource.data.status.clone();
constructed_response.https_state = cached_resource.data.https_state.clone();
constructed_response.referrer = request.referrer.to_url().cloned();
constructed_response.referrer_policy = request.referrer_policy.clone();
constructed_response.raw_status = cached_resource.raw_status.clone();
constructed_response.url_list = cached_resource.url_list.clone();
constructed_response.raw_status = cached_resource.data.raw_status.clone();
constructed_response.url_list = cached_resource.data.url_list.clone();
// done_chan will have been set to Some by http_network_fetch,
// set it back to None since the response returned here replaces the 304 one from the network.
*done_chan = None;
cached_resource.expires = get_response_expiry(&constructed_response);
cached_resource.data.expires = get_response_expiry(&constructed_response);
let mut stored_headers = cached_resource.data.metadata.headers.lock().unwrap();
stored_headers.extend(response.headers.iter());
constructed_response.headers = stored_headers.clone();
return Some(constructed_response);
}
}
@ -617,7 +652,7 @@ impl HttpCache {
let entry_key = CacheKey::from_servo_url(url);
if let Some(cached_resources) = self.entries.get_mut(&entry_key) {
for cached_resource in cached_resources.iter_mut() {
cached_resource.expires = Duration::seconds(0i64);
cached_resource.data.expires = Duration::seconds(0i64);
}
}
}
@ -664,25 +699,29 @@ impl HttpCache {
}
let expiry = get_response_expiry(&response);
let cacheable_metadata = CachedMetadata {
final_url: metadata.final_url,
content_type: metadata.content_type,
charset: metadata.charset,
status: metadata.status,
headers: Arc::new(Mutex::new(response.headers.clone()))
headers: Arc::new(Mutex::new(response.headers.clone())),
data: Measurable(MeasurableCachedMetadata {
final_url: metadata.final_url,
content_type: metadata.content_type,
charset: metadata.charset,
status: metadata.status
})
};
let entry_resource = CachedResource {
metadata: cacheable_metadata,
request_headers: Arc::new(Mutex::new(request.headers.clone())),
body: response.body.clone(),
location_url: response.location_url.clone(),
https_state: response.https_state.clone(),
status: response.status.clone(),
raw_status: response.raw_status.clone(),
url_list: response.url_list.clone(),
expires: expiry,
last_validated: time::now(),
aborted: response.aborted.clone(),
awaiting_body: Arc::new(Mutex::new(vec![]))
awaiting_body: Arc::new(Mutex::new(vec![])),
data: Measurable(MeasurableCachedResource {
metadata: cacheable_metadata,
location_url: response.location_url.clone(),
https_state: response.https_state.clone(),
status: response.status.clone(),
raw_status: response.raw_status.clone(),
url_list: response.url_list.clone(),
expires: expiry,
last_validated: time::now()
})
};
let entry = self.entries.entry(entry_key).or_insert(vec![]);
entry.push(entry_resource);

View File

@ -17,6 +17,8 @@ extern crate ipc_channel;
#[macro_use]
extern crate lazy_static;
#[macro_use] extern crate log;
extern crate malloc_size_of;
#[macro_use] extern crate malloc_size_of_derive;
#[macro_use] #[no_link] extern crate matches;
#[macro_use]
extern crate mime;
@ -24,9 +26,12 @@ extern crate mime_guess;
extern crate msg;
extern crate net_traits;
extern crate openssl;
#[macro_use]
extern crate profile_traits;
#[macro_use] extern crate serde;
extern crate serde_json;
extern crate servo_allocator;
extern crate servo_arc;
extern crate servo_config;
extern crate servo_url;
extern crate time;

View File

@ -16,6 +16,7 @@ use http_cache::HttpCache;
use http_loader::{HttpState, http_redirect_fetch};
use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcReceiver, IpcReceiverSet, IpcSender};
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use net_traits::{CookieSource, CoreResourceThread};
use net_traits::{CoreResourceMsg, CustomResponseMediator, FetchChannels};
use net_traits::{FetchResponseMsg, ResourceThreads, WebSocketDomAction};
@ -23,9 +24,12 @@ use net_traits::WebSocketNetworkEvent;
use net_traits::request::{Request, RequestInit};
use net_traits::response::{Response, ResponseInit};
use net_traits::storage_thread::StorageThreadMsg;
use profile_traits::mem::{Report, ReportsChan, ReportKind};
use profile_traits::mem::ProfilerChan as MemProfilerChan;
use profile_traits::time::ProfilerChan;
use serde::{Deserialize, Serialize};
use serde_json;
use servo_allocator;
use servo_config::opts;
use servo_config::resource_files::resources_dir_path;
use servo_url::ServoUrl;
@ -47,13 +51,15 @@ const TFD_PROVIDER: &'static TFDProvider = &TFDProvider;
/// Returns a tuple of (public, private) senders to the new threads.
pub fn new_resource_threads(user_agent: Cow<'static, str>,
devtools_chan: Option<Sender<DevtoolsControlMsg>>,
profiler_chan: ProfilerChan,
time_profiler_chan: ProfilerChan,
mem_profiler_chan: MemProfilerChan,
config_dir: Option<PathBuf>)
-> (ResourceThreads, ResourceThreads) {
let (public_core, private_core) = new_core_resource_thread(
user_agent,
devtools_chan,
profiler_chan,
time_profiler_chan,
mem_profiler_chan,
config_dir.clone());
let storage: IpcSender<StorageThreadMsg> = StorageThreadFactory::new(config_dir);
(ResourceThreads::new(public_core, storage.clone()),
@ -64,22 +70,34 @@ pub fn new_resource_threads(user_agent: Cow<'static, str>,
/// Create a CoreResourceThread
pub fn new_core_resource_thread(user_agent: Cow<'static, str>,
devtools_chan: Option<Sender<DevtoolsControlMsg>>,
profiler_chan: ProfilerChan,
time_profiler_chan: ProfilerChan,
mem_profiler_chan: MemProfilerChan,
config_dir: Option<PathBuf>)
-> (CoreResourceThread, CoreResourceThread) {
let (public_setup_chan, public_setup_port) = ipc::channel().unwrap();
let (private_setup_chan, private_setup_port) = ipc::channel().unwrap();
let (report_chan, report_port) = ipc::channel().unwrap();
thread::Builder::new().name("ResourceManager".to_owned()).spawn(move || {
let resource_manager = CoreResourceManager::new(
user_agent, devtools_chan, profiler_chan
user_agent, devtools_chan, time_profiler_chan
);
let mut channel_manager = ResourceChannelManager {
resource_manager: resource_manager,
config_dir: config_dir,
};
channel_manager.start(public_setup_port,
private_setup_port);
mem_profiler_chan.run_with_memory_reporting(|| (
channel_manager.start(
public_setup_port,
private_setup_port,
report_port)
),
String::from("network-cache-reporter"),
report_chan,
|report_chan| report_chan);
}).expect("Thread spawning failed");
(public_setup_chan, private_setup_chan)
}
@ -127,31 +145,69 @@ impl ResourceChannelManager {
#[allow(unsafe_code)]
fn start(&mut self,
public_receiver: IpcReceiver<CoreResourceMsg>,
private_receiver: IpcReceiver<CoreResourceMsg>) {
private_receiver: IpcReceiver<CoreResourceMsg>,
memory_reporter: IpcReceiver<ReportsChan>) {
let (public_http_state, private_http_state) =
create_http_states(self.config_dir.as_ref().map(Deref::deref));
let mut rx_set = IpcReceiverSet::new().unwrap();
let private_id = rx_set.add(private_receiver).unwrap();
let public_id = rx_set.add(public_receiver).unwrap();
let reporter_id = rx_set.add(memory_reporter).unwrap();
loop {
for (id, data) in rx_set.select().unwrap().into_iter().map(|m| m.unwrap()) {
let group = if id == private_id {
&private_http_state
for receiver in rx_set.select().unwrap().into_iter() {
// Handles case where profiler thread shuts down before resource thread.
match receiver {
ipc::IpcSelectionResult::ChannelClosed(..) => continue,
_ => {}
}
let (id, data) = receiver.unwrap();
// If message is memory report, get the size_of of public and private http caches
if id == reporter_id {
if let Ok(msg) = data.to() {
self.process_report(msg, &private_http_state, &public_http_state);
continue;
}
} else {
assert_eq!(id, public_id);
&public_http_state
};
if let Ok(msg) = data.to() {
if !self.process_msg(msg, group) {
return;
let group = if id == private_id {
&private_http_state
} else {
assert_eq!(id, public_id);
&public_http_state
};
if let Ok(msg) = data.to() {
if !self.process_msg(msg, group) {
return;
}
}
}
}
}
}
fn process_report(&mut self,
msg: ReportsChan,
public_http_state: &Arc<HttpState>,
private_http_state: &Arc<HttpState>) {
let mut ops = MallocSizeOfOps::new(servo_allocator::usable_size, None, None);
let public_cache = public_http_state.http_cache.read().unwrap();
let private_cache = private_http_state.http_cache.read().unwrap();
let public_report = Report {
path: path!["memory-cache", "public"],
kind: ReportKind::ExplicitJemallocHeapSize,
size: public_cache.size_of(&mut ops)
};
let private_report = Report {
path: path!["memory-cache", "private"],
kind: ReportKind::ExplicitJemallocHeapSize,
size: private_cache.size_of(&mut ops)
};
msg.send(vec!(public_report, private_report));
}
/// Returns false if the thread should exit.
fn process_msg(&mut self,

View File

@ -6,6 +6,7 @@ use ipc_channel::ipc;
use net::resource_thread::new_core_resource_thread;
use net::test::parse_hostsfile;
use net_traits::CoreResourceMsg;
use profile_traits::mem::ProfilerChan as MemProfilerChan;
use profile_traits::time::ProfilerChan;
use std::net::IpAddr;
@ -16,9 +17,10 @@ fn ip(s: &str) -> IpAddr {
#[test]
fn test_exit() {
let (tx, _rx) = ipc::channel().unwrap();
let (mtx, _mrx) = ipc::channel().unwrap();
let (sender, receiver) = ipc::channel().unwrap();
let (resource_thread, _private_resource_thread) = new_core_resource_thread(
"".into(), None, ProfilerChan(tx), None);
"".into(), None, ProfilerChan(tx), MemProfilerChan(mtx), None);
resource_thread.send(CoreResourceMsg::Exit(sender)).unwrap();
receiver.recv().unwrap();
}

View File

@ -24,6 +24,7 @@ malloc_size_of_derive = { path = "../malloc_size_of_derive" }
msg = {path = "../msg"}
num-traits = "0.1.32"
serde = "1.0"
servo_arc = {path = "../servo_arc"}
servo_config = {path = "../config"}
servo_url = {path = "../url"}
url = "1.2"

View File

@ -17,6 +17,7 @@ extern crate ipc_channel;
extern crate msg;
extern crate num_traits;
#[macro_use] extern crate serde;
extern crate servo_arc;
extern crate servo_config;
extern crate servo_url;
extern crate url;

View File

@ -8,8 +8,9 @@ use {FetchMetadata, FilteredMetadata, Metadata, NetworkError, ReferrerPolicy};
use hyper::header::{AccessControlExposeHeaders, ContentType, Headers};
use hyper::status::StatusCode;
use hyper_serde::Serde;
use servo_arc::Arc;
use servo_url::ServoUrl;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
/// [Response type](https://fetch.spec.whatwg.org/#concept-response-type)

View File

@ -8,6 +8,7 @@
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use serde;
use std::marker::Send;
use std::sync::mpsc::Sender;
@ -25,6 +26,14 @@ impl<T> OpaqueSender<T> for Sender<T> {
}
}
impl<T> OpaqueSender<T> for IpcSender<T> where T: serde::Serialize {
fn send(&self, message: T) {
if let Err(e) = IpcSender::send(self, message) {
warn!("Error communicating with the target thread from the profiler: {}", e);
}
}
}
/// Front-end representation of the profiler used to communicate with the
/// profiler.
#[derive(Clone, Deserialize, Serialize)]

View File

@ -462,6 +462,7 @@ fn create_constellation(user_agent: Cow<'static, str>,
new_resource_threads(user_agent,
devtools_chan.clone(),
time_profiler_chan.clone(),
mem_profiler_chan.clone(),
config_dir);
let font_cache_thread = FontCacheThread::new(public_resource_threads.sender(),
webrender_api_sender.create_api());