Bug 1501148 - Refactor AudioIPC to make way for multiple OS backends. r=chunmin

--HG--
extra : rebase_source : c189d532815de05bc24ee93913d470d8f6a422ef
This commit is contained in:
Matthew Gregan 2018-10-23 16:46:52 +13:00
parent bd2ce060a1
commit af490a6f95
20 changed files with 580 additions and 580 deletions

View File

@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
The git commit ID used was 709eeb98ce93d949f05f7ecd8a7f15162b44dfad (2018-10-23 16:40:20 +1300)
The git commit ID used was 572d6a6a16501cde726dcc09604a0cbc895d93e3 (2018-10-23 16:43:12 +1300)

View File

@ -5,10 +5,10 @@
//! Various async helpers modelled after futures-rs and tokio-io.
use {RecvMsg, SendMsg};
use bytes::{Buf, BufMut};
use futures::{Async, Poll};
use iovec::IoVec;
use msg::{RecvMsg, SendMsg};
use std::io;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_uds::UnixStream;
@ -139,7 +139,7 @@ impl AsyncSendMsg for UnixStream {
static DUMMY: &[u8] = &[0];
let nom = <&IoVec>::from(DUMMY);
let mut bufs = [
nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom
nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom,
];
let n = buf.bytes_vec(&mut bufs);
self.send_msg(&bufs[..n], cmsg.bytes())

View File

@ -5,8 +5,8 @@
use bytes::{BufMut, Bytes, BytesMut};
use libc::{self, cmsghdr};
use std::{convert, mem, ops, slice};
use std::os::unix::io::RawFd;
use std::{convert, mem, ops, slice};
#[derive(Clone, Debug)]
pub struct Fds {

View File

@ -1,9 +1,14 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details.
// Ease accessing reactor::Core handles.
use futures::{Future, IntoFuture};
use futures::sync::oneshot;
use std::{fmt, io, thread};
use futures::{Future, IntoFuture};
use std::sync::mpsc;
use std::{fmt, io, thread};
use tokio_core::reactor::{Core, Handle, Remote};
scoped_thread_local! {

View File

@ -1,3 +1,8 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details.
use bincode;
use cubeb;
use std;

View File

@ -10,9 +10,9 @@ use codec::Codec;
use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
use libc;
use messages::AssocRawFd;
use std::{fmt, io, mem};
use std::collections::VecDeque;
use std::os::unix::io::RawFd;
use std::{fmt, io, mem};
const INITIAL_CAPACITY: usize = 1024;
const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
@ -34,7 +34,8 @@ impl IncomingFds {
pub fn take_fds(&mut self) -> Option<[RawFd; 3]> {
loop {
let fds = self.recv_fds
let fds = self
.recv_fds
.as_mut()
.and_then(|recv_fds| recv_fds.next())
.and_then(|fds| Some(clone_into_array(&fds)));
@ -293,10 +294,6 @@ pub fn framed_with_fds<A, C>(io: A, codec: C) -> FramedWithFds<A, C> {
}
}
fn write_zero() -> io::Error {
io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
}
fn clone_into_array<A, T>(slice: &[T]) -> A
where
A: Sized + Default + AsMut<[T]>,

View File

@ -147,10 +147,6 @@ where
}
}
fn write_zero() -> io::Error {
io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
}
pub fn framed<A, C>(io: A, codec: C) -> Framed<A, C> {
Framed {
io: io,

View File

@ -2,7 +2,7 @@
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
#![allow(dead_code)] // TODO: Remove.
#![recursion_limit = "1024"]
#[macro_use]
extern crate error_chain;
@ -29,28 +29,21 @@ extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_uds;
pub mod async;
pub mod cmsg;
mod async;
mod cmsg;
pub mod codec;
pub mod core;
pub mod errors;
pub mod fd_passing;
pub mod frame;
pub mod rpc;
pub mod core;
pub mod messages;
mod msg;
pub mod rpc;
pub mod shm;
use iovec::IoVec;
#[cfg(target_os = "linux")]
use libc::MSG_CMSG_CLOEXEC;
pub use messages::{ClientMessage, ServerMessage};
use std::env::temp_dir;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::path::PathBuf;
#[cfg(not(target_os = "linux"))]
const MSG_CMSG_CLOEXEC: libc::c_int = 0;
// This must match the definition of
// ipc::FileDescriptor::PlatformHandleType in Gecko.
@ -59,78 +52,6 @@ pub type PlatformHandleType = *mut std::os::raw::c_void;
#[cfg(not(target_os = "windows"))]
pub type PlatformHandleType = libc::c_int;
// Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
// UnixStream type.
pub trait RecvMsg {
fn recv_msg(
&mut self,
iov: &mut [&mut IoVec],
cmsg: &mut [u8],
) -> io::Result<(usize, usize, i32)>;
}
pub trait SendMsg {
fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize>;
}
impl<T: AsRawFd> RecvMsg for T {
fn recv_msg(
&mut self,
iov: &mut [&mut IoVec],
cmsg: &mut [u8],
) -> io::Result<(usize, usize, i32)> {
msg::recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, MSG_CMSG_CLOEXEC)
}
}
impl<T: AsRawFd> SendMsg for T {
fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize> {
msg::send_msg_with_flags(self.as_raw_fd(), iov, cmsg, 0)
}
}
////////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct AutoCloseFd(RawFd);
impl Drop for AutoCloseFd {
fn drop(&mut self) {
unsafe {
libc::close(self.0);
}
}
}
impl FromRawFd for AutoCloseFd {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
AutoCloseFd(fd)
}
}
impl IntoRawFd for AutoCloseFd {
fn into_raw_fd(self) -> RawFd {
let fd = self.0;
::std::mem::forget(self);
fd
}
}
impl AsRawFd for AutoCloseFd {
fn as_raw_fd(&self) -> RawFd {
self.0
}
}
impl<'a> AsRawFd for &'a AutoCloseFd {
fn as_raw_fd(&self) -> RawFd {
self.0
}
}
////////////////////////////////////////////////////////////////////////////////
pub fn get_shm_path(dir: &str) -> PathBuf {
let pid = unsafe { libc::getpid() };
let mut temp = temp_dir();

View File

@ -1,8 +1,48 @@
use iovec::IoVec;
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details.
use iovec::unix as iovec;
use iovec::IoVec;
use libc;
use std::os::unix::io::{AsRawFd, RawFd};
use std::{cmp, io, mem, ptr};
use std::os::unix::io::RawFd;
// Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
// UnixStream type.
pub trait RecvMsg {
fn recv_msg(
&mut self,
iov: &mut [&mut IoVec],
cmsg: &mut [u8],
) -> io::Result<(usize, usize, i32)>;
}
pub trait SendMsg {
fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize>;
}
impl<T: AsRawFd> RecvMsg for T {
fn recv_msg(
&mut self,
iov: &mut [&mut IoVec],
cmsg: &mut [u8],
) -> io::Result<(usize, usize, i32)> {
#[cfg(target_os = "linux")]
let flags = libc::MSG_CMSG_CLOEXEC;
#[cfg(not(target_os = "linux"))]
let flags = 0;
recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, flags)
}
}
impl<T: AsRawFd> SendMsg for T {
fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize> {
send_msg_with_flags(self.as_raw_fd(), iov, cmsg, 0)
}
}
fn cvt(r: libc::ssize_t) -> io::Result<usize> {
if r == -1 {

View File

@ -39,10 +39,10 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{Async, Future, Poll, Sink, Stream};
use futures::sync::oneshot;
use rpc::Handler;
use futures::{Async, Future, Poll, Sink, Stream};
use rpc::driver::Driver;
use rpc::Handler;
use std::collections::VecDeque;
use std::io;
use tokio_core::reactor::Handle;

View File

@ -43,8 +43,8 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{Async, Future, Poll};
use futures::sync::{mpsc, oneshot};
use futures::{Async, Future, Poll};
use std::fmt;
use std::io;

View File

@ -6,8 +6,8 @@
use futures::{Poll, Sink, Stream};
use std::io;
mod driver;
mod client;
mod driver;
mod server;
pub use self::client::{bind_client, Client, ClientProxy, Response};

View File

@ -40,8 +40,8 @@
// DEALINGS IN THE SOFTWARE.
use futures::{Async, Future, Poll, Sink, Stream};
use rpc::Handler;
use rpc::driver::Driver;
use rpc::Handler;
use std::collections::VecDeque;
use std::io;
use tokio_core::reactor::Handle;

View File

@ -1,3 +1,8 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details.
use errors::*;
use memmap::{Mmap, MmapViewSync, Protection};
use std::fs::{remove_file, File, OpenOptions};

View File

@ -3,27 +3,29 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use {ClientStream, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
use assert_not_in_callback;
use audioipc::{messages, ClientMessage, ServerMessage};
use audioipc::{core, rpc};
use audioipc::codec::LengthDelimitedCodec;
use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
use cubeb_backend::{ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error,
Ops, Result, Stream, StreamParams, StreamParamsRef};
use audioipc::{core, rpc};
use audioipc::{messages, ClientMessage, ServerMessage};
use cubeb_backend::{
ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result,
Stream, StreamParams, StreamParamsRef,
};
use futures::Future;
use futures_cpupool::{self, CpuPool};
use libc;
use std::{fmt, io, mem, ptr};
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::os::unix::io::FromRawFd;
use std::os::unix::net;
use std::sync::mpsc;
use std::thread;
use std::{fmt, io, mem, ptr};
use stream;
use tokio_core::reactor::{Handle, Remote};
use tokio_uds::UnixStream;
use {ClientStream, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
struct CubebClient;
@ -100,9 +102,7 @@ impl ContextOps for ClientContext {
let (tx_rpc, rx_rpc) = mpsc::channel();
let params = CPUPOOL_INIT_PARAMS.with(|p| {
p.replace(None).unwrap()
});
let params = CPUPOOL_INIT_PARAMS.with(|p| p.replace(None).unwrap());
let thread_create_callback = params.thread_create_callback;
@ -134,11 +134,11 @@ impl ContextOps for ClientContext {
let rpc = t!(rx_rpc.recv());
let cpupool = futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.after_start(register_thread)
.pool_size(params.pool_size)
.stack_size(params.stack_size)
.create();
.name_prefix("AudioIPC")
.after_start(register_thread)
.pool_size(params.pool_size)
.stack_size(params.stack_size)
.create();
let ctx = Box::new(ClientContext {
_ops: &CLIENT_OPS as *const _,

View File

@ -32,9 +32,12 @@ type InitParamsTls = std::cell::RefCell<Option<CpuPoolInitParams>>;
thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
thread_local!(static CPUPOOL_INIT_PARAMS: InitParamsTls = std::cell::RefCell::new(None));
// This must match the definition of AudioIpcInitParams in
// dom/media/CubebUtils.cpp in Gecko.
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct AudioIpcInitParams {
// Fields only need to be public for ipctest.
pub server_connection: PlatformHandleType,
pub pool_size: usize,
pub stack_size: usize,
@ -43,13 +46,13 @@ pub struct AudioIpcInitParams {
#[derive(Clone, Copy, Debug)]
struct CpuPoolInitParams {
pub pool_size: usize,
pub stack_size: usize,
pub thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
pool_size: usize,
stack_size: usize,
thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
}
impl CpuPoolInitParams {
pub fn init_with(params: &AudioIpcInitParams) -> Self {
fn init_with(params: &AudioIpcInitParams) -> Self {
CpuPoolInitParams {
pool_size: params.pool_size,
stack_size: params.stack_size,

View File

@ -1,3 +1,8 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details.
use cubeb_backend::Error;
use std::os::raw::c_int;

View File

@ -3,8 +3,6 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use {assert_not_in_callback, set_in_callback};
use ClientContext;
use audioipc::codec::LengthDelimitedCodec;
use audioipc::frame::{framed, Framed};
use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
@ -21,6 +19,8 @@ use std::os::unix::net;
use std::ptr;
use std::sync::mpsc;
use tokio_uds::UnixStream;
use ClientContext;
use {assert_not_in_callback, set_in_callback};
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
@ -95,14 +95,14 @@ impl rpc::Server for CallbackServer {
.get_slice(nframes as usize * frame_size)
.unwrap()
.as_ptr(),
None => ptr::null()
None => ptr::null(),
};
let output_ptr: *mut u8 = match output_shm {
Some(ref mut shm) => shm
.get_mut_slice(nframes as usize * frame_size)
.unwrap()
.as_mut_ptr(),
None => ptr::null_mut()
None => ptr::null_mut(),
};
set_in_callback(true);

View File

@ -1,3 +1,8 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
#[macro_use]
extern crate error_chain;
@ -14,31 +19,21 @@ extern crate slab;
extern crate tokio_core;
extern crate tokio_uds;
use audioipc::codec::LengthDelimitedCodec;
use audioipc::core;
use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
use audioipc::frame::{framed, Framed};
use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo,
ServerMessage, StreamCreate, StreamInitParams, StreamParams};
use audioipc::fd_passing::framed_with_fds;
use audioipc::rpc;
use audioipc::shm::{SharedMemReader, SharedMemWriter};
use audioipc::PlatformHandleType;
use cubeb::ffi;
use futures::future::{self, FutureResult};
use futures::sync::oneshot;
use futures::Future;
use std::cell::RefCell;
use std::convert::From;
use std::error::Error;
use std::ffi::{CStr, CString};
use std::mem::{size_of, ManuallyDrop};
use std::os::raw::{c_long, c_void};
use std::os::raw::c_void;
use std::os::unix::io::IntoRawFd;
use std::os::unix::net;
use std::os::unix::prelude::*;
use std::{panic, ptr, slice};
use tokio_core::reactor::Remote;
use std::ptr;
use tokio_uds::UnixStream;
mod server;
pub mod errors {
error_chain! {
links {
@ -54,394 +49,6 @@ pub mod errors {
use errors::*;
type ContextKey = RefCell<Option<cubeb::Result<cubeb::Context>>>;
thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None));
fn with_local_context<T, F>(f: F) -> T
where
F: FnOnce(&cubeb::Result<cubeb::Context>) -> T,
{
CONTEXT_KEY.with(|k| {
let mut context = k.borrow_mut();
if context.is_none() {
let name = CString::new("AudioIPC Server").unwrap();
*context = Some(cubeb::Context::init(Some(name.as_c_str()), None));
}
f(context.as_ref().unwrap())
})
}
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
// The size in which the stream slab is grown.
const STREAM_CONN_CHUNK_SIZE: usize = 64;
struct CallbackClient;
impl rpc::Client for CallbackClient {
type Request = CallbackReq;
type Response = CallbackResp;
type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
struct ServerStreamCallbacks {
/// Size of input frame in bytes
input_frame_size: u16,
/// Size of output frame in bytes
output_frame_size: u16,
/// Shared memory buffer for sending input data to client
input_shm: SharedMemWriter,
/// Shared memory buffer for receiving output data from client
output_shm: SharedMemReader,
/// RPC interface to callback server running in client
rpc: rpc::ClientProxy<CallbackReq, CallbackResp>,
}
impl ServerStreamCallbacks {
fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
trace!("Stream data callback: {} {}", input.len(), output.len());
// len is of input and output is frame len. Turn these into the real lengths.
let real_input = unsafe {
let nbytes = input.len() * self.input_frame_size as usize;
slice::from_raw_parts(input.as_ptr(), nbytes)
};
self.input_shm.write(real_input).unwrap();
let r = self.rpc
.call(CallbackReq::Data(
output.len() as isize,
self.output_frame_size as usize,
))
.wait();
match r {
Ok(CallbackResp::Data(frames)) => {
if frames >= 0 {
let nbytes = frames as usize * self.output_frame_size as usize;
let real_output = unsafe {
trace!("Resize output to {}", nbytes);
slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes)
};
self.output_shm.read(&mut real_output[..nbytes]).unwrap();
}
frames
}
_ => {
debug!("Unexpected message {:?} during data_callback", r);
-1
}
}
}
fn state_callback(&mut self, state: cubeb::State) {
trace!("Stream state callback: {:?}", state);
let r = self.rpc.call(CallbackReq::State(state.into())).wait();
match r {
Ok(CallbackResp::State) => {}
_ => {
debug!("Unexpected message {:?} during callback", r);
}
}
}
}
struct ServerStream {
stream: ManuallyDrop<cubeb::Stream>,
cbs: ManuallyDrop<Box<ServerStreamCallbacks>>,
}
impl Drop for ServerStream {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.stream);
ManuallyDrop::drop(&mut self.cbs);
}
}
}
type StreamSlab = slab::Slab<ServerStream, usize>;
pub struct CubebServer {
cb_remote: Remote,
streams: StreamSlab,
}
impl rpc::Server for CubebServer {
type Request = ServerMessage;
type Response = ClientMessage;
type Future = FutureResult<Self::Response, ()>;
type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
fn process(&mut self, req: Self::Request) -> Self::Future {
let resp = with_local_context(|context| match *context {
Err(_) => error(cubeb::Error::error()),
Ok(ref context) => self.process_msg(context, &req),
});
future::ok(resp)
}
}
impl CubebServer {
pub fn new(cb_remote: Remote) -> Self {
CubebServer {
cb_remote: cb_remote,
streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
}
}
// Process a request coming from the client.
fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
let resp: ClientMessage = match *msg {
ServerMessage::ClientConnect => panic!("already connected"),
ServerMessage::ClientDisconnect => {
// TODO:
//self.connection.client_disconnect();
ClientMessage::ClientDisconnected
}
ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
ServerMessage::ContextGetMaxChannelCount => context
.max_channel_count()
.map(ClientMessage::ContextMaxChannelCount)
.unwrap_or_else(error),
ServerMessage::ContextGetMinLatency(ref params) => {
let format = cubeb::SampleFormat::from(params.format);
let layout = cubeb::ChannelLayout::from(params.layout);
let params = cubeb::StreamParamsBuilder::new()
.format(format)
.rate(u32::from(params.rate))
.channels(u32::from(params.channels))
.layout(layout)
.take();
context
.min_latency(&params)
.map(ClientMessage::ContextMinLatency)
.unwrap_or_else(error)
}
ServerMessage::ContextGetPreferredSampleRate => context
.preferred_sample_rate()
.map(ClientMessage::ContextPreferredSampleRate)
.unwrap_or_else(error),
ServerMessage::ContextGetDeviceEnumeration(device_type) => context
.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
.map(|devices| {
let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect();
ClientMessage::ContextEnumeratedDevices(v)
})
.unwrap_or_else(error),
ServerMessage::StreamInit(ref params) => self.process_stream_init(context, params)
.unwrap_or_else(|_| error(cubeb::Error::error())),
ServerMessage::StreamDestroy(stm_tok) => {
self.streams.remove(stm_tok);
ClientMessage::StreamDestroyed
}
ServerMessage::StreamStart(stm_tok) => self.streams[stm_tok]
.stream
.start()
.map(|_| ClientMessage::StreamStarted)
.unwrap_or_else(error),
ServerMessage::StreamStop(stm_tok) => self.streams[stm_tok]
.stream
.stop()
.map(|_| ClientMessage::StreamStopped)
.unwrap_or_else(error),
ServerMessage::StreamResetDefaultDevice(stm_tok) => self.streams[stm_tok]
.stream
.reset_default_device()
.map(|_| ClientMessage::StreamDefaultDeviceReset)
.unwrap_or_else(error),
ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
.stream
.position()
.map(ClientMessage::StreamPosition)
.unwrap_or_else(error),
ServerMessage::StreamGetLatency(stm_tok) => self.streams[stm_tok]
.stream
.latency()
.map(ClientMessage::StreamLatency)
.unwrap_or_else(error),
ServerMessage::StreamSetVolume(stm_tok, volume) => self.streams[stm_tok]
.stream
.set_volume(volume)
.map(|_| ClientMessage::StreamVolumeSet)
.unwrap_or_else(error),
ServerMessage::StreamSetPanning(stm_tok, panning) => self.streams[stm_tok]
.stream
.set_panning(panning)
.map(|_| ClientMessage::StreamPanningSet)
.unwrap_or_else(error),
ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
.stream
.current_device()
.map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
.unwrap_or_else(error),
};
trace!("process_msg: req={:?}, resp={:?}", msg, resp);
resp
}
// Stream init is special, so it's been separated from process_msg.
fn process_stream_init(
&mut self,
context: &cubeb::Context,
params: &StreamInitParams,
) -> Result<ClientMessage> {
fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
params
.map(|p| {
let format = p.format.into();
let sample_size = match format {
cubeb::SampleFormat::S16LE
| cubeb::SampleFormat::S16BE
| cubeb::SampleFormat::S16NE => 2,
cubeb::SampleFormat::Float32LE
| cubeb::SampleFormat::Float32BE
| cubeb::SampleFormat::Float32NE => 4,
};
let channel_count = p.channels as u16;
sample_size * channel_count
})
.unwrap_or(0u16)
}
// Create the callback handling struct which is attached the cubeb stream.
let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
let (stm1, stm2) = net::UnixStream::pair()?;
debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
let (input_shm, input_file) =
SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
let (output_shm, output_file) =
SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
// This code is currently running on the Client/Server RPC
// handling thread. We need to move the registration of the
// bind_client to the callback RPC handling thread. This is
// done by spawning a future on cb_remote.
let id = core::handle().id();
let (tx, rx) = oneshot::channel();
self.cb_remote.spawn(move |handle| {
// Ensure we're running on a loop different to the one
// invoking spawn_fn.
assert_ne!(id, handle.id());
let stream = UnixStream::from_stream(stm2, handle).unwrap();
let transport = framed(stream, Default::default());
let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
drop(tx.send(rpc));
Ok(())
});
let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
Ok(rpc) => rpc,
Err(_) => bail!("Failed to create callback rpc."),
};
let cbs = Box::new(ServerStreamCallbacks {
input_frame_size,
output_frame_size,
input_shm,
output_shm,
rpc,
});
// Create cubeb stream from params
let stream_name = params
.stream_name
.as_ref()
.and_then(|name| CStr::from_bytes_with_nul(name).ok());
let input_device = params.input_device as *const _;
let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
});
let output_device = params.output_device as *const _;
let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
});
let latency = params.latency_frames;
assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
let user_ptr = cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
unsafe {
context
.stream_init(
stream_name,
input_device,
input_stream_params,
output_device,
output_stream_params,
latency,
Some(data_cb_c),
Some(state_cb_c),
user_ptr,
)
.and_then(|stream| {
if !self.streams.has_available() {
trace!(
"server connection ran out of stream slots. reserving {} more.",
STREAM_CONN_CHUNK_SIZE
);
self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
}
let stm_tok = match self.streams.vacant_entry() {
Some(entry) => {
debug!("Registering stream {:?}", entry.index(),);
entry
.insert(ServerStream {
stream: ManuallyDrop::new(stream),
cbs: ManuallyDrop::new(cbs),
})
.index()
}
None => {
// TODO: Turn into error
panic!("Failed to insert stream into slab. No entries")
}
};
Ok(ClientMessage::StreamCreated(StreamCreate {
token: stm_tok,
fds: [
stm1.into_raw_fd(),
input_file.into_raw_fd(),
output_file.into_raw_fd(),
],
}))
})
.map_err(|e| e.into())
}
}
}
struct ServerWrapper {
core_thread: core::CoreThread,
callback_thread: core::CoreThread,
@ -506,10 +113,9 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy
UnixStream::from_stream(sock2, handle)
.and_then(|sock| {
let transport = framed_with_fds(sock, Default::default());
rpc::bind_server(transport, CubebServer::new(cb_remote), handle);
rpc::bind_server(transport, server::CubebServer::new(cb_remote), handle);
Ok(())
})
.map_err(|_| ())
}).map_err(|_| ())
// Notify waiting thread that sock2 has been registered.
.and_then(|_| wait_tx.send(()))
});
@ -517,8 +123,7 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy
// with reactor::Core.
let _ = wait_rx.wait();
Ok(sock1.into_raw_fd())
})
.unwrap_or(-1)
}).unwrap_or(-1)
}
#[no_mangle]
@ -526,45 +131,3 @@ pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
drop(wrapper);
}
fn error(error: cubeb::Error) -> ClientMessage {
ClientMessage::Error(error.raw_code())
}
// C callable callbacks
unsafe extern "C" fn data_cb_c(
_: *mut ffi::cubeb_stream,
user_ptr: *mut c_void,
input_buffer: *const c_void,
output_buffer: *mut c_void,
nframes: c_long,
) -> c_long {
let ok = panic::catch_unwind(|| {
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
let input = if input_buffer.is_null() {
&[]
} else {
slice::from_raw_parts(input_buffer as *const u8, nframes as usize)
};
let output: &mut [u8] = if output_buffer.is_null() {
&mut []
} else {
slice::from_raw_parts_mut(output_buffer as *mut u8, nframes as usize)
};
cbs.data_callback(input, output) as c_long
});
ok.unwrap_or(0)
}
unsafe extern "C" fn state_cb_c(
_: *mut ffi::cubeb_stream,
user_ptr: *mut c_void,
state: ffi::cubeb_state,
) {
let ok = panic::catch_unwind(|| {
let state = cubeb::State::from(state);
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
cbs.state_callback(state);
});
ok.expect("State callback panicked");
}

View File

@ -0,0 +1,460 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use audioipc;
use audioipc::codec::LengthDelimitedCodec;
use audioipc::core;
use audioipc::fd_passing::FramedWithFds;
use audioipc::frame::{framed, Framed};
use audioipc::messages::{
CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo, ServerMessage, StreamCreate,
StreamInitParams, StreamParams,
};
use audioipc::rpc;
use audioipc::shm::{SharedMemReader, SharedMemWriter};
use cubeb;
use cubeb::ffi;
use futures::future::{self, FutureResult};
use futures::sync::oneshot;
use futures::Future;
use slab;
use std::cell::RefCell;
use std::convert::From;
use std::ffi::{CStr, CString};
use std::mem::{size_of, ManuallyDrop};
use std::os::raw::{c_long, c_void};
use std::os::unix::io::IntoRawFd;
use std::os::unix::net;
use std::{panic, slice};
use tokio_core::reactor::Remote;
use tokio_uds::UnixStream;
use errors::*;
fn error(error: cubeb::Error) -> ClientMessage {
ClientMessage::Error(error.raw_code())
}
type ContextKey = RefCell<Option<cubeb::Result<cubeb::Context>>>;
thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None));
fn with_local_context<T, F>(f: F) -> T
where
F: FnOnce(&cubeb::Result<cubeb::Context>) -> T,
{
CONTEXT_KEY.with(|k| {
let mut context = k.borrow_mut();
if context.is_none() {
let name = CString::new("AudioIPC Server").unwrap();
*context = Some(cubeb::Context::init(Some(name.as_c_str()), None));
}
f(context.as_ref().unwrap())
})
}
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
// The size in which the stream slab is grown.
const STREAM_CONN_CHUNK_SIZE: usize = 64;
struct CallbackClient;
impl rpc::Client for CallbackClient {
type Request = CallbackReq;
type Response = CallbackResp;
type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
struct ServerStreamCallbacks {
/// Size of input frame in bytes
input_frame_size: u16,
/// Size of output frame in bytes
output_frame_size: u16,
/// Shared memory buffer for sending input data to client
input_shm: SharedMemWriter,
/// Shared memory buffer for receiving output data from client
output_shm: SharedMemReader,
/// RPC interface to callback server running in client
rpc: rpc::ClientProxy<CallbackReq, CallbackResp>,
}
impl ServerStreamCallbacks {
fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
trace!("Stream data callback: {} {}", input.len(), output.len());
// len is of input and output is frame len. Turn these into the real lengths.
let real_input = unsafe {
let nbytes = input.len() * self.input_frame_size as usize;
slice::from_raw_parts(input.as_ptr(), nbytes)
};
self.input_shm.write(real_input).unwrap();
let r = self
.rpc
.call(CallbackReq::Data(
output.len() as isize,
self.output_frame_size as usize,
)).wait();
match r {
Ok(CallbackResp::Data(frames)) => {
if frames >= 0 {
let nbytes = frames as usize * self.output_frame_size as usize;
let real_output = unsafe {
trace!("Resize output to {}", nbytes);
slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes)
};
self.output_shm.read(&mut real_output[..nbytes]).unwrap();
}
frames
}
_ => {
debug!("Unexpected message {:?} during data_callback", r);
-1
}
}
}
fn state_callback(&mut self, state: cubeb::State) {
trace!("Stream state callback: {:?}", state);
let r = self.rpc.call(CallbackReq::State(state.into())).wait();
match r {
Ok(CallbackResp::State) => {}
_ => {
debug!("Unexpected message {:?} during callback", r);
}
}
}
}
struct ServerStream {
stream: ManuallyDrop<cubeb::Stream>,
cbs: ManuallyDrop<Box<ServerStreamCallbacks>>,
}
impl Drop for ServerStream {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.stream);
ManuallyDrop::drop(&mut self.cbs);
}
}
}
type StreamSlab = slab::Slab<ServerStream, usize>;
pub struct CubebServer {
cb_remote: Remote,
streams: StreamSlab,
}
impl rpc::Server for CubebServer {
type Request = ServerMessage;
type Response = ClientMessage;
type Future = FutureResult<Self::Response, ()>;
type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
fn process(&mut self, req: Self::Request) -> Self::Future {
let resp = with_local_context(|context| match *context {
Err(_) => error(cubeb::Error::error()),
Ok(ref context) => self.process_msg(context, &req),
});
future::ok(resp)
}
}
impl CubebServer {
pub fn new(cb_remote: Remote) -> Self {
CubebServer {
cb_remote: cb_remote,
streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
}
}
// Process a request coming from the client.
fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
let resp: ClientMessage = match *msg {
ServerMessage::ClientConnect => panic!("already connected"),
ServerMessage::ClientDisconnect => {
// TODO:
//self.connection.client_disconnect();
ClientMessage::ClientDisconnected
}
ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
ServerMessage::ContextGetMaxChannelCount => context
.max_channel_count()
.map(ClientMessage::ContextMaxChannelCount)
.unwrap_or_else(error),
ServerMessage::ContextGetMinLatency(ref params) => {
let format = cubeb::SampleFormat::from(params.format);
let layout = cubeb::ChannelLayout::from(params.layout);
let params = cubeb::StreamParamsBuilder::new()
.format(format)
.rate(u32::from(params.rate))
.channels(u32::from(params.channels))
.layout(layout)
.take();
context
.min_latency(&params)
.map(ClientMessage::ContextMinLatency)
.unwrap_or_else(error)
}
ServerMessage::ContextGetPreferredSampleRate => context
.preferred_sample_rate()
.map(ClientMessage::ContextPreferredSampleRate)
.unwrap_or_else(error),
ServerMessage::ContextGetDeviceEnumeration(device_type) => context
.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
.map(|devices| {
let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect();
ClientMessage::ContextEnumeratedDevices(v)
}).unwrap_or_else(error),
ServerMessage::StreamInit(ref params) => self
.process_stream_init(context, params)
.unwrap_or_else(|_| error(cubeb::Error::error())),
ServerMessage::StreamDestroy(stm_tok) => {
self.streams.remove(stm_tok);
ClientMessage::StreamDestroyed
}
ServerMessage::StreamStart(stm_tok) => self.streams[stm_tok]
.stream
.start()
.map(|_| ClientMessage::StreamStarted)
.unwrap_or_else(error),
ServerMessage::StreamStop(stm_tok) => self.streams[stm_tok]
.stream
.stop()
.map(|_| ClientMessage::StreamStopped)
.unwrap_or_else(error),
ServerMessage::StreamResetDefaultDevice(stm_tok) => self.streams[stm_tok]
.stream
.reset_default_device()
.map(|_| ClientMessage::StreamDefaultDeviceReset)
.unwrap_or_else(error),
ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
.stream
.position()
.map(ClientMessage::StreamPosition)
.unwrap_or_else(error),
ServerMessage::StreamGetLatency(stm_tok) => self.streams[stm_tok]
.stream
.latency()
.map(ClientMessage::StreamLatency)
.unwrap_or_else(error),
ServerMessage::StreamSetVolume(stm_tok, volume) => self.streams[stm_tok]
.stream
.set_volume(volume)
.map(|_| ClientMessage::StreamVolumeSet)
.unwrap_or_else(error),
ServerMessage::StreamSetPanning(stm_tok, panning) => self.streams[stm_tok]
.stream
.set_panning(panning)
.map(|_| ClientMessage::StreamPanningSet)
.unwrap_or_else(error),
ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
.stream
.current_device()
.map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
.unwrap_or_else(error),
};
trace!("process_msg: req={:?}, resp={:?}", msg, resp);
resp
}
// Stream init is special, so it's been separated from process_msg.
fn process_stream_init(
&mut self,
context: &cubeb::Context,
params: &StreamInitParams,
) -> Result<ClientMessage> {
fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
params
.map(|p| {
let format = p.format.into();
let sample_size = match format {
cubeb::SampleFormat::S16LE
| cubeb::SampleFormat::S16BE
| cubeb::SampleFormat::S16NE => 2,
cubeb::SampleFormat::Float32LE
| cubeb::SampleFormat::Float32BE
| cubeb::SampleFormat::Float32NE => 4,
};
let channel_count = p.channels as u16;
sample_size * channel_count
}).unwrap_or(0u16)
}
// Create the callback handling struct which is attached the cubeb stream.
let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
let (stm1, stm2) = net::UnixStream::pair()?;
debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
let (input_shm, input_file) =
SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
let (output_shm, output_file) =
SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
// This code is currently running on the Client/Server RPC
// handling thread. We need to move the registration of the
// bind_client to the callback RPC handling thread. This is
// done by spawning a future on cb_remote.
let id = core::handle().id();
let (tx, rx) = oneshot::channel();
self.cb_remote.spawn(move |handle| {
// Ensure we're running on a loop different to the one
// invoking spawn_fn.
assert_ne!(id, handle.id());
let stream = UnixStream::from_stream(stm2, handle).unwrap();
let transport = framed(stream, Default::default());
let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
drop(tx.send(rpc));
Ok(())
});
let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
Ok(rpc) => rpc,
Err(_) => bail!("Failed to create callback rpc."),
};
let cbs = Box::new(ServerStreamCallbacks {
input_frame_size,
output_frame_size,
input_shm,
output_shm,
rpc,
});
// Create cubeb stream from params
let stream_name = params
.stream_name
.as_ref()
.and_then(|name| CStr::from_bytes_with_nul(name).ok());
let input_device = params.input_device as *const _;
let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
});
let output_device = params.output_device as *const _;
let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
});
let latency = params.latency_frames;
assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
let user_ptr = cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
unsafe {
context
.stream_init(
stream_name,
input_device,
input_stream_params,
output_device,
output_stream_params,
latency,
Some(data_cb_c),
Some(state_cb_c),
user_ptr,
).and_then(|stream| {
if !self.streams.has_available() {
trace!(
"server connection ran out of stream slots. reserving {} more.",
STREAM_CONN_CHUNK_SIZE
);
self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
}
let stm_tok = match self.streams.vacant_entry() {
Some(entry) => {
debug!("Registering stream {:?}", entry.index(),);
entry
.insert(ServerStream {
stream: ManuallyDrop::new(stream),
cbs: ManuallyDrop::new(cbs),
}).index()
}
None => {
// TODO: Turn into error
panic!("Failed to insert stream into slab. No entries")
}
};
Ok(ClientMessage::StreamCreated(StreamCreate {
token: stm_tok,
fds: [
stm1.into_raw_fd(),
input_file.into_raw_fd(),
output_file.into_raw_fd(),
],
}))
}).map_err(|e| e.into())
}
}
}
// C callable callbacks
unsafe extern "C" fn data_cb_c(
_: *mut ffi::cubeb_stream,
user_ptr: *mut c_void,
input_buffer: *const c_void,
output_buffer: *mut c_void,
nframes: c_long,
) -> c_long {
let ok = panic::catch_unwind(|| {
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
let input = if input_buffer.is_null() {
&[]
} else {
slice::from_raw_parts(input_buffer as *const u8, nframes as usize)
};
let output: &mut [u8] = if output_buffer.is_null() {
&mut []
} else {
slice::from_raw_parts_mut(output_buffer as *mut u8, nframes as usize)
};
cbs.data_callback(input, output) as c_long
});
ok.unwrap_or(0)
}
unsafe extern "C" fn state_cb_c(
_: *mut ffi::cubeb_stream,
user_ptr: *mut c_void,
state: ffi::cubeb_state,
) {
let ok = panic::catch_unwind(|| {
let state = cubeb::State::from(state);
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
cbs.state_callback(state);
});
ok.expect("State callback panicked");
}