Many fixes (mainly in connections)

This is probably the last update to the current libxpc codebase I'll be doing. The entire codebase is going to be rewritten soon.
This commit is contained in:
Ariel Abreu 2021-03-17 23:14:47 -04:00
parent b7cbe133c8
commit 3382d940fd
No known key found for this signature in database
GPG Key ID: BB20848279B910AC
6 changed files with 111 additions and 37 deletions

View File

@ -84,7 +84,6 @@ struct xpc_connection {
int xc_suspend_count;
int xc_transaction_count;
int xc_flags;
volatile uint64_t xc_last_id;
void * xc_context;
struct xpc_connection * xc_parent;
uid_t xc_remote_euid;

View File

@ -68,7 +68,7 @@ xpc_array_set_value(xpc_object_t xarray, size_t index, xpc_object_t value)
xotmp, xo_link);
TAILQ_REMOVE(arr, xotmp, xo_link);
xpc_retain(value);
free(xotmp);
xpc_release(xotmp);
break;
}
}

View File

@ -34,7 +34,11 @@
#include <libkern/OSAtomic.h>
#include <xpc/internal.h>
#define XPC_CONNECTION_NEXT_ID(conn) (OSAtomicIncrement64(&conn->xc_last_id))
static uint64_t generate_message_id(xpc_connection_t conn) {
uint64_t num = 0;
arc4random_buf(&num, sizeof(num));
return num;
};
static void xpc_connection_recv_message();
static void xpc_send(xpc_connection_t xconn, xpc_object_t message, uint64_t id);
@ -63,7 +67,6 @@ xpc_connection_create(const char *name, dispatch_queue_t targetq)
conn = conn_extract(rv);
memset(conn, 0, sizeof(struct xpc_connection));
conn->xc_last_id = 1;
TAILQ_INIT(&conn->xc_peers);
TAILQ_INIT(&conn->xc_pending);
@ -232,13 +235,6 @@ xpc_connection_resume(xpc_connection_t xconn)
}
dispatch_resume(conn->xc_recv_queue);
dispatch_async(conn->xc_recv_queue, ^{
debugf("Recv queue %p works! #1", conn->xc_recv_queue);
});
dispatch_async(conn->xc_recv_queue, ^{
debugf("Recv queue works! #2");
});
}
void
@ -249,10 +245,13 @@ xpc_connection_send_message(xpc_connection_t xconn,
uint64_t id;
conn = conn_extract(xconn);
id = xpc_dictionary_get_uint64(message, XPC_SEQID);
if (id == 0)
id = XPC_CONNECTION_NEXT_ID(conn);
xpc_object_t seq_id = xpc_dictionary_get_value(message, XPC_SEQID);
if (seq_id) {
id = xpc_uint64_get_value(seq_id);
} else {
id = generate_message_id(xconn);
}
xpc_retain(message);
dispatch_async(conn->xc_send_queue, ^{
@ -267,11 +266,19 @@ xpc_connection_send_message_with_reply(xpc_connection_t xconn,
{
struct xpc_connection *conn;
struct xpc_pending_call *call;
uint64_t id;
xpc_object_t seq_id = xpc_dictionary_get_value(message, XPC_SEQID);
if (seq_id) {
id = xpc_uint64_get_value(seq_id);
} else {
id = generate_message_id(xconn);
}
conn = conn_extract(xconn);
call = malloc(sizeof(struct xpc_pending_call));
call->xp_id = XPC_CONNECTION_NEXT_ID(conn);
call->xp_handler = handler;
call->xp_id = id;
call->xp_handler = Block_copy(handler);
call->xp_queue = targetq;
TAILQ_INSERT_TAIL(&conn->xc_pending, call, xp_link);
@ -410,20 +417,51 @@ xpc_transaction_end(void)
vproc_transaction_end(NULL, NULL);
}
static void xpc_connection_invalidate(xpc_connection_t xconn) {
struct xpc_pending_call* call = NULL;
struct xpc_connection* conn = conn_extract(xconn);
struct xpc_connection* peer = NULL;
// tell all outstanding waiters that the connection is now invalid
while ((call = TAILQ_FIRST(&conn->xc_pending))) {
TAILQ_REMOVE(&conn->xc_pending, call, xp_link);
if (call->xp_handler) {
xpc_handler_t handler = call->xp_handler;
dispatch_async(conn->xc_target_queue, ^{
handler((xpc_object_t)XPC_ERROR_CONNECTION_INVALID);
Block_release(handler);
});
}
free(call);
}
// notify the main event handler
if (conn->xc_handler) {
dispatch_async(conn->xc_target_queue, ^{
conn->xc_handler((xpc_object_t)XPC_ERROR_CONNECTION_INVALID);
});
}
};
static void
xpc_send(xpc_connection_t xconn, xpc_object_t message, uint64_t id)
{
struct xpc_connection *conn;
kern_return_t kr;
int status;
debugf("xpc_send(): connection=%p, message=%p (type=%d), id=%d", xconn, message, ((struct xpc_object*) message)->xo_xpc_type, id);
conn = conn_extract(xconn);
kr = xpc_pipe_send(message, conn->xc_remote_port,
status = xpc_pipe_send(message, conn->xc_remote_port,
conn->xc_local_port, id);
if (kr != KERN_SUCCESS)
debugf("send failed, kr=%d", kr);
if (status != 0) {
debugf("send failed, kr=%d", status);
if (status == -EPIPE) {
xpc_connection_invalidate(xconn);
}
}
}
static void
@ -453,21 +491,43 @@ xpc_connection_recv_message(void *context)
struct xpc_connection *conn, *peer;
xpc_object_t result;
mach_port_t remote;
kern_return_t kr;
int status;
uint64_t id;
debugf("xpc_recv: connection=%p", context);
conn = context;
kr = xpc_pipe_receive(conn->xc_local_port, &remote, &result, &id);
if (kr != KERN_SUCCESS)
status = xpc_pipe_receive(conn->xc_local_port, &remote, &result, &id);
if (status != 0) {
if (status == -EPIPE) {
xpc_connection_invalidate((xpc_connection_t)conn);
}
return;
}
debugf("xpc_recv: message=%p, id=%d, remote=<%d>", result, id, remote);
if (conn->xc_flags & XPC_CONNECTION_MACH_SERVICE_LISTENER) {
TAILQ_FOREACH(peer, &conn->xc_peers, xc_link) {
if (remote == peer->xc_remote_port) {
// first, check if it's a reply to any message with pending replies
TAILQ_FOREACH(call, &peer->xc_pending, xp_link) {
if (call->xp_id == id) {
debugf("Found matching ID, async run handler on queue %p", peer->xc_target_queue);
if (call->xp_handler) {
dispatch_async(peer->xc_target_queue, ^{
debugf("pass to handler");
call->xp_handler(result);
Block_release(call->xp_handler);
TAILQ_REMOVE(&peer->xc_pending, call, xp_link);
free(call);
});
}
return;
}
}
// otherwise, pass it to the main handler
if (peer->xc_handler) {
dispatch_async(peer->xc_target_queue, ^{
peer->xc_handler(result);
@ -517,6 +577,7 @@ xpc_connection_recv_message(void *context)
dispatch_async(conn->xc_target_queue, ^{
debugf("pass to handler");
call->xp_handler(result);
Block_release(call->xp_handler);
TAILQ_REMOVE(&conn->xc_pending, call,
xp_link);
free(call);

View File

@ -65,8 +65,14 @@ xpc_dictionary_create_reply(xpc_object_t original)
}
xo_orig->xo_flags &= ~_XPC_FROM_WIRE;
// TODO: Apple puts a reference to original into retval
return xpc_dictionary_create(NULL, NULL, 0);
// possible BUG: i'm not sure if this is how Apple associates the 2 dictionaries or not
// if we want to replicate their format exactly, we need to double-check this
xpc_object_t new_dict = xpc_dictionary_create(NULL, NULL, 0);
xpc_object_t seq_id = xpc_dictionary_get_value(original, XPC_SEQID);
if (seq_id != NULL) {
xpc_dictionary_set_uint64(new_dict, XPC_SEQID, xpc_uint64_get_value(seq_id));
}
return new_dict;
}
void
@ -133,6 +139,8 @@ xpc_dictionary_set_value(xpc_object_t xdict, const char *key, xpc_object_t value
TAILQ_FOREACH(pair, head, xo_link) {
if (!strcmp(pair->key, key)) {
xpc_retain(value);
xpc_release(pair->value);
pair->value = value;
return;
}

View File

@ -422,14 +422,14 @@ xpc_pipe_send(xpc_object_t xobj, mach_port_t dst, mach_port_t local,
debugf("packing message");
if ((errno = xpc_serialize(xo, NULL, 0, &size)) != 0)
return errno;
return -errno;
msg_size = size + sizeof(struct xpc_message);
if ((message = malloc(msg_size)) == NULL)
return ENOMEM;
return -ENOMEM;
if ((errno = xpc_serialize(xo, message->data, size, NULL)) != 0)
return errno;
return -errno;
debugf("sending message");
msg_size = ALIGN(size + sizeof(mach_msg_header_t) + sizeof(size_t) + sizeof(uint64_t));
@ -442,7 +442,7 @@ xpc_pipe_send(xpc_object_t xobj, mach_port_t dst, mach_port_t local,
message->size = size;
kr = mach_msg_send(&message->header);
if (kr != KERN_SUCCESS)
err = (kr == KERN_INVALID_TASK) ? EPIPE : EINVAL;
err = (kr == KERN_INVALID_TASK || kr == MACH_SEND_INVALID_DEST) ? -EPIPE : -EINVAL;
else
err = 0;
free(message);
@ -476,8 +476,11 @@ xpc_pipe_receive(mach_port_t local, mach_port_t *remote, xpc_object_t *result,
0, request->msgh_size, request->msgh_local_port,
MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
if (kr != 0)
if (kr != KERN_SUCCESS) {
LOG("mach_msg_receive returned %d\n", kr);
return (kr == KERN_INVALID_TASK || kr == MACH_RCV_PORT_DIED || kr == MACH_RCV_PORT_CHANGED) ? -EPIPE : -EINVAL;
}
*remote = request->msgh_remote_port;
*id = message.id;
data_size = (int)message.size;

View File

@ -421,20 +421,20 @@ xpc_uuid_get_bytes(xpc_object_t xuuid)
xpc_type_t
xpc_get_type(xpc_object_t obj)
{
struct xpc_object *xo;
struct xpc_object* xo = obj;
// temporary hack
if (xo->xo_xpc_type == _XPC_TYPE_DICTIONARY && xpc_dictionary_get_value(obj, XPC_ERROR_KEY_DESCRIPTION) != NULL) {
return XPC_TYPE_ERROR;
}
xo = obj;
return (xpc_typemap[xo->xo_xpc_type]);
}
bool
xpc_equal(xpc_object_t x1, xpc_object_t x2)
{
struct xpc_object *xo1, *xo2;
xo1 = x1;
xo2 = x2;
return xo1 == xo2;
return xpc_hash(x1) == xpc_hash(x2);
}
static size_t
@ -489,6 +489,9 @@ xpc_hash(xpc_object_t obj)
return (hash);
case _XPC_TYPE_POINTER:
return ((size_t)xo->xo_u.ptr);
case _XPC_TYPE_CONNECTION:
// two connections can only be equal if they are the same object (i.e. with the same pointers)
return (size_t)obj;
default:
return 0;
}