Bug 830100: Refactor transport flow to allow destruction on any thread r=derf

This commit is contained in:
EKR 2013-02-19 03:43:34 -08:00
parent 2cb5ff02cf
commit f7d3f3002b
7 changed files with 306 additions and 44 deletions

View File

@ -124,13 +124,19 @@ class TransportTestPeer : public sigslot::has_slots<> {
usrsctp_deregister_address(static_cast<void *>(this));
test_utils->sts_target()->Dispatch(WrapRunnable(this,
&TransportTestPeer::DisconnectInt),
&TransportTestPeer::Disconnect_s),
NS_DISPATCH_SYNC);
std::cerr << "~TransportTestPeer() completed" << std::endl;
}
void ConnectSocket(TransportTestPeer *peer) {
test_utils->sts_target()->Dispatch(WrapRunnable(
this, &TransportTestPeer::ConnectSocket_s, peer),
NS_DISPATCH_SYNC);
}
void ConnectSocket_s(TransportTestPeer *peer) {
loopback_->Connect(peer->loopback_);
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(loopback_));
@ -150,7 +156,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
ASSERT_GE(0, r);
}
void DisconnectInt() {
void Disconnect_s() {
if (flow_) {
flow_ = nullptr;
}
@ -191,10 +197,20 @@ class TransportTestPeer : public sigslot::has_slots<> {
int received() const { return received_; }
bool connected() const { return connected_; }
TransportResult SendPacket(const unsigned char* data, size_t len) {
TransportResult SendPacket_s(const unsigned char* data, size_t len) {
return flow_->SendPacket(data, len);
}
TransportResult SendPacket(const unsigned char* data, size_t len) {
TransportResult res;
test_utils->sts_target()->Dispatch(WrapRunnableRet(
this, &TransportTestPeer::SendPacket_s, data, len, &res),
NS_DISPATCH_SYNC);
return res;
}
void PacketReceived(TransportFlow * flow, const unsigned char* data,
size_t len) {
std::cerr << "Received " << len << " bytes" << std::endl;

View File

@ -42,10 +42,40 @@ MOZ_MTLOG_MODULE("mtransport")
MtransportTestUtils *test_utils;
// Layer class which can't be initialized.
class TransportLayerDummy : public TransportLayer {
public:
TransportLayerDummy(bool allow_init, bool *destroyed)
: allow_init_(allow_init),
destroyed_(destroyed) {
*destroyed_ = false;
}
virtual ~TransportLayerDummy() {
*destroyed_ = true;
}
virtual nsresult InitInternal() {
return allow_init_ ? NS_OK : NS_ERROR_FAILURE;
}
virtual TransportResult SendPacket(const unsigned char *data, size_t len) {
MOZ_CRASH(); // Should never be called.
return 0;
}
TRANSPORT_LAYER_ID("lossy")
private:
bool allow_init_;
bool *destroyed_;
};
// Class to simulate various kinds of network lossage
class TransportLayerLossy : public TransportLayer {
public:
TransportLayerLossy() : loss_mask_(0), packet_(0) {}
~TransportLayerLossy () {}
virtual TransportResult SendPacket(const unsigned char *data, size_t len) {
MOZ_MTLOG(PR_LOG_NOTICE, LAYER_INFO << "SendPacket(" << len << ")");
@ -137,16 +167,26 @@ class TransportTestPeer : public sigslot::has_slots<> {
NS_DISPATCH_SYNC);
}
void DestroyFlow() {
loopback_->Disconnect();
flow_ = nullptr;
if (flow_) {
loopback_->Disconnect();
flow_ = nullptr;
}
ice_ctx_ = nullptr;
}
void DisconnectDestroyFlow() {
loopback_->Disconnect();
disconnect_all(); // Disconnect from the signals;
flow_ = nullptr;
}
void SetDtlsAllowAll() {
nsresult res = dtls_->SetVerificationAllowAll();
ASSERT_TRUE(NS_SUCCEEDED(res));
}
void SetDtlsPeer(TransportTestPeer *peer, int digests, unsigned int damage) {
unsigned int mask = 1;
@ -171,7 +211,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
}
void ConnectSocket(TransportTestPeer *peer) {
void ConnectSocket_s(TransportTestPeer *peer) {
nsresult res;
res = loopback_->Init();
ASSERT_EQ((nsresult)NS_OK, res);
@ -186,6 +226,13 @@ class TransportTestPeer : public sigslot::has_slots<> {
flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
}
void ConnectSocket(TransportTestPeer *peer) {
RUN_ON_THREAD(test_utils->sts_target(),
WrapRunnable(this, & TransportTestPeer::ConnectSocket_s,
peer),
NS_DISPATCH_SYNC);
}
void InitIce() {
nsresult res;
@ -283,7 +330,6 @@ class TransportTestPeer : public sigslot::has_slots<> {
TransportResult SendPacket(const unsigned char* data, size_t len) {
TransportResult ret;
test_utils->sts_target()->Dispatch(
WrapRunnableRet(flow_, &TransportFlow::SendPacket, data, len, &ret),
NS_DISPATCH_SYNC);
@ -308,12 +354,22 @@ class TransportTestPeer : public sigslot::has_slots<> {
lossy_->SetLoss(loss);
}
TransportLayer::State state() {
TransportLayer::State tstate;
RUN_ON_THREAD(test_utils->sts_target(),
WrapRunnableRet(flow_, &TransportFlow::state, &tstate),
NS_DISPATCH_SYNC);
return tstate;
}
bool connected() {
return flow_->state() == TransportLayer::TS_OPEN;
return state() == TransportLayer::TS_OPEN;
}
bool failed() {
return flow_->state() == TransportLayer::TS_ERROR;
return state() == TransportLayer::TS_ERROR;
}
size_t received() { return received_; }
@ -355,6 +411,11 @@ class TransportTest : public ::testing::Test {
// PR_Close(fds_[1]);
}
void DestroyPeerFlows() {
p1_->DisconnectDestroyFlow();
p2_->DisconnectDestroyFlow();
}
void SetUp() {
nsresult rv;
target_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
@ -441,6 +502,12 @@ TEST_F(TransportTest, TestConnect) {
ConnectSocket();
}
TEST_F(TransportTest, TestConnectDestroyFlowsMainThread) {
SetDtlsPeer();
ConnectSocket();
DestroyPeerFlows();
}
TEST_F(TransportTest, TestConnectAllowAll) {
SetDtlsAllowAll();
ConnectSocket();
@ -500,6 +567,59 @@ TEST_F(TransportTest, TestTransferIce) {
TransferTest(1);
}
TEST(PushTests, LayerFail) {
TransportFlow flow;
nsresult rv;
bool destroyed1, destroyed2;
rv = flow.PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = flow.PushLayer(new TransportLayerDummy(false, &destroyed2));
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(TransportLayer::TS_ERROR, flow.state());
ASSERT_EQ(true, destroyed1);
ASSERT_EQ(true, destroyed2);
rv = flow.PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(true, destroyed1);
}
TEST(PushTests, LayersFail) {
TransportFlow flow;
nsresult rv;
bool destroyed1, destroyed2, destroyed3;
rv = flow.PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsAutoPtr<std::queue<TransportLayer *> > layers(
new std::queue<TransportLayer *>());
layers->push(new TransportLayerDummy(true, &destroyed2));
layers->push(new TransportLayerDummy(false, &destroyed3));
rv = flow.PushLayers(layers);
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(TransportLayer::TS_ERROR, flow.state());
ASSERT_EQ(true, destroyed1);
ASSERT_EQ(true, destroyed2);
ASSERT_EQ(true, destroyed3);
layers = new std::queue<TransportLayer *>();
layers->push(new TransportLayerDummy(true, &destroyed2));
layers->push(new TransportLayerDummy(true, &destroyed3));
rv = flow.PushLayers(layers);
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(true, destroyed2);
ASSERT_EQ(true, destroyed3);
}
} // end namespace
int main(int argc, char **argv)

View File

@ -10,6 +10,7 @@
#include <prlog.h>
#include "logging.h"
#include "runnable_utils.h"
#include "transportflow.h"
#include "transportlayer.h"
@ -17,14 +18,51 @@ namespace mozilla {
MOZ_MTLOG_MODULE("mtransport")
// There are some hacks here to allow destruction off of
// the main thread.
TransportFlow::~TransportFlow() {
for (std::deque<TransportLayer *>::iterator it = layers_.begin();
it != layers_.end(); ++it) {
delete *it;
// Make sure that if we are off the right thread, we have
// no more attached signals.
if (!CheckThreadInt()) {
MOZ_ASSERT(SignalStateChange.is_empty());
MOZ_ASSERT(SignalPacketReceived.is_empty());
}
// Push the destruction onto the STS thread. Note that there
// is still some possibility that someone is accessing this
// object simultaneously, but as long as smart pointer discipline
// is maintained, it shouldn't be possible to access and
// destroy it simultaneously. The conversion to an nsAutoPtr
// ensures automatic destruction of the queue at exit of
// DestroyFinal.
nsAutoPtr<std::deque<TransportLayer*> > layers_tmp(layers_.forget());
RUN_ON_THREAD(target_,
WrapRunnableNM(&TransportFlow::DestroyFinal, layers_tmp),
NS_DISPATCH_NORMAL);
}
void TransportFlow::DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers) {
ClearLayers(layers);
}
void TransportFlow::ClearLayers(std::queue<TransportLayer *>* layers) {
while (!layers->empty()) {
delete layers->front();
layers->pop();
}
}
void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) {
while (!layers->empty()) {
delete layers->front();
layers->pop_front();
}
}
nsresult TransportFlow::PushLayer(TransportLayer *layer) {
CheckThread();
ScopedDeletePtr<TransportLayer> layer_tmp(layer); // Destroy on failure.
// Don't allow pushes once we are in error state.
if (state_ == TransportLayer::TS_ERROR) {
MOZ_MTLOG(PR_LOG_ERROR, id_ + ": Can't call PushLayer in error state for flow ");
@ -33,20 +71,26 @@ nsresult TransportFlow::PushLayer(TransportLayer *layer) {
nsresult rv = layer->Init();
if (!NS_SUCCEEDED(rv)) {
// Destroy the rest of the flow, because it's no longer in an acceptable
// state.
ClearLayers(layers_.get());
// Set ourselves to have failed.
MOZ_MTLOG(PR_LOG_ERROR, id_ << ": Layer initialization failed; invalidating");
StateChangeInt(TransportLayer::TS_ERROR);
return rv;
}
EnsureSameThread(layer);
TransportLayer *old_layer = layers_.empty() ? nullptr : layers_.front();
TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
// Re-target my signals to the new layer
if (old_layer) {
old_layer->SignalStateChange.disconnect(this);
old_layer->SignalPacketReceived.disconnect(this);
}
layers_.push_front(layer);
layers_->push_front(layer_tmp.forget());
layer->Inserted(this, old_layer);
layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
@ -58,6 +102,8 @@ nsresult TransportFlow::PushLayer(TransportLayer *layer) {
// This is all-or-nothing.
nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers) {
CheckThread();
MOZ_ASSERT(!layers->empty());
if (layers->empty()) {
MOZ_MTLOG(PR_LOG_ERROR, id_ << ": Can't call PushLayers with empty layers");
@ -67,6 +113,7 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
// Don't allow pushes once we are in error state.
if (state_ == TransportLayer::TS_ERROR) {
MOZ_MTLOG(PR_LOG_ERROR, id_ << ": Can't call PushLayers in error state for flow ");
ClearLayers(layers.get());
return NS_ERROR_FAILURE;
}
@ -78,7 +125,7 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
TransportLayer *layer;
while (!layers->empty()) {
TransportLayer *old_layer = layers_.empty() ? nullptr : layers_.front();
TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
layer = layers->front();
rv = layer->Init();
@ -87,25 +134,21 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
break;
}
EnsureSameThread(layer);
// Push the layer onto the queue.
layers_.push_front(layer);
layers_->push_front(layer);
layers->pop();
layer->Inserted(this, old_layer);
}
if (NS_FAILED(rv)) {
// Destroy any layers we could not push.
while (!layers->empty()) {
delete layers->front();
layers->pop();
}
ClearLayers(layers);
// Now destroy the rest of the flow, because it's no longer
// in an acceptable state.
while (!layers_.empty()) {
delete layers_.front();
layers_.pop_front();
}
ClearLayers(layers_);
// Set ourselves to have failed.
StateChangeInt(TransportLayer::TS_ERROR);
@ -123,12 +166,16 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
}
TransportLayer *TransportFlow::top() const {
return layers_.empty() ? nullptr : layers_.front();
CheckThread();
return layers_->empty() ? nullptr : layers_->front();
}
TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
for (std::deque<TransportLayer *>::const_iterator it = layers_.begin();
it != layers_.end(); ++it) {
CheckThread();
for (std::deque<TransportLayer *>::const_iterator it = layers_->begin();
it != layers_->end(); ++it) {
if ((*it)->id() == id)
return *it;
}
@ -137,18 +184,38 @@ TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
}
TransportLayer::State TransportFlow::state() {
CheckThread();
return state_;
}
TransportResult TransportFlow::SendPacket(const unsigned char *data,
size_t len) {
CheckThread();
if (state_ != TransportLayer::TS_OPEN) {
return TE_ERROR;
}
return top() ? top()->SendPacket(data, len) : TE_ERROR;
}
void TransportFlow::EnsureSameThread(TransportLayer *layer) {
// Enforce that if any of the layers have a thread binding,
// they all have the same binding.
if (target_) {
const nsCOMPtr<nsIEventTarget>& lthread = layer->GetThread();
if (lthread && (lthread != target_))
MOZ_CRASH();
}
else {
target_ = layer->GetThread();
}
}
void TransportFlow::StateChangeInt(TransportLayer::State state) {
CheckThread();
if (state == state_) {
return;
}
@ -159,12 +226,16 @@ void TransportFlow::StateChangeInt(TransportLayer::State state) {
void TransportFlow::StateChange(TransportLayer *layer,
TransportLayer::State state) {
CheckThread();
StateChangeInt(state);
}
void TransportFlow::PacketReceived(TransportLayer* layer,
const unsigned char *data,
size_t len) {
CheckThread();
SignalPacketReceived(this, data, len);
}

View File

@ -15,21 +15,49 @@
#include "nscore.h"
#include "nsISupportsImpl.h"
#include "mozilla/Scoped.h"
#include "transportlayer.h"
#include "m_cpp_utils.h"
// A stack of transport layers acts as a flow.
// Generally, one reads and writes to the top layer.
// This code has a confusing hybrid threading model which
// probably needs some eventual refactoring.
// TODO(ekr@rtfm.com): Bug 844891
//
// TransportFlows are not inherently bound to a thread *but*
// TransportLayers can be. If any layer in a flow is bound
// to a given thread, then all layers in the flow MUST be
// bound to that thread and you can only manipulate the
// flow (push layers, write, etc.) on that thread.
//
// The sole official exception to this is that you are
// allowed to *destroy* a flow off the bound thread provided
// that there are no listeners on its signals. This exception
// is designed to allow idioms where you create the flow
// and then something goes wrong and you destroy it and
// you don't want to bother with a thread dispatch.
//
// Eventually we hope to relax the "no listeners"
// restriction by thread-locking the signals, but previous
// attempts have caused deadlocks.
//
// Most of these invariants are enforced by hard asserts
// (i.e., those which fire even in production builds).
namespace mozilla {
class TransportFlow : public sigslot::has_slots<> {
public:
TransportFlow()
: id_("(anonymous)"),
state_(TransportLayer::TS_NONE) {}
state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
TransportFlow(const std::string id)
: id_(id),
state_(TransportLayer::TS_NONE) {}
state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
~TransportFlow();
@ -72,14 +100,39 @@ class TransportFlow : public sigslot::has_slots<> {
private:
DISALLOW_COPY_ASSIGN(TransportFlow);
// Check if we are on the right thread
void CheckThread() const {
if (!CheckThreadInt())
MOZ_CRASH();
}
bool CheckThreadInt() const {
bool on;
if (!target_) // OK if no thread set.
return true;
if (NS_FAILED(target_->IsOnCurrentThread(&on)))
return false;
return on;
}
void EnsureSameThread(TransportLayer *layer);
void StateChange(TransportLayer *layer, TransportLayer::State state);
void StateChangeInt(TransportLayer::State state);
void PacketReceived(TransportLayer* layer, const unsigned char *data,
size_t len);
static void DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers);
// Overload needed because we use deque internally and queue externally.
static void ClearLayers(std::deque<TransportLayer *>* layers);
static void ClearLayers(std::queue<TransportLayer *>* layers);
std::string id_;
std::deque<TransportLayer *> layers_;
TransportLayer::State state_;
ScopedDeletePtr<std::deque<TransportLayer *> > layers_;
nsCOMPtr<nsIEventTarget> target_;
};
} // close namespace

View File

@ -32,9 +32,8 @@ nsresult TransportLayer::Init() {
}
void TransportLayer::Inserted(TransportFlow *flow, TransportLayer *downward) {
flow_ = flow;
downward_ = downward;
flow_id_ = flow->id();
MOZ_MTLOG(PR_LOG_DEBUG, LAYER_INFO << "Inserted: downward='" <<
(downward ? downward->id(): "none") << "'");
@ -49,9 +48,4 @@ void TransportLayer::SetState(State state) {
}
}
const std::string& TransportLayer::flow_id() {
static const std::string empty;
return flow_ ? flow_->id() : empty;
}
} // close namespace

View File

@ -45,7 +45,7 @@ class TransportLayer : public sigslot::has_slots<> {
TransportLayer(Mode mode = STREAM) :
mode_(mode),
state_(TS_NONE),
flow_(nullptr),
flow_id_(),
downward_(nullptr) {}
virtual ~TransportLayer() {}
@ -82,6 +82,11 @@ class TransportLayer : public sigslot::has_slots<> {
// Must be implemented by derived classes
virtual TransportResult SendPacket(const unsigned char *data, size_t len) = 0;
// Get the thread.
const nsCOMPtr<nsIEventTarget> GetThread() const {
return target_;
}
// Event definitions that one can register for
// State has changed
sigslot::signal2<TransportLayer*, State> SignalStateChange;
@ -93,19 +98,21 @@ class TransportLayer : public sigslot::has_slots<> {
virtual const std::string id() = 0;
// The id of the flow
virtual const std::string& flow_id();
const std::string& flow_id() {
return flow_id_;
}
protected:
virtual void WasInserted() {}
virtual void SetState(State state);
// Check if we are on the right thread
void CheckThread() {
NS_ABORT_IF_FALSE(CheckThreadInt(), "Wrong thread");
}
Mode mode_;
State state_;
TransportFlow *flow_; // The flow this is part of
std::string flow_id_;
TransportLayer *downward_; // The next layer in the stack
nsCOMPtr<nsIEventTarget> target_;
@ -114,7 +121,10 @@ class TransportLayer : public sigslot::has_slots<> {
bool CheckThreadInt() {
bool on;
NS_ENSURE_TRUE(target_, false);
if (!target_) // OK if no thread set.
return true;
NS_ENSURE_SUCCESS(target_->IsOnCurrentThread(&on), false);
NS_ENSURE_TRUE(on, false);

View File

@ -34,7 +34,6 @@ class TransportLayerLoopback : public TransportLayer {
TransportLayerLoopback() :
peer_(nullptr),
timer_(nullptr),
target_(nullptr),
packets_(),
packets_lock_(nullptr),
deliverer_(nullptr) {}
@ -130,7 +129,6 @@ class TransportLayerLoopback : public TransportLayer {
TransportLayerLoopback* peer_;
nsCOMPtr<nsITimer> timer_;
nsCOMPtr<nsIEventTarget> target_;
std::queue<QueuedPacket *> packets_;
PRLock *packets_lock_;
nsRefPtr<Deliverer> deliverer_;