Bug 1619780 - Update audioipc to 9e4f94a5. r=chunmin

Differential Revision: https://phabricator.services.mozilla.com/D65214

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Matthew Gregan 2020-03-10 17:02:27 +00:00
parent 2a88bd2f12
commit a61c982ec9
11 changed files with 270 additions and 98 deletions

View File

@ -1 +1 @@
# Cubeb Audio Remoting Prototype
# Cubeb Audio Remoting Prototype

View File

@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
The git commit ID used was 4ecd137ab09c78e4ffecfd6775ba9ea952aed5a9 (2020-02-13 11:07:59 +1300)
The git commit ID used was 9e4f94a50e7b94d13b820f36b73ee9d07b4afdc1 (2020-03-10 12:15:37 +1300)

View File

@ -120,7 +120,11 @@ impl ControlMsgBuilder {
unsafe {
let cmsghdr_ptr = cmsg.bytes_mut().as_mut_ptr();
std::ptr::copy_nonoverlapping(&cmsghdr as *const _ as *const _, cmsghdr_ptr, mem::size_of::<cmsghdr>());
std::ptr::copy_nonoverlapping(
&cmsghdr as *const _ as *const _,
cmsghdr_ptr,
mem::size_of::<cmsghdr>(),
);
let cmsg_data_ptr = libc::CMSG_DATA(cmsghdr_ptr as _);
std::ptr::copy_nonoverlapping(msg.as_ptr(), cmsg_data_ptr, msg.len());
cmsg.advance_mut(cmsg_space);
@ -166,13 +170,9 @@ fn aligned(buf: &BytesMut) -> BytesMut {
}
fn len(len: usize) -> usize {
unsafe {
libc::CMSG_LEN(len.try_into().unwrap()) as usize
}
unsafe { libc::CMSG_LEN(len.try_into().unwrap()) as usize }
}
pub fn space(len: usize) -> usize {
unsafe {
libc::CMSG_SPACE(len.try_into().unwrap()) as usize
}
unsafe { libc::CMSG_SPACE(len.try_into().unwrap()) as usize }
}

View File

@ -45,8 +45,6 @@ mod tokio_uds_stream;
mod tokio_named_pipes;
pub use crate::messages::{ClientMessage, ServerMessage};
use std::env::temp_dir;
use std::path::PathBuf;
// TODO: Remove hardcoded size and allow allocation based on cubeb backend requirements.
pub const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
@ -188,22 +186,6 @@ unsafe fn close_platformhandle(handle: PlatformHandleType) {
winapi::um::handleapi::CloseHandle(handle);
}
use std::sync::atomic::{AtomicUsize, Ordering};
static SHM_ID: AtomicUsize = AtomicUsize::new(0);
// Generate a temporary shm_path that is unique to the process. This
// path is used temporarily to create a shm segment, which is then
// immediately deleted from the filesystem while retaining handles to
// the shm to be shared between the server and client.
pub fn get_shm_path() -> PathBuf {
let pid = std::process::id();
let shm_id = SHM_ID.fetch_add(1, Ordering::SeqCst);
let mut temp = temp_dir();
temp.push(&format!("cubeb-shm-{}-{}", pid, shm_id));
temp
}
#[cfg(unix)]
pub mod messagestream_unix;
#[cfg(unix)]

View File

@ -3,12 +3,12 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use super::tokio_named_pipes;
use mio_named_pipes;
use std::os::windows::fs::*;
use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_io::{AsyncRead, AsyncWrite};
use super::tokio_named_pipes;
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
#[derive(Debug)]
@ -32,7 +32,10 @@ impl MessageStream {
let file = opts.open(&pipe_name)?;
unsafe { miow::pipe::NamedPipe::from_raw_handle(file.into_raw_handle()) }
};
Ok((MessageStream::new(pipe_server), MessageStream::new(pipe_client)))
Ok((
MessageStream::new(pipe_server),
MessageStream::new(pipe_client),
))
}
pub unsafe fn from_raw_fd(raw: super::PlatformHandleType) -> MessageStream {

View File

@ -7,22 +7,129 @@ use crate::errors::*;
use memmap::{Mmap, MmapMut, MmapOptions};
use std::cell::UnsafeCell;
use std::fs::{remove_file, File, OpenOptions};
use std::path::Path;
use std::sync::{atomic, Arc};
use std::convert::TryInto;
use std::env::temp_dir;
fn open_shm_file(id: &str) -> Result<File> {
#[cfg(target_os = "linux")]
{
unsafe {
let r = libc::syscall(libc::SYS_memfd_create,
std::ffi::CString::new(id).unwrap().as_ptr(),
0);
if r >= 0 {
use std::os::unix::io::FromRawFd as _;
return Ok(File::from_raw_fd(r.try_into().unwrap()));
}
}
let mut path = std::path::PathBuf::from("/dev/shm");
path.push(id);
if let Ok(file) = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&path) {
let _ = remove_file(&path);
return Ok(file);
}
}
let mut path = temp_dir();
path.push(id);
let file = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&path)?;
let _ = remove_file(&path);
Ok(file)
}
#[cfg(unix)]
fn handle_enospc(s: &str) -> Result<()> {
let err = std::io::Error::last_os_error();
let errno = err.raw_os_error().unwrap_or(0);
assert_ne!(errno, 0);
debug!("allocate_file: {} failed errno={}", s, errno);
if errno == libc::ENOSPC {
return Err(err.into());
}
Ok(())
}
#[cfg(unix)]
fn allocate_file(file: &File, size: usize) -> Result<()> {
use std::os::unix::io::AsRawFd;
// First, set the file size. This may create a sparse file on
// many systems, which can fail with SIGBUS when accessed via a
// mapping and the lazy backing allocation fails due to low disk
// space. To avoid this, try to force the entire file to be
// preallocated before mapping using OS-specific approaches below.
file.set_len(size.try_into().unwrap())?;
let fd = file.as_raw_fd();
let size: libc::off_t = size.try_into().unwrap();
// Try Linux-specific fallocate.
#[cfg(target_os = "linux")]
{
if unsafe { libc::fallocate(fd, 0, 0, size) } == 0 {
return Ok(());
}
handle_enospc("fallocate()")?;
}
// Try macOS-specific fcntl.
#[cfg(target_os = "macos")]
{
let params = libc::fstore_t {
fst_flags: libc::F_ALLOCATEALL,
fst_posmode: libc::F_PEOFPOSMODE,
fst_offset: 0,
fst_length: size,
fst_bytesalloc: 0,
};
if unsafe { libc::fcntl(fd, libc::F_PREALLOCATE, &params) } == 0 {
return Ok(());
}
handle_enospc("fcntl(F_PREALLOCATE)")?;
}
// Fall back to portable version, where available.
#[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "dragonfly"))]
{
if unsafe { libc::posix_fallocate(fd, 0, size) } == 0 {
return Ok(());
}
handle_enospc("posix_fallocate()")?;
}
Ok(())
}
#[cfg(windows)]
fn allocate_file(file: &File, size: usize) -> Result<()> {
// CreateFileMapping will ensure the entire file is allocated
// before it's mapped in, so we simply set the size here.
file.set_len(size.try_into().unwrap())?;
Ok(())
}
pub struct SharedMemReader {
mmap: Mmap,
}
impl SharedMemReader {
pub fn new(path: &Path, size: usize) -> Result<(SharedMemReader, File)> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(path)?;
let _ = remove_file(path);
file.set_len(size as u64)?;
pub fn new(id: &str, size: usize) -> Result<(SharedMemReader, File)> {
let file = open_shm_file(id)?;
allocate_file(&file, size)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
assert_eq!(mmap.len(), size);
Ok((SharedMemReader { mmap }, file))
@ -88,15 +195,11 @@ pub struct SharedMemWriter {
}
impl SharedMemWriter {
pub fn new(path: &Path, size: usize) -> Result<(SharedMemWriter, File)> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(path)?;
let _ = remove_file(path);
file.set_len(size as u64)?;
pub fn new(id: &str, size: usize) -> Result<(SharedMemWriter, File)> {
let file = open_shm_file(id)?;
allocate_file(&file, size)?;
let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
assert_eq!(mmap.len(), size);
Ok((SharedMemWriter { mmap }, file))
}

View File

@ -9,11 +9,11 @@ use std::fmt;
use std::io::{Read, Write};
use std::os::windows::io::*;
use bytes::{Buf, BufMut};
use futures::{Async, Poll};
use bytes::{BufMut, Buf};
use mio::Ready;
use tokio::reactor::{Handle, PollEvented2};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::reactor::{Handle, PollEvented2};
pub struct NamedPipe {
io: PollEvented2<mio_named_pipes::NamedPipe>,
@ -29,8 +29,10 @@ impl NamedPipe {
NamedPipe::from_pipe(inner, handle)
}
pub fn from_pipe(pipe: mio_named_pipes::NamedPipe, handle: &Handle)
-> std::io::Result<NamedPipe> {
pub fn from_pipe(
pipe: mio_named_pipes::NamedPipe,
handle: &Handle,
) -> std::io::Result<NamedPipe> {
Ok(NamedPipe {
io: PollEvented2::new_with_handle(pipe, handle)?,
})
@ -95,7 +97,7 @@ impl AsyncRead for NamedPipe {
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, std::io::Error> {
if let Async::NotReady = self.io.poll_read_ready(Ready::readable())? {
return Ok(Async::NotReady)
return Ok(Async::NotReady);
}
let mut stack_buf = [0u8; 1024];
@ -104,7 +106,7 @@ impl AsyncRead for NamedPipe {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io_mut().clear_read_ready(Ready::readable())?;
return Ok(Async::NotReady);
},
}
Err(e) => Err(e),
Ok(bytes_read) => {
buf.put_slice(&stack_buf[0..bytes_read]);
@ -122,7 +124,7 @@ impl AsyncWrite for NamedPipe {
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std::io::Error> {
if let Async::NotReady = self.io.poll_write_ready()? {
return Ok(Async::NotReady)
return Ok(Async::NotReady);
}
let bytes_wrt = self.io_mut().write(buf.bytes());
@ -130,7 +132,7 @@ impl AsyncWrite for NamedPipe {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io_mut().clear_write_ready()?;
return Ok(Async::NotReady);
},
}
Err(e) => Err(e),
Ok(bytes_wrt) => {
buf.advance(bytes_wrt);

View File

@ -209,16 +209,19 @@ impl ContextOps for ClientContext {
let server_stream =
unsafe { audioipc::MessageStream::from_raw_fd(params.server_connection) };
let core = core::spawn_thread("AudioIPC Client RPC", move || {
let handle = reactor::Handle::default();
let core = core::spawn_thread(
"AudioIPC Client RPC",
move || {
let handle = reactor::Handle::default();
register_thread(thread_create_callback);
register_thread(thread_create_callback);
server_stream
.into_tokio_ipc(&handle)
.and_then(|stream| bind_and_send_client(stream, &tx_rpc))
},
move || unregister_thread(thread_destroy_callback))
server_stream
.into_tokio_ipc(&handle)
.and_then(|stream| bind_and_send_client(stream, &tx_rpc))
},
move || unregister_thread(thread_destroy_callback),
)
.map_err(|_| Error::default())?;
let rpc = rx_rpc.recv().map_err(|_| Error::default())?;

View File

@ -178,7 +178,8 @@ impl<'ctx> ClientStream<'ctx> {
data.token, data.platform_handles
);
let stream = unsafe { audioipc::MessageStream::from_raw_fd(data.platform_handles[0].into_raw()) };
let stream =
unsafe { audioipc::MessageStream::from_raw_fd(data.platform_handles[0].into_raw()) };
let input_file = unsafe { data.platform_handles[1].into_file() };
let input_shm = if has_input {

View File

@ -62,16 +62,20 @@ struct ServerWrapper {
fn run() -> Result<ServerWrapper> {
trace!("Starting up cubeb audio server event loop thread...");
let callback_thread = core::spawn_thread("AudioIPC Callback RPC", || {
match promote_current_thread_to_real_time(0, 48000) {
Ok(_) => {}
Err(_) => {
debug!("Failed to promote audio callback thread to real-time.");
let callback_thread = core::spawn_thread(
"AudioIPC Callback RPC",
|| {
match promote_current_thread_to_real_time(0, 48000) {
Ok(_) => {}
Err(_) => {
debug!("Failed to promote audio callback thread to real-time.");
}
}
}
trace!("Starting up cubeb audio callback event loop thread...");
Ok(())
}, || {})
trace!("Starting up cubeb audio callback event loop thread...");
Ok(())
},
|| {},
)
.or_else(|e| {
debug!(
"Failed to start cubeb audio callback event loop thread: {:?}",
@ -80,13 +84,14 @@ fn run() -> Result<ServerWrapper> {
Err(e)
})?;
let core_thread = core::spawn_thread("AudioIPC Server RPC", move || Ok(()), || {}).or_else(|e| {
debug!(
"Failed to cubeb audio core event loop thread: {:?}",
e.description()
);
Err(e)
})?;
let core_thread =
core::spawn_thread("AudioIPC Server RPC", move || Ok(()), || {}).or_else(|e| {
debug!(
"Failed to cubeb audio core event loop thread: {:?}",
e.description()
);
Err(e)
})?;
Ok(ServerWrapper {
core_thread,

View File

@ -29,6 +29,7 @@ use std::ffi::CStr;
use std::mem::{size_of, ManuallyDrop};
use std::os::raw::{c_long, c_void};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{panic, slice};
use tokio::reactor;
use tokio::runtime::current_thread;
@ -147,6 +148,43 @@ impl CubebDeviceCollectionManager {
}
}
struct DevIdMap {
devices: Vec<usize>,
}
// A cubeb_devid is an opaque type which may be implemented with a stable
// pointer in a cubeb backend. cubeb_devids received remotely must be
// validated before use, so DevIdMap provides a simple 1:1 mapping between a
// cubeb_devid and an IPC-transportable value suitable for use as a unique
// handle.
impl DevIdMap {
fn new() -> DevIdMap {
let mut d = DevIdMap {
devices: Vec::with_capacity(32),
};
// A null cubeb_devid is used for selecting the default device.
// Pre-populate the mapping with 0 -> 0 to handle nulls.
d.devices.push(0);
d
}
// Given a cubeb_devid, return a unique stable value suitable for use
// over IPC.
fn to_handle(&mut self, devid: usize) -> usize {
if let Some(i) = self.devices.iter().position(|&d| d == devid) {
return i;
}
self.devices.push(devid);
self.devices.len() - 1
}
// Given a handle produced by `to_handle`, return the associated
// cubeb_devid. Invalid handles result in a panic.
fn from_handle(&self, handle: usize) -> usize {
self.devices[handle]
}
}
struct CubebContextState {
context: cubeb::Result<cubeb::Context>,
manager: CubebDeviceCollectionManager,
@ -274,6 +312,18 @@ impl ServerStreamCallbacks {
}
}
static SHM_ID: AtomicUsize = AtomicUsize::new(0);
// Generate a temporary shm_id fragment that is unique to the process. This
// path is used temporarily to create a shm segment, which is then
// immediately deleted from the filesystem while retaining handles to the
// shm to be shared between the server and client.
fn get_shm_id() -> String {
format!("cubeb-shm-{}-{}",
std::process::id(),
SHM_ID.fetch_add(1, Ordering::SeqCst))
}
struct ServerStream {
stream: ManuallyDrop<cubeb::Stream>,
cbs: ManuallyDrop<Box<ServerStreamCallbacks>>,
@ -314,6 +364,7 @@ pub struct CubebServer {
streams: StreamSlab,
remote_pid: Option<u32>,
cbs: Option<Rc<RefCell<CubebServerCallbacks>>>,
devidmap: DevIdMap,
}
impl rpc::Server for CubebServer {
@ -340,7 +391,13 @@ macro_rules! try_stream {
if $self.streams.contains($stm_tok) {
&mut $self.streams[$stm_tok]
} else {
error!("{}:{}:{} - Stream({}): invalid token", file!(), line!(), column!(), $stm_tok);
error!(
"{}:{}:{} - Stream({}): invalid token",
file!(),
line!(),
column!(),
$stm_tok
);
return error(cubeb::Error::invalid_parameter());
}
};
@ -353,6 +410,7 @@ impl CubebServer {
streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
remote_pid: None,
cbs: None,
devidmap: DevIdMap::new(),
}
}
@ -409,7 +467,15 @@ impl CubebServer {
ServerMessage::ContextGetDeviceEnumeration(device_type) => context
.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
.map(|devices| {
let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect();
let v: Vec<DeviceInfo> = devices
.iter()
.map(|i| {
let mut tmp: DeviceInfo = i.as_ref().into();
// Replace each cubeb_devid with a unique handle suitable for IPC.
tmp.devid = self.devidmap.to_handle(tmp.devid);
tmp
})
.collect();
ClientMessage::ContextEnumeratedDevices(v)
})
.unwrap_or_else(error),
@ -472,19 +538,24 @@ impl CubebServer {
.map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
.unwrap_or_else(error),
ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => try_stream!(self, stm_tok)
.stream
.register_device_changed_callback(if enable {
Some(device_change_cb_c)
} else {
None
})
.map(|_| ClientMessage::StreamRegisterDeviceChangeCallback)
.unwrap_or_else(error),
ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => {
try_stream!(self, stm_tok)
.stream
.register_device_changed_callback(if enable {
Some(device_change_cb_c)
} else {
None
})
.map(|_| ClientMessage::StreamRegisterDeviceChangeCallback)
.unwrap_or_else(error)
}
ServerMessage::ContextSetupDeviceCollectionCallback => {
if let Ok((ipc_server, ipc_client)) = MessageStream::anonymous_ipc_pair() {
debug!("Created device collection RPC pair: {:?}-{:?}", ipc_server, ipc_client);
debug!(
"Created device collection RPC pair: {:?}-{:?}",
ipc_server, ipc_client
);
// This code is currently running on the Client/Server RPC
// handling thread. We need to move the registration of the
@ -615,11 +686,11 @@ impl CubebServer {
let (ipc_server, ipc_client) = MessageStream::anonymous_ipc_pair()?;
debug!("Created callback pair: {:?}-{:?}", ipc_server, ipc_client);
let mut shm_path = audioipc::get_shm_path();
shm_path.set_extension("input");
let (input_shm, input_file) = SharedMemWriter::new(&shm_path, audioipc::SHM_AREA_SIZE)?;
shm_path.set_extension("output");
let (output_shm, output_file) = SharedMemReader::new(&shm_path, audioipc::SHM_AREA_SIZE)?;
let shm_id = get_shm_id();
let (input_shm, input_file) = SharedMemWriter::new(&format!("{}-input", shm_id),
audioipc::SHM_AREA_SIZE)?;
let (output_shm, output_file) = SharedMemReader::new(&format!("{}-output", shm_id),
audioipc::SHM_AREA_SIZE)?;
// This code is currently running on the Client/Server RPC
// handling thread. We need to move the registration of the
@ -657,12 +728,14 @@ impl CubebServer {
.as_ref()
.and_then(|name| CStr::from_bytes_with_nul(name).ok());
let input_device = params.input_device as *const _;
// Map IPC handle back to cubeb_devid.
let input_device = self.devidmap.from_handle(params.input_device) as *const _;
let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
});
let output_device = params.output_device as *const _;
// Map IPC handle back to cubeb_devid.
let output_device = self.devidmap.from_handle(params.output_device) as *const _;
let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
});