tokio: Add initial io driver metrics (#4507)

This commit is contained in:
Lucio Franco
2022-02-24 13:39:37 -05:00
committed by GitHub
parent a913033c18
commit 0d398765a2
11 changed files with 229 additions and 3 deletions
+2
View File
@@ -1,2 +1,4 @@
target
Cargo.lock
.cargo/config.toml
+1 -1
View File
@@ -20,7 +20,7 @@ required-features = ["rt-process-signal"]
# For mem check
rt-net = ["tokio/rt", "tokio/rt-multi-thread", "tokio/net"]
# For test-process-signal
rt-process-signal = ["rt", "tokio/process", "tokio/signal"]
rt-process-signal = ["rt-net", "tokio/process", "tokio/signal"]
full = [
"macros",
+22
View File
@@ -0,0 +1,22 @@
//! This file contains mocks of the metrics types used in the I/O driver.
//!
//! The reason these mocks don't live in `src/runtime/mock.rs` is because
//! these need to be available in the case when `net` is enabled but
//! `rt` is not.
cfg_not_rt_and_metrics! {
#[derive(Default)]
pub(crate) struct IoDriverMetrics {}
impl IoDriverMetrics {
pub(crate) fn incr_fd_count(&self) {}
pub(crate) fn dec_fd_count(&self) {}
pub(crate) fn incr_ready_count_by(&self, _amt: u64) {}
}
}
cfg_rt! {
cfg_metrics! {
pub(crate) use crate::runtime::IoDriverMetrics;
}
}
+37 -1
View File
@@ -14,10 +14,14 @@ pub(crate) use registration::Registration;
mod scheduled_io;
use scheduled_io::ScheduledIo;
mod metrics;
use crate::park::{Park, Unpark};
use crate::util::slab::{self, Slab};
use crate::{loom::sync::Mutex, util::bit};
use metrics::IoDriverMetrics;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
@@ -74,6 +78,8 @@ pub(super) struct Inner {
/// Used to wake up the reactor from a call to `turn`.
waker: mio::Waker,
metrics: IoDriverMetrics,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
@@ -130,6 +136,7 @@ impl Driver {
registry,
io_dispatch: allocator,
waker,
metrics: IoDriverMetrics::default(),
}),
})
}
@@ -167,14 +174,18 @@ impl Driver {
}
// Process all the events that came in, dispatching appropriately
let mut ready_count = 0;
for event in events.iter() {
let token = event.token();
if token != TOKEN_WAKEUP {
self.dispatch(token, Ready::from_mio(event));
ready_count += 1;
}
}
self.inner.metrics.incr_ready_count_by(ready_count);
self.events = Some(events);
Ok(())
@@ -279,6 +290,25 @@ cfg_not_rt! {
}
}
cfg_metrics! {
impl Handle {
// TODO: Remove this when handle contains `Arc<Inner>` so that we can return
// &IoDriverMetrics instead of using a closure.
//
// Related issue: https://github.com/tokio-rs/tokio/issues/4509
pub(crate) fn with_io_driver_metrics<F, R>(&self, f: F) -> Option<R>
where
F: Fn(&IoDriverMetrics) -> R,
{
if let Some(inner) = self.inner() {
Some(f(&inner.metrics))
} else {
None
}
}
}
}
impl Handle {
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
/// makes the next call to `turn` return immediately.
@@ -335,12 +365,18 @@ impl Inner {
self.registry
.register(source, mio::Token(token), interest.to_mio())?;
self.metrics.incr_fd_count();
Ok(shared)
}
/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
self.registry.deregister(source)
self.registry.deregister(source)?;
self.metrics.dec_fd_count();
Ok(())
}
}
+1 -1
View File
@@ -483,7 +483,7 @@ pub(crate) use self::doc::winapi;
#[cfg(all(not(docsrs), windows, feature = "net"))]
#[allow(unused)]
pub(crate) use ::winapi;
pub(crate) use winapi;
cfg_macros! {
/// Implementation detail of the `select!` macro. This macro is **not**
+6
View File
@@ -195,6 +195,12 @@ macro_rules! cfg_not_metrics {
}
}
macro_rules! cfg_not_rt_and_metrics {
($($item:item)*) => {
$( #[cfg(not(all(feature = "rt", all(tokio_unstable, not(loom)))))] $item )*
}
}
macro_rules! cfg_net {
($($item:item)*) => {
$(
+30
View File
@@ -0,0 +1,30 @@
#![cfg_attr(not(feature = "net"), allow(dead_code))]
use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
#[derive(Default)]
pub(crate) struct IoDriverMetrics {
pub(super) fd_registered_count: AtomicU64,
pub(super) fd_deregistered_count: AtomicU64,
pub(super) ready_count: AtomicU64,
}
impl IoDriverMetrics {
pub(crate) fn incr_fd_count(&self) {
let prev = self.fd_registered_count.load(Relaxed);
let new = prev.wrapping_add(1);
self.fd_registered_count.store(new, Relaxed);
}
pub(crate) fn dec_fd_count(&self) {
let prev = self.fd_deregistered_count.load(Relaxed);
let new = prev.wrapping_add(1);
self.fd_deregistered_count.store(new, Relaxed);
}
pub(crate) fn incr_ready_count_by(&self, amt: u64) {
let prev = self.ready_count.load(Relaxed);
let new = prev.wrapping_add(amt);
self.ready_count.store(new, Relaxed);
}
}
+5
View File
@@ -21,6 +21,11 @@ cfg_metrics! {
mod worker;
pub(crate) use worker::WorkerMetrics;
cfg_net! {
mod io;
pub(crate) use io::IoDriverMetrics;
}
}
cfg_not_metrics! {
+87
View File
@@ -447,3 +447,90 @@ impl RuntimeMetrics {
self.handle.spawner.worker_local_queue_depth(worker)
}
}
cfg_net! {
impl RuntimeMetrics {
/// Returns the number of file descriptors that have been registered with the
/// runtime's I/O driver.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let registered_fds = metrics.io_driver_fd_registered_count();
/// println!("{} fds have been registered with the runtime's I/O driver.", registered_fds);
///
/// let deregistered_fds = metrics.io_fd_deregistered_count();
///
/// let current_fd_count = registered_fds - deregistered_fds;
/// println!("{} fds are currently registered by the runtime's I/O driver.", current_fd_count);
/// }
/// ```
pub fn io_driver_fd_registered_count(&self) -> u64 {
self.with_io_driver_metrics(|m| {
m.fd_registered_count.load(Relaxed)
})
}
/// Returns the number of file descriptors that have been deregistered by the
/// runtime's I/O driver.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.io_driver_deregisteredd_fd_count();
/// println!("{} fds have been deregistered by the runtime's I/O driver.", n);
/// }
/// ```
pub fn io_driver_fd_deregistered_count(&self) -> u64 {
self.with_io_driver_metrics(|m| {
m.fd_deregistered_count.load(Relaxed)
})
}
/// Returns the number of ready events processed by the runtime's
/// I/O driver.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.io_driver_ready_count();
/// println!("{} ready events procssed by the runtime's I/O driver.", n);
/// }
/// ```
pub fn io_driver_ready_count(&self) -> u64 {
self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed))
}
fn with_io_driver_metrics<F>(&self, f: F) -> u64
where
F: Fn(&super::IoDriverMetrics) -> u64,
{
// TODO: Investigate if this should return 0, most of our metrics always increase
// thus this breaks that guarantee.
self.handle
.io_handle
.as_ref()
.map(|h| h.with_io_driver_metrics(f))
.flatten()
.unwrap_or(0)
}
}
}
+4
View File
@@ -187,6 +187,10 @@ cfg_metrics! {
pub use metrics::RuntimeMetrics;
pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
cfg_net! {
pub(crate) use metrics::IoDriverMetrics;
}
}
cfg_not_metrics! {
+34
View File
@@ -369,6 +369,40 @@ fn worker_local_queue_depth() {
});
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_fd_count() {
let rt = basic();
let metrics = rt.metrics();
// Since this is enabled w/ the process driver we always
// have 1 fd registered.
assert_eq!(metrics.io_driver_fd_registered_count(), 1);
let stream = tokio::net::TcpStream::connect("google.com:80");
let stream = rt.block_on(async move { stream.await.unwrap() });
assert_eq!(metrics.io_driver_fd_registered_count(), 2);
assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);
drop(stream);
assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
assert_eq!(metrics.io_driver_fd_registered_count(), 2);
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_ready_count() {
let rt = basic();
let metrics = rt.metrics();
let stream = tokio::net::TcpStream::connect("google.com:80");
let _stream = rt.block_on(async move { stream.await.unwrap() });
assert_eq!(metrics.io_driver_ready_count(), 2);
}
fn basic() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()