Bug 830290: Send blocking DBus messages from within DBus thread r=bent,qdot

The this commit moves the sending of blocking DBus messages to the
DBus thread. This allows us to avoid concurrency problems within the
DBus library, which is not explicitly thread-safe.

As a side note, I'd like to mention that blocking in distributed
systems simply doesn't work. The dbus library is especially broken
in this regard as it delays all unrelated messages until the reply
for the blocking request has been received. A future commit should
implement this functionality with an asyncronous call and make the
related thread wait for the reply.
This commit is contained in:
Thomas Zimmermann 2013-03-01 15:05:50 +01:00
parent 5bdf29ab6d
commit e747be0ff7
2 changed files with 111 additions and 30 deletions

View File

@ -359,7 +359,7 @@ static dbus_bool_t dbus_func_args_async_valist(DBusConnection *conn,
const char *func,
int first_arg_type,
va_list args) {
DBusMessage *msg = NULL;
DBusMessage *msg = NULL;
/* Compose the command */
msg = dbus_message_new_method_call(BLUEZ_DBUS_BASE_IFC, path, ifc, func);
@ -402,33 +402,111 @@ dbus_bool_t dbus_func_args_async(DBusConnection *conn,
return ret;
}
//
// Sends a message and allows the dispatching thread to wait for the
// reply. Only run it in DBus thread.
//
class DBusConnectionSendAndBlockRunnable : public DBusConnectionSendSyncRunnable
{
public:
DBusConnectionSendAndBlockRunnable(DBusConnection* aConnection,
DBusMessage* aMessage,
int aTimeout,
DBusError* aError)
: DBusConnectionSendSyncRunnable(aConnection, aMessage),
mTimeout(aTimeout),
mError(aError),
mReply(nullptr)
{ }
NS_IMETHOD Run()
{
MOZ_ASSERT(!NS_IsMainThread());
DBusError error;
if (!mError) {
mError = &error;
}
dbus_error_init(mError);
mReply = dbus_connection_send_with_reply_and_block(mConnection, mMessage, mTimeout, mError);
bool success = mReply || dbus_error_has_name(mError, DBUS_ERROR_NO_REPLY);
Completed(success);
NS_ENSURE_TRUE(success, NS_ERROR_FAILURE);
return NS_OK;
}
DBusMessage* GetReply()
{
return mReply;
}
protected:
~DBusConnectionSendAndBlockRunnable()
{ }
private:
int mTimeout;
DBusError* mError;
DBusMessage* mReply;
};
dbus_bool_t
dbus_func_send_and_block(DBusConnection* aConnection,
int aTimeout,
DBusMessage** aReply,
DBusError* aError,
DBusMessage* aMessage)
{
nsRefPtr<DBusConnectionSendAndBlockRunnable> t(
new DBusConnectionSendAndBlockRunnable(aConnection, aMessage,
aTimeout, aError));
MOZ_ASSERT(t);
nsresult rv = DispatchToDBusThread(t);
if (NS_FAILED(rv)) {
if (aMessage) {
dbus_message_unref(aMessage);
}
return FALSE;
}
if (!t->WaitForCompletion()) {
return FALSE;
}
if (aReply) {
*aReply = t->GetReply();
}
return TRUE;
}
// If err is NULL, then any errors will be LOG'd, and free'd and the reply
// will be NULL.
// If err is not NULL, then it is assumed that dbus_error_init was already
// called, and error's will be returned to the caller without logging. The
// return value is NULL iff an error was set. The client must free the error if
// set.
DBusMessage * dbus_func_args_timeout_valist(DBusConnection *conn,
int timeout_ms,
DBusError *err,
const char *path,
const char *ifc,
const char *func,
int first_arg_type,
va_list args) {
DBusMessage *msg = NULL, *reply = NULL;
bool return_error = (err != NULL);
if (!return_error) {
err = (DBusError*)malloc(sizeof(DBusError));
dbus_error_init(err);
}
DBusMessage* dbus_func_args_timeout_valist(DBusConnection* conn,
int timeout_ms,
DBusError* err,
const char* path,
const char* ifc,
const char* func,
int first_arg_type,
va_list args)
{
/* Compose the command */
msg = dbus_message_new_method_call(BLUEZ_DBUS_BASE_IFC, path, ifc, func);
DBusMessage* msg = dbus_message_new_method_call(BLUEZ_DBUS_BASE_IFC, path, ifc, func);
if (msg == NULL) {
if (!msg) {
LOG("Could not allocate D-Bus message object!");
goto done;
}
@ -439,18 +517,15 @@ DBusMessage * dbus_func_args_timeout_valist(DBusConnection *conn,
goto done;
}
/* Make the call. */
reply = dbus_connection_send_with_reply_and_block(conn, msg, timeout_ms, err);
if (!return_error && dbus_error_is_set(err)) {
LOG_AND_FREE_DBUS_ERROR_WITH_MSG(err, msg);
}
DBusMessage* reply;
return dbus_func_send_and_block(conn, timeout_ms, &reply, err, msg) ? reply : nullptr;
done:
if (!return_error) {
free(err);
if (msg) {
dbus_message_unref(msg);
}
if (msg) dbus_message_unref(msg);
return reply;
return nullptr;
}
DBusMessage * dbus_func_args_timeout(DBusConnection *conn,
@ -504,7 +579,7 @@ DBusMessage * dbus_func_args_error(DBusConnection *conn,
return ret;
}
int dbus_returns_int32(DBusMessage *reply)
int dbus_returns_int32(DBusMessage *reply)
{
DBusError err;
int32_t ret = -1;

View File

@ -84,6 +84,12 @@ dbus_bool_t dbus_func_args_async(DBusConnection* conn,
int first_arg_type,
...);
dbus_bool_t dbus_func_send_and_block(DBusConnection* aConnection,
int aTimeout,
DBusMessage** aReply,
DBusError* aError,
DBusMessage* aMessage);
DBusMessage* dbus_func_args(DBusConnection* conn,
const char* path,
const char* ifc,