Bug 801221: Move mtransport operations onto STS thread; r=jesup

This commit is contained in:
EKR 2012-10-18 13:01:52 -07:00
parent 5686d907c0
commit 6ee946c418
11 changed files with 192 additions and 40 deletions

View File

@ -205,8 +205,15 @@ class TransportTestPeer : public sigslot::has_slots<> {
ice_ = new TransportLayerIce(name, ice_ctx_, stream, 1);
// Assemble the stack
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(ice_));
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(dtls_));
std::queue<mozilla::TransportLayer *> layers;
layers.push(ice_);
layers.push(dtls_);
test_utils.sts_target()->Dispatch(
WrapRunnableRet(flow_, &TransportFlow::PushLayers, layers, &res),
NS_DISPATCH_SYNC);
ASSERT_EQ((nsresult)NS_OK, res);
// Listen for media events
flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
@ -268,7 +275,13 @@ class TransportTestPeer : public sigslot::has_slots<> {
}
TransportResult SendPacket(const unsigned char* data, size_t len) {
return flow_->SendPacket(data, len);
TransportResult ret;
test_utils.sts_target()->Dispatch(
WrapRunnableRet(flow_, &TransportFlow::SendPacket, data, len, &ret),
NS_DISPATCH_SYNC);
return ret;
}
@ -355,15 +368,24 @@ class TransportTest : public ::testing::Test {
}
void ConnectSocket() {
p1_->ConnectSocket(p2_);
p2_->ConnectSocket(p1_);
test_utils.sts_target()->Dispatch(
WrapRunnable(p1_, &TransportTestPeer::ConnectSocket, p2_),
NS_DISPATCH_SYNC);
test_utils.sts_target()->Dispatch(
WrapRunnable(p2_, &TransportTestPeer::ConnectSocket, p1_),
NS_DISPATCH_SYNC);
ASSERT_TRUE_WAIT(p1_->connected(), 10000);
ASSERT_TRUE_WAIT(p2_->connected(), 10000);
}
void ConnectSocketExpectFail() {
p1_->ConnectSocket(p2_);
p2_->ConnectSocket(p1_);
test_utils.sts_target()->Dispatch(
WrapRunnable(p1_, &TransportTestPeer::ConnectSocket, p2_),
NS_DISPATCH_SYNC);
test_utils.sts_target()->Dispatch(
WrapRunnable(p2_, &TransportTestPeer::ConnectSocket, p1_),
NS_DISPATCH_SYNC);
ASSERT_TRUE_WAIT(p1_->failed(), 10000);
ASSERT_TRUE_WAIT(p2_->failed(), 10000);
}

View File

@ -24,7 +24,6 @@ nsresult TransportFlow::PushLayer(TransportLayer *layer) {
if (!NS_SUCCEEDED(rv))
return rv;
TransportLayer *old_layer = layers_.empty() ? nullptr : layers_.front();
// Re-target my signals to the new layer
@ -40,6 +39,27 @@ nsresult TransportFlow::PushLayer(TransportLayer *layer) {
return NS_OK;
}
nsresult TransportFlow::PushLayers(std::queue<TransportLayer *> layers) {
nsresult rv;
while (!layers.empty()) {
rv = PushLayer(layers.front());
layers.pop();
if (NS_FAILED(rv)) {
// Destroy any layers we could not push.
while (!layers.empty()) {
delete layers.front();
layers.pop();
}
return rv;
}
}
return NS_OK;
}
TransportLayer *TransportFlow::top() const {
return layers_.empty() ? nullptr : layers_.front();
}

View File

@ -10,6 +10,7 @@
#define transportflow_h__
#include <deque>
#include <queue>
#include <string>
#include "nscore.h"
@ -33,7 +34,18 @@ class TransportFlow : public sigslot::has_slots<> {
// either:
// (a) Do it in the thread handling the I/O
// (b) Do it before you activate the I/O system
//
// The flow takes ownership of the layers after a successful
// push.
nsresult PushLayer(TransportLayer *layer);
// Convenience function to push multiple layers on. Layers
// are pushed on in the order that they are in the queue.
// Any layers which cannot be pushed on are just deleted
// and an error is returned.
// TODO(ekr@rtfm.com): Change layers to be ref-counted.
nsresult PushLayers(std::queue<TransportLayer *> layers);
TransportLayer *top() const;
TransportLayer *GetLayer(const std::string& id) const;

View File

@ -12,6 +12,9 @@
#include "sigslot.h"
#include "mozilla/RefPtr.h"
#include "nsCOMPtr.h"
#include "nsIEventTarget.h"
#include "nsThreadUtils.h"
#include "m_cpp_utils.h"
@ -56,6 +59,23 @@ class TransportLayer : public sigslot::has_slots<> {
// Downward interface
TransportLayer *downward() { return downward_; }
// Dispatch a call onto our thread (or run on the same thread if
// thread is not set). This is always synchronous.
nsresult RunOnThread(nsIRunnable *event) {
if (target_) {
nsIThread *thr;
nsresult rv = NS_GetCurrentThread(&thr);
MOZ_ASSERT(NS_SUCCEEDED(rv));
if (target_ != thr) {
return target_->Dispatch(event, NS_DISPATCH_SYNC);
}
}
return event->Run();
}
// Get the state
State state() const { return state_; }
// Must be implemented by derived classes
@ -78,13 +98,27 @@ class TransportLayer : public sigslot::has_slots<> {
virtual void WasInserted() {}
virtual void SetState(State state);
void CheckThread() {
NS_ABORT_IF_FALSE(CheckThreadInt(), "Wrong thread");
}
Mode mode_;
State state_;
TransportFlow *flow_; // The flow this is part of
TransportLayer *downward_; // The next layer in the stack
nsCOMPtr<nsIEventTarget> target_;
private:
DISALLOW_COPY_ASSIGN(TransportLayer);
bool CheckThreadInt() {
bool on;
NS_ENSURE_TRUE(target_, false);
NS_ENSURE_SUCCESS(target_->IsOnCurrentThread(&on), false);
NS_ENSURE_TRUE(on, false);
return true;
}
};
#define LAYER_INFO "Flow[" << flow_id() << "(none)" << "]; Layer[" << id() << "]: "

View File

@ -420,6 +420,7 @@ TransportLayerDtls::SetVerificationDigest(const std::string digest_algorithm,
// TODO: make sure this is called from STS. Otherwise
// we have thread safety issues
bool TransportLayerDtls::Setup() {
CheckThread();
SECStatus rv;
if (!downward_) {
@ -676,6 +677,7 @@ void TransportLayerDtls::Handshake() {
void TransportLayerDtls::PacketReceived(TransportLayer* layer,
const unsigned char *data,
size_t len) {
CheckThread();
MOZ_MTLOG(PR_LOG_DEBUG, LAYER_INFO << "PacketReceived(" << len << ")");
if (state_ != TS_CONNECTING && state_ != TS_OPEN) {
@ -717,6 +719,7 @@ void TransportLayerDtls::PacketReceived(TransportLayer* layer,
TransportResult TransportLayerDtls::SendPacket(const unsigned char *data,
size_t len) {
CheckThread();
if (state_ != TS_OPEN) {
MOZ_MTLOG(PR_LOG_ERROR, LAYER_INFO << "Can't call SendPacket() in state "
<< state_);
@ -756,6 +759,7 @@ SECStatus TransportLayerDtls::GetClientAuthDataHook(void *arg, PRFileDesc *fd,
MOZ_MTLOG(PR_LOG_DEBUG, "Server requested client auth");
TransportLayerDtls *stream = reinterpret_cast<TransportLayerDtls *>(arg);
stream->CheckThread();
if (!stream->identity_) {
MOZ_MTLOG(PR_LOG_ERROR, "No identity available");
@ -788,6 +792,7 @@ nsresult TransportLayerDtls::SetSrtpCiphers(std::vector<uint16_t> ciphers) {
}
nsresult TransportLayerDtls::GetSrtpCipher(uint16_t *cipher) {
CheckThread();
SECStatus rv = SSL_GetSRTPCipher(ssl_fd_, cipher);
if (rv != SECSuccess) {
MOZ_MTLOG(PR_LOG_DEBUG, "No SRTP cipher negotiated");
@ -802,6 +807,7 @@ nsresult TransportLayerDtls::ExportKeyingMaterial(const std::string& label,
const std::string& context,
unsigned char *out,
unsigned int outlen) {
CheckThread();
SECStatus rv = SSL_ExportKeyingMaterial(ssl_fd_,
label.c_str(),
label.size(),
@ -824,7 +830,7 @@ SECStatus TransportLayerDtls::AuthCertificateHook(void *arg,
PRBool checksig,
PRBool isServer) {
TransportLayerDtls *stream = reinterpret_cast<TransportLayerDtls *>(arg);
stream->CheckThread();
return stream->AuthCertificateHook(fd, checksig, isServer);
}
@ -871,6 +877,7 @@ TransportLayerDtls::CheckDigest(const RefPtr<VerificationDigest>&
SECStatus TransportLayerDtls::AuthCertificateHook(PRFileDesc *fd,
PRBool checksig,
PRBool isServer) {
CheckThread();
ScopedCERTCertificate peer_cert;
peer_cert = SSL_PeerCertificate(fd);

View File

@ -157,7 +157,6 @@ class TransportLayerDtls : public TransportLayer {
ScopedPRFileDesc ssl_fd_;
ScopedCERTCertificate peer_cert_;
nsCOMPtr<nsIEventTarget> target_;
nsCOMPtr<nsITimer> timer_;
bool auth_hook_called_;
bool cert_ok_;

View File

@ -88,6 +88,8 @@ TransportLayerIce::TransportLayerIce(const std::string& name,
RefPtr<NrIceCtx> ctx, RefPtr<NrIceMediaStream> stream,
int component)
: name_(name), ctx_(ctx), stream_(stream), component_(component) {
target_ = ctx->thread();
stream_->SignalReady.connect(this, &TransportLayerIce::IceReady);
stream_->SignalFailed.connect(this, &TransportLayerIce::IceFailed);
stream_->SignalPacketReceived.connect(this,
@ -103,6 +105,7 @@ TransportLayerIce::~TransportLayerIce() {
TransportResult TransportLayerIce::SendPacket(const unsigned char *data,
size_t len) {
CheckThread();
nsresult res = stream_->SendPacket(component_, data, len);
if (!NS_SUCCEEDED(res)) {
@ -122,15 +125,18 @@ void TransportLayerIce::IceCandidate(NrIceMediaStream *stream,
}
void TransportLayerIce::IceReady(NrIceMediaStream *stream) {
CheckThread();
SetState(TS_OPEN);
}
void TransportLayerIce::IceFailed(NrIceMediaStream *stream) {
CheckThread();
SetState(TS_ERROR);
}
void TransportLayerIce::IcePacketReceived(NrIceMediaStream *stream, int component,
const unsigned char *data, int len) {
CheckThread();
// We get packets for both components, so ignore the ones that aren't
// for us.
if (component_ != component)

View File

@ -726,10 +726,15 @@ short vcmSetIceCandidate(const char *peerconnection, const char *icecandidate, u
return VCM_ERROR;
nsresult res;
pc->impl()->ice_ctx()->thread()->Dispatch(
nsresult rv = pc->impl()->ice_ctx()->thread()->Dispatch(
WrapRunnableRet(stream, &NrIceMediaStream::ParseTrickleCandidate, icecandidate, &res),
NS_DISPATCH_SYNC);
if (!NS_SUCCEEDED(rv)) {
CSFLogError( logTag, "%s(): Could not dispatch to ICE thread", __FUNCTION__, level);
return VCM_ERROR;
}
if (!NS_SUCCEEDED(res)) {
CSFLogError( logTag, "%s(): Could not parse trickle candidate for stream %d", __FUNCTION__, level);
return VCM_ERROR;
@ -755,10 +760,15 @@ short vcmStartIceChecks(const char *peerconnection)
}
nsresult res;
pc->impl()->ice_ctx()->thread()->Dispatch(
nsresult rv = pc->impl()->ice_ctx()->thread()->Dispatch(
WrapRunnableRet(pc->impl()->ice_ctx(), &NrIceCtx::StartChecks, &res),
NS_DISPATCH_SYNC);
if (!NS_SUCCEEDED(rv)) {
CSFLogError( logTag, "%s(): Could not dispatch to ICE thread", __FUNCTION__);
return VCM_ERROR;
}
if (!NS_SUCCEEDED(res)) {
CSFLogError( logTag, "%s: couldn't start ICE checks", __FUNCTION__ );
return VCM_ERROR;
@ -2432,10 +2442,13 @@ vcmCreateTransportFlow(sipcc::PeerConnectionImpl *pc, int level, bool rtcp,
pc->GetHandle().c_str(), level, rtcp ? "rtcp" : "rtp");
flow = new TransportFlow(id);
flow->PushLayer(new TransportLayerIce("flow", pc->ice_ctx(),
pc->ice_media_stream(level-1),
rtcp ? 2 : 1));
TransportLayerDtls *dtls = new TransportLayerDtls();
ScopedDeletePtr<TransportLayerIce> ice(new
TransportLayerIce("flow", pc->ice_ctx(),
pc->ice_media_stream(level-1),
rtcp ? 2 : 1));
ScopedDeletePtr<TransportLayerDtls> dtls(new TransportLayerDtls());
dtls->SetRole(pc->GetRole() == sipcc::PeerConnectionImpl::kRoleOfferer ?
TransportLayerDtls::CLIENT : TransportLayerDtls::SERVER);
dtls->SetIdentity(pc->GetIdentity());
@ -2467,7 +2480,19 @@ vcmCreateTransportFlow(sipcc::PeerConnectionImpl *pc, int level, bool rtcp,
return NULL;
}
flow->PushLayer(dtls);
std::queue<TransportLayer *> layers;
layers.push(ice.forget());
layers.push(dtls.forget());
// Layers are now owned by the flow.
nsresult rv = pc->ice_ctx()->thread()->Dispatch(
WrapRunnableRet(flow, &TransportFlow::PushLayers, layers, &res),
NS_DISPATCH_SYNC);
if (NS_FAILED(rv) || NS_FAILED(res)) {
return NULL;
}
pc->AddTransportFlow(level, rtcp, flow);
}

View File

@ -91,6 +91,21 @@ void MediaPipeline::StateChange(TransportFlow *flow, TransportLayer::State state
}
nsresult MediaPipeline::TransportReady(TransportFlow *flow) {
nsresult rv;
nsresult res;
rv = RUN_ON_THREAD(sts_thread_,
WrapRunnableRet(this, &MediaPipeline::TransportReadyInt, flow, &res),
NS_DISPATCH_SYNC);
// res is invalid unless the dispatch succeeded
if (NS_FAILED(rv))
return rv;
return res;
}
nsresult MediaPipeline::TransportReadyInt(TransportFlow *flow) {
bool rtcp = !(flow == rtp_transport_);
State *state = rtcp ? &rtcp_state_ : &rtp_state_;
@ -236,8 +251,26 @@ nsresult MediaPipeline::TransportFailed(TransportFlow *flow) {
return NS_OK;
}
// Wrapper to send a packet on the STS thread.
nsresult MediaPipeline::SendPacket(TransportFlow *flow, const void *data,
int len) {
nsresult rv;
nsresult res;
rv = RUN_ON_THREAD(sts_thread_,
WrapRunnableRet(this, &MediaPipeline::SendPacketInt, flow, data, len, &res),
NS_DISPATCH_SYNC);
// res is invalid unless the dispatch succeeded
if (NS_FAILED(rv))
return rv;
return res;
}
nsresult MediaPipeline::SendPacketInt(TransportFlow *flow, const void *data,
int len) {
// Note that we bypass the DTLS layer here
TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
flow->GetLayer(TransportLayerDtls::ID()));
@ -407,16 +440,10 @@ nsresult MediaPipelineTransmit::Init() {
"audio" : "video") <<
" hints=" << stream_->GetHintContents());
if (main_thread_) {
main_thread_->Dispatch(WrapRunnable(
stream_->GetStream(), &MediaStream::AddListener, listener_),
NS_DISPATCH_SYNC);
}
else {
stream_->GetStream()->AddListener(listener_);
}
return NS_OK;
return RUN_ON_THREAD(main_thread_, WrapRunnable(stream_->GetStream(),
&MediaStream::AddListener,
listener_),
NS_DISPATCH_SYNC);
}
nsresult MediaPipeline::PipelineTransport::SendRtpPacket(
@ -636,16 +663,10 @@ void MediaPipelineTransmit::ProcessVideoChunk(VideoSessionConduit *conduit,
nsresult MediaPipelineReceiveAudio::Init() {
MOZ_MTLOG(PR_LOG_DEBUG, __FUNCTION__);
if (main_thread_) {
main_thread_->Dispatch(WrapRunnable(
stream_->GetStream(), &MediaStream::AddListener, listener_),
NS_DISPATCH_SYNC);
}
else {
stream_->GetStream()->AddListener(listener_);
}
return NS_OK;
return RUN_ON_THREAD(main_thread_, WrapRunnable(stream_->GetStream(),
&MediaStream::AddListener,
listener_),
NS_DISPATCH_SYNC);
}
void MediaPipelineReceiveAudio::PipelineListener::

View File

@ -105,7 +105,7 @@ class MediaPipeline : public sigslot::has_slots<> {
};
friend class PipelineTransport;
virtual nsresult TransportReady(TransportFlow *flow); // The transport is ready
virtual nsresult TransportReady(TransportFlow *flow); // The transport is ready
virtual nsresult TransportFailed(TransportFlow *flow); // The transport is down
void increment_rtp_packets_sent();
@ -148,6 +148,8 @@ class MediaPipeline : public sigslot::has_slots<> {
private:
virtual void DetachTransportInt();
nsresult SendPacketInt(TransportFlow *flow, const void* data, int len);
nsresult TransportReadyInt(TransportFlow *flow);
bool IsRtp(const unsigned char *data, size_t len);
};

View File

@ -191,8 +191,12 @@ class MediaPipelineTest : public ::testing::Test {
PRStatus status = PR_NewTCPSocketPair(fds_);
ASSERT_EQ(status, PR_SUCCESS);
p1_.ConnectSocket(fds_[0], false);
p2_.ConnectSocket(fds_[1], true);
test_utils.sts_target()->Dispatch(
WrapRunnable(&p1_, &TestAgent::ConnectSocket, fds_[0], false),
NS_DISPATCH_SYNC);
test_utils.sts_target()->Dispatch(
WrapRunnable(&p2_, &TestAgent::ConnectSocket, fds_[1], false),
NS_DISPATCH_SYNC);
}
protected: