servo: Merge #11283 - Scheduler with fewer threads (from asajeffrey:scheduler-with-fewer-threads); r=emilio

Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `__` with appropriate data:
- [X] `./mach build -d` does not report any errors
- [X] `./mach test-tidy --faster` does not report any errors
- [X] These changes fix #11268.

Either:
- [ ] There are tests for these changes OR
- [X] These changes do not require tests because it's improving perf, not adding functionality.

Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process.

Source-Repo: https://github.com/servo/servo
Source-Revision: 5b9375ada11fa53a7fd4a23afd05750b852831f0
This commit is contained in:
Alan Jeffrey 2016-05-20 12:36:48 -07:00
parent b69f75b8a7
commit 0def26a11e

View File

@ -2,86 +2,20 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use euclid::length::Length;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use script_traits::{MsDuration, NsDuration, precise_time_ms, precise_time_ns};
use script_traits::{TimerEvent, TimerEventRequest};
use std::cell::RefCell;
use std::cmp::{self, Ord};
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool};
use std::sync::mpsc::{channel, Receiver, Select};
use std::thread::{self, spawn, Thread};
use std::time::Duration;
use util::thread::spawn_named;
use std::sync::mpsc;
use std::sync::mpsc::TryRecvError::{Disconnected, Empty};
use std::thread;
use std::time::{Duration, Instant};
/// A quick hack to work around the removal of [`std::old_io::timer::Timer`](
/// http://doc.rust-lang.org/1.0.0-beta/std/old_io/timer/struct.Timer.html )
struct CancelableOneshotTimer {
thread: Thread,
canceled: Arc<AtomicBool>,
port: Receiver<()>,
}
impl CancelableOneshotTimer {
fn new(duration: MsDuration) -> CancelableOneshotTimer {
let (tx, rx) = channel();
let canceled = Arc::new(AtomicBool::new(false));
let canceled_clone = canceled.clone();
let thread = spawn(move || {
let due_time = precise_time_ms() + duration;
let mut park_time = duration;
loop {
thread::park_timeout(Duration::from_millis(park_time.get()));
if canceled_clone.load(atomic::Ordering::Relaxed) {
return;
}
// park_timeout_ms does not guarantee parking for the
// given amout. We might have woken up early.
let current_time = precise_time_ms();
if current_time >= due_time {
let _ = tx.send(());
return;
}
park_time = due_time - current_time;
}
}).thread().clone();
CancelableOneshotTimer {
thread: thread,
canceled: canceled,
port: rx,
}
}
fn port(&self) -> &Receiver<()> {
&self.port
}
fn cancel(&self) {
self.canceled.store(true, atomic::Ordering::Relaxed);
self.thread.unpark();
}
}
pub struct TimerScheduler {
port: Receiver<TimerEventRequest>,
scheduled_events: RefCell<BinaryHeap<ScheduledEvent>>,
timer: RefCell<Option<CancelableOneshotTimer>>,
}
pub struct TimerScheduler;
struct ScheduledEvent {
request: TimerEventRequest,
for_time: NsDuration,
for_time: Instant,
}
impl Ord for ScheduledEvent {
@ -103,119 +37,83 @@ impl PartialEq for ScheduledEvent {
}
}
enum Task {
HandleRequest(TimerEventRequest),
DispatchDueEvents,
}
impl TimerScheduler {
pub fn start() -> IpcSender<TimerEventRequest> {
let (chan, port) = ipc::channel().unwrap();
let (req_ipc_sender, req_ipc_receiver) = ipc::channel().unwrap();
let (req_sender, req_receiver) = mpsc::sync_channel(1);
let timer_scheduler = TimerScheduler {
port: ROUTER.route_ipc_receiver_to_new_mpsc_receiver(port),
// We could do this much more directly with recv_timeout
// (https://github.com/rust-lang/rfcs/issues/962).
scheduled_events: RefCell::new(BinaryHeap::new()),
// util::thread doesn't give us access to the JoinHandle, which we need for park/unpark,
// so we use the builder directly.
let timeout_thread = thread::Builder::new()
.name(String::from("TimerScheduler"))
.spawn(move || {
// We maintain a priority queue of future events, sorted by due time.
let mut scheduled_events = BinaryHeap::<ScheduledEvent>::new();
loop {
let now = Instant::now();
// Dispatch any events whose due time is past
loop {
match scheduled_events.peek() {
// Dispatch the event if its due time is past
Some(event) if event.for_time <= now => {
let TimerEventRequest(ref sender, source, id, _) = event.request;
let _ = sender.send(TimerEvent(source, id));
},
// Otherwise, we're done dispatching events
_ => break,
}
// Remove the event from the priority queue
// (Note this only executes when the first event has been dispatched
scheduled_events.pop();
}
// Look to see if there are any incoming events
match req_receiver.try_recv() {
// If there is an event, add it to the priority queue
Ok(req) => {
let TimerEventRequest(_, _, _, delay) = req;
let schedule = Instant::now() + Duration::from_millis(delay.get());
let event = ScheduledEvent { request: req, for_time: schedule };
scheduled_events.push(event);
},
// If there is no incoming event, park the thread,
// it will either be unparked when a new event arrives,
// or by a timeout.
Err(Empty) => match scheduled_events.peek() {
None => thread::park(),
Some(event) => thread::park_timeout(event.for_time - now),
},
// If the channel is closed, we are done.
Err(Disconnected) => break,
}
}
// This thread can terminate if the req_ipc_sender is dropped.
warn!("TimerScheduler thread terminated.");
})
.unwrap()
.thread()
.clone();
timer: RefCell::new(None),
};
// A proxy that just routes incoming IPC requests over the MPSC channel to the timeout thread,
// and unparks the timeout thread each time. Note that if unpark is called while the timeout
// thread isn't parked, this causes the next call to thread::park by the timeout thread
// not to block. This means that the timeout thread won't park when there is a request
// waiting in the MPSC channel buffer.
thread::Builder::new()
.name(String::from("TimerProxy"))
.spawn(move || {
while let Ok(req) = req_ipc_receiver.recv() {
req_sender.send(req).unwrap();
timeout_thread.unpark();
}
// This thread can terminate if the req_ipc_sender is dropped.
warn!("TimerProxy thread terminated.");
})
.unwrap();
spawn_named("TimerScheduler".to_owned(), move || {
timer_scheduler.run_event_loop();
});
chan
}
fn run_event_loop(&self) {
while let Some(thread) = self.receive_next_task() {
match thread {
Task::HandleRequest(request) => self.handle_request(request),
Task::DispatchDueEvents => self.dispatch_due_events(),
}
}
}
#[allow(unsafe_code)]
fn receive_next_task(&self) -> Option<Task> {
let port = &self.port;
let timer = self.timer.borrow();
let timer_port = timer.as_ref().map(|timer| timer.port());
if let Some(ref timer_port) = timer_port {
let sel = Select::new();
let mut scheduler_handle = sel.handle(port);
let mut timer_handle = sel.handle(timer_port);
unsafe {
scheduler_handle.add();
timer_handle.add();
}
let ret = sel.wait();
if ret == scheduler_handle.id() {
port.recv().ok().map(Task::HandleRequest)
} else if ret == timer_handle.id() {
timer_port.recv().ok().map(|_| Task::DispatchDueEvents)
} else {
panic!("unexpected select result!")
}
} else {
port.recv().ok().map(Task::HandleRequest)
}
}
fn handle_request(&self, request: TimerEventRequest) {
let TimerEventRequest(_, _, _, duration_ms) = request;
let duration_ns = Length::new(duration_ms.get() * 1000 * 1000);
let schedule_for = precise_time_ns() + duration_ns;
let previously_earliest = self.scheduled_events.borrow().peek()
.map_or(Length::new(u64::max_value()), |scheduled| scheduled.for_time);
self.scheduled_events.borrow_mut().push(ScheduledEvent {
request: request,
for_time: schedule_for,
});
if schedule_for < previously_earliest {
self.start_timer_for_next_event();
}
}
fn dispatch_due_events(&self) {
let now = precise_time_ns();
{
let mut events = self.scheduled_events.borrow_mut();
while !events.is_empty() && events.peek().as_ref().unwrap().for_time <= now {
let event = events.pop().unwrap();
let TimerEventRequest(chan, source, id, _) = event.request;
let _ = chan.send(TimerEvent(source, id));
}
}
self.start_timer_for_next_event();
}
fn start_timer_for_next_event(&self) {
let events = self.scheduled_events.borrow();
let next_event = events.peek();
let mut timer = self.timer.borrow_mut();
if let Some(ref mut timer) = *timer {
timer.cancel();
}
*timer = next_event.map(|next_event| {
let delay_ns = next_event.for_time.get().saturating_sub(precise_time_ns().get());
// Round up, we'd rather be late than early…
let delay_ms = Length::new(delay_ns.saturating_add(999999) / (1000 * 1000));
CancelableOneshotTimer::new(delay_ms)
});
// Return the IPC sender
req_ipc_sender
}
}