mirror of
https://github.com/darlinghq/darlingserver.git
synced 2024-11-26 22:00:26 +00:00
RPC changes for better interrupt/signal handling
This commit is contained in:
parent
aee29954e2
commit
a82cff6069
@ -14,6 +14,8 @@ XNU_TRAP_BSD = 1 << 4
|
||||
XNU_TRAP_NO_DTAPE_DEF = 1 << 5
|
||||
XNU_BSD_TRAP_CALL = XNU_TRAP_CALL | XNU_TRAP_NOPREFIX | XNU_TRAP_NOSUFFIX | XNU_TRAP_NOSUFFIX_ARGS | XNU_TRAP_BSD
|
||||
UNMANAGED_CALL = 1 << 6
|
||||
ALLOW_INTERRUPTIONS = 1 << 7
|
||||
PUSH_UNKNOWN_REPLIES = 1 << 8
|
||||
|
||||
# NOTE: in Python 3.7+, we can rely on dictionaries having their items in insertion order.
|
||||
# unfortunately, we can't expect everyone building Darling to have Python 3.7+ installed.
|
||||
@ -50,8 +52,53 @@ calls = [
|
||||
# the resulting descriptor received on the other end (in either client or server) will behave like a `dup()`ed descriptor.
|
||||
#
|
||||
# FLAGS:
|
||||
# currently, the only flag that can be passed is XNU_TRAP_CALL. this indicates that the given call is actually an XNU trap.
|
||||
# this enables more advanced wrappers to be generated for that call and avoid unnecessary boilerplate code on the server side.
|
||||
# XNU_TRAP_CALL
|
||||
# this indicates that the given call is actually an XNU trap. this enables more advanced wrappers to be generated for that call
|
||||
# and avoid unnecessary boilerplate code on the server side.
|
||||
# XNU_TRAP_NOPREFIX
|
||||
# this indicates that the XNU trap does not use the `_kernelrpc_` prefix on the server side.
|
||||
# this affects both the function and argument structure names.
|
||||
# XNU_TRAP_NOSUFFIX
|
||||
# this indicates that the XNU trap does not use the `_trap` suffix on the server side.
|
||||
# this affects both the function and argument structure names.
|
||||
# XNU_TRAP_NOSUFFIX_ARGS
|
||||
# this indicates the arguments structure for this call on the server side does not use the `_trap` suffix.
|
||||
# this does not affect the name of the function on the server side.
|
||||
# XNU_TRAP_BSD
|
||||
# this indicates the XNU trap is for a BSD syscall. BSD syscalls have 2 return codes: one for failure and one for success.
|
||||
# this flag informs the RPC wrapper generator about this so it can handle it appropriately.
|
||||
# XNU_TRAP_NO_DTAPE_DEF
|
||||
# by default, the RPC wrapper generator code will generate duct-tape wrappers for XNU traps that automatically call the
|
||||
# XNU handler function for the trap. this flag tells it not to do that; this means you must define the duct-tape handler yourself.
|
||||
# UNMANAGED_CALL
|
||||
# this indicates that the given call may be called from an unmanaged process; that is, a process that the server does not manage or
|
||||
# have direct access to (e.g. it cannot access its memory). this is mainly useful for calls that inspect the state of the container.
|
||||
# ALLOW_INTERRUPTIONS
|
||||
# by default, calls are performed atomically with signals disabled on the calling thread; this way, the call is either fully performed
|
||||
# or fully unperformed. this flag indicates that the call should be allowed to be interrupted by signals. most calls should be performed
|
||||
# without interruptions, but calls that may wait (usually for long periods of time) should be performed with interruptions allowed.
|
||||
#
|
||||
# do note that this means callers may see the value of `dserver_rpc_hooks_get_interrupt_status()` on return and must handle it appropriately.
|
||||
# this status is only returned when the RPC send operation is interrupted; when the RPC receive operation is interrupted, it is simply retried.
|
||||
# thus, even when interruptions are allowed, callers should still see a consistent RPC state.
|
||||
# PUSH_UNKNOWN_REPLIES
|
||||
# the vast majority of calls should fail (spectacularly) when they receive an unexpected/unknown reply from the server.
|
||||
# 99% of the time, this is indicative of a critical RPC error. some calls (currently only one: interrupt_enter), however, need to gracefully handle
|
||||
# mixed up replies because of race conditions.
|
||||
#
|
||||
# for example, when a signal is received, interrupt_enter is called. sometimes, signals arrive while we're waiting for a reply from the server
|
||||
# for another call. when the server receives interrupt_enter, it interrupts the current call and any reply it generates is deferred to be delivered
|
||||
# once the server receives interrupt_exit. however, there is still a race condition here: if the server already had the reply to the interrupted
|
||||
# call queued for delivery when the signal was received, the client will send interrupt_enter and immediately receive the reply to the interrupted
|
||||
# call. without handling this gracefully (by saving the reply for later), RPC communication becomes desynchronized and the program crashes.
|
||||
#
|
||||
# note that this flag should only be used in very special circumstances (interrupt_enter currently being the only such one).
|
||||
# not only can it mask legitimate RPC communication errors, but it also requires significantly more stack space to handle such calls,
|
||||
# as the wrapper must create a buffer large enough to store any possible reply (including any potential descriptors).
|
||||
#
|
||||
# the way this works is that calls with this flag allocate enough space in the reply buffer to hold all possible replies;
|
||||
# if they receive an unexpected reply, they push it back to the server. the server then holds on to the reply
|
||||
# and re-sends it when appropriate (e.g. for interrupt_enter, that's after interrupt_exit is called).
|
||||
#
|
||||
# TODO: we should probably add a class for these calls (so it's more readable).
|
||||
# we could even create a DSL (à-la-MIG), but that's probably overkill since
|
||||
@ -111,7 +158,7 @@ calls = [
|
||||
('length', 'uint64_t'),
|
||||
]),
|
||||
|
||||
('fork_wait_for_child', [], []),
|
||||
('fork_wait_for_child', [], [], ALLOW_INTERRUPTIONS),
|
||||
|
||||
('sigprocess', [
|
||||
('bsd_signal_number', 'int32_t'),
|
||||
@ -133,9 +180,9 @@ calls = [
|
||||
('is_64_bit', 'bool'),
|
||||
]),
|
||||
|
||||
('sigexc_enter', [], []),
|
||||
('interrupt_enter', [], [], PUSH_UNKNOWN_REPLIES),
|
||||
|
||||
('sigexc_exit', [], []),
|
||||
('interrupt_exit', [], []),
|
||||
|
||||
('console_open', [], [
|
||||
('console', '@fd'),
|
||||
@ -211,7 +258,7 @@ calls = [
|
||||
('timeout', 'uint32_t'),
|
||||
('priority', 'uint32_t'),
|
||||
('rcv_msg', 'void*', 'uint64_t'),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX | ALLOW_INTERRUPTIONS),
|
||||
|
||||
('mach_port_deallocate', [
|
||||
('target', 'uint32_t'),
|
||||
@ -359,25 +406,25 @@ calls = [
|
||||
|
||||
('semaphore_wait', [
|
||||
('wait_name', 'uint32_t'),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX | ALLOW_INTERRUPTIONS),
|
||||
|
||||
('semaphore_wait_signal', [
|
||||
('wait_name', 'uint32_t'),
|
||||
('signal_name', 'uint32_t'),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX | ALLOW_INTERRUPTIONS),
|
||||
|
||||
('semaphore_timedwait', [
|
||||
('wait_name', 'uint32_t'),
|
||||
('sec', 'uint32_t'),
|
||||
('nsec', 'uint32_t'),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX | ALLOW_INTERRUPTIONS),
|
||||
|
||||
('semaphore_timedwait_signal', [
|
||||
('wait_name', 'uint32_t'),
|
||||
('signal_name', 'uint32_t'),
|
||||
('sec', 'uint32_t'),
|
||||
('nsec', 'uint32_t'),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX),
|
||||
], [], XNU_TRAP_CALL | XNU_TRAP_NOPREFIX | ALLOW_INTERRUPTIONS),
|
||||
|
||||
#
|
||||
# mk_timer traps
|
||||
@ -453,7 +500,7 @@ calls = [
|
||||
('nsec', 'uint32_t'),
|
||||
], [
|
||||
('retval', 'uint32_t'),
|
||||
], XNU_BSD_TRAP_CALL | XNU_TRAP_NO_DTAPE_DEF),
|
||||
], XNU_BSD_TRAP_CALL | XNU_TRAP_NO_DTAPE_DEF | ALLOW_INTERRUPTIONS),
|
||||
|
||||
('psynch_mutexdrop', [
|
||||
('mutex', 'uint64_t'),
|
||||
@ -473,7 +520,7 @@ calls = [
|
||||
('flags', 'uint32_t'),
|
||||
], [
|
||||
('retval', 'uint32_t'),
|
||||
], XNU_BSD_TRAP_CALL | XNU_TRAP_NO_DTAPE_DEF),
|
||||
], XNU_BSD_TRAP_CALL | XNU_TRAP_NO_DTAPE_DEF | ALLOW_INTERRUPTIONS),
|
||||
|
||||
('psynch_rw_rdlock', [
|
||||
('rwlock', 'uint64_t'),
|
||||
@ -483,7 +530,7 @@ calls = [
|
||||
('flags', 'int32_t'),
|
||||
], [
|
||||
('retval', 'uint32_t'),
|
||||
], XNU_BSD_TRAP_CALL | XNU_TRAP_NO_DTAPE_DEF),
|
||||
], XNU_BSD_TRAP_CALL | XNU_TRAP_NO_DTAPE_DEF | ALLOW_INTERRUPTIONS),
|
||||
|
||||
('psynch_rw_unlock', [
|
||||
('rwlock', 'uint64_t'),
|
||||
@ -503,7 +550,7 @@ calls = [
|
||||
('flags', 'int32_t'),
|
||||
], [
|
||||
('retval', 'uint32_t'),
|
||||
], XNU_BSD_TRAP_CALL | XNU_TRAP_NO_DTAPE_DEF),
|
||||
], XNU_BSD_TRAP_CALL | XNU_TRAP_NO_DTAPE_DEF | ALLOW_INTERRUPTIONS),
|
||||
]
|
||||
|
||||
def parse_type(param_tuple, is_public):
|
||||
@ -581,6 +628,7 @@ public_header.write("#define DSERVER_CALL_UNMANAGED_FLAG 0x80000000U\n\n")
|
||||
public_header.write("enum dserver_callnum {\n")
|
||||
# "52ccall" -> "s2c call"
|
||||
public_header.write("\tdserver_callnum_s2c = 0x52cca11,\n")
|
||||
public_header.write("\tdserver_callnum_push_reply = 0xbadca11,\n")
|
||||
public_header.write("\tdserver_callnum_invalid = 0,\n")
|
||||
idx = 1
|
||||
for call in calls:
|
||||
@ -628,13 +676,19 @@ typedef struct dserver_rpc_replyhdr {
|
||||
int code;
|
||||
} dserver_rpc_replyhdr_t;
|
||||
|
||||
typedef struct dserver_rpc_call_push_reply {
|
||||
dserver_rpc_callhdr_t header;
|
||||
uint64_t reply;
|
||||
uint64_t reply_size;
|
||||
} dserver_rpc_call_push_reply_t;
|
||||
|
||||
""")
|
||||
|
||||
library_source.write("""\
|
||||
#include {}
|
||||
#include <darlingserver/rpc-supplement.h>
|
||||
|
||||
#if !defined(dserver_rpc_hooks_msghdr_t) || !defined(dserver_rpc_hooks_iovec_t) || !defined(dserver_rpc_hooks_cmsghdr_t) || !defined(DSERVER_RPC_HOOKS_CMSG_SPACE) || !defined(DSERVER_RPC_HOOKS_CMSG_FIRSTHDR) || !defined(DSERVER_RPC_HOOKS_SOL_SOCKET) || !defined(DSERVER_RPC_HOOKS_SCM_RIGHTS) || !defined(DSERVER_RPC_HOOKS_CMSG_LEN) || !defined(DSERVER_RPC_HOOKS_CMSG_DATA) || !defined(DSERVER_RPC_HOOKS_ATTRIBUTE)
|
||||
#if !defined(dserver_rpc_hooks_msghdr_t) || !defined(dserver_rpc_hooks_iovec_t) || !defined(dserver_rpc_hooks_cmsghdr_t) || !defined(DSERVER_RPC_HOOKS_CMSG_SPACE) || !defined(DSERVER_RPC_HOOKS_CMSG_FIRSTHDR) || !defined(DSERVER_RPC_HOOKS_SOL_SOCKET) || !defined(DSERVER_RPC_HOOKS_SCM_RIGHTS) || !defined(DSERVER_RPC_HOOKS_CMSG_LEN) || !defined(DSERVER_RPC_HOOKS_CMSG_DATA) || !defined(DSERVER_RPC_HOOKS_ATTRIBUTE) || !defined(dserver_rpc_hooks_atomic_save_t)
|
||||
#error Missing definitions
|
||||
#endif
|
||||
|
||||
@ -690,6 +744,26 @@ DSERVER_RPC_HOOKS_ATTRIBUTE void dserver_rpc_hooks_close_fd(int fd);
|
||||
DSERVER_RPC_HOOKS_ATTRIBUTE int dserver_rpc_hooks_get_socket(void);
|
||||
#endif
|
||||
|
||||
#ifndef dserver_rpc_hooks_printf
|
||||
DSERVER_RPC_HOOKS_ATTRIBUTE void dserver_rpc_hooks_printf(const char* format, ...);
|
||||
#endif
|
||||
|
||||
#ifndef dserver_rpc_hooks_atomic_begin
|
||||
DSERVER_RPC_HOOKS_ATTRIBUTE void dserver_rpc_hooks_atomic_begin(dserver_rpc_hooks_atomic_save_t* atomic_save);
|
||||
#endif
|
||||
|
||||
#ifndef dserver_rpc_hooks_atomic_end
|
||||
DSERVER_RPC_HOOKS_ATTRIBUTE void dserver_rpc_hooks_atomic_end(dserver_rpc_hooks_atomic_save_t* atomic_save);
|
||||
#endif
|
||||
|
||||
#ifndef dserver_rpc_hooks_get_interrupt_status
|
||||
DSERVER_RPC_HOOKS_ATTRIBUTE int dserver_rpc_hooks_get_interrupt_status(void);
|
||||
#endif
|
||||
|
||||
#ifndef dserver_rpc_hooks_push_reply
|
||||
DSERVER_RPC_HOOKS_ATTRIBUTE void dserver_rpc_hooks_push_reply(int socket, const dserver_rpc_hooks_msghdr_t* message, size_t size);
|
||||
#endif
|
||||
|
||||
""".format(library_import))
|
||||
|
||||
internal_header.write("#define DSERVER_VALID_CALLNUM_CASES \\\n")
|
||||
@ -988,6 +1062,7 @@ internal_header.write("\n")
|
||||
public_header.write("__attribute__((always_inline)) static const char* dserver_callnum_to_string(dserver_callnum_t callnum) {\n")
|
||||
public_header.write("\tswitch (callnum) {\n")
|
||||
public_header.write("\t\tcase dserver_callnum_s2c: return \"dserver_callnum_s2c\";\n")
|
||||
public_header.write("\t\tcase dserver_callnum_push_reply: return \"dserver_callnum_push_reply\";\n")
|
||||
for call in calls:
|
||||
call_name = call[0]
|
||||
public_header.write("\t\tcase dserver_callnum_" + call_name + ": return \"dserver_callnum_" + call_name + "\";\n")
|
||||
@ -995,12 +1070,33 @@ public_header.write("\t\tdefault: return (const char*)0;\n")
|
||||
public_header.write("\t}\n")
|
||||
public_header.write("};\n\n")
|
||||
|
||||
max_call_fd_count = 0
|
||||
max_reply_fd_count = 0
|
||||
for call in calls:
|
||||
call_name = call[0]
|
||||
call_parameters = call[1]
|
||||
reply_parameters = call[2]
|
||||
fd_count_in_call = 0
|
||||
fd_count_in_reply = 0
|
||||
flags = call[3] if len(call) >= 4 else 0
|
||||
for param in call_parameters:
|
||||
if is_fd(param):
|
||||
fd_count_in_call += 1
|
||||
for param in reply_parameters:
|
||||
if is_fd(param):
|
||||
fd_count_in_reply += 1
|
||||
if fd_count_in_call > max_call_fd_count:
|
||||
max_call_fd_count = fd_count_in_call
|
||||
if fd_count_in_reply > max_reply_fd_count:
|
||||
max_reply_fd_count = fd_count_in_reply
|
||||
|
||||
for call in calls:
|
||||
call_name = call[0]
|
||||
call_parameters = call[1]
|
||||
reply_parameters = call[2]
|
||||
fd_count_in_call = 0
|
||||
fd_count_in_reply = 0
|
||||
flags = call[3] if len(call) >= 4 else 0
|
||||
|
||||
# define the RPC call body structure
|
||||
if len(call_parameters) > 0:
|
||||
@ -1098,10 +1194,21 @@ for call in calls:
|
||||
library_source.write("\tunion {\n")
|
||||
library_source.write("\t\tdserver_rpc_reply_" + call_name + "_t reply;\n")
|
||||
library_source.write("\t\tdserver_s2c_call_t s2c;\n")
|
||||
|
||||
# make room for any potetial replies if we need to handle unexpected replies
|
||||
if (flags & PUSH_UNKNOWN_REPLIES) != 0:
|
||||
for other_call in calls:
|
||||
other_call_name = other_call[0]
|
||||
other_call_parameters = other_call[1]
|
||||
other_reply_parameters = other_call[2]
|
||||
other_fd_count_in_reply = 0
|
||||
other_flags = other_call[3] if len(other_call) >= 4 else 0
|
||||
library_source.write("\t\tdserver_rpc_reply_" + other_call_name + "_t potential_reply_" + other_call_name + ";\n")
|
||||
|
||||
library_source.write("\t} reply_msg;\n")
|
||||
|
||||
if fd_count_in_call > 0 or fd_count_in_reply > 0:
|
||||
library_source.write("\tint fds[" + str(max(fd_count_in_call, fd_count_in_reply)) + "];\n")
|
||||
if max(fd_count_in_call, fd_count_in_reply, max_reply_fd_count if (flags & PUSH_UNKNOWN_REPLIES) != 0 else 0) > 0:
|
||||
library_source.write("\tint fds[" + str(max(fd_count_in_call, fd_count_in_reply, max_reply_fd_count if (flags & PUSH_UNKNOWN_REPLIES) != 0 else 0)) + "];\n")
|
||||
library_source.write("\tint valid_fd_count;\n")
|
||||
library_source.write("\tchar controlbuf[DSERVER_RPC_HOOKS_CMSG_SPACE(sizeof(fds))];\n")
|
||||
|
||||
@ -1159,7 +1266,7 @@ for call in calls:
|
||||
.msg_iovlen = 1,
|
||||
"""), '\t'))
|
||||
|
||||
if fd_count_in_reply == 0:
|
||||
if max(fd_count_in_reply, max_reply_fd_count if (flags & PUSH_UNKNOWN_REPLIES) != 0 else 0) == 0:
|
||||
library_source.write("\t\t.msg_control = NULL,\n")
|
||||
library_source.write("\t\t.msg_controllen = 0,\n")
|
||||
else:
|
||||
@ -1168,22 +1275,75 @@ for call in calls:
|
||||
|
||||
library_source.write("\t};\n\n")
|
||||
|
||||
library_source.write("\tlong int long_status = dserver_rpc_hooks_send_message(server_socket, &callmsg);\n")
|
||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||
library_source.write("\tdserver_rpc_hooks_atomic_save_t atomic_save;\n")
|
||||
library_source.write("\tdserver_rpc_hooks_atomic_begin(&atomic_save);\n\n")
|
||||
|
||||
library_source.write("\tlong int long_status = dserver_rpc_hooks_send_message(server_socket, &callmsg);\n\n")
|
||||
|
||||
if (flags & ALLOW_INTERRUPTIONS) != 0:
|
||||
library_source.write("\tif (long_status == dserver_rpc_hooks_get_interrupt_status()) {\n")
|
||||
library_source.write("\t\treturn (int)long_status;\n")
|
||||
library_source.write("\t}\n\n")
|
||||
|
||||
library_source.write("\tif (long_status < 0) {\n")
|
||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||
library_source.write("\t\tdserver_rpc_hooks_atomic_end(&atomic_save);\n")
|
||||
library_source.write("\t\tdserver_rpc_hooks_printf(\"*** %d:%d: %s: BAD SEND STATUS: %ld ***\\n\", dserver_rpc_hooks_get_pid(), dserver_rpc_hooks_get_tid(), __func__, long_status);\n")
|
||||
library_source.write("\t\treturn (int)long_status;\n")
|
||||
library_source.write("\t}\n\n")
|
||||
|
||||
library_source.write("\tif (long_status != sizeof(call)) {\n")
|
||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||
library_source.write("\t\tdserver_rpc_hooks_atomic_end(&atomic_save);\n")
|
||||
library_source.write("\t\tdserver_rpc_hooks_printf(\"*** %d:%d: %s: BAD SEND LENGTH: %ld (expected %zu) ***\\n\", dserver_rpc_hooks_get_pid(), dserver_rpc_hooks_get_tid(), __func__, long_status, sizeof(call));\n")
|
||||
library_source.write("\t\treturn dserver_rpc_hooks_get_communication_error_status();\n")
|
||||
library_source.write("\t}\n\n")
|
||||
|
||||
library_source.write("\tlong_status = dserver_rpc_hooks_receive_message(server_socket, &replymsg);\n")
|
||||
if (flags & (ALLOW_INTERRUPTIONS | PUSH_UNKNOWN_REPLIES)) != 0:
|
||||
library_source.write("retry_receive:\n")
|
||||
library_source.write("\tlong_status = dserver_rpc_hooks_receive_message(server_socket, &replymsg);\n\n")
|
||||
|
||||
if (flags & ALLOW_INTERRUPTIONS) != 0:
|
||||
library_source.write("\tif (long_status == dserver_rpc_hooks_get_interrupt_status()) {\n")
|
||||
library_source.write("\t\tgoto retry_receive;\n")
|
||||
library_source.write("\t}\n\n")
|
||||
|
||||
library_source.write("\tif (long_status < 0) {\n")
|
||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||
library_source.write("\t\tdserver_rpc_hooks_atomic_end(&atomic_save);\n")
|
||||
library_source.write("\t\tdserver_rpc_hooks_printf(\"*** %d:%d: %s: BAD RECEIVE STATUS: %ld ***\\n\", dserver_rpc_hooks_get_pid(), dserver_rpc_hooks_get_tid(), __func__, long_status);\n")
|
||||
library_source.write("\t\treturn (int)long_status;\n")
|
||||
library_source.write("\t}\n\n")
|
||||
library_source.write("\tif (long_status != sizeof(reply_msg.reply)) {\n")
|
||||
|
||||
library_source.write("\tif (long_status < sizeof(dserver_rpc_replyhdr_t)) {\n")
|
||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||
library_source.write("\t\tdserver_rpc_hooks_atomic_end(&atomic_save);\n")
|
||||
library_source.write("\t\tdserver_rpc_hooks_printf(\"*** %d:%d: %s: BAD RECEIVE MESSAGE: length=%ld (expected %zu) ***\\n\", dserver_rpc_hooks_get_pid(), dserver_rpc_hooks_get_tid(), __func__, long_status, sizeof(reply_msg.reply));\n")
|
||||
library_source.write("\t\treturn dserver_rpc_hooks_get_communication_error_status();\n")
|
||||
library_source.write("\t}\n\n")
|
||||
|
||||
library_source.write("\tif (reply_msg.reply.header.number != dserver_callnum_" + call_name + ") {\n")
|
||||
if (flags & PUSH_UNKNOWN_REPLIES) != 0:
|
||||
library_source.write("\t\tdserver_rpc_hooks_push_reply(server_socket, &replymsg, long_status);\n")
|
||||
library_source.write("\t\tgoto retry_receive;\n")
|
||||
else:
|
||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||
library_source.write("\t\tdserver_rpc_hooks_atomic_end(&atomic_save);\n")
|
||||
library_source.write("\t\tdserver_rpc_hooks_printf(\"*** %d:%d: %s: BAD RECEIVE MESSAGE: number=%d (expected %d), code=%d, length=%ld (expected %zu) ***\\n\", dserver_rpc_hooks_get_pid(), dserver_rpc_hooks_get_tid(), __func__, reply_msg.reply.header.number, dserver_callnum_" + call_name + ", reply_msg.reply.header.code, long_status, sizeof(reply_msg.reply));\n")
|
||||
library_source.write("\t\treturn dserver_rpc_hooks_get_communication_error_status();\n")
|
||||
library_source.write("\t}\n\n")
|
||||
|
||||
library_source.write("\tif (long_status != sizeof(reply_msg.reply)) {\n")
|
||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||
library_source.write("\t\tdserver_rpc_hooks_atomic_end(&atomic_save);\n")
|
||||
library_source.write("\t\tdserver_rpc_hooks_printf(\"*** %d:%d: %s: BAD RECEIVE MESSAGE: number=%d (expected %d), code=%d, length=%ld (expected %zu) ***\\n\", dserver_rpc_hooks_get_pid(), dserver_rpc_hooks_get_tid(), __func__, reply_msg.reply.header.number, dserver_callnum_" + call_name + ", reply_msg.reply.header.code, long_status, sizeof(reply_msg.reply));\n")
|
||||
library_source.write("\t\treturn dserver_rpc_hooks_get_communication_error_status();\n")
|
||||
library_source.write("\t}\n\n")
|
||||
|
||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||
library_source.write("\tdserver_rpc_hooks_atomic_end(&atomic_save);\n\n")
|
||||
|
||||
if fd_count_in_reply != 0:
|
||||
library_source.write("\tvalid_fd_count = 0;\n")
|
||||
for param in reply_parameters:
|
||||
@ -1281,6 +1441,24 @@ for call in calls:
|
||||
|
||||
library_source.write("};\n\n")
|
||||
|
||||
public_header.write("// we don't care about multiple evaluation here\n")
|
||||
public_header.write("#define dserver_rpc_helper_max(a, b) (((b) > (a)) ? (b) : (a))\n\n")
|
||||
|
||||
curr_call_len_str = "0"
|
||||
curr_reply_len_str = "0"
|
||||
for call in calls:
|
||||
call_name = call[0]
|
||||
call_parameters = call[1]
|
||||
reply_parameters = call[2]
|
||||
flags = call[3] if len(call) >= 4 else 0
|
||||
curr_call_len_str = "(dserver_rpc_helper_max(sizeof(dserver_rpc_call_" + call_name + "_t), " + curr_call_len_str + "))"
|
||||
curr_reply_len_str = "(dserver_rpc_helper_max(sizeof(dserver_rpc_reply_" + call_name + "_t), " + curr_reply_len_str + "))"
|
||||
|
||||
public_header.write("#define DSERVER_RPC_CALL_MAX_LENGTH " + curr_call_len_str + "\n")
|
||||
public_header.write("#define DSERVER_RPC_REPLY_MAX_LENGTH " + curr_reply_len_str + "\n")
|
||||
public_header.write("#define DSERVER_RPC_CALL_MAX_FD_COUNT " + str(max_call_fd_count) + "\n")
|
||||
public_header.write("#define DSERVER_RPC_REPLY_MAX_FD_COUNT " + str(max_reply_fd_count) + "\n\n")
|
||||
|
||||
public_header.write("""\
|
||||
#ifdef __cplusplus
|
||||
};
|
||||
|
32
src/call.cpp
32
src/call.cpp
@ -46,6 +46,7 @@ std::shared_ptr<DarlingServer::Call> DarlingServer::Call::callFromMessage(Messag
|
||||
// first, make sure we know this call number
|
||||
switch (header->number) {
|
||||
case dserver_callnum_s2c:
|
||||
case dserver_callnum_push_reply:
|
||||
DSERVER_VALID_CALLNUM_CASES
|
||||
break;
|
||||
|
||||
@ -93,6 +94,33 @@ std::shared_ptr<DarlingServer::Call> DarlingServer::Call::callFromMessage(Messag
|
||||
|
||||
dtape_semaphore_up(thread->_s2cReplySempahore);
|
||||
|
||||
return nullptr;
|
||||
} else if (header->number == dserver_callnum_push_reply) {
|
||||
// this is a reply push
|
||||
// (used to send interrupted replies back to the server)
|
||||
|
||||
auto pushReplyCall = reinterpret_cast<const dserver_rpc_call_push_reply_t*>(requestMessage.data().data());
|
||||
Message replyToSave(pushReplyCall->reply_size, 0);
|
||||
|
||||
if (!process->readMemory(pushReplyCall->reply, replyToSave.data().data(), pushReplyCall->reply_size)) {
|
||||
throw std::runtime_error("Failed to read client-pushed reply body");
|
||||
}
|
||||
|
||||
replyToSave.replaceDescriptors(requestMessage.descriptors());
|
||||
requestMessage.replaceDescriptors({});
|
||||
|
||||
replyToSave.setAddress(requestMessage.address());
|
||||
|
||||
{
|
||||
std::unique_lock lock(thread->_rwlock);
|
||||
if (thread->_savedReply) {
|
||||
throw std::runtime_error("Client-pushed reply overwriting existing saved reply");
|
||||
}
|
||||
thread->_savedReply = std::move(replyToSave);
|
||||
}
|
||||
|
||||
callLog.debug() << *thread << ": Saved client-pushed reply" << callLog.endLog;
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -629,7 +657,7 @@ void DarlingServer::Call::TaskIs64Bit::processCall() {
|
||||
_sendReply(code, is64Bit);
|
||||
};
|
||||
|
||||
void DarlingServer::Call::SigexcEnter::processCall() {
|
||||
void DarlingServer::Call::InterruptEnter::processCall() {
|
||||
// FIXME: we should not be accessing private Thread members
|
||||
|
||||
auto thread = _thread.lock();
|
||||
@ -655,7 +683,7 @@ void DarlingServer::Call::SigexcEnter::processCall() {
|
||||
_sendReply(0);
|
||||
};
|
||||
|
||||
void DarlingServer::Call::SigexcExit::processCall() {
|
||||
void DarlingServer::Call::InterruptExit::processCall() {
|
||||
auto thread = _thread.lock();
|
||||
|
||||
dtape_thread_sigexc_exit(thread->_dtapeThread);
|
||||
|
@ -398,7 +398,7 @@ void DarlingServer::Thread::doWork() {
|
||||
|
||||
_rwlock.lock();
|
||||
|
||||
if (_pendingCall && _pendingCall->number() == Call::Number::SigexcEnter) {
|
||||
if (_pendingCall && _pendingCall->number() == Call::Number::InterruptEnter) {
|
||||
_interruptedContinuation = _continuationCallback;
|
||||
_continuationCallback = nullptr;
|
||||
_interruptedCall = _activeCall;
|
||||
|
Loading…
Reference in New Issue
Block a user