Bug 1672275 - Update audioipc to db32323. r=chunmin

Differential Revision: https://phabricator.services.mozilla.com/D94203
This commit is contained in:
Matthew Gregan 2020-10-21 15:41:53 +00:00
parent 99b8c43ddb
commit 8edf83adea
7 changed files with 120 additions and 137 deletions

View File

@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
The git commit ID used was c474b0b78f49b86531bf09f4c0e4af2e16cf15a9 (2020-10-08 15:13:49 +1300)
The git commit ID used was db32323c92b2b4f5ca2e702dcf987fedc1e71768 (2020-10-21 09:02:32 +1300)

View File

@ -8,7 +8,7 @@ use crate::cmsg;
use crate::codec::Codec;
use crate::messages::AssocRawPlatformHandle;
use bytes::{Bytes, BytesMut, IntoBuf};
use futures::{AsyncSink, Poll, Sink, StartSend, Stream, task};
use futures::{task, AsyncSink, Poll, Sink, StartSend, Stream};
use std::collections::VecDeque;
use std::os::unix::io::RawFd;
use std::{fmt, io, mem};

View File

@ -5,7 +5,7 @@
use crate::codec::Codec;
use bytes::{Buf, Bytes, BytesMut, IntoBuf};
use futures::{AsyncSink, Poll, Sink, StartSend, Stream, task};
use futures::{task, AsyncSink, Poll, Sink, StartSend, Stream};
use std::io;
use tokio_io::{AsyncRead, AsyncWrite};

View File

@ -6,7 +6,7 @@
use crate::codec::Codec;
use crate::messages::AssocRawPlatformHandle;
use bytes::{Bytes, BytesMut, IntoBuf};
use futures::{AsyncSink, Poll, Sink, StartSend, Stream, task};
use futures::{task, AsyncSink, Poll, Sink, StartSend, Stream};
use std::collections::VecDeque;
use std::{fmt, io};
use tokio_io::{AsyncRead, AsyncWrite};

View File

@ -137,6 +137,12 @@ impl<'a> From<&'a cubeb::StreamParamsRef> for StreamParams {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamCreateParams {
pub input_stream_params: Option<StreamParams>,
pub output_stream_params: Option<StreamParams>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamInitParams {
pub stream_name: Option<Vec<u8>>,
@ -198,7 +204,8 @@ pub enum ServerMessage {
ContextSetupDeviceCollectionCallback,
ContextRegisterDeviceCollectionChanged(ffi::cubeb_device_type, bool),
StreamInit(StreamInitParams),
StreamCreate(StreamCreateParams),
StreamInit(usize, StreamInitParams),
StreamDestroy(usize),
StreamStart(usize),
@ -232,6 +239,7 @@ pub enum ClientMessage {
ContextRegisteredDeviceCollectionChanged,
StreamCreated(StreamCreate),
StreamInitialized,
StreamDestroyed,
StreamStarted,

View File

@ -5,11 +5,11 @@
use crate::ClientContext;
use crate::{assert_not_in_callback, run_in_callback};
use audioipc::codec::LengthDelimitedCodec;
use audioipc::frame::{framed, Framed};
use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
use audioipc::rpc;
use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
use audioipc::{codec::LengthDelimitedCodec, messages::StreamCreateParams};
use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
use futures::Future;
use futures_cpupool::{CpuFuture, CpuPool};
@ -167,17 +167,21 @@ impl<'ctx> ClientStream<'ctx> {
) -> Result<Stream> {
assert_not_in_callback();
let has_input = init_params.input_stream_params.is_some();
let has_output = init_params.output_stream_params.is_some();
let rpc = ctx.rpc();
let data = send_recv!(rpc, StreamInit(init_params) => StreamCreated())?;
let create_params = StreamCreateParams {
input_stream_params: init_params.input_stream_params,
output_stream_params: init_params.output_stream_params,
};
let data = send_recv!(rpc, StreamCreate(create_params) => StreamCreated())?;
debug!(
"token = {}, handles = {:?}",
data.token, data.platform_handles
);
let has_input = init_params.input_stream_params.is_some();
let has_output = init_params.output_stream_params.is_some();
let stream =
unsafe { audioipc::MessageStream::from_raw_fd(data.platform_handles[0].into_raw()) };
@ -237,6 +241,8 @@ impl<'ctx> ClientStream<'ctx> {
.expect("Failed to spawn CallbackServer");
wait_rx.recv().unwrap();
send_recv!(rpc, StreamInit(data.token, init_params) => StreamInitialized)?;
let stream = Box::into_raw(Box::new(ClientStream {
context: ctx,
user_ptr,

View File

@ -10,8 +10,8 @@ use audioipc::codec::LengthDelimitedCodec;
use audioipc::frame::{framed, Framed};
use audioipc::messages::{
CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp,
DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamInitParams,
StreamParams,
DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamCreateParams,
StreamInitParams, StreamParams,
};
use audioipc::platformhandle_passing::FramedWithPlatformHandles;
use audioipc::rpc;
@ -23,13 +23,13 @@ use futures::future::{self, FutureResult};
use futures::sync::oneshot;
use futures::Future;
use slab;
use std::cell::RefCell;
use std::convert::From;
use std::ffi::CStr;
use std::mem::{size_of, ManuallyDrop};
use std::mem::size_of;
use std::os::raw::{c_long, c_void};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cell::RefCell, sync::Mutex};
use std::{panic, slice};
use tokio::reactor;
use tokio::runtime::current_thread;
@ -41,15 +41,13 @@ fn error(error: cubeb::Error) -> ClientMessage {
}
struct CubebDeviceCollectionManager {
servers: Vec<Rc<RefCell<CubebServerCallbacks>>>,
devtype: cubeb::DeviceType,
servers: Mutex<Vec<Rc<RefCell<CubebServerCallbacks>>>>,
}
impl CubebDeviceCollectionManager {
fn new() -> CubebDeviceCollectionManager {
CubebDeviceCollectionManager {
servers: Vec::new(),
devtype: cubeb::DeviceType::empty(),
servers: Mutex::new(Vec::new()),
}
}
@ -57,47 +55,37 @@ impl CubebDeviceCollectionManager {
&mut self,
context: &cubeb::Context,
server: &Rc<RefCell<CubebServerCallbacks>>,
devtype: cubeb::DeviceType,
) -> cubeb::Result<()> {
if self
.servers
.iter()
.find(|s| Rc::ptr_eq(s, server))
.is_none()
{
self.servers.push(server.clone());
let mut servers = self.servers.lock().unwrap();
if servers.is_empty() {
self.internal_register(context, true)?;
}
self.update(context)
server.borrow_mut().devtype.insert(devtype);
if servers.iter().find(|s| Rc::ptr_eq(s, server)).is_none() {
servers.push(server.clone());
}
Ok(())
}
fn unregister(
&mut self,
context: &cubeb::Context,
server: &Rc<RefCell<CubebServerCallbacks>>,
devtype: cubeb::DeviceType,
) -> cubeb::Result<()> {
self.servers
.retain(|s| !(Rc::ptr_eq(&s, server) && s.borrow().devtype.is_empty()));
self.update(context)
}
fn update(&mut self, context: &cubeb::Context) -> cubeb::Result<()> {
let mut devtype = cubeb::DeviceType::empty();
for s in &self.servers {
devtype |= s.borrow().devtype;
let mut servers = self.servers.lock().unwrap();
server.borrow_mut().devtype.remove(devtype);
if server.borrow().devtype.is_empty() {
servers.retain(|s| !Rc::ptr_eq(&s, server));
}
for &dir in &[cubeb::DeviceType::INPUT, cubeb::DeviceType::OUTPUT] {
if devtype.contains(dir) != self.devtype.contains(dir) {
self.internal_register(context, dir, devtype.contains(dir))?;
}
if servers.is_empty() {
self.internal_register(context, false)?;
}
Ok(())
}
fn internal_register(
&mut self,
context: &cubeb::Context,
devtype: cubeb::DeviceType,
enable: bool,
) -> cubeb::Result<()> {
fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> {
let user_ptr = if enable {
self as *const CubebDeviceCollectionManager as *mut c_void
} else {
@ -113,20 +101,12 @@ impl CubebDeviceCollectionManager {
device_collection_changed_output_cb_c as _,
),
] {
if devtype.contains(dir) {
assert_eq!(self.devtype.contains(dir), !enable);
unsafe {
context.register_device_collection_changed(
dir,
if enable { Some(cb) } else { None },
user_ptr,
)?;
}
if enable {
self.devtype.insert(dir);
} else {
self.devtype.remove(dir);
}
unsafe {
context.register_device_collection_changed(
dir,
if enable { Some(cb) } else { None },
user_ptr,
)?;
}
}
Ok(())
@ -134,7 +114,8 @@ impl CubebDeviceCollectionManager {
// Warning: this is called from an internal cubeb thread, so we must not mutate unprotected shared state.
unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
self.servers.iter().for_each(|server| {
let servers = self.servers.lock().unwrap();
servers.iter().for_each(|server| {
if server
.borrow()
.devtype
@ -228,9 +209,6 @@ where
})
}
// The size in which the stream slab is grown.
const STREAM_CONN_CHUNK_SIZE: usize = 64;
struct DeviceCollectionClient;
impl rpc::Client for DeviceCollectionClient {
@ -339,16 +317,14 @@ fn get_shm_id() -> String {
}
struct ServerStream {
stream: ManuallyDrop<cubeb::Stream>,
cbs: ManuallyDrop<Box<ServerStreamCallbacks>>,
stream: Option<cubeb::Stream>,
cbs: Box<ServerStreamCallbacks>,
}
impl Drop for ServerStream {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.stream);
ManuallyDrop::drop(&mut self.cbs);
}
// `stream` *must* be dropped before `cbs`.
drop(self.stream.take());
}
}
@ -406,7 +382,10 @@ impl rpc::Server for CubebServer {
macro_rules! try_stream {
($self:expr, $stm_tok:expr) => {
if $self.streams.contains($stm_tok) {
&mut $self.streams[$stm_tok]
$self.streams[$stm_tok]
.stream
.as_mut()
.expect("uninitialized stream")
} else {
error!(
"{}:{}:{} - Stream({}): invalid token",
@ -424,7 +403,7 @@ impl CubebServer {
pub fn new(handle: current_thread::Handle) -> Self {
CubebServer {
handle,
streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
streams: StreamSlab::new(),
remote_pid: None,
cbs: None,
devidmap: DevIdMap::new(),
@ -498,8 +477,12 @@ impl CubebServer {
})
.unwrap_or_else(error),
ServerMessage::StreamInit(ref params) => self
.process_stream_init(context, params)
ServerMessage::StreamCreate(ref params) => self
.process_stream_create(params)
.unwrap_or_else(|_| error(cubeb::Error::error())),
ServerMessage::StreamInit(stm_tok, ref params) => self
.process_stream_init(context, stm_tok, params)
.unwrap_or_else(|_| error(cubeb::Error::error())),
ServerMessage::StreamDestroy(stm_tok) => {
@ -515,62 +498,52 @@ impl CubebServer {
}
ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok)
.stream
.start()
.map(|_| ClientMessage::StreamStarted)
.unwrap_or_else(error),
ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok)
.stream
.stop()
.map(|_| ClientMessage::StreamStopped)
.unwrap_or_else(error),
ServerMessage::StreamResetDefaultDevice(stm_tok) => try_stream!(self, stm_tok)
.stream
.reset_default_device()
.map(|_| ClientMessage::StreamDefaultDeviceReset)
.unwrap_or_else(error),
ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok)
.stream
.position()
.map(ClientMessage::StreamPosition)
.unwrap_or_else(error),
ServerMessage::StreamGetLatency(stm_tok) => try_stream!(self, stm_tok)
.stream
.latency()
.map(ClientMessage::StreamLatency)
.unwrap_or_else(error),
ServerMessage::StreamGetInputLatency(stm_tok) => try_stream!(self, stm_tok)
.stream
.input_latency()
.map(ClientMessage::StreamInputLatency)
.unwrap_or_else(error),
ServerMessage::StreamSetVolume(stm_tok, volume) => try_stream!(self, stm_tok)
.stream
.set_volume(volume)
.map(|_| ClientMessage::StreamVolumeSet)
.unwrap_or_else(error),
ServerMessage::StreamSetName(stm_tok, ref name) => try_stream!(self, stm_tok)
.stream
.set_name(name)
.map(|_| ClientMessage::StreamNameSet)
.unwrap_or_else(error),
ServerMessage::StreamGetCurrentDevice(stm_tok) => try_stream!(self, stm_tok)
.stream
.current_device()
.map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
.unwrap_or_else(error),
ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => {
try_stream!(self, stm_tok)
.stream
.register_device_changed_callback(if enable {
Some(device_change_cb_c)
} else {
@ -591,7 +564,6 @@ impl CubebServer {
// handling thread. We need to move the registration of the
// bind_client to the callback RPC handling thread. This is
// done by spawning a future on `handle`.
let (tx, rx) = oneshot::channel();
self.handle
.spawn(futures::future::lazy(move || {
@ -677,21 +649,15 @@ impl CubebServer {
let cbs = self.cbs.as_ref().unwrap();
if enable {
cbs.borrow_mut().devtype.insert(devtype);
manager.register(context, cbs)
manager.register(context, cbs, devtype)
} else {
cbs.borrow_mut().devtype.remove(devtype);
manager.unregister(context, cbs)
manager.unregister(context, cbs, devtype)
}
.map(|_| ClientMessage::ContextRegisteredDeviceCollectionChanged)
}
// Stream init is special, so it's been separated from process_msg.
fn process_stream_init(
&mut self,
context: &cubeb::Context,
params: &StreamInitParams,
) -> Result<ClientMessage> {
// Stream create is special, so it's been separated from process_msg.
fn process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage> {
fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
params
.map(|p| {
@ -726,7 +692,6 @@ impl CubebServer {
// handling thread. We need to move the registration of the
// bind_client to the callback RPC handling thread. This is
// done by spawning a future on `handle`.
let (tx, rx) = oneshot::channel();
self.handle
.spawn(futures::future::lazy(move || {
@ -752,6 +717,30 @@ impl CubebServer {
rpc,
});
let entry = self.streams.vacant_entry();
let key = entry.key();
debug!("Registering stream {:?}", key);
entry.insert(ServerStream { stream: None, cbs });
Ok(ClientMessage::StreamCreated(StreamCreate {
token: key,
platform_handles: [
PlatformHandle::from(ipc_client),
PlatformHandle::from(input_file),
PlatformHandle::from(output_file),
],
target_pid: self.remote_pid.unwrap(),
}))
}
// Stream init is special, so it's been separated from process_msg.
fn process_stream_init(
&mut self,
context: &cubeb::Context,
stm_tok: usize,
params: &StreamInitParams,
) -> Result<ClientMessage> {
// Create cubeb stream from params
let stream_name = params
.stream_name
@ -771,52 +760,32 @@ impl CubebServer {
});
let latency = params.latency_frames;
let server_stream = &mut self.streams[stm_tok];
assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
let user_ptr = cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
unsafe {
context
.stream_init(
stream_name,
input_device,
input_stream_params,
output_device,
output_stream_params,
latency,
Some(data_cb_c),
Some(state_cb_c),
user_ptr,
)
.and_then(|stream| {
if self.streams.len() == self.streams.capacity() {
trace!(
"server connection ran out of stream slots. reserving {} more.",
STREAM_CONN_CHUNK_SIZE
);
self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
}
let stream = unsafe {
let stream = context.stream_init(
stream_name,
input_device,
input_stream_params,
output_device,
output_stream_params,
latency,
Some(data_cb_c),
Some(state_cb_c),
user_ptr,
);
match stream {
Ok(stream) => stream,
Err(e) => return Err(e.into()), // XXX full teardown of ServerStream?
}
};
let entry = self.streams.vacant_entry();
let key = entry.key();
debug!("Registering stream {:?}", key);
server_stream.stream = Some(stream);
entry.insert(ServerStream {
stream: ManuallyDrop::new(stream),
cbs: ManuallyDrop::new(cbs),
});
Ok(ClientMessage::StreamCreated(StreamCreate {
token: key,
platform_handles: [
PlatformHandle::from(ipc_client),
PlatformHandle::from(input_file),
PlatformHandle::from(output_file),
],
target_pid: self.remote_pid.unwrap(),
}))
})
.map_err(|e| e.into())
}
Ok(ClientMessage::StreamInitialized)
}
}