diff --git a/.gitignore b/.gitignore index a9d37c56..1e656726 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ target Cargo.lock + +.cargo/config.toml diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 5ab8c15d..a45c4dea 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -20,7 +20,7 @@ required-features = ["rt-process-signal"] # For mem check rt-net = ["tokio/rt", "tokio/rt-multi-thread", "tokio/net"] # For test-process-signal -rt-process-signal = ["rt", "tokio/process", "tokio/signal"] +rt-process-signal = ["rt-net", "tokio/process", "tokio/signal"] full = [ "macros", diff --git a/tokio/src/io/driver/metrics.rs b/tokio/src/io/driver/metrics.rs new file mode 100644 index 00000000..410732ce --- /dev/null +++ b/tokio/src/io/driver/metrics.rs @@ -0,0 +1,22 @@ +//! This file contains mocks of the metrics types used in the I/O driver. +//! +//! The reason these mocks don't live in `src/runtime/mock.rs` is because +//! these need to be available in the case when `net` is enabled but +//! `rt` is not. + +cfg_not_rt_and_metrics! { + #[derive(Default)] + pub(crate) struct IoDriverMetrics {} + + impl IoDriverMetrics { + pub(crate) fn incr_fd_count(&self) {} + pub(crate) fn dec_fd_count(&self) {} + pub(crate) fn incr_ready_count_by(&self, _amt: u64) {} + } +} + +cfg_rt! { + cfg_metrics! { + pub(crate) use crate::runtime::IoDriverMetrics; + } +} diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 19f67a24..47d0d5e3 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -14,10 +14,14 @@ pub(crate) use registration::Registration; mod scheduled_io; use scheduled_io::ScheduledIo; +mod metrics; + use crate::park::{Park, Unpark}; use crate::util::slab::{self, Slab}; use crate::{loom::sync::Mutex, util::bit}; +use metrics::IoDriverMetrics; + use std::fmt; use std::io; use std::sync::{Arc, Weak}; @@ -74,6 +78,8 @@ pub(super) struct Inner { /// Used to wake up the reactor from a call to `turn`. waker: mio::Waker, + + metrics: IoDriverMetrics, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -130,6 +136,7 @@ impl Driver { registry, io_dispatch: allocator, waker, + metrics: IoDriverMetrics::default(), }), }) } @@ -167,14 +174,18 @@ impl Driver { } // Process all the events that came in, dispatching appropriately + let mut ready_count = 0; for event in events.iter() { let token = event.token(); if token != TOKEN_WAKEUP { self.dispatch(token, Ready::from_mio(event)); + ready_count += 1; } } + self.inner.metrics.incr_ready_count_by(ready_count); + self.events = Some(events); Ok(()) @@ -279,6 +290,25 @@ cfg_not_rt! { } } +cfg_metrics! { + impl Handle { + // TODO: Remove this when handle contains `Arc` so that we can return + // &IoDriverMetrics instead of using a closure. + // + // Related issue: https://github.com/tokio-rs/tokio/issues/4509 + pub(crate) fn with_io_driver_metrics(&self, f: F) -> Option + where + F: Fn(&IoDriverMetrics) -> R, + { + if let Some(inner) = self.inner() { + Some(f(&inner.metrics)) + } else { + None + } + } + } +} + impl Handle { /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise /// makes the next call to `turn` return immediately. @@ -335,12 +365,18 @@ impl Inner { self.registry .register(source, mio::Token(token), interest.to_mio())?; + self.metrics.incr_fd_count(); + Ok(shared) } /// Deregisters an I/O resource from the reactor. pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { - self.registry.deregister(source) + self.registry.deregister(source)?; + + self.metrics.dec_fd_count(); + + Ok(()) } } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 35295d83..2a7887ce 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -483,7 +483,7 @@ pub(crate) use self::doc::winapi; #[cfg(all(not(docsrs), windows, feature = "net"))] #[allow(unused)] -pub(crate) use ::winapi; +pub(crate) use winapi; cfg_macros! { /// Implementation detail of the `select!` macro. This macro is **not** diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index b6beb3d6..608eef08 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -195,6 +195,12 @@ macro_rules! cfg_not_metrics { } } +macro_rules! cfg_not_rt_and_metrics { + ($($item:item)*) => { + $( #[cfg(not(all(feature = "rt", all(tokio_unstable, not(loom)))))] $item )* + } +} + macro_rules! cfg_net { ($($item:item)*) => { $( diff --git a/tokio/src/runtime/metrics/io.rs b/tokio/src/runtime/metrics/io.rs new file mode 100644 index 00000000..4928c48e --- /dev/null +++ b/tokio/src/runtime/metrics/io.rs @@ -0,0 +1,30 @@ +#![cfg_attr(not(feature = "net"), allow(dead_code))] + +use std::sync::atomic::{AtomicU64, Ordering::Relaxed}; + +#[derive(Default)] +pub(crate) struct IoDriverMetrics { + pub(super) fd_registered_count: AtomicU64, + pub(super) fd_deregistered_count: AtomicU64, + pub(super) ready_count: AtomicU64, +} + +impl IoDriverMetrics { + pub(crate) fn incr_fd_count(&self) { + let prev = self.fd_registered_count.load(Relaxed); + let new = prev.wrapping_add(1); + self.fd_registered_count.store(new, Relaxed); + } + + pub(crate) fn dec_fd_count(&self) { + let prev = self.fd_deregistered_count.load(Relaxed); + let new = prev.wrapping_add(1); + self.fd_deregistered_count.store(new, Relaxed); + } + + pub(crate) fn incr_ready_count_by(&self, amt: u64) { + let prev = self.ready_count.load(Relaxed); + let new = prev.wrapping_add(amt); + self.ready_count.store(new, Relaxed); + } +} diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index ca643a59..4b96f1b7 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -21,6 +21,11 @@ cfg_metrics! { mod worker; pub(crate) use worker::WorkerMetrics; + + cfg_net! { + mod io; + pub(crate) use io::IoDriverMetrics; + } } cfg_not_metrics! { diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 0f805590..397271ee 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -447,3 +447,90 @@ impl RuntimeMetrics { self.handle.spawner.worker_local_queue_depth(worker) } } + +cfg_net! { + impl RuntimeMetrics { + /// Returns the number of file descriptors that have been registered with the + /// runtime's I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let registered_fds = metrics.io_driver_fd_registered_count(); + /// println!("{} fds have been registered with the runtime's I/O driver.", registered_fds); + /// + /// let deregistered_fds = metrics.io_fd_deregistered_count(); + /// + /// let current_fd_count = registered_fds - deregistered_fds; + /// println!("{} fds are currently registered by the runtime's I/O driver.", current_fd_count); + /// } + /// ``` + pub fn io_driver_fd_registered_count(&self) -> u64 { + self.with_io_driver_metrics(|m| { + m.fd_registered_count.load(Relaxed) + }) + } + + /// Returns the number of file descriptors that have been deregistered by the + /// runtime's I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.io_driver_deregisteredd_fd_count(); + /// println!("{} fds have been deregistered by the runtime's I/O driver.", n); + /// } + /// ``` + pub fn io_driver_fd_deregistered_count(&self) -> u64 { + self.with_io_driver_metrics(|m| { + m.fd_deregistered_count.load(Relaxed) + }) + } + + /// Returns the number of ready events processed by the runtime's + /// I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.io_driver_ready_count(); + /// println!("{} ready events procssed by the runtime's I/O driver.", n); + /// } + /// ``` + pub fn io_driver_ready_count(&self) -> u64 { + self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed)) + } + + fn with_io_driver_metrics(&self, f: F) -> u64 + where + F: Fn(&super::IoDriverMetrics) -> u64, + { + // TODO: Investigate if this should return 0, most of our metrics always increase + // thus this breaks that guarantee. + self.handle + .io_handle + .as_ref() + .map(|h| h.with_io_driver_metrics(f)) + .flatten() + .unwrap_or(0) + } + } +} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 7c381b0b..66856df6 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -187,6 +187,10 @@ cfg_metrics! { pub use metrics::RuntimeMetrics; pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; + + cfg_net! { + pub(crate) use metrics::IoDriverMetrics; + } } cfg_not_metrics! { diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 0a26b802..1521cd26 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -369,6 +369,40 @@ fn worker_local_queue_depth() { }); } +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn io_driver_fd_count() { + let rt = basic(); + let metrics = rt.metrics(); + + // Since this is enabled w/ the process driver we always + // have 1 fd registered. + assert_eq!(metrics.io_driver_fd_registered_count(), 1); + + let stream = tokio::net::TcpStream::connect("google.com:80"); + let stream = rt.block_on(async move { stream.await.unwrap() }); + + assert_eq!(metrics.io_driver_fd_registered_count(), 2); + assert_eq!(metrics.io_driver_fd_deregistered_count(), 0); + + drop(stream); + + assert_eq!(metrics.io_driver_fd_deregistered_count(), 1); + assert_eq!(metrics.io_driver_fd_registered_count(), 2); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn io_driver_ready_count() { + let rt = basic(); + let metrics = rt.metrics(); + + let stream = tokio::net::TcpStream::connect("google.com:80"); + let _stream = rt.block_on(async move { stream.await.unwrap() }); + + assert_eq!(metrics.io_driver_ready_count(), 2); +} + fn basic() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all()