Bug 1648405 - Use crossbeam channels instead of std::mpsc ones in webrender. r=gw

Differential Revision: https://phabricator.services.mozilla.com/D81040
This commit is contained in:
Nicolas Silva 2020-07-28 09:43:58 +00:00
parent cb36b1b7de
commit 255da40c5d
11 changed files with 72 additions and 35 deletions

11
gfx/wr/Cargo.lock generated
View File

@ -368,6 +368,15 @@ name = "crossbeam"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "crossbeam-channel"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.3"
@ -1850,6 +1859,7 @@ dependencies = [
"byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"core-foundation 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"core-graphics 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.99.5 (registry+https://github.com/rust-lang/crates.io-index)",
"euclid 0.20.10 (registry+https://github.com/rust-lang/crates.io-index)",
"malloc_size_of_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2087,6 +2097,7 @@ dependencies = [
"checksum core-text 19.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "04dfae50af11e72657fe7174cddb1ecddc5398037f7f6f39533ad69207c9a4e2"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "bd66663db5a988098a89599d4857919b3acf7f61402e61365acfd3919857b9be"
"checksum crossbeam-channel 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6"
"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
"checksum crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db"

View File

@ -4,10 +4,9 @@
use api::{ApiMsg, DebugCommand, DebugFlags};
use api::units::DeviceIntSize;
use api::crossbeam_channel::{bounded, Sender, Receiver};
use crate::print_tree::PrintTreePrinter;
use crate::renderer;
use std::sync::mpsc::{channel, Receiver};
use std::sync::mpsc::Sender;
use std::thread;
use ws;
use base64::encode;
@ -111,7 +110,7 @@ pub struct DebugServerImpl {
impl DebugServerImpl {
pub fn new(api_tx: Sender<ApiMsg>) -> DebugServerImpl {
let (debug_tx, debug_rx) = channel();
let (debug_tx, debug_rx) = bounded(64);
let socket = ws::Builder::new()
.build(move |out| {

View File

@ -5,6 +5,7 @@
use api::{FontInstanceFlags, FontSize, BaseFontInstance};
use api::{FontKey, FontRenderMode, FontTemplate};
use api::{ColorU, GlyphIndex, GlyphDimensions, SyntheticItalics};
use api::channel::{unbounded_channel, Receiver, Sender};
use api::units::*;
use api::{ImageDescriptor, ImageDescriptorFlags, ImageFormat, DirtyRect};
use crate::internal_types::ResourceCacheError;
@ -29,7 +30,6 @@ use std::hash::{Hash, Hasher};
use std::mem;
use std::ops::Deref;
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::atomic::{AtomicBool, Ordering};
pub static GLYPH_FLASHING: AtomicBool = AtomicBool::new(false);
@ -909,7 +909,7 @@ pub struct GlyphRasterizer {
impl GlyphRasterizer {
pub fn new(workers: Arc<ThreadPool>) -> Result<Self, ResourceCacheError> {
let (glyph_tx, glyph_rx) = channel();
let (glyph_tx, glyph_rx) = unbounded_channel();
let num_workers = workers.current_num_threads();
let mut contexts = Vec::with_capacity(num_workers);

View File

@ -19,6 +19,7 @@ use api::units::*;
use api::CaptureBits;
#[cfg(feature = "replay")]
use api::CapturedDocument;
use api::channel::{single_msg_channel, Sender, Receiver};
use crate::spatial_tree::SpatialNodeIndex;
#[cfg(any(feature = "capture", feature = "replay"))]
use crate::capture::CaptureConfig;
@ -56,7 +57,6 @@ use serde_json;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::time::{UNIX_EPOCH, SystemTime};
use std::{mem, u32};
#[cfg(feature = "capture")]
@ -1112,7 +1112,7 @@ impl RenderBackend {
}
if let Some(ref tx) = result_tx {
let (resume_tx, resume_rx) = channel();
let (resume_tx, resume_rx) = single_msg_channel();
tx.send(SceneSwapResult::Complete(resume_tx)).unwrap();
// Block until the post-swap hook has completed on
// the scene builder thread. We need to do this before

View File

@ -43,6 +43,7 @@ use api::{RenderApiSender, RenderNotifier, TextureTarget, SharedFontInstanceMap}
#[cfg(feature = "replay")]
use api::ExternalImage;
use api::units::*;
use api::channel::{unbounded_channel, Sender, Receiver};
pub use api::DebugFlags;
use core::time::Duration;
use crate::batch::{AlphaBatchContainer, BatchKind, BatchFeatures, BatchTextures, BrushBatchKind, ClipBatchList};
@ -109,7 +110,6 @@ use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::thread;
use std::cell::RefCell;
use tracy_rs::register_thread_with_profiler;
@ -2229,8 +2229,8 @@ impl Renderer {
HAS_BEEN_INITIALIZED.store(true, Ordering::SeqCst);
let (api_tx, api_rx) = channel();
let (result_tx, result_rx) = channel();
let (api_tx, api_rx) = unbounded_channel();
let (result_tx, result_rx) = unbounded_channel();
let gl_type = gl.get_type();
let debug_server = new_debug_server(options.start_debug_server, api_tx.clone());
@ -2550,7 +2550,7 @@ impl Renderer {
})?;
let low_priority_scene_tx = if options.support_low_priority_transactions {
let (low_priority_scene_tx, low_priority_scene_rx) = channel();
let (low_priority_scene_tx, low_priority_scene_rx) = unbounded_channel();
let lp_builder = LowPrioritySceneBuilderThread {
rx: low_priority_scene_rx,
tx: scene_tx.clone(),

View File

@ -7,6 +7,7 @@ use api::{DocumentId, PipelineId, ApiMsg, FrameMsg, SceneMsg, ResourceUpdate, Ex
use api::{NotificationRequest, Checkpoint, IdNamespace, QualitySettings, TransactionMsg};
use api::{ClipIntern, FilterDataIntern, MemoryReport, PrimitiveKeyKind, SharedFontInstanceMap};
use api::{DocumentLayer, GlyphDimensionRequest, GlyphIndexRequest};
use api::channel::{unbounded_channel, single_msg_channel, Receiver, Sender};
use api::units::*;
#[cfg(feature = "capture")]
use crate::capture::CaptureConfig;
@ -26,7 +27,6 @@ use crate::render_backend::SceneView;
use crate::renderer::{PipelineInfo, SceneBuilderHooks};
use crate::scene::{Scene, BuiltScene, SceneStats};
use std::iter;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::mem::replace;
use time::precise_time_ns;
use crate::util::drain_filter;
@ -269,9 +269,9 @@ impl SceneBuilderThreadChannels {
pub fn new(
api_tx: Sender<ApiMsg>
) -> (Self, Sender<SceneBuilderRequest>, Sender<BackendSceneBuilderRequest>, Receiver<SceneBuilderResult>) {
let (in_tx, in_rx) = channel();
let (out_tx, out_rx) = channel();
let (backend_tx, backend_rx) = channel();
let (in_tx, in_rx) = unbounded_channel();
let (out_tx, out_rx) = unbounded_channel();
let (backend_tx, backend_rx) = unbounded_channel();
(
Self {
rx: in_rx,
@ -787,7 +787,7 @@ impl SceneBuilderThread {
.flatten().collect(),
};
let (tx, rx) = channel();
let (tx, rx) = single_msg_channel();
let txn = txns.iter().find(|txn| txn.built_scene.is_some()).unwrap();
hooks.pre_scene_swap(txn.scene_build_end_time - txn.scene_build_start_time);

View File

@ -26,6 +26,7 @@ serde_bytes = "0.11"
time = "0.1"
malloc_size_of = { version = "0.0.1", path = "../wr_malloc_size_of", package = "wr_malloc_size_of" }
peek-poke = { version = "0.2", path = "../peek-poke", features = ["extras"] }
crossbeam-channel = "0.4.3"
[target.'cfg(target_os = "macos")'.dependencies]
core-foundation = "0.9"

View File

@ -14,10 +14,10 @@ use std::os::raw::c_void;
use std::path::PathBuf;
use std::sync::Arc;
use std::u32;
use std::sync::mpsc::{Sender, Receiver, channel};
use time::precise_time_ns;
// local imports
use crate::{display_item as di, font};
use crate::channel::{Sender, Receiver, single_msg_channel, unbounded_channel};
use crate::color::{ColorU, ColorF};
use crate::display_list::BuiltDisplayList;
use crate::font::SharedFontInstanceMap;
@ -1290,7 +1290,7 @@ impl RenderApiSender {
/// Creates a new resource API object with a dedicated namespace.
pub fn create_api(&self) -> RenderApi {
let (sync_tx, sync_rx) = channel();
let (sync_tx, sync_rx) = single_msg_channel();
let msg = ApiMsg::CloneApi(sync_tx);
self.api_sender.send(msg).expect("Failed to send CloneApi message");
let namespace_id = match sync_rx.recv() {
@ -1494,7 +1494,7 @@ impl RenderApi {
key: font::FontInstanceKey,
glyph_indices: Vec<font::GlyphIndex>,
) -> Vec<Option<font::GlyphDimensions>> {
let (sender, rx) = channel();
let (sender, rx) = single_msg_channel();
let msg = ApiMsg::GetGlyphDimensions(font::GlyphDimensionRequest {
key,
glyph_indices,
@ -1507,7 +1507,7 @@ impl RenderApi {
/// Gets the glyph indices for the supplied string. These
/// can be used to construct GlyphKeys.
pub fn get_glyph_indices(&self, key: font::FontKey, text: &str) -> Vec<Option<u32>> {
let (sender, rx) = channel();
let (sender, rx) = single_msg_channel();
let msg = ApiMsg::GetGlyphIndices(font::GlyphIndexRequest {
key,
text: text.to_string(),
@ -1544,7 +1544,7 @@ impl RenderApi {
/// Synchronously requests memory report.
pub fn report_memory(&self, _ops: malloc_size_of::MallocSizeOfOps) -> MemoryReport {
let (tx, rx) = channel();
let (tx, rx) = single_msg_channel();
self.api_sender.send(ApiMsg::ReportMemory(tx)).unwrap();
*rx.recv().unwrap()
}
@ -1558,7 +1558,7 @@ impl RenderApi {
/// Shut the WebRender instance down.
pub fn shut_down(&self, synchronously: bool) {
if synchronously {
let (tx, rx) = channel();
let (tx, rx) = single_msg_channel();
self.api_sender.send(ApiMsg::ShutDown(Some(tx))).unwrap();
rx.recv().unwrap();
} else {
@ -1684,7 +1684,7 @@ impl RenderApi {
point: WorldPoint,
flags: HitTestFlags)
-> HitTestResult {
let (tx, rx) = channel();
let (tx, rx) = single_msg_channel();
self.send_frame_msg(
document_id,
@ -1695,7 +1695,7 @@ impl RenderApi {
/// Synchronously request an object that can perform fast hit testing queries.
pub fn request_hit_tester(&self, document_id: DocumentId) -> HitTesterRequest {
let (tx, rx) = channel();
let (tx, rx) = single_msg_channel();
self.send_frame_msg(
document_id,
FrameMsg::RequestHitTester(tx)
@ -1736,7 +1736,7 @@ impl RenderApi {
///
pub fn get_scroll_node_state(&self, document_id: DocumentId) -> Vec<ScrollNodeState> {
let (tx, rx) = channel();
let (tx, rx) = single_msg_channel();
self.send_frame_msg(document_id, FrameMsg::GetScrollNodeState(tx));
rx.recv().unwrap()
}
@ -1752,7 +1752,7 @@ impl RenderApi {
/// ensures that any transactions (including ones deferred to the scene
/// builder thread) have been processed.
pub fn flush_scene_builder(&self) {
let (tx, rx) = channel();
let (tx, rx) = single_msg_channel();
self.send_message(ApiMsg::FlushSceneBuilder(tx));
rx.recv().unwrap(); // block until done
}
@ -1769,7 +1769,7 @@ impl RenderApi {
// the capture we are about to load.
self.flush_scene_builder();
let (tx, rx) = channel();
let (tx, rx) = unbounded_channel();
let msg = ApiMsg::DebugCommand(DebugCommand::LoadCapture(path, ids, tx));
self.send_message(msg);

View File

@ -7,7 +7,7 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::io::{self, Cursor, Error, ErrorKind, Read};
use std::mem;
use std::sync::mpsc;
pub use crossbeam_channel::{Sender, Receiver};
#[derive(Clone)]
pub struct Payload {
@ -74,7 +74,7 @@ pub type PayloadSender = MsgSender<Payload>;
pub type PayloadReceiver = MsgReceiver<Payload>;
pub struct MsgReceiver<T> {
rx: mpsc::Receiver<T>,
rx: Receiver<T>,
}
impl<T> MsgReceiver<T> {
@ -82,14 +82,14 @@ impl<T> MsgReceiver<T> {
self.rx.recv().map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))
}
pub fn to_mpsc_receiver(self) -> mpsc::Receiver<T> {
pub fn to_crossbeam_receiver(self) -> Receiver<T> {
self.rx
}
}
#[derive(Clone)]
pub struct MsgSender<T> {
tx: mpsc::Sender<T>,
tx: Sender<T>,
}
impl<T> MsgSender<T> {
@ -99,12 +99,12 @@ impl<T> MsgSender<T> {
}
pub fn payload_channel() -> Result<(PayloadSender, PayloadReceiver), Error> {
let (tx, rx) = mpsc::channel();
let (tx, rx) = unbounded_channel();
Ok((PayloadSender { tx }, PayloadReceiver { rx }))
}
pub fn msg_channel<T>() -> Result<(MsgSender<T>, MsgReceiver<T>), Error> {
let (tx, rx) = mpsc::channel();
let (tx, rx) = unbounded_channel();
Ok((MsgSender { tx }, MsgReceiver { rx }))
}
@ -129,3 +129,26 @@ impl<'de, T> Deserialize<'de> for MsgSender<T> {
unreachable!();
}
}
/// A create a channel intended for one-shot uses, for example the channels
/// created to block on a synchronous query and then discarded,
pub fn single_msg_channel<T>() -> (Sender<T>, Receiver<T>) {
crossbeam_channel::bounded(1)
}
/// A fast MPMC message channel that can hold a fixed number of messages.
///
/// If the channel is full, the sender will block upon sending extra messages
/// until the receiver has consumed some messages.
/// The capacity parameter should be chosen either:
/// - high enough to avoid blocking on the common cases,
/// - or, on the contrary, using the blocking behavior as a means to prevent
/// fast producers from building up work faster than it is consumed.
pub fn fast_channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
crossbeam_channel::bounded(capacity)
}
/// Creates an MPMC channel that is a bit slower than the fast_channel but doesn't
/// have a limit on the number of messages held at a given time and therefore
/// doesn't block when sending.
pub use crossbeam_channel::unbounded as unbounded_channel;

View File

@ -15,10 +15,11 @@ use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
#[cfg(not(target_os = "macos"))]
use std::path::PathBuf;
use std::sync::{Arc, RwLock, RwLockReadGuard, mpsc::Sender};
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::collections::HashMap;
// local imports
use crate::api::IdNamespace;
use crate::channel::Sender;
use crate::color::ColorU;
use crate::units::LayoutPoint;

View File

@ -15,6 +15,9 @@
#![cfg_attr(feature = "cargo-clippy", allow(clippy::float_cmp, clippy::too_many_arguments))]
#![cfg_attr(feature = "cargo-clippy", allow(clippy::unreadable_literal, clippy::new_without_default))]
pub extern crate crossbeam_channel;
pub extern crate euclid;
extern crate app_units;
#[macro_use]
extern crate bitflags;
@ -27,7 +30,6 @@ extern crate core_foundation;
extern crate core_graphics;
#[macro_use]
extern crate derive_more;
pub extern crate euclid;
#[macro_use]
extern crate malloc_size_of_derive;
extern crate serde;