Bug 1512445 - Import latest AudioIPC from upstream, including Windows backend. r=chunmin

Differential Revision: https://phabricator.services.mozilla.com/D22153

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Matthew Gregan 2019-03-07 02:04:36 +00:00
parent 62025568c8
commit 1ecd53f466
19 changed files with 909 additions and 138 deletions

View File

@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
The git commit ID used was c2148b95128f5e54ef1d18e5e457a2fa45e7ac43 (2019-02-05 15:18:22 +1300)
The git commit ID used was 137eb45d702f628cae622d374975903c23a7a814 (2019-03-07 10:31:45 +1300)

View File

@ -8,7 +8,7 @@ authors = [
description = "Remote Cubeb IPC"
[dependencies]
cubeb = "0.5.2"
cubeb = "0.5.4"
bincode = "1.0"
bytes = "0.4"
futures = "0.1.18"
@ -22,7 +22,12 @@ serde_derive = "1.*.*"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-uds = "0.1.7"
winapi = "0.3.6"
mio-named-pipes = "=0.1.5"
[dependencies.error-chain]
version = "0.11.0"
default-features = false
[build-dependencies]
cc = "1.0"

View File

@ -6,12 +6,15 @@
//! Various async helpers modelled after futures-rs and tokio-io.
use bytes::{Buf, BufMut};
use futures::{Async, Poll};
#[cfg(unix)]
use futures::Async;
use futures::Poll;
#[cfg(unix)]
use iovec::IoVec;
#[cfg(unix)]
use msg::{RecvMsg, SendMsg};
use std::io;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_uds::UnixStream;
pub trait AsyncRecvMsg: AsyncRead {
/// Pull some bytes from this source into the specified `Buf`, returning
@ -54,12 +57,13 @@ pub trait AsyncSendMsg: AsyncWrite {
////////////////////////////////////////////////////////////////////////////////
impl AsyncRecvMsg for UnixStream {
#[cfg(unix)]
impl AsyncRecvMsg for super::AsyncMessageStream {
fn recv_msg_buf<B>(&mut self, buf: &mut B, cmsg: &mut B) -> Poll<(usize, i32), io::Error>
where
B: BufMut,
{
if let Async::NotReady = <UnixStream>::poll_read(self) {
if let Async::NotReady = <super::AsyncMessageStream>::poll_read(self) {
return Ok(Async::NotReady);
}
let r = unsafe {
@ -123,13 +127,14 @@ impl AsyncRecvMsg for UnixStream {
}
}
impl AsyncSendMsg for UnixStream {
#[cfg(unix)]
impl AsyncSendMsg for super::AsyncMessageStream {
fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
where
B: Buf,
C: Buf,
{
if let Async::NotReady = <UnixStream>::poll_write(self) {
if let Async::NotReady = <super::AsyncMessageStream>::poll_write(self) {
return Ok(Async::NotReady);
}
let r = {

View File

@ -8,8 +8,7 @@ use bytes::{Bytes, BytesMut, IntoBuf};
use cmsg;
use codec::Codec;
use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
use libc;
use messages::AssocRawFd;
use messages::AssocRawPlatformHandle;
use std::collections::VecDeque;
use std::os::unix::io::RawFd;
use std::{fmt, io, mem};
@ -66,7 +65,7 @@ struct Frame {
/// A unified `Stream` and `Sink` interface over an I/O object, using
/// the `Codec` trait to encode and decode the payload.
pub struct FramedWithFds<A, C> {
pub struct FramedWithPlatformHandles<A, C> {
io: A,
codec: C,
// Stream
@ -80,7 +79,7 @@ pub struct FramedWithFds<A, C> {
outgoing_fds: BytesMut,
}
impl<A, C> FramedWithFds<A, C>
impl<A, C> FramedWithPlatformHandles<A, C>
where
A: AsyncSendMsg,
{
@ -162,11 +161,11 @@ where
}
}
impl<A, C> Stream for FramedWithFds<A, C>
impl<A, C> Stream for FramedWithPlatformHandles<A, C>
where
A: AsyncRecvMsg,
C: Codec,
C::Out: AssocRawFd,
C::Out: AssocRawPlatformHandle,
{
type Item = C::Out;
type Error = io::Error;
@ -181,7 +180,7 @@ where
if self.is_readable {
if self.eof {
let mut item = try!(self.codec.decode_eof(&mut self.read_buf));
item.take_fd(|| self.incoming_fds.take_fds());
item.take_platform_handles(|| self.incoming_fds.take_fds());
return Ok(Some(item).into());
}
@ -189,7 +188,7 @@ where
if let Some(mut item) = try!(self.codec.decode(&mut self.read_buf)) {
trace!("frame decoded from buffer");
item.take_fd(|| self.incoming_fds.take_fds());
item.take_platform_handles(|| self.incoming_fds.take_fds());
return Ok(Some(item).into());
}
@ -206,10 +205,6 @@ where
.recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg())
);
// if flags != 0 {
// error!("recv_msg_buf: flags = {:x}", flags)
// }
if n == 0 {
self.eof = true;
}
@ -219,11 +214,11 @@ where
}
}
impl<A, C> Sink for FramedWithFds<A, C>
impl<A, C> Sink for FramedWithPlatformHandles<A, C>
where
A: AsyncSendMsg,
C: Codec,
C::In: AssocRawFd + fmt::Debug,
C::In: AssocRawPlatformHandle + fmt::Debug,
{
type SinkItem = C::In;
type SinkError = io::Error;
@ -241,11 +236,11 @@ where
}
}
let fds = item.fd();
let fds = item.platform_handles();
try!(self.codec.encode(item, &mut self.write_buf));
let fds = fds.and_then(|fds| {
cmsg::builder(&mut self.outgoing_fds)
.rights(&fds[..])
.rights(&fds.0[..])
.finish()
.ok()
});
@ -278,8 +273,8 @@ where
}
}
pub fn framed_with_fds<A, C>(io: A, codec: C) -> FramedWithFds<A, C> {
FramedWithFds {
pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformHandles<A, C> {
FramedWithPlatformHandles {
io: io,
codec: codec,
read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
@ -307,7 +302,7 @@ where
fn close_fds(fds: &[RawFd]) {
for fd in fds {
unsafe {
libc::close(*fd);
super::close_platformhandle(*fd);
}
}
}
@ -315,15 +310,26 @@ fn close_fds(fds: &[RawFd]) {
#[cfg(test)]
mod tests {
use bytes::BufMut;
use libc;
use std;
const CMSG_BYTES: &[u8] =
b"\x1c\0\0\0\0\0\0\0\x01\0\0\0\x01\0\0\02\0\0\0[\0\0\0\\\0\0\0\xe5\xe5\xe5\xe5";
extern {
fn cmsghdr_bytes(size: *mut libc::size_t) -> *const libc::uint8_t;
}
fn cmsg_bytes() -> &'static [u8] {
let mut size = 0;
unsafe {
let ptr = cmsghdr_bytes(&mut size);
std::slice::from_raw_parts(ptr, size)
}
}
#[test]
fn single_cmsg() {
let mut incoming = super::IncomingFds::new(16);
incoming.cmsg().put_slice(CMSG_BYTES);
incoming.cmsg().put_slice(cmsg_bytes());
assert!(incoming.take_fds().is_some());
assert!(incoming.take_fds().is_none());
}
@ -332,9 +338,9 @@ mod tests {
fn multiple_cmsg_1() {
let mut incoming = super::IncomingFds::new(16);
incoming.cmsg().put_slice(CMSG_BYTES);
incoming.cmsg().put_slice(cmsg_bytes());
assert!(incoming.take_fds().is_some());
incoming.cmsg().put_slice(CMSG_BYTES);
incoming.cmsg().put_slice(cmsg_bytes());
assert!(incoming.take_fds().is_some());
assert!(incoming.take_fds().is_none());
}
@ -342,11 +348,12 @@ mod tests {
#[test]
fn multiple_cmsg_2() {
let mut incoming = super::IncomingFds::new(16);
println!("cmsg_bytes() {}", cmsg_bytes().len());
incoming.cmsg().put_slice(CMSG_BYTES);
incoming.cmsg().put_slice(CMSG_BYTES);
incoming.cmsg().put_slice(cmsg_bytes());
incoming.cmsg().put_slice(cmsg_bytes());
assert!(incoming.take_fds().is_some());
incoming.cmsg().put_slice(CMSG_BYTES);
incoming.cmsg().put_slice(cmsg_bytes());
assert!(incoming.take_fds().is_some());
assert!(incoming.take_fds().is_some());
assert!(incoming.take_fds().is_none());

View File

@ -0,0 +1,256 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, BytesMut, IntoBuf};
use codec::Codec;
use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
use messages::AssocRawPlatformHandle;
use std::collections::VecDeque;
use std::{fmt, io};
const INITIAL_CAPACITY: usize = 1024;
const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
#[derive(Debug)]
struct Frame {
msgs: Bytes,
}
/// A unified `Stream` and `Sink` interface over an I/O object, using
/// the `Codec` trait to encode and decode the payload.
pub struct FramedWithPlatformHandles<A, C> {
io: A,
codec: C,
// Stream
read_buf: BytesMut,
is_readable: bool,
eof: bool,
// Sink
frames: VecDeque<Frame>,
write_buf: BytesMut,
}
impl<A, C> FramedWithPlatformHandles<A, C>
where
A: AsyncWrite,
{
// If there is a buffered frame, try to write it to `A`
fn do_write(&mut self) -> Poll<(), io::Error> {
trace!("do_write...");
// Create a frame from any pending message in `write_buf`.
if !self.write_buf.is_empty() {
self.set_frame();
}
trace!("pending frames: {:?}", self.frames);
let mut processed = 0;
loop {
let n = match self.frames.front() {
Some(frame) => {
trace!("sending msg {:?}", frame.msgs);
let mut msgs = frame.msgs.clone().into_buf();
try_ready!(self.io.write_buf(&mut msgs))
}
_ => {
// No pending frames.
return Ok(().into());
}
};
match self.frames.pop_front() {
Some(mut frame) => {
processed += 1;
if n != frame.msgs.len() {
// If only part of the message was sent then
// re-queue the remaining message at the head
// of the queue. (Don't need to resend the
// handles since they've been sent with the
// first part.)
drop(frame.msgs.split_to(n));
self.frames.push_front(frame);
break;
}
}
_ => panic!(),
}
}
trace!("process {} frames", processed);
trace!("pending frames: {:?}", self.frames);
Ok(().into())
}
fn set_frame(&mut self) {
if self.write_buf.is_empty() {
trace!("set_frame: No pending messages...");
return;
}
let msgs = self.write_buf.take().freeze();
trace!("set_frame: msgs={:?}", msgs);
self.frames.push_back(Frame { msgs });
}
}
impl<A, C> Stream for FramedWithPlatformHandles<A, C>
where
A: AsyncRead,
C: Codec,
C::Out: AssocRawPlatformHandle,
{
type Item = C::Out;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// Repeatedly call `decode` or `decode_eof` as long as it is
// "readable". Readable is defined as not having returned `None`. If
// the upstream has returned EOF, and the decoder is no longer
// readable, it can be assumed that the decoder will never become
// readable again, at which point the stream is terminated.
if self.is_readable {
if self.eof {
let item = try!(self.codec.decode_eof(&mut self.read_buf));
return Ok(Some(item).into());
}
trace!("attempting to decode a frame");
if let Some(item) = try!(self.codec.decode(&mut self.read_buf)) {
trace!("frame decoded from buffer");
return Ok(Some(item).into());
}
self.is_readable = false;
}
assert!(!self.eof);
// Otherwise, try to read more data and try again. Make sure we've
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
let n = try_ready!(
self.io
.read_buf(&mut self.read_buf)
);
if n == 0 {
self.eof = true;
}
self.is_readable = true;
}
}
}
impl<A, C> Sink for FramedWithPlatformHandles<A, C>
where
A: AsyncWrite,
C: Codec,
C::In: AssocRawPlatformHandle + fmt::Debug,
{
type SinkItem = C::In;
type SinkError = io::Error;
fn start_send(&mut self, mut item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
trace!("start_send: item={:?}", item);
// If the buffer is already over BACKPRESSURE_THRESHOLD,
// then attempt to flush it. If after flush it's *still*
// over BACKPRESSURE_THRESHOLD, then reject the send.
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
try!(self.poll_complete());
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
return Ok(AsyncSink::NotReady(item));
}
}
let mut got_handles = false;
if let Some((handles, target_pid)) = item.platform_handles() {
got_handles = true;
let remote_handles = unsafe {
[duplicate_platformhandle(handles[0], target_pid)?,
duplicate_platformhandle(handles[1], target_pid)?,
duplicate_platformhandle(handles[2], target_pid)?]
};
trace!("item handles: {:?} remote_handles: {:?}", handles, remote_handles);
item.take_platform_handles(|| Some(remote_handles));
}
try!(self.codec.encode(item, &mut self.write_buf));
if got_handles {
// Enforce splitting sends on messages that contain file
// descriptors.
self.set_frame();
}
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
trace!("flushing framed transport");
try_ready!(self.do_write());
try_nb!(self.io.flush());
trace!("framed transport flushed");
Ok(().into())
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
try_ready!(self.poll_complete());
self.io.shutdown()
}
}
pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformHandles<A, C> {
FramedWithPlatformHandles {
io: io,
codec: codec,
read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
is_readable: false,
eof: false,
frames: VecDeque::new(),
write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
}
}
use winapi::um::{processthreadsapi, winnt, handleapi};
use winapi::shared::minwindef::{DWORD, FALSE};
use super::PlatformHandleType;
// source_handle is effectively taken ownership of (consumed) and
// closed when duplicate_platformhandle is called.
// TODO: Make this transfer more explicit via the type system.
unsafe fn duplicate_platformhandle(source_handle: PlatformHandleType,
target_pid: DWORD) -> Result<PlatformHandleType, std::io::Error> {
let source = processthreadsapi::GetCurrentProcess();
let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE,
FALSE,
target_pid);
if !super::valid_handle(target) {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "invalid target process"));
}
let mut target_handle = std::ptr::null_mut();
let ok = handleapi::DuplicateHandle(source,
source_handle,
target,
&mut target_handle,
0,
FALSE,
winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS);
if ok == FALSE {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "DuplicateHandle failed"));
}
Ok(target_handle)
}

View File

@ -28,16 +28,27 @@ extern crate tokio_core;
#[macro_use]
extern crate tokio_io;
extern crate tokio_uds;
#[cfg(windows)]
extern crate winapi;
mod async;
#[cfg(unix)]
mod cmsg;
pub mod codec;
pub mod core;
#[allow(deprecated)]
pub mod errors;
#[cfg(unix)]
pub mod fd_passing;
#[cfg(unix)]
pub use fd_passing as platformhandle_passing;
#[cfg(windows)]
pub mod handle_passing;
#[cfg(windows)]
pub use handle_passing as platformhandle_passing;
pub mod frame;
pub mod messages;
#[cfg(unix)]
mod msg;
pub mod rpc;
pub mod shm;
@ -46,16 +57,141 @@ pub use messages::{ClientMessage, ServerMessage};
use std::env::temp_dir;
use std::path::PathBuf;
#[cfg(windows)]
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
#[cfg(unix)]
use std::os::unix::io::{FromRawFd, IntoRawFd};
// This must match the definition of
// ipc::FileDescriptor::PlatformHandleType in Gecko.
#[cfg(target_os = "windows")]
pub type PlatformHandleType = *mut std::os::raw::c_void;
#[cfg(not(target_os = "windows"))]
#[cfg(windows)]
pub type PlatformHandleType = std::os::windows::raw::HANDLE;
#[cfg(unix)]
pub type PlatformHandleType = libc::c_int;
// This stands in for RawFd/RawHandle.
#[derive(Copy, Clone, Debug)]
pub struct PlatformHandle(PlatformHandleType);
unsafe impl Send for PlatformHandle {}
// Custom serialization to treat HANDLEs as i64. This is not valid in
// general, but after sending the HANDLE value to a remote process we
// use it to create a valid HANDLE via DuplicateHandle.
// To avoid duplicating the serialization code, we're lazy and treat
// file descriptors as i64 rather than i32.
impl serde::Serialize for PlatformHandle {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_i64(self.0 as i64)
}
}
struct PlatformHandleVisitor;
impl<'de> serde::de::Visitor<'de> for PlatformHandleVisitor {
type Value = PlatformHandle;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("an integer between -2^63 and 2^63")
}
fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(PlatformHandle::new(value as PlatformHandleType))
}
}
impl<'de> serde::Deserialize<'de> for PlatformHandle {
fn deserialize<D>(deserializer: D) -> Result<PlatformHandle, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_i64(PlatformHandleVisitor)
}
}
#[cfg(unix)]
fn valid_handle(handle: PlatformHandleType) -> bool {
handle >= 0
}
#[cfg(windows)]
fn valid_handle(handle: PlatformHandleType) -> bool {
const INVALID_HANDLE_VALUE: PlatformHandleType = -1isize as PlatformHandleType;
const NULL_HANDLE_VALUE: PlatformHandleType = 0isize as PlatformHandleType;
handle != INVALID_HANDLE_VALUE && handle != NULL_HANDLE_VALUE
}
impl PlatformHandle {
pub fn new(raw: PlatformHandleType) -> PlatformHandle {
PlatformHandle(raw)
}
pub fn try_new(raw: PlatformHandleType) -> Option<PlatformHandle> {
if !valid_handle(raw) {
return None;
}
Some(PlatformHandle::new(raw))
}
#[cfg(windows)]
pub fn from<T: IntoRawHandle>(from: T) -> PlatformHandle {
PlatformHandle::new(from.into_raw_handle())
}
#[cfg(unix)]
pub fn from<T: IntoRawFd>(from: T) -> PlatformHandle {
PlatformHandle::new(from.into_raw_fd())
}
#[cfg(windows)]
pub unsafe fn into_file(self) -> std::fs::File {
std::fs::File::from_raw_handle(self.0)
}
#[cfg(unix)]
pub unsafe fn into_file(self) -> std::fs::File {
std::fs::File::from_raw_fd(self.0)
}
pub fn as_raw(&self) -> PlatformHandleType {
self.0
}
pub unsafe fn close(self) {
close_platformhandle(self.0);
}
}
#[cfg(unix)]
unsafe fn close_platformhandle(handle: PlatformHandleType) {
libc::close(handle);
}
#[cfg(windows)]
unsafe fn close_platformhandle(handle: PlatformHandleType) {
winapi::um::handleapi::CloseHandle(handle);
}
pub fn get_shm_path(dir: &str) -> PathBuf {
let pid = unsafe { libc::getpid() };
let pid = std::process::id();
let mut temp = temp_dir();
temp.push(&format!("cubeb-shm-{}-{}", pid, dir));
temp
}
#[cfg(unix)]
pub mod messagestream_unix;
#[cfg(unix)]
pub use messagestream_unix::*;
#[cfg(windows)]
pub mod messagestream_win;
#[cfg(windows)]
pub use messagestream_win::*;
#[cfg(windows)]
mod tokio_named_pipes;

View File

@ -3,10 +3,11 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use PlatformHandle;
use PlatformHandleType;
use cubeb::{self, ffi};
use std::ffi::{CStr, CString};
use std::os::raw::{c_char, c_int, c_uint};
use std::os::unix::io::RawFd;
use std::ptr;
#[derive(Debug, Serialize, Deserialize)]
@ -169,7 +170,8 @@ fn opt_str(v: Option<Vec<u8>>) -> *mut c_char {
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamCreate {
pub token: usize,
pub fds: [RawFd; 3],
pub platform_handles: [PlatformHandle; 3],
pub target_pid: u32,
}
// Client -> Server messages.
@ -177,7 +179,7 @@ pub struct StreamCreate {
// ServerConn::process_msg doesn't have a catch-all case.
#[derive(Debug, Serialize, Deserialize)]
pub enum ServerMessage {
ClientConnect,
ClientConnect(u32),
ClientDisconnect,
ContextGetBackendId,
@ -241,32 +243,40 @@ pub enum CallbackResp {
State,
}
pub trait AssocRawFd {
fn fd(&self) -> Option<[RawFd; 3]> {
pub trait AssocRawPlatformHandle {
fn platform_handles(&self) -> Option<([PlatformHandleType; 3], u32)> {
None
}
fn take_fd<F>(&mut self, _: F)
fn take_platform_handles<F>(&mut self, f: F)
where
F: FnOnce() -> Option<[RawFd; 3]>,
{
F: FnOnce() -> Option<[PlatformHandleType; 3]> {
assert!(f().is_none());
}
}
impl AssocRawFd for ServerMessage {}
impl AssocRawFd for ClientMessage {
fn fd(&self) -> Option<[RawFd; 3]> {
impl AssocRawPlatformHandle for ServerMessage {}
impl AssocRawPlatformHandle for ClientMessage {
fn platform_handles(&self) -> Option<([PlatformHandleType; 3], u32)> {
match *self {
ClientMessage::StreamCreated(ref data) => Some(data.fds),
ClientMessage::StreamCreated(ref data) => Some(([data.platform_handles[0].as_raw(),
data.platform_handles[1].as_raw(),
data.platform_handles[2].as_raw()],
data.target_pid)),
_ => None,
}
}
fn take_fd<F>(&mut self, f: F)
fn take_platform_handles<F>(&mut self, f: F)
where
F: FnOnce() -> Option<[RawFd; 3]>,
F: FnOnce() -> Option<[PlatformHandleType; 3]>,
{
if let ClientMessage::StreamCreated(ref mut data) = *self {
data.fds = f().unwrap();
let handles = f().expect("platform_handles must be available when processing StreamCreated");
data.platform_handles = [PlatformHandle::new(handles[0]),
PlatformHandle::new(handles[1]),
PlatformHandle::new(handles[2])]
}
}
}

View File

@ -0,0 +1,96 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use std::os::unix::io::{IntoRawFd, FromRawFd, AsRawFd, RawFd};
use std::os::unix::net;
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct MessageStream(net::UnixStream);
pub struct AsyncMessageStream(tokio_uds::UnixStream);
impl MessageStream {
fn new(stream: net::UnixStream) -> MessageStream {
MessageStream(stream)
}
pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> {
let pair = net::UnixStream::pair()?;
Ok((MessageStream::new(pair.0), MessageStream::new(pair.1)))
}
pub unsafe fn from_raw_fd(raw: super::PlatformHandleType) -> MessageStream {
MessageStream::new(net::UnixStream::from_raw_fd(raw))
}
pub fn into_tokio_ipc(self, handle: &tokio_core::reactor::Handle) -> std::result::Result<AsyncMessageStream, std::io::Error> {
Ok(AsyncMessageStream::new(tokio_uds::UnixStream::from_stream(self.0, handle)?))
}
}
impl AsyncMessageStream {
fn new(stream: tokio_uds::UnixStream) -> AsyncMessageStream {
AsyncMessageStream(stream)
}
pub fn poll_read(&self) -> futures::Async<()> {
self.0.poll_read()
}
pub fn poll_write(&self) -> futures::Async<()> {
self.0.poll_write()
}
pub fn need_read(&self) {
self.0.need_read()
}
pub fn need_write(&self) {
self.0.need_write()
}
}
impl std::io::Read for AsyncMessageStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.read(buf)
}
}
impl std::io::Write for AsyncMessageStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}
impl AsyncRead for AsyncMessageStream {
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> futures::Poll<usize, std::io::Error> {
<&tokio_uds::UnixStream>::read_buf(&mut &self.0, buf)
}
}
impl AsyncWrite for AsyncMessageStream {
fn shutdown(&mut self) -> futures::Poll<(), std::io::Error> {
<&tokio_uds::UnixStream>::shutdown(&mut &self.0)
}
fn write_buf<B: bytes::Buf>(&mut self, buf: &mut B) -> futures::Poll<usize, std::io::Error> {
<&tokio_uds::UnixStream>::write_buf(&mut &self.0, buf)
}
}
impl AsRawFd for AsyncMessageStream {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
impl IntoRawFd for MessageStream {
fn into_raw_fd(self) -> RawFd {
self.0.into_raw_fd()
}
}

View File

@ -0,0 +1,109 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
extern crate mio_named_pipes;
use std::os::windows::io::{IntoRawHandle, FromRawHandle, AsRawHandle, RawHandle};
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_named_pipes;
#[derive(Debug)]
pub struct MessageStream(mio_named_pipes::NamedPipe);
pub struct AsyncMessageStream(tokio_named_pipes::NamedPipe);
impl MessageStream {
fn new(stream: mio_named_pipes::NamedPipe) -> MessageStream {
MessageStream(stream)
}
pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> {
let pipe1 = mio_named_pipes::NamedPipe::new(get_pipe_name())?;
let pipe2 = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(pipe1.as_raw_handle()) };
Ok((MessageStream::new(pipe1), MessageStream::new(pipe2)))
}
pub unsafe fn from_raw_fd(raw: super::PlatformHandleType) -> MessageStream {
MessageStream::new(mio_named_pipes::NamedPipe::from_raw_handle(raw))
}
pub fn into_tokio_ipc(self, handle: &tokio_core::reactor::Handle) -> std::result::Result<AsyncMessageStream, std::io::Error> {
Ok(AsyncMessageStream::new(tokio_named_pipes::NamedPipe::from_pipe(self.0, handle)?))
}
}
impl AsyncMessageStream {
fn new(stream: tokio_named_pipes::NamedPipe) -> AsyncMessageStream {
AsyncMessageStream(stream)
}
pub fn poll_read(&self) -> futures::Async<()> {
self.0.poll_read()
}
pub fn poll_write(&self) -> futures::Async<()> {
self.0.poll_write()
}
pub fn need_read(&self) {
self.0.need_read()
}
pub fn need_write(&self) {
self.0.need_write()
}
}
impl std::io::Read for AsyncMessageStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.read(buf)
}
}
impl std::io::Write for AsyncMessageStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}
impl AsyncRead for AsyncMessageStream {
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> futures::Poll<usize, std::io::Error> {
<tokio_named_pipes::NamedPipe>::read_buf(&mut self.0, buf)
}
}
impl AsyncWrite for AsyncMessageStream {
fn shutdown(&mut self) -> futures::Poll<(), std::io::Error> {
<tokio_named_pipes::NamedPipe>::shutdown(&mut self.0)
}
fn write_buf<B: bytes::Buf>(&mut self, buf: &mut B) -> futures::Poll<usize, std::io::Error> {
<tokio_named_pipes::NamedPipe>::write_buf(&mut self.0, buf)
}
}
impl AsRawHandle for AsyncMessageStream {
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_handle()
}
}
impl IntoRawHandle for MessageStream {
fn into_raw_handle(self) -> RawHandle {
// XXX: Ideally this would call into_raw_handle.
self.0.as_raw_handle()
}
}
static PIPE_ID: AtomicUsize = ATOMIC_USIZE_INIT;
fn get_pipe_name() -> String {
let pid = std::process::id();
let pipe_id = PIPE_ID.fetch_add(1, Ordering::SeqCst);
format!("\\\\.\\pipe\\cubeb-pipe-{}-{}", pid, pipe_id)
}

View File

@ -0,0 +1,175 @@
// From https://github.com/alexcrichton/tokio-named-pipes/commit/3a22f8fc9a441b548aec25bd5df3b1e0ab99fabe
// License MIT/Apache-2.0
// Sloppily updated to be compatible with tokio_io
// To be replaced with tokio_named_pipes crate after tokio 0.1 update.
#![cfg(windows)]
extern crate bytes;
extern crate tokio_core;
extern crate mio_named_pipes;
extern crate futures;
use std::ffi::OsStr;
use std::fmt;
use std::io::{self, Read, Write};
use std::os::windows::io::*;
use futures::{Async, Poll};
use bytes::{BufMut, Buf};
#[allow(deprecated)]
use tokio_core::io::Io;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::{PollEvented, Handle};
pub struct NamedPipe {
io: PollEvented<mio_named_pipes::NamedPipe>,
}
impl NamedPipe {
pub fn new<P: AsRef<OsStr>>(p: P, handle: &Handle) -> io::Result<NamedPipe> {
NamedPipe::_new(p.as_ref(), handle)
}
fn _new(p: &OsStr, handle: &Handle) -> io::Result<NamedPipe> {
let inner = try!(mio_named_pipes::NamedPipe::new(p));
NamedPipe::from_pipe(inner, handle)
}
pub fn from_pipe(pipe: mio_named_pipes::NamedPipe,
handle: &Handle)
-> io::Result<NamedPipe> {
Ok(NamedPipe {
io: try!(PollEvented::new(pipe, handle)),
})
}
pub fn connect(&self) -> io::Result<()> {
self.io.get_ref().connect()
}
pub fn disconnect(&self) -> io::Result<()> {
self.io.get_ref().disconnect()
}
pub fn need_read(&self) {
self.io.need_read()
}
pub fn need_write(&self) {
self.io.need_write()
}
pub fn poll_read(&self) -> Async<()> {
self.io.poll_read()
}
pub fn poll_write(&self) -> Async<()> {
self.io.poll_write()
}
}
impl Read for NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
}
}
impl Write for NamedPipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
#[allow(deprecated)]
impl Io for NamedPipe {
fn poll_read(&mut self) -> Async<()> {
<NamedPipe>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<NamedPipe>::poll_write(self)
}
}
impl AsyncRead for NamedPipe {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if NamedPipe::poll_read(self).is_not_ready() {
return Ok(Async::NotReady)
}
let mut stack_buf = [0u8; 1024];
let bytes_read = self.io.read(&mut stack_buf);
match bytes_read {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io.need_read();
return Ok(Async::NotReady);
},
Err(e) => Err(e),
Ok(bytes_read) => {
buf.put_slice(&stack_buf[0..bytes_read]);
Ok(Async::Ready(bytes_read))
}
}
}
}
impl AsyncWrite for NamedPipe {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if NamedPipe::poll_write(self).is_not_ready() {
return Ok(Async::NotReady)
}
let bytes_wrt = self.io.write(buf.bytes());
match bytes_wrt {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io.need_write();
return Ok(Async::NotReady);
},
Err(e) => Err(e),
Ok(bytes_wrt) => {
buf.advance(bytes_wrt);
Ok(Async::Ready(bytes_wrt))
}
}
}
}
impl<'a> Read for &'a NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.io).read(buf)
}
}
impl<'a> Write for &'a NamedPipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&self.io).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
(&self.io).flush()
}
}
impl fmt::Debug for NamedPipe {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl AsRawHandle for NamedPipe {
fn as_raw_handle(&self) -> RawHandle {
self.io.get_ref().as_raw_handle()
}
}

View File

@ -9,7 +9,7 @@ description = "Cubeb Backend for talking to remote cubeb server."
[dependencies]
audioipc = { path="../audioipc" }
cubeb-backend = "0.5"
cubeb-backend = "0.5.4"
foreign-types = "0.3"
futures = { version="0.1.18", default-features=false, features=["use_std"] }
futures-cpupool = { version="0.1.8", default-features=false }

View File

@ -5,7 +5,7 @@
use assert_not_in_callback;
use audioipc::codec::LengthDelimitedCodec;
use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles};
use audioipc::{core, rpc};
use audioipc::{messages, ClientMessage, ServerMessage};
use cubeb_backend::{
@ -14,17 +14,13 @@ use cubeb_backend::{
};
use futures::Future;
use futures_cpupool::{self, CpuPool};
use libc;
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::os::unix::io::FromRawFd;
use std::os::unix::net;
use std::sync::mpsc;
use std::thread;
use std::{fmt, io, mem, ptr};
use stream;
use tokio_core::reactor::{Handle, Remote};
use tokio_uds::UnixStream;
use {ClientStream, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
struct CubebClient;
@ -32,7 +28,7 @@ struct CubebClient;
impl rpc::Client for CubebClient {
type Request = ServerMessage;
type Response = ClientMessage;
type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
type Transport = FramedWithPlatformHandles<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
macro_rules! t(
@ -73,10 +69,10 @@ impl ClientContext {
}
// TODO: encapsulate connect, etc inside audioipc.
fn open_server_stream() -> Result<net::UnixStream> {
fn open_server_stream() -> Result<audioipc::MessageStream> {
unsafe {
if let Some(fd) = G_SERVER_FD {
return Ok(net::UnixStream::from_raw_fd(fd));
return Ok(audioipc::MessageStream::from_raw_fd(fd.as_raw()));
}
Err(Error::default())
@ -86,11 +82,11 @@ fn open_server_stream() -> Result<net::UnixStream> {
impl ContextOps for ClientContext {
fn init(_context_name: Option<&CStr>) -> Result<Context> {
fn bind_and_send_client(
stream: UnixStream,
stream: audioipc::AsyncMessageStream,
handle: &Handle,
tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>,
) -> Option<()> {
let transport = framed_with_fds(stream, Default::default());
let transport = framed_with_platformhandles(stream, Default::default());
let rpc = rpc::bind_client::<CubebClient>(transport, handle);
// If send fails then the rx end has closed
// which is unlikely here.
@ -121,7 +117,7 @@ impl ContextOps for ClientContext {
open_server_stream()
.ok()
.and_then(|stream| UnixStream::from_stream(stream, &handle).ok())
.and_then(|stream| stream.into_tokio_ipc(&handle).ok())
.and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc))
.ok_or_else(|| {
io::Error::new(
@ -140,6 +136,8 @@ impl ContextOps for ClientContext {
.stack_size(params.stack_size)
.create();
send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected)?;
let ctx = Box::new(ClientContext {
_ops: &CLIENT_OPS as *const _,
rpc: rpc,
@ -280,7 +278,7 @@ impl Drop for ClientContext {
let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
unsafe {
if G_SERVER_FD.is_some() {
libc::close(super::G_SERVER_FD.take().unwrap());
G_SERVER_FD.take().unwrap().close();
}
}
}

View File

@ -20,11 +20,10 @@ mod send_recv;
mod context;
mod stream;
use audioipc::PlatformHandleType;
use audioipc::{PlatformHandleType, PlatformHandle};
use context::ClientContext;
use cubeb_backend::{capi, ffi};
use std::os::raw::{c_char, c_int};
use std::os::unix::io::RawFd;
use stream::ClientStream;
type InitParamsTls = std::cell::RefCell<Option<CpuPoolInitParams>>;
@ -83,7 +82,7 @@ where
});
}
static mut G_SERVER_FD: Option<RawFd> = None;
static mut G_SERVER_FD: Option<PlatformHandle> = None;
#[no_mangle]
/// Entry point from C code.
@ -98,13 +97,13 @@ pub unsafe extern "C" fn audioipc_client_init(
let init_params = &*init_params;
// TODO: Windows portability (for fd).
// TODO: Better way to pass extra parameters to Context impl.
if G_SERVER_FD.is_some() {
panic!("audioipc client's server connection already initialized.");
return cubeb_backend::ffi::CUBEB_ERROR;
}
if init_params.server_connection >= 0 {
G_SERVER_FD = Some(init_params.server_connection);
G_SERVER_FD = PlatformHandle::try_new(init_params.server_connection);
if G_SERVER_FD.is_none() {
return cubeb_backend::ffi::CUBEB_ERROR;
}
let cpupool_init_params = CpuPoolInitParams::init_with(&init_params);

View File

@ -12,13 +12,9 @@ use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
use futures::Future;
use futures_cpupool::{CpuFuture, CpuPool};
use std::ffi::CString;
use std::fs::File;
use std::os::raw::c_void;
use std::os::unix::io::FromRawFd;
use std::os::unix::net;
use std::ptr;
use std::sync::mpsc;
use tokio_uds::UnixStream;
use ClientContext;
use {assert_not_in_callback, set_in_callback};
@ -65,7 +61,7 @@ impl rpc::Server for CallbackServer {
type Request = CallbackReq;
type Response = CallbackResp;
type Future = CpuFuture<Self::Response, ()>;
type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
type Transport = Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
fn process(&mut self, req: Self::Request) -> Self::Future {
match req {
@ -155,21 +151,21 @@ impl<'ctx> ClientStream<'ctx> {
let rpc = ctx.rpc();
let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated()));
debug!("token = {}, fds = {:?}", data.token, data.fds);
debug!("token = {}, handles = {:?}", data.token, data.platform_handles);
let stm = data.fds[0];
let stream = unsafe { net::UnixStream::from_raw_fd(stm) };
let stm = data.platform_handles[0];
let stream = unsafe { audioipc::MessageStream::from_raw_fd(stm.as_raw()) };
let input = data.fds[1];
let input_file = unsafe { File::from_raw_fd(input) };
let input = data.platform_handles[1];
let input_file = unsafe { input.into_file() };
let input_shm = if has_input {
Some(SharedMemSlice::from(&input_file, SHM_AREA_SIZE).unwrap())
} else {
None
};
let output = data.fds[2];
let output_file = unsafe { File::from_raw_fd(output) };
let output = data.platform_handles[2];
let output_file = unsafe { output.into_file() };
let output_shm = if has_output {
Some(SharedMemMutSlice::from(&output_file, SHM_AREA_SIZE).unwrap())
} else {
@ -191,7 +187,7 @@ impl<'ctx> ClientStream<'ctx> {
let (wait_tx, wait_rx) = mpsc::channel();
ctx.remote().spawn(move |handle| {
let stream = UnixStream::from_stream(stream, handle).unwrap();
let stream = stream.into_tokio_ipc(handle).unwrap();
let transport = framed(stream, Default::default());
rpc::bind_server(transport, server, handle);
wait_tx.send(()).unwrap();

View File

@ -1,22 +0,0 @@
diff --git a/media/audioipc/client/src/context.rs b/media/audioipc/client/src/context.rs
--- a/media/audioipc/client/src/context.rs
+++ b/media/audioipc/client/src/context.rs
@@ -265,17 +265,17 @@ impl ContextOps for ClientContext {
fn register_device_collection_changed(
&mut self,
_dev_type: DeviceType,
_collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
_user_ptr: *mut c_void,
) -> Result<()> {
assert_not_in_callback();
- Ok(())
+ Err(Error::not_supported())
}
}
impl Drop for ClientContext {
fn drop(&mut self) {
debug!("ClientContext dropped...");
let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
unsafe {

View File

@ -9,7 +9,7 @@ description = "Remote cubeb server"
[dependencies]
audioipc = { path = "../audioipc" }
cubeb-core = "0.5.1"
cubeb-core = "0.5.4"
bytes = "0.4"
lazycell = "^0.4"
libc = "0.2"

View File

@ -20,17 +20,14 @@ extern crate tokio_core;
extern crate tokio_uds;
use audioipc::core;
use audioipc::fd_passing::framed_with_fds;
use audioipc::platformhandle_passing::framed_with_platformhandles;
use audioipc::rpc;
use audioipc::PlatformHandleType;
use audioipc::{MessageStream, PlatformHandle, PlatformHandleType};
use futures::sync::oneshot;
use futures::Future;
use std::error::Error;
use std::os::raw::c_void;
use std::os::unix::io::IntoRawFd;
use std::os::unix::net;
use std::ptr;
use tokio_uds::UnixStream;
mod server;
@ -102,18 +99,18 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy
let cb_remote = wrapper.callback_thread.remote();
// We create a pair of connected unix domain sockets. One socket is
// registered with the reactor core, the other is returned to the
// caller.
net::UnixStream::pair()
// We create a connected pair of anonymous IPC endpoints. One side
// is registered with the reactor core, the other side is returned
// to the caller.
MessageStream::anonymous_ipc_pair()
.and_then(|(sock1, sock2)| {
// Spawn closure to run on same thread as reactor::Core
// via remote handle.
wrapper.core_thread.remote().spawn(|handle| {
trace!("Incoming connection");
UnixStream::from_stream(sock2, handle)
sock2.into_tokio_ipc(handle)
.and_then(|sock| {
let transport = framed_with_fds(sock, Default::default());
let transport = framed_with_platformhandles(sock, Default::default());
rpc::bind_server(transport, server::CubebServer::new(cb_remote), handle);
Ok(())
}).map_err(|_| ())
@ -123,8 +120,8 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy
// Wait for notification that sock2 has been registered
// with reactor::Core.
let _ = wait_rx.wait();
Ok(sock1.into_raw_fd())
}).unwrap_or(-1)
Ok(PlatformHandle::from(sock1).as_raw())
}).unwrap_or(-1isize as PlatformHandleType)
}
#[no_mangle]

View File

@ -4,9 +4,10 @@
// accompanying file LICENSE for details
use audioipc;
use audioipc::{MessageStream, PlatformHandle};
use audioipc::codec::LengthDelimitedCodec;
use audioipc::core;
use audioipc::fd_passing::FramedWithFds;
use audioipc::platformhandle_passing::FramedWithPlatformHandles;
use audioipc::frame::{framed, Framed};
use audioipc::messages::{
CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo, ServerMessage, StreamCreate,
@ -25,11 +26,8 @@ use std::convert::From;
use std::ffi::{CStr, CString};
use std::mem::{size_of, ManuallyDrop};
use std::os::raw::{c_long, c_void};
use std::os::unix::io::IntoRawFd;
use std::os::unix::net;
use std::{panic, slice};
use tokio_core::reactor::Remote;
use tokio_uds::UnixStream;
use errors::*;
@ -65,7 +63,7 @@ struct CallbackClient;
impl rpc::Client for CallbackClient {
type Request = CallbackReq;
type Response = CallbackResp;
type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
type Transport = Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
struct ServerStreamCallbacks {
@ -142,13 +140,14 @@ type StreamSlab = slab::Slab<ServerStream, usize>;
pub struct CubebServer {
cb_remote: Remote,
streams: StreamSlab,
remote_pid: Option<u32>,
}
impl rpc::Server for CubebServer {
type Request = ServerMessage;
type Response = ClientMessage;
type Future = FutureResult<Self::Response, ()>;
type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
type Transport = FramedWithPlatformHandles<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
fn process(&mut self, req: Self::Request) -> Self::Future {
let resp = with_local_context(|context| match *context {
@ -164,13 +163,17 @@ impl CubebServer {
CubebServer {
cb_remote: cb_remote,
streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
remote_pid: None,
}
}
// Process a request coming from the client.
fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
let resp: ClientMessage = match *msg {
ServerMessage::ClientConnect => panic!("already connected"),
ServerMessage::ClientConnect(pid) => {
self.remote_pid = Some(pid);
ClientMessage::ClientConnected
}
ServerMessage::ClientDisconnect => {
// TODO:
@ -304,7 +307,7 @@ impl CubebServer {
let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
let (stm1, stm2) = net::UnixStream::pair()?;
let (stm1, stm2) = MessageStream::anonymous_ipc_pair()?;
debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
let (input_shm, input_file) =
SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
@ -323,7 +326,7 @@ impl CubebServer {
// Ensure we're running on a loop different to the one
// invoking spawn_fn.
assert_ne!(id, handle.id());
let stream = UnixStream::from_stream(stm2, handle).unwrap();
let stream = stm2.into_tokio_ipc(handle).unwrap();
let transport = framed(stream, Default::default());
let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
drop(tx.send(rpc));
@ -402,11 +405,12 @@ impl CubebServer {
Ok(ClientMessage::StreamCreated(StreamCreate {
token: stm_tok,
fds: [
stm1.into_raw_fd(),
input_file.into_raw_fd(),
output_file.into_raw_fd(),
platform_handles: [
PlatformHandle::from(stm1),
PlatformHandle::from(input_file),
PlatformHandle::from(output_file),
],
target_pid: self.remote_pid.unwrap()
}))
}).map_err(|e| e.into())
}

View File

@ -11,6 +11,8 @@ for crate in audioipc client server; do
cp -pr $1/$crate/src/ $crate/src/
done
rm audioipc/src/cmsghdr.c
if [ -d $1/.git ]; then
rev=$(cd $1 && git rev-parse --verify HEAD)
date=$(cd $1 && git show -s --format=%ci HEAD)
@ -32,5 +34,3 @@ fi
echo "Applying gecko.patch on top of $rev"
patch -p3 < gecko.patch
echo "Applying register-collection-not-supported.patch on top of $rev"
patch -p3 < register-collection-not-supported.patch