From 3382d940fd4ec554b9bf08cef0b2be60b2e8b483 Mon Sep 17 00:00:00 2001 From: Ariel Abreu Date: Wed, 17 Mar 2021 23:14:47 -0400 Subject: [PATCH] Many fixes (mainly in connections) This is probably the last update to the current libxpc codebase I'll be doing. The entire codebase is going to be rewritten soon. --- internal-include/xpc/internal.h | 1 - src/array.c | 2 +- src/connection.c | 103 +++++++++++++++++++++++++------- src/dictionary.c | 12 +++- src/misc.c | 13 ++-- src/type.c | 17 +++--- 6 files changed, 111 insertions(+), 37 deletions(-) diff --git a/internal-include/xpc/internal.h b/internal-include/xpc/internal.h index 0e95d0a..5a47935 100644 --- a/internal-include/xpc/internal.h +++ b/internal-include/xpc/internal.h @@ -84,7 +84,6 @@ struct xpc_connection { int xc_suspend_count; int xc_transaction_count; int xc_flags; - volatile uint64_t xc_last_id; void * xc_context; struct xpc_connection * xc_parent; uid_t xc_remote_euid; diff --git a/src/array.c b/src/array.c index 5c85b93..6955d49 100644 --- a/src/array.c +++ b/src/array.c @@ -68,7 +68,7 @@ xpc_array_set_value(xpc_object_t xarray, size_t index, xpc_object_t value) xotmp, xo_link); TAILQ_REMOVE(arr, xotmp, xo_link); xpc_retain(value); - free(xotmp); + xpc_release(xotmp); break; } } diff --git a/src/connection.c b/src/connection.c index 572c6d5..fec87a2 100644 --- a/src/connection.c +++ b/src/connection.c @@ -34,7 +34,11 @@ #include #include -#define XPC_CONNECTION_NEXT_ID(conn) (OSAtomicIncrement64(&conn->xc_last_id)) +static uint64_t generate_message_id(xpc_connection_t conn) { + uint64_t num = 0; + arc4random_buf(&num, sizeof(num)); + return num; +}; static void xpc_connection_recv_message(); static void xpc_send(xpc_connection_t xconn, xpc_object_t message, uint64_t id); @@ -63,7 +67,6 @@ xpc_connection_create(const char *name, dispatch_queue_t targetq) conn = conn_extract(rv); memset(conn, 0, sizeof(struct xpc_connection)); - conn->xc_last_id = 1; TAILQ_INIT(&conn->xc_peers); TAILQ_INIT(&conn->xc_pending); @@ -232,13 +235,6 @@ xpc_connection_resume(xpc_connection_t xconn) } dispatch_resume(conn->xc_recv_queue); - dispatch_async(conn->xc_recv_queue, ^{ - debugf("Recv queue %p works! #1", conn->xc_recv_queue); - }); - dispatch_async(conn->xc_recv_queue, ^{ - debugf("Recv queue works! #2"); - }); - } void @@ -249,10 +245,13 @@ xpc_connection_send_message(xpc_connection_t xconn, uint64_t id; conn = conn_extract(xconn); - id = xpc_dictionary_get_uint64(message, XPC_SEQID); - if (id == 0) - id = XPC_CONNECTION_NEXT_ID(conn); + xpc_object_t seq_id = xpc_dictionary_get_value(message, XPC_SEQID); + if (seq_id) { + id = xpc_uint64_get_value(seq_id); + } else { + id = generate_message_id(xconn); + } xpc_retain(message); dispatch_async(conn->xc_send_queue, ^{ @@ -267,11 +266,19 @@ xpc_connection_send_message_with_reply(xpc_connection_t xconn, { struct xpc_connection *conn; struct xpc_pending_call *call; + uint64_t id; + + xpc_object_t seq_id = xpc_dictionary_get_value(message, XPC_SEQID); + if (seq_id) { + id = xpc_uint64_get_value(seq_id); + } else { + id = generate_message_id(xconn); + } conn = conn_extract(xconn); call = malloc(sizeof(struct xpc_pending_call)); - call->xp_id = XPC_CONNECTION_NEXT_ID(conn); - call->xp_handler = handler; + call->xp_id = id; + call->xp_handler = Block_copy(handler); call->xp_queue = targetq; TAILQ_INSERT_TAIL(&conn->xc_pending, call, xp_link); @@ -410,20 +417,51 @@ xpc_transaction_end(void) vproc_transaction_end(NULL, NULL); } +static void xpc_connection_invalidate(xpc_connection_t xconn) { + struct xpc_pending_call* call = NULL; + struct xpc_connection* conn = conn_extract(xconn); + struct xpc_connection* peer = NULL; + + // tell all outstanding waiters that the connection is now invalid + while ((call = TAILQ_FIRST(&conn->xc_pending))) { + TAILQ_REMOVE(&conn->xc_pending, call, xp_link); + if (call->xp_handler) { + xpc_handler_t handler = call->xp_handler; + dispatch_async(conn->xc_target_queue, ^{ + handler((xpc_object_t)XPC_ERROR_CONNECTION_INVALID); + Block_release(handler); + }); + } + free(call); + } + + // notify the main event handler + if (conn->xc_handler) { + dispatch_async(conn->xc_target_queue, ^{ + conn->xc_handler((xpc_object_t)XPC_ERROR_CONNECTION_INVALID); + }); + } +}; + static void xpc_send(xpc_connection_t xconn, xpc_object_t message, uint64_t id) { struct xpc_connection *conn; - kern_return_t kr; + int status; debugf("xpc_send(): connection=%p, message=%p (type=%d), id=%d", xconn, message, ((struct xpc_object*) message)->xo_xpc_type, id); conn = conn_extract(xconn); - kr = xpc_pipe_send(message, conn->xc_remote_port, + status = xpc_pipe_send(message, conn->xc_remote_port, conn->xc_local_port, id); - if (kr != KERN_SUCCESS) - debugf("send failed, kr=%d", kr); + if (status != 0) { + debugf("send failed, kr=%d", status); + + if (status == -EPIPE) { + xpc_connection_invalidate(xconn); + } + } } static void @@ -453,21 +491,43 @@ xpc_connection_recv_message(void *context) struct xpc_connection *conn, *peer; xpc_object_t result; mach_port_t remote; - kern_return_t kr; + int status; uint64_t id; debugf("xpc_recv: connection=%p", context); conn = context; - kr = xpc_pipe_receive(conn->xc_local_port, &remote, &result, &id); - if (kr != KERN_SUCCESS) + status = xpc_pipe_receive(conn->xc_local_port, &remote, &result, &id); + if (status != 0) { + if (status == -EPIPE) { + xpc_connection_invalidate((xpc_connection_t)conn); + } return; + } debugf("xpc_recv: message=%p, id=%d, remote=<%d>", result, id, remote); if (conn->xc_flags & XPC_CONNECTION_MACH_SERVICE_LISTENER) { TAILQ_FOREACH(peer, &conn->xc_peers, xc_link) { if (remote == peer->xc_remote_port) { + // first, check if it's a reply to any message with pending replies + TAILQ_FOREACH(call, &peer->xc_pending, xp_link) { + if (call->xp_id == id) { + debugf("Found matching ID, async run handler on queue %p", peer->xc_target_queue); + if (call->xp_handler) { + dispatch_async(peer->xc_target_queue, ^{ + debugf("pass to handler"); + call->xp_handler(result); + Block_release(call->xp_handler); + TAILQ_REMOVE(&peer->xc_pending, call, xp_link); + free(call); + }); + } + return; + } + } + + // otherwise, pass it to the main handler if (peer->xc_handler) { dispatch_async(peer->xc_target_queue, ^{ peer->xc_handler(result); @@ -517,6 +577,7 @@ xpc_connection_recv_message(void *context) dispatch_async(conn->xc_target_queue, ^{ debugf("pass to handler"); call->xp_handler(result); + Block_release(call->xp_handler); TAILQ_REMOVE(&conn->xc_pending, call, xp_link); free(call); diff --git a/src/dictionary.c b/src/dictionary.c index 3cfc58a..b20dc94 100644 --- a/src/dictionary.c +++ b/src/dictionary.c @@ -65,8 +65,14 @@ xpc_dictionary_create_reply(xpc_object_t original) } xo_orig->xo_flags &= ~_XPC_FROM_WIRE; - // TODO: Apple puts a reference to original into retval - return xpc_dictionary_create(NULL, NULL, 0); + // possible BUG: i'm not sure if this is how Apple associates the 2 dictionaries or not + // if we want to replicate their format exactly, we need to double-check this + xpc_object_t new_dict = xpc_dictionary_create(NULL, NULL, 0); + xpc_object_t seq_id = xpc_dictionary_get_value(original, XPC_SEQID); + if (seq_id != NULL) { + xpc_dictionary_set_uint64(new_dict, XPC_SEQID, xpc_uint64_get_value(seq_id)); + } + return new_dict; } void @@ -133,6 +139,8 @@ xpc_dictionary_set_value(xpc_object_t xdict, const char *key, xpc_object_t value TAILQ_FOREACH(pair, head, xo_link) { if (!strcmp(pair->key, key)) { + xpc_retain(value); + xpc_release(pair->value); pair->value = value; return; } diff --git a/src/misc.c b/src/misc.c index 1f4004f..fd9d027 100644 --- a/src/misc.c +++ b/src/misc.c @@ -422,14 +422,14 @@ xpc_pipe_send(xpc_object_t xobj, mach_port_t dst, mach_port_t local, debugf("packing message"); if ((errno = xpc_serialize(xo, NULL, 0, &size)) != 0) - return errno; + return -errno; msg_size = size + sizeof(struct xpc_message); if ((message = malloc(msg_size)) == NULL) - return ENOMEM; + return -ENOMEM; if ((errno = xpc_serialize(xo, message->data, size, NULL)) != 0) - return errno; + return -errno; debugf("sending message"); msg_size = ALIGN(size + sizeof(mach_msg_header_t) + sizeof(size_t) + sizeof(uint64_t)); @@ -442,7 +442,7 @@ xpc_pipe_send(xpc_object_t xobj, mach_port_t dst, mach_port_t local, message->size = size; kr = mach_msg_send(&message->header); if (kr != KERN_SUCCESS) - err = (kr == KERN_INVALID_TASK) ? EPIPE : EINVAL; + err = (kr == KERN_INVALID_TASK || kr == MACH_SEND_INVALID_DEST) ? -EPIPE : -EINVAL; else err = 0; free(message); @@ -476,8 +476,11 @@ xpc_pipe_receive(mach_port_t local, mach_port_t *remote, xpc_object_t *result, 0, request->msgh_size, request->msgh_local_port, MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL); - if (kr != 0) + if (kr != KERN_SUCCESS) { LOG("mach_msg_receive returned %d\n", kr); + return (kr == KERN_INVALID_TASK || kr == MACH_RCV_PORT_DIED || kr == MACH_RCV_PORT_CHANGED) ? -EPIPE : -EINVAL; + } + *remote = request->msgh_remote_port; *id = message.id; data_size = (int)message.size; diff --git a/src/type.c b/src/type.c index 0440dd7..b89ce08 100644 --- a/src/type.c +++ b/src/type.c @@ -421,20 +421,20 @@ xpc_uuid_get_bytes(xpc_object_t xuuid) xpc_type_t xpc_get_type(xpc_object_t obj) { - struct xpc_object *xo; + struct xpc_object* xo = obj; + + // temporary hack + if (xo->xo_xpc_type == _XPC_TYPE_DICTIONARY && xpc_dictionary_get_value(obj, XPC_ERROR_KEY_DESCRIPTION) != NULL) { + return XPC_TYPE_ERROR; + } - xo = obj; return (xpc_typemap[xo->xo_xpc_type]); } bool xpc_equal(xpc_object_t x1, xpc_object_t x2) { - struct xpc_object *xo1, *xo2; - - xo1 = x1; - xo2 = x2; - return xo1 == xo2; + return xpc_hash(x1) == xpc_hash(x2); } static size_t @@ -489,6 +489,9 @@ xpc_hash(xpc_object_t obj) return (hash); case _XPC_TYPE_POINTER: return ((size_t)xo->xo_u.ptr); + case _XPC_TYPE_CONNECTION: + // two connections can only be equal if they are the same object (i.e. with the same pointers) + return (size_t)obj; default: return 0; }