Bug 869869 - Part 3, add an IPC implementation of NrSocket, r=ekr.

This commit is contained in:
Shih-Chiang Chien 2013-06-19 19:08:06 +08:00
parent 4d0b41056b
commit 32ecc31c85
2 changed files with 662 additions and 73 deletions

View File

@ -102,7 +102,9 @@ nrappkit copyright:
#include "nsNetCID.h"
#include "nsISupportsImpl.h"
#include "nsServiceManagerUtils.h"
#include "nsComponentManagerUtils.h"
#include "nsXPCOM.h"
#include "nsXULAppAPI.h"
#include "runnable_utils.h"
extern "C" {
@ -116,6 +118,63 @@ extern "C" {
// Implement the nsISupports ref counting
namespace mozilla {
// NrSocketBase implementation
// async_event APIs
int NrSocketBase::async_wait(int how, NR_async_cb cb, void *cb_arg,
char *function, int line) {
uint16_t flag;
switch (how) {
case NR_ASYNC_WAIT_READ:
flag = PR_POLL_READ;
break;
case NR_ASYNC_WAIT_WRITE:
flag = PR_POLL_WRITE;
break;
default:
return R_BAD_ARGS;
}
cbs_[how] = cb;
cb_args_[how] = cb_arg;
poll_flags_ |= flag;
return 0;
}
int NrSocketBase::cancel(int how) {
uint16_t flag;
switch (how) {
case NR_ASYNC_WAIT_READ:
flag = PR_POLL_READ;
break;
case NR_ASYNC_WAIT_WRITE:
flag = PR_POLL_WRITE;
break;
default:
return R_BAD_ARGS;
}
poll_flags_ &= ~flag;
return 0;
}
void NrSocketBase::fire_callback(int how) {
// This can't happen unless we are armed because we only set
// the flags if we are armed
MOZ_ASSERT(cbs_[how]);
// Now cancel so that we need to be re-armed. Note that
// the re-arming probably happens in the callback we are
// about to fire.
cancel(how);
cbs_[how](this, how, cb_args_[how]);
}
// NrSocket implementation
NS_IMPL_ISUPPORTS0(NrSocket)
@ -139,56 +198,23 @@ void NrSocket::IsLocal(bool *aIsLocal) {
// async_event APIs
int NrSocket::async_wait(int how, NR_async_cb cb, void *cb_arg,
char *function, int line) {
uint16_t flag;
int r = NrSocketBase::async_wait(how, cb, cb_arg, function, line);
switch (how) {
case NR_ASYNC_WAIT_READ:
flag = PR_POLL_READ;
break;
case NR_ASYNC_WAIT_WRITE:
flag = PR_POLL_WRITE;
break;
default:
return R_BAD_ARGS;
if (!r) {
mPollFlags = poll_flags();
}
cbs_[how] = cb;
cb_args_[how] = cb_arg;
mPollFlags |= flag;
return 0;
return r;
}
int NrSocket::cancel(int how) {
uint16_t flag;
int r = NrSocketBase::cancel(how);
switch (how) {
case NR_ASYNC_WAIT_READ:
flag = PR_POLL_READ;
break;
case NR_ASYNC_WAIT_WRITE:
flag = PR_POLL_WRITE;
break;
default:
return R_BAD_ARGS;
if (!r) {
mPollFlags = poll_flags();
}
mPollFlags &= ~flag;
return 0;
}
void NrSocket::fire_callback(int how) {
// This can't happen unless we are armed because we only set
// the flags if we are armed
MOZ_ASSERT(cbs_[how]);
// Now cancel so that we need to be re-armed. Note that
// the re-arming probably happens in the callback we are
// about to fire.
cancel(how);
cbs_[how](this, how, cb_args_[how]);
return r;
}
// Helper functions for addresses
@ -240,6 +266,54 @@ static int nr_transport_addr_to_praddr(nr_transport_addr *addr,
return(_status);
}
//XXX schien@mozilla.com: copy from PRNetAddrToNetAddr,
// should be removed after fix the link error in signaling_unittests
static int praddr_to_netaddr(const PRNetAddr *prAddr, net::NetAddr *addr)
{
int _status;
switch (prAddr->raw.family) {
case PR_AF_INET:
addr->inet.family = AF_INET;
addr->inet.port = prAddr->inet.port;
addr->inet.ip = prAddr->inet.ip;
break;
case PR_AF_INET6:
addr->inet6.family = AF_INET6;
addr->inet6.port = prAddr->ipv6.port;
addr->inet6.flowinfo = prAddr->ipv6.flowinfo;
memcpy(&addr->inet6.ip, &prAddr->ipv6.ip, sizeof(addr->inet6.ip.u8));
addr->inet6.scope_id = prAddr->ipv6.scope_id;
break;
default:
MOZ_ASSERT(false);
ABORT(R_BAD_ARGS);
}
_status = 0;
abort:
return(_status);
}
static int nr_transport_addr_to_netaddr(nr_transport_addr *addr,
net::NetAddr *naddr)
{
int r, _status;
PRNetAddr praddr;
if((r = nr_transport_addr_to_praddr(addr, &praddr))) {
ABORT(r);
}
if((r = praddr_to_netaddr(&praddr, naddr))) {
ABORT(r);
}
_status = 0;
abort:
return(_status);
}
int nr_netaddr_to_transport_addr(const net::NetAddr *netaddr,
nr_transport_addr *addr)
{
@ -299,6 +373,32 @@ int nr_praddr_to_transport_addr(const PRNetAddr *praddr,
return(_status);
}
/*
* nr_transport_addr_get_addrstring_and_port
* convert nr_transport_addr to IP address string and port number
*/
int nr_transport_addr_get_addrstring_and_port(nr_transport_addr *addr,
nsACString *host, int32_t *port) {
int r, _status;
char addr_string[64];
// We cannot directly use |nr_transport_addr.as_string| because it contains
// more than ip address, therefore, we need to explicity convert it
// from |nr_transport_addr_get_addrstring|.
if ((r=nr_transport_addr_get_addrstring(addr, addr_string, sizeof(addr_string)))) {
ABORT(r);
}
if ((r=nr_transport_addr_get_port(addr, port))) {
ABORT(r);
}
*host = addr_string;
_status=0;
abort:
return(_status);
}
// nr_socket APIs (as member functions)
int NrSocket::create(nr_transport_addr *addr) {
@ -394,7 +494,7 @@ int NrSocket::sendto(const void *msg, size_t len,
if (PR_GetError() == PR_WOULD_BLOCK_ERROR)
ABORT(R_WOULDBLOCK);
r_log_e(LOG_GENERIC, LOG_INFO, "Error in sendto %s", to->as_string);
r_log(LOG_GENERIC, LOG_INFO, "Error in sendto %s", to->as_string);
ABORT(R_IO_ERROR);
}
@ -413,7 +513,7 @@ int NrSocket::recvfrom(void * buf, size_t maxlen,
status = PR_RecvFrom(fd_, buf, maxlen, flags, &nfrom, PR_INTERVAL_NO_WAIT);
if (status <= 0) {
r_log_e(LOG_GENERIC,LOG_ERR,"Error in recvfrom");
r_log(LOG_GENERIC,LOG_ERR,"Error in recvfrom");
ABORT(R_IO_ERROR);
}
*len=status;
@ -438,6 +538,383 @@ void NrSocket::close() {
ASSERT_ON_THREAD(ststhread_);
mCondition = NS_BASE_STREAM_CLOSED;
}
// NrSocketIpc Implementation
NS_IMPL_ISUPPORTS1(NrSocketIpc, nsIUDPSocketInternal)
NrSocketIpc::NrSocketIpc(const nsCOMPtr<nsIEventTarget> &main_thread)
: err_(false),
state_(NR_INIT),
main_thread_(main_thread),
monitor_("NrSocketIpc") {
}
// IUDPSocketInternal interfaces
// callback while error happened in UDP socket operation
NS_IMETHODIMP NrSocketIpc::CallListenerError(const nsACString &type,
const nsACString &message,
const nsACString &filename,
uint32_t line_number,
uint32_t column_number) {
ASSERT_ON_THREAD(main_thread_);
MOZ_ASSERT(type.EqualsLiteral("onerror"));
r_log(LOG_GENERIC, LOG_ERR, "UDP socket error:%s at %s:%d:%d",
message.BeginReading(), filename.BeginReading(),
line_number, column_number);
ReentrantMonitorAutoEnter mon(monitor_);
err_ = true;
monitor_.NotifyAll();
return NS_OK;
}
// callback while receiving UDP packet
NS_IMETHODIMP NrSocketIpc::CallListenerReceivedData(const nsACString &type,
const nsACString &host,
uint16_t port, uint8_t *data,
uint32_t data_length) {
ASSERT_ON_THREAD(main_thread_);
MOZ_ASSERT(type.EqualsLiteral("ondata"));
PRNetAddr addr;
memset(&addr, 0, sizeof(addr));
{
ReentrantMonitorAutoEnter mon(monitor_);
if (PR_SUCCESS != PR_StringToNetAddr(host.BeginReading(), &addr)) {
err_ = true;
MOZ_ASSERT(false, "Failed to convert remote host to PRNetAddr");
return NS_OK;
}
// Use PR_IpAddrNull to avoid address being reset to 0.
if (PR_SUCCESS != PR_SetNetAddr(PR_IpAddrNull, addr.raw.family, port, &addr)) {
err_ = true;
MOZ_ASSERT(false, "Failed to set port in PRNetAddr");
return NS_OK;
}
}
nsAutoPtr<DataBuffer> buf(new DataBuffer(data, data_length));
RefPtr<nr_udp_message> msg(new nr_udp_message(addr, buf));
RUN_ON_THREAD(sts_thread_,
mozilla::WrapRunnable(nsRefPtr<NrSocketIpc>(this),
&NrSocketIpc::recv_callback_s,
msg),
NS_DISPATCH_NORMAL);
return NS_OK;
}
// callback while UDP socket is opened or closed
NS_IMETHODIMP NrSocketIpc::CallListenerVoid(const nsACString &type) {
ASSERT_ON_THREAD(main_thread_);
if (type.EqualsLiteral("onopen")) {
ReentrantMonitorAutoEnter mon(monitor_);
uint16_t port;
if (NS_FAILED(socket_child_->GetLocalPort(&port))) {
err_ = true;
MOZ_ASSERT(false, "Failed to get local port");
return NS_OK;
}
nsAutoCString address;
if(NS_FAILED(socket_child_->GetLocalAddress(address))) {
err_ = true;
MOZ_ASSERT(false, "Failed to get local address");
return NS_OK;
}
PRNetAddr praddr;
if (PR_SUCCESS != PR_InitializeNetAddr(PR_IpAddrAny, port, &praddr)) {
err_ = true;
MOZ_ASSERT(false, "Failed to set port in PRNetAddr");
return NS_OK;
}
if (PR_SUCCESS != PR_StringToNetAddr(address.BeginReading(), &praddr)) {
err_ = true;
MOZ_ASSERT(false, "Failed to convert local host to PRNetAddr");
return NS_OK;
}
nr_transport_addr expected_addr;
if(nr_transport_addr_copy(&expected_addr, &my_addr_)) {
err_ = true;
MOZ_ASSERT(false, "Failed to copy my_addr_");
}
if (nr_praddr_to_transport_addr(&praddr, &my_addr_, 1)) {
err_ = true;
MOZ_ASSERT(false, "Failed to copy local host to my_addr_");
}
if (nr_transport_addr_cmp(&expected_addr, &my_addr_,
NR_TRANSPORT_ADDR_CMP_MODE_ADDR)) {
err_ = true;
MOZ_ASSERT(false, "Address of opened socket is not expected");
}
mon.NotifyAll();
} else if (type.EqualsLiteral("onclose")) {
// Already handled in UpdateReadyState, nothing to do here
} else {
MOZ_ASSERT(false, "Received unexpected event");
}
return NS_OK;
}
// callback while UDP packet is sent
NS_IMETHODIMP NrSocketIpc::CallListenerSent(const nsACString &type,
nsresult result) {
ASSERT_ON_THREAD(main_thread_);
MOZ_ASSERT(type.EqualsLiteral("onsent"));
if (NS_FAILED(result)) {
ReentrantMonitorAutoEnter mon(monitor_);
err_ = true;
}
return NS_OK;
}
// callback for state update after every socket operation
NS_IMETHODIMP NrSocketIpc::UpdateReadyState(const nsACString &readyState) {
ASSERT_ON_THREAD(main_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
if (readyState.EqualsLiteral("closed")) {
MOZ_ASSERT(state_ == NR_CONNECTED || state_ == NR_CLOSING);
state_ = NR_CLOSED;
}
return NS_OK;
}
// nr_socket public APIs
int NrSocketIpc::create(nr_transport_addr *addr) {
ASSERT_ON_THREAD(sts_thread_);
int r, _status;
nsresult rv;
int32_t port;
nsCString host;
ReentrantMonitorAutoEnter mon(monitor_);
if (state_ != NR_INIT) {
ABORT(R_INTERNAL);
}
sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
if (NS_FAILED(rv)) {
MOZ_ASSERT(false, "Failed to get STS thread");
ABORT(R_INTERNAL);
}
if ((r=nr_transport_addr_get_addrstring_and_port(addr, &host, &port))) {
ABORT(r);
}
// wildcard address will be resolved at NrSocketIpc::CallListenerVoid
if ((r=nr_transport_addr_copy(&my_addr_, addr))) {
ABORT(r);
}
state_ = NR_CONNECTING;
RUN_ON_THREAD(main_thread_,
mozilla::WrapRunnable(nsRefPtr<NrSocketIpc>(this),
&NrSocketIpc::create_m,
host, static_cast<uint16_t>(port)),
NS_DISPATCH_NORMAL);
// Wait until socket creation complete.
mon.Wait();
if (err_) {
ABORT(R_INTERNAL);
}
state_ = NR_CONNECTED;
_status = 0;
abort:
return(_status);
}
int NrSocketIpc::sendto(const void *msg, size_t len, int flags,
nr_transport_addr *to) {
ASSERT_ON_THREAD(sts_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
//If send err happened before, simply return the error.
if (err_) {
return R_IO_ERROR;
}
if (!socket_child_) {
return R_EOD;
}
if (state_ != NR_CONNECTED) {
return R_INTERNAL;
}
int r;
net::NetAddr addr;
if ((r=nr_transport_addr_to_netaddr(to, &addr))) {
return r;
}
nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t*>(msg), len));
RUN_ON_THREAD(main_thread_,
mozilla::WrapRunnable(nsRefPtr<NrSocketIpc>(this),
&NrSocketIpc::sendto_m,
addr, buf),
NS_DISPATCH_NORMAL);
return 0;
}
void NrSocketIpc::close() {
ASSERT_ON_THREAD(sts_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
state_ = NR_CLOSING;
RUN_ON_THREAD(main_thread_,
mozilla::WrapRunnable(nsRefPtr<NrSocketIpc>(this),
&NrSocketIpc::close_m),
NS_DISPATCH_NORMAL);
//remove all enqueued messages
std::queue<RefPtr<nr_udp_message> > empty;
std::swap(received_msgs_, empty);
}
int NrSocketIpc::recvfrom(void *buf, size_t maxlen, size_t *len, int flags,
nr_transport_addr *from) {
ASSERT_ON_THREAD(sts_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
int r, _status;
uint32_t consumed_len;
*len = 0;
if (state_ != NR_CONNECTED) {
ABORT(R_INTERNAL);
}
if (received_msgs_.empty()) {
ABORT(R_WOULDBLOCK);
}
{
RefPtr<nr_udp_message> msg(received_msgs_.front());
received_msgs_.pop();
if ((r=nr_praddr_to_transport_addr(&msg->from, from, 0))) {
err_ = true;
MOZ_ASSERT(false, "Get bogus address for received UDP packet");
ABORT(r);
}
consumed_len = std::min(maxlen, msg->data->len());
if (consumed_len < msg->data->len()) {
r_log(LOG_GENERIC, LOG_DEBUG, "Partial received UDP packet will be discard");
}
memcpy(buf, msg->data->data(), consumed_len);
*len = consumed_len;
}
_status = 0;
abort:
return(_status);
}
int NrSocketIpc::getaddr(nr_transport_addr *addrp) {
ASSERT_ON_THREAD(sts_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
if (state_ != NR_CONNECTED) {
return R_INTERNAL;
}
return nr_transport_addr_copy(addrp, &my_addr_);
}
// Main thread executors
void NrSocketIpc::create_m(const nsACString &host, const uint16_t port) {
ASSERT_ON_THREAD(main_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
nsresult rv;
socket_child_ = do_CreateInstance("@mozilla.org/udp-socket-child;1", &rv);
if (NS_FAILED(rv)) {
err_ = true;
MOZ_ASSERT(false, "Failed to create UDPSocketChild");
}
if (NS_FAILED(socket_child_->Bind(this, host, port))) {
err_ = true;
MOZ_ASSERT(false, "Failed to create UDP socket");
}
}
void NrSocketIpc::sendto_m(const net::NetAddr &addr, nsAutoPtr<DataBuffer> buf) {
ASSERT_ON_THREAD(main_thread_);
MOZ_ASSERT(socket_child_);
ReentrantMonitorAutoEnter mon(monitor_);
if (NS_FAILED(socket_child_->SendWithAddress(&addr,
buf->data(),
buf->len()))) {
err_ = true;
}
}
void NrSocketIpc::close_m() {
ASSERT_ON_THREAD(main_thread_);
if (socket_child_) {
socket_child_->Close();
socket_child_ = nullptr;
}
}
void NrSocketIpc::recv_callback_s(RefPtr<nr_udp_message> msg) {
ASSERT_ON_THREAD(sts_thread_);
{
ReentrantMonitorAutoEnter mon(monitor_);
if (state_ != NR_CONNECTED) {
return;
}
}
//enqueue received message
received_msgs_.push(msg);
if ((poll_flags() & PR_POLL_READ)) {
fire_callback(NR_ASYNC_WAIT_READ);
}
}
} // close namespace
@ -462,9 +939,18 @@ static nr_socket_vtbl nr_socket_local_vtbl={
nr_socket_local_close
};
int nr_socket_local_create(nr_transport_addr *addr, nr_socket **sockp) {
NrSocket * sock = new NrSocket();
NrSocketBase *sock = nullptr;
// create IPC bridge for content process
if (XRE_GetProcessType() == GeckoProcessType_Default) {
sock = new NrSocket();
} else {
nsCOMPtr<nsIThread> main_thread;
NS_GetMainThread(getter_AddRefs(main_thread));
sock = new NrSocketIpc(main_thread.get());
}
int r, _status;
r = sock->create(addr);
@ -492,7 +978,7 @@ static int nr_socket_local_destroy(void **objp) {
if(!objp || !*objp)
return 0;
NrSocket *sock = static_cast<NrSocket *>(*objp);
NrSocketBase *sock = static_cast<NrSocketBase *>(*objp);
*objp=0;
sock->close(); // Signal STS that we want not to listen
@ -503,7 +989,7 @@ static int nr_socket_local_destroy(void **objp) {
static int nr_socket_local_sendto(void *obj,const void *msg, size_t len,
int flags, nr_transport_addr *addr) {
NrSocket *sock = static_cast<NrSocket *>(obj);
NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
return sock->sendto(msg, len, flags, addr);
}
@ -511,13 +997,13 @@ static int nr_socket_local_sendto(void *obj,const void *msg, size_t len,
static int nr_socket_local_recvfrom(void *obj,void * restrict buf,
size_t maxlen, size_t *len, int flags,
nr_transport_addr *addr) {
NrSocket *sock = static_cast<NrSocket *>(obj);
NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
return sock->recvfrom(buf, maxlen, len, flags, addr);
}
static int nr_socket_local_getfd(void *obj, NR_SOCKET *fd) {
NrSocket *sock = static_cast<NrSocket *>(obj);
NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
*fd = sock;
@ -525,14 +1011,14 @@ static int nr_socket_local_getfd(void *obj, NR_SOCKET *fd) {
}
static int nr_socket_local_getaddr(void *obj, nr_transport_addr *addrp) {
NrSocket *sock = static_cast<NrSocket *>(obj);
NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
return sock->getaddr(addrp);
}
static int nr_socket_local_close(void *obj) {
NrSocket *sock = static_cast<NrSocket *>(obj);
NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
sock->close();
@ -542,13 +1028,13 @@ static int nr_socket_local_close(void *obj) {
// Implement async api
int NR_async_wait(NR_SOCKET sock, int how, NR_async_cb cb,void *cb_arg,
char *function,int line) {
NrSocket *s = static_cast<NrSocket *>(sock);
NrSocketBase *s = static_cast<NrSocketBase *>(sock);
return s->async_wait(how, cb, cb_arg, function, line);
}
int NR_async_cancel(NR_SOCKET sock,int how) {
NrSocket *s = static_cast<NrSocket *>(sock);
NrSocketBase *s = static_cast<NrSocketBase *>(sock);
return s->cancel(how);
}

View File

@ -46,18 +46,23 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef nr_socket_prsock__
#define nr_socket_prsock__
#include <vector>
#include <queue>
#include "nspr.h"
#include "prio.h"
#include "nsAutoPtr.h"
#include "nsCOMPtr.h"
#include "nsASocketHandler.h"
#include "nsISocketTransportService.h"
#include "nsXPCOM.h"
#include "nsIEventTarget.h"
#include "nsIUDPSocketChild.h"
#include "databuffer.h"
#include "m_cpp_utils.h"
#include "mozilla/ReentrantMonitor.h"
#include "mozilla/RefPtr.h"
namespace mozilla {
@ -65,13 +70,52 @@ namespace net {
union NetAddr;
}
class NrSocket : public nsASocketHandler {
class NrSocketBase {
public:
NrSocket() : fd_(nullptr) {
memset(&my_addr_, 0, sizeof(my_addr_));
NrSocketBase() : poll_flags_(0) {
memset(cbs_, 0, sizeof(cbs_));
memset(cb_args_, 0, sizeof(cb_args_));
memset(&my_addr_, 0, sizeof(my_addr_));
}
virtual ~NrSocketBase() {}
// the nr_socket APIs
virtual int create(nr_transport_addr *addr) = 0;
virtual int sendto(const void *msg, size_t len,
int flags, nr_transport_addr *to) = 0;
virtual int recvfrom(void * buf, size_t maxlen,
size_t *len, int flags,
nr_transport_addr *from) = 0;
virtual int getaddr(nr_transport_addr *addrp) = 0;
virtual void close() = 0;
// Implementations of the async_event APIs
virtual int async_wait(int how, NR_async_cb cb, void *cb_arg,
char *function, int line);
virtual int cancel(int how);
// nsISupport reference counted interface
NS_IMETHOD_(nsrefcnt) AddRef(void) = 0;
NS_IMETHOD_(nsrefcnt) Release(void) = 0;
uint32_t poll_flags() {
return poll_flags_;
}
protected:
void fire_callback(int how);
nr_transport_addr my_addr_;
private:
NR_async_cb cbs_[NR_ASYNC_WAIT_WRITE + 1];
void *cb_args_[NR_ASYNC_WAIT_WRITE + 1];
uint32_t poll_flags_;
};
class NrSocket : public NrSocketBase,
public nsASocketHandler {
public:
NrSocket() : fd_(nullptr) {}
virtual ~NrSocket() {
PR_Close(fd_);
}
@ -87,36 +131,95 @@ public:
NS_DECL_THREADSAFE_ISUPPORTS
// Implementations of the async_event APIs
int async_wait(int how, NR_async_cb cb, void *cb_arg,
char *function, int line);
int cancel(int how);
virtual int async_wait(int how, NR_async_cb cb, void *cb_arg,
char *function, int line);
virtual int cancel(int how);
// Implementations of the nr_socket APIs
int create(nr_transport_addr *addr); // (really init, but it's called create)
int sendto(const void *msg, size_t len,
int flags, nr_transport_addr *to);
int recvfrom(void * buf, size_t maxlen,
size_t *len, int flags,
nr_transport_addr *from);
int getaddr(nr_transport_addr *addrp);
void close();
virtual int create(nr_transport_addr *addr); // (really init, but it's called create)
virtual int sendto(const void *msg, size_t len,
int flags, nr_transport_addr *to);
virtual int recvfrom(void * buf, size_t maxlen,
size_t *len, int flags,
nr_transport_addr *from);
virtual int getaddr(nr_transport_addr *addrp);
virtual void close();
private:
DISALLOW_COPY_ASSIGN(NrSocket);
void fire_callback(int how);
PRFileDesc *fd_;
nr_transport_addr my_addr_;
NR_async_cb cbs_[NR_ASYNC_WAIT_WRITE + 1];
void *cb_args_[NR_ASYNC_WAIT_WRITE + 1];
nsCOMPtr<nsIEventTarget> ststhread_;
};
struct nr_udp_message {
nr_udp_message(const PRNetAddr &from, nsAutoPtr<DataBuffer> &data)
: from(from), data(data) {
}
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(nr_udp_message);
PRNetAddr from;
nsAutoPtr<DataBuffer> data;
private:
DISALLOW_COPY_ASSIGN(nr_udp_message);
};
class NrSocketIpc : public NrSocketBase,
public nsIUDPSocketInternal {
public:
enum NrSocketIpcState {
NR_INIT,
NR_CONNECTING,
NR_CONNECTED,
NR_CLOSING,
NR_CLOSED,
};
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIUDPSOCKETINTERNAL
NrSocketIpc(const nsCOMPtr<nsIEventTarget> &main_thread);
virtual ~NrSocketIpc() {};
// Implementations of the NrSocketBase APIs
virtual int create(nr_transport_addr *addr);
virtual int sendto(const void *msg, size_t len,
int flags, nr_transport_addr *to);
virtual int recvfrom(void * buf, size_t maxlen,
size_t *len, int flags,
nr_transport_addr *from);
virtual int getaddr(nr_transport_addr *addrp);
virtual void close();
private:
DISALLOW_COPY_ASSIGN(NrSocketIpc);
// Main thread executors of the NrSocketBase APIs
void create_m(const nsACString &host, const uint16_t port);
void sendto_m(const net::NetAddr &addr, nsAutoPtr<DataBuffer> buf);
void close_m();
// STS thread executor
void recv_callback_s(RefPtr<nr_udp_message> msg);
bool err_;
NrSocketIpcState state_;
std::queue<RefPtr<nr_udp_message> > received_msgs_;
nsCOMPtr<nsIUDPSocketChild> socket_child_;
nsCOMPtr<nsIEventTarget> sts_thread_;
const nsCOMPtr<nsIEventTarget> main_thread_;
ReentrantMonitor monitor_;
};
int nr_netaddr_to_transport_addr(const net::NetAddr *netaddr,
nr_transport_addr *addr);
int nr_praddr_to_transport_addr(const PRNetAddr *praddr,
nr_transport_addr *addr, int keep);
int nr_transport_addr_get_addrstring_and_port(nr_transport_addr *addr,
nsACString *host, int32_t *port);
} // close namespace
#endif