mirror of
https://github.com/darlinghq/darlingserver.git
synced 2024-11-26 22:00:26 +00:00
Allow threads to perform S2C calls at any time
This is possible now because we can signal threads with a real-time signal that libsystem_kernel handles.
This commit is contained in:
parent
b4fccbd6dd
commit
be203fd569
@ -115,6 +115,7 @@ add_executable(darlingserver
|
|||||||
|
|
||||||
add_dependencies(darlingserver
|
add_dependencies(darlingserver
|
||||||
generate_dserver_rpc_wrappers
|
generate_dserver_rpc_wrappers
|
||||||
|
rtsig_h
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(darlingserver PRIVATE
|
target_link_libraries(darlingserver PRIVATE
|
||||||
|
@ -370,6 +370,7 @@ target_include_directories(darlingserver_duct_tape PUBLIC
|
|||||||
add_dependencies(darlingserver_duct_tape
|
add_dependencies(darlingserver_duct_tape
|
||||||
kernel_mig_generate
|
kernel_mig_generate
|
||||||
generate_dserver_rpc_wrappers
|
generate_dserver_rpc_wrappers
|
||||||
|
rtsig_h
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(darlingserver_duct_tape PUBLIC
|
target_link_libraries(darlingserver_duct_tape PUBLIC
|
||||||
|
@ -115,6 +115,8 @@ namespace DarlingServer {
|
|||||||
void _clearPortSet(dtape_port_set_id_t portSetID);
|
void _clearPortSet(dtape_port_set_id_t portSetID);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
std::shared_ptr<Thread> _pickS2CThread(void) const;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
using ID = pid_t;
|
using ID = pid_t;
|
||||||
using NSID = ID;
|
using NSID = ID;
|
||||||
@ -171,6 +173,13 @@ namespace DarlingServer {
|
|||||||
std::shared_ptr<Process> tracerProcess() const;
|
std::shared_ptr<Process> tracerProcess() const;
|
||||||
bool setTracerProcess(std::shared_ptr<Process> tracerProcess);
|
bool setTracerProcess(std::shared_ptr<Process> tracerProcess);
|
||||||
|
|
||||||
|
uintptr_t allocatePages(size_t pageCount, int protection, uintptr_t addressHint, bool fixed, bool overwrite);
|
||||||
|
void freePages(uintptr_t address, size_t pageCount);
|
||||||
|
uintptr_t mapFile(int fd, size_t pageCount, int protection, uintptr_t addressHint, size_t pageOffset, bool fixed, bool overwrite);
|
||||||
|
void changeProtection(uintptr_t address, size_t pageCount, int protection);
|
||||||
|
|
||||||
|
uintptr_t getNextRegion(uintptr_t address) const;
|
||||||
|
|
||||||
static std::shared_ptr<Process> currentProcess();
|
static std::shared_ptr<Process> currentProcess();
|
||||||
static std::shared_ptr<Process> kernelProcess();
|
static std::shared_ptr<Process> kernelProcess();
|
||||||
|
|
||||||
|
@ -97,6 +97,10 @@ namespace DarlingServer {
|
|||||||
ucontext_t _syscallReturnHereDuringInterrupt;
|
ucontext_t _syscallReturnHereDuringInterrupt;
|
||||||
bool _didSyscallReturnDuringInterrupt = false;
|
bool _didSyscallReturnDuringInterrupt = false;
|
||||||
bool _handlingInterruptedCall = false;
|
bool _handlingInterruptedCall = false;
|
||||||
|
dtape_semaphore_t* _s2cInterruptEnterSemaphore = nullptr;
|
||||||
|
dtape_semaphore_t* _s2cInterruptExitSemaphore = nullptr;
|
||||||
|
bool _deferReplyForS2C = false;
|
||||||
|
std::optional<Message> _deferredReply = std::nullopt;
|
||||||
|
|
||||||
struct InterruptContext {
|
struct InterruptContext {
|
||||||
std::optional<Message> savedReply = std::nullopt;
|
std::optional<Message> savedReply = std::nullopt;
|
||||||
|
@ -222,6 +222,8 @@ calls = [
|
|||||||
('float_state', 'uint64_t'),
|
('float_state', 'uint64_t'),
|
||||||
], []),
|
], []),
|
||||||
|
|
||||||
|
('s2c_perform', [], []),
|
||||||
|
|
||||||
#
|
#
|
||||||
# kqueue channels
|
# kqueue channels
|
||||||
#
|
#
|
||||||
@ -1241,10 +1243,10 @@ for call in calls:
|
|||||||
|
|
||||||
library_source.write("\t} reply_msg;\n")
|
library_source.write("\t} reply_msg;\n")
|
||||||
|
|
||||||
if max(fd_count_in_call, fd_count_in_reply, max_reply_fd_count if (flags & PUSH_UNKNOWN_REPLIES) != 0 else 0) > 0:
|
# we always allocate space for at least one FD, since S2C calls might send a descriptor
|
||||||
library_source.write("\tint fds[" + str(max(fd_count_in_call, fd_count_in_reply, max_reply_fd_count if (flags & PUSH_UNKNOWN_REPLIES) != 0 else 0)) + "];\n")
|
library_source.write("\tint fds[" + str(max(1, fd_count_in_call, fd_count_in_reply, max_reply_fd_count if (flags & PUSH_UNKNOWN_REPLIES) != 0 else 0)) + "];\n")
|
||||||
library_source.write("\tint valid_fd_count;\n")
|
library_source.write("\tint valid_fd_count;\n")
|
||||||
library_source.write("\tchar controlbuf[DSERVER_RPC_HOOKS_CMSG_SPACE(sizeof(fds))];\n")
|
library_source.write("\tchar controlbuf[DSERVER_RPC_HOOKS_CMSG_SPACE(sizeof(fds))];\n")
|
||||||
|
|
||||||
if fd_count_in_call > 0:
|
if fd_count_in_call > 0:
|
||||||
library_source.write("\tvalid_fd_count = 0;\n")
|
library_source.write("\tvalid_fd_count = 0;\n")
|
||||||
@ -1298,27 +1300,27 @@ for call in calls:
|
|||||||
.msg_namelen = 0,
|
.msg_namelen = 0,
|
||||||
.msg_iov = &reply_data,
|
.msg_iov = &reply_data,
|
||||||
.msg_iovlen = 1,
|
.msg_iovlen = 1,
|
||||||
|
.msg_control = controlbuf,
|
||||||
|
.msg_controllen = sizeof(controlbuf),
|
||||||
"""), '\t'))
|
"""), '\t'))
|
||||||
|
|
||||||
if max(fd_count_in_reply, max_reply_fd_count if (flags & PUSH_UNKNOWN_REPLIES) != 0 else 0) == 0:
|
|
||||||
library_source.write("\t\t.msg_control = NULL,\n")
|
|
||||||
library_source.write("\t\t.msg_controllen = 0,\n")
|
|
||||||
else:
|
|
||||||
library_source.write("\t\t.msg_control = controlbuf,\n")
|
|
||||||
library_source.write("\t\t.msg_controllen = sizeof(controlbuf),\n")
|
|
||||||
|
|
||||||
library_source.write("\t};\n\n")
|
library_source.write("\t};\n\n")
|
||||||
|
|
||||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||||
library_source.write("\tdserver_rpc_hooks_atomic_save_t atomic_save;\n")
|
library_source.write("\tdserver_rpc_hooks_atomic_save_t atomic_save;\n")
|
||||||
library_source.write("\tdserver_rpc_hooks_atomic_begin(&atomic_save);\n\n")
|
library_source.write("\tdserver_rpc_hooks_atomic_begin(&atomic_save);\n\n")
|
||||||
|
|
||||||
library_source.write("\tlong int long_status = dserver_rpc_hooks_send_message(server_socket, &callmsg);\n\n")
|
library_source.write("\tlong int long_status;\n\n")
|
||||||
|
|
||||||
if (flags & ALLOW_INTERRUPTIONS) != 0:
|
library_source.write("retry_send:\n")
|
||||||
library_source.write("\tif (long_status == dserver_rpc_hooks_get_interrupt_status()) {\n")
|
library_source.write("\tlong_status = dserver_rpc_hooks_send_message(server_socket, &callmsg);\n\n")
|
||||||
|
|
||||||
|
library_source.write("\tif (long_status == dserver_rpc_hooks_get_interrupt_status()) {\n")
|
||||||
|
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||||
|
library_source.write("\t\tgoto retry_send;\n")
|
||||||
|
else:
|
||||||
library_source.write("\t\treturn (int)long_status;\n")
|
library_source.write("\t\treturn (int)long_status;\n")
|
||||||
library_source.write("\t}\n\n")
|
library_source.write("\t}\n\n")
|
||||||
|
|
||||||
library_source.write("\tif (long_status < 0) {\n")
|
library_source.write("\tif (long_status < 0) {\n")
|
||||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||||
@ -1334,14 +1336,12 @@ for call in calls:
|
|||||||
library_source.write("\t\treturn dserver_rpc_hooks_get_communication_error_status();\n")
|
library_source.write("\t\treturn dserver_rpc_hooks_get_communication_error_status();\n")
|
||||||
library_source.write("\t}\n\n")
|
library_source.write("\t}\n\n")
|
||||||
|
|
||||||
if (flags & (ALLOW_INTERRUPTIONS | PUSH_UNKNOWN_REPLIES)) != 0:
|
library_source.write("retry_receive:\n")
|
||||||
library_source.write("retry_receive:\n")
|
|
||||||
library_source.write("\tlong_status = dserver_rpc_hooks_receive_message(server_socket, &replymsg);\n\n")
|
library_source.write("\tlong_status = dserver_rpc_hooks_receive_message(server_socket, &replymsg);\n\n")
|
||||||
|
|
||||||
if (flags & ALLOW_INTERRUPTIONS) != 0:
|
library_source.write("\tif (long_status == dserver_rpc_hooks_get_interrupt_status()) {\n")
|
||||||
library_source.write("\tif (long_status == dserver_rpc_hooks_get_interrupt_status()) {\n")
|
library_source.write("\t\tgoto retry_receive;\n")
|
||||||
library_source.write("\t\tgoto retry_receive;\n")
|
library_source.write("\t}\n\n")
|
||||||
library_source.write("\t}\n\n")
|
|
||||||
|
|
||||||
library_source.write("\tif (long_status < 0) {\n")
|
library_source.write("\tif (long_status < 0) {\n")
|
||||||
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
if (flags & ALLOW_INTERRUPTIONS) == 0:
|
||||||
|
13
src/call.cpp
13
src/call.cpp
@ -972,4 +972,17 @@ void DarlingServer::Call::ThreadSuspended::processCall() {
|
|||||||
_sendReply(code);
|
_sendReply(code);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void DarlingServer::Call::S2CPerform::processCall() {
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
if (auto thread = _thread.lock()) {
|
||||||
|
dtape_semaphore_up(thread->_s2cInterruptEnterSemaphore);
|
||||||
|
dtape_semaphore_down_simple(thread->_s2cInterruptExitSemaphore);
|
||||||
|
} else {
|
||||||
|
code = -ESRCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
_sendReply(code);
|
||||||
|
};
|
||||||
|
|
||||||
DSERVER_CLASS_SOURCE_DEFS;
|
DSERVER_CLASS_SOURCE_DEFS;
|
||||||
|
108
src/process.cpp
108
src/process.cpp
@ -235,8 +235,8 @@ bool DarlingServer::Process::_readOrWriteMemory(bool isWrite, uintptr_t remoteAd
|
|||||||
<< "Failed to "
|
<< "Failed to "
|
||||||
<< (isWrite ? "write " : "read ")
|
<< (isWrite ? "write " : "read ")
|
||||||
<< length
|
<< length
|
||||||
<< " byte(s) at "
|
<< " byte(s) at 0x"
|
||||||
<< remoteAddress
|
<< std::hex << remoteAddress << std::dec
|
||||||
<< " in process "
|
<< " in process "
|
||||||
<< id()
|
<< id()
|
||||||
<< " ("
|
<< " ("
|
||||||
@ -256,8 +256,8 @@ bool DarlingServer::Process::_readOrWriteMemory(bool isWrite, uintptr_t remoteAd
|
|||||||
<< "Successfully "
|
<< "Successfully "
|
||||||
<< (isWrite ? "wrote " : "read ")
|
<< (isWrite ? "wrote " : "read ")
|
||||||
<< length
|
<< length
|
||||||
<< " byte(s) at "
|
<< " byte(s) at 0x"
|
||||||
<< remoteAddress
|
<< std::hex << remoteAddress << std::dec
|
||||||
<< " in process "
|
<< " in process "
|
||||||
<< id()
|
<< id()
|
||||||
<< " ("
|
<< " ("
|
||||||
@ -326,6 +326,10 @@ void DarlingServer::Process::notifyCheckin(Architecture architecture) {
|
|||||||
mainThread->_s2cPerformSempahore = nullptr;
|
mainThread->_s2cPerformSempahore = nullptr;
|
||||||
dtape_semaphore_destroy(mainThread->_s2cReplySempahore);
|
dtape_semaphore_destroy(mainThread->_s2cReplySempahore);
|
||||||
mainThread->_s2cReplySempahore = nullptr;
|
mainThread->_s2cReplySempahore = nullptr;
|
||||||
|
dtape_semaphore_destroy(mainThread->_s2cInterruptEnterSemaphore);
|
||||||
|
mainThread->_s2cInterruptEnterSemaphore = nullptr;
|
||||||
|
dtape_semaphore_destroy(mainThread->_s2cInterruptExitSemaphore);
|
||||||
|
mainThread->_s2cInterruptExitSemaphore = nullptr;
|
||||||
|
|
||||||
// destroy the fork-wait semaphore
|
// destroy the fork-wait semaphore
|
||||||
dtape_semaphore_destroy(_dtapeForkWaitSemaphore);
|
dtape_semaphore_destroy(_dtapeForkWaitSemaphore);
|
||||||
@ -343,6 +347,8 @@ void DarlingServer::Process::notifyCheckin(Architecture architecture) {
|
|||||||
// create new S2C semaphores for the main thread
|
// create new S2C semaphores for the main thread
|
||||||
mainThread->_s2cPerformSempahore = dtape_semaphore_create(_dtapeTask, 1);
|
mainThread->_s2cPerformSempahore = dtape_semaphore_create(_dtapeTask, 1);
|
||||||
mainThread->_s2cReplySempahore = dtape_semaphore_create(_dtapeTask, 0);
|
mainThread->_s2cReplySempahore = dtape_semaphore_create(_dtapeTask, 0);
|
||||||
|
mainThread->_s2cInterruptEnterSemaphore = dtape_semaphore_create(_dtapeTask, 0);
|
||||||
|
mainThread->_s2cInterruptExitSemaphore = dtape_semaphore_create(_dtapeTask, 0);
|
||||||
|
|
||||||
// notify listeners that we have exec'd (i.e. been replaced)
|
// notify listeners that we have exec'd (i.e. been replaced)
|
||||||
_notifyListeningKqchannelsLocked(NOTE_EXEC, 0);
|
_notifyListeningKqchannelsLocked(NOTE_EXEC, 0);
|
||||||
@ -500,7 +506,7 @@ DarlingServer::Process::MemoryRegionInfo DarlingServer::Process::memoryRegionInf
|
|||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
processLog.warning() << *this << ": Address " << std::hex << address << " not found in \"/proc/" << std::dec << _pid << "/maps\"" << processLog.endLog;
|
processLog.warning() << *this << ": Address 0x" << std::hex << address << " not found in \"/proc/" << std::dec << _pid << "/maps\"" << processLog.endLog;
|
||||||
throw std::system_error(EFAULT, std::generic_category());
|
throw std::system_error(EFAULT, std::generic_category());
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -553,3 +559,95 @@ bool DarlingServer::Process::setTracerProcess(std::shared_ptr<Process> tracerPro
|
|||||||
_tracerProcess = tracerProcess;
|
_tracerProcess = tracerProcess;
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
std::shared_ptr<DarlingServer::Thread> DarlingServer::Process::_pickS2CThread(void) const {
|
||||||
|
// if we're the process for the current thread (i.e. we're the current process), use the current thread
|
||||||
|
if (currentProcess().get() == this) {
|
||||||
|
return Thread::currentThread();
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise, pick any thread to perform the call
|
||||||
|
|
||||||
|
std::shared_ptr<Thread> thread = nullptr;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::shared_lock lock(_rwlock);
|
||||||
|
for (auto& [id, weakThread]: _threads) {
|
||||||
|
thread = weakThread.lock();
|
||||||
|
if (thread) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return thread;
|
||||||
|
};
|
||||||
|
|
||||||
|
uintptr_t DarlingServer::Process::allocatePages(size_t pageCount, int protection, uintptr_t addressHint, bool fixed, bool overwrite) {
|
||||||
|
auto thread = _pickS2CThread();
|
||||||
|
|
||||||
|
if (!thread) {
|
||||||
|
throw std::system_error(ESRCH, std::generic_category());
|
||||||
|
}
|
||||||
|
|
||||||
|
return thread->allocatePages(pageCount, protection, addressHint, fixed, overwrite);
|
||||||
|
};
|
||||||
|
|
||||||
|
void DarlingServer::Process::freePages(uintptr_t address, size_t pageCount) {
|
||||||
|
auto thread = _pickS2CThread();
|
||||||
|
|
||||||
|
if (!thread) {
|
||||||
|
throw std::system_error(ESRCH, std::generic_category());
|
||||||
|
}
|
||||||
|
|
||||||
|
return thread->freePages(address, pageCount);
|
||||||
|
};
|
||||||
|
|
||||||
|
uintptr_t DarlingServer::Process::mapFile(int fd, size_t pageCount, int protection, uintptr_t addressHint, size_t pageOffset, bool fixed, bool overwrite) {
|
||||||
|
auto thread = _pickS2CThread();
|
||||||
|
|
||||||
|
if (!thread) {
|
||||||
|
throw std::system_error(ESRCH, std::generic_category());
|
||||||
|
}
|
||||||
|
|
||||||
|
return thread->mapFile(fd, pageCount, protection, addressHint, pageOffset, fixed, overwrite);
|
||||||
|
};
|
||||||
|
|
||||||
|
void DarlingServer::Process::changeProtection(uintptr_t address, size_t pageCount, int protection) {
|
||||||
|
auto thread = _pickS2CThread();
|
||||||
|
|
||||||
|
if (!thread) {
|
||||||
|
throw std::system_error(ESRCH, std::generic_category());
|
||||||
|
}
|
||||||
|
|
||||||
|
return thread->changeProtection(address, pageCount, protection);
|
||||||
|
};
|
||||||
|
|
||||||
|
static const std::regex memoryRegionEntryAddressRegex("([0-9a-fA-F]+)\\-([0-9a-fA-F]+)");
|
||||||
|
|
||||||
|
uintptr_t DarlingServer::Process::getNextRegion(uintptr_t address) const {
|
||||||
|
std::ifstream file("/proc/" + std::to_string(_pid) + "/maps");
|
||||||
|
std::string line;
|
||||||
|
|
||||||
|
while (std::getline(file, line)) {
|
||||||
|
std::smatch match;
|
||||||
|
|
||||||
|
if (!std::regex_search(line, match, memoryRegionEntryAddressRegex)) {
|
||||||
|
processLog.warning() << "Encountered malformed `/proc/<pid>/maps` entry? Definitely a bug (on our part)." << processLog.endLog;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto startAddress = std::stoul(match[1].str(), nullptr, 16);
|
||||||
|
auto endAddress = std::stoul(match[2].str(), nullptr, 16);
|
||||||
|
|
||||||
|
if (startAddress <= address) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// /proc/<pid>/maps is sorted in ascending order, so as soon as we find a line with a starting address greater than `address`, that's the next region
|
||||||
|
|
||||||
|
return startAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
};
|
||||||
|
@ -37,6 +37,8 @@
|
|||||||
#include <sanitizer/asan_interface.h>
|
#include <sanitizer/asan_interface.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include <rtsig.h>
|
||||||
|
|
||||||
// 64KiB should be enough for us
|
// 64KiB should be enough for us
|
||||||
#define THREAD_STACK_SIZE (64 * 1024ULL)
|
#define THREAD_STACK_SIZE (64 * 1024ULL)
|
||||||
#define USE_THREAD_GUARD_PAGES 1
|
#define USE_THREAD_GUARD_PAGES 1
|
||||||
@ -146,6 +148,8 @@ DarlingServer::Thread::Thread(std::shared_ptr<Process> process, NSID nsid):
|
|||||||
_dtapeThread = dtape_thread_create(process->_dtapeTask, _nstid, this);
|
_dtapeThread = dtape_thread_create(process->_dtapeTask, _nstid, this);
|
||||||
_s2cPerformSempahore = dtape_semaphore_create(process->_dtapeTask, 1);
|
_s2cPerformSempahore = dtape_semaphore_create(process->_dtapeTask, 1);
|
||||||
_s2cReplySempahore = dtape_semaphore_create(process->_dtapeTask, 0);
|
_s2cReplySempahore = dtape_semaphore_create(process->_dtapeTask, 0);
|
||||||
|
_s2cInterruptEnterSemaphore = dtape_semaphore_create(process->_dtapeTask, 0);
|
||||||
|
_s2cInterruptExitSemaphore = dtape_semaphore_create(process->_dtapeTask, 0);
|
||||||
|
|
||||||
threadLog.info() << "New thread created with ID " << _tid << " and NSID " << _nstid << " for process with ID " << (process ? process->id() : -1) << " and NSID " << (process ? process->nsid() : -1);
|
threadLog.info() << "New thread created with ID " << _tid << " and NSID " << _nstid << " for process with ID " << (process ? process->id() : -1) << " and NSID " << (process ? process->nsid() : -1);
|
||||||
};
|
};
|
||||||
@ -189,13 +193,19 @@ DarlingServer::Thread::~Thread() noexcept(false) {
|
|||||||
|
|
||||||
// schedule the duct-taped thread to be destroyed
|
// schedule the duct-taped thread to be destroyed
|
||||||
// dtape_thread_destroy needs a microthread context, so we call it within a kernel microthread
|
// dtape_thread_destroy needs a microthread context, so we call it within a kernel microthread
|
||||||
kernelAsync([dtapeThread = _dtapeThread, s2cPerformSemaphore = _s2cPerformSempahore, s2cReplySemaphore = _s2cReplySempahore]() {
|
kernelAsync([dtapeThread = _dtapeThread, s2cPerformSemaphore = _s2cPerformSempahore, s2cReplySemaphore = _s2cReplySempahore, s2cInterruptEnterSemaphore = _s2cInterruptEnterSemaphore, s2cInterruptExitSemaphore = _s2cInterruptExitSemaphore]() {
|
||||||
if (s2cPerformSemaphore) {
|
if (s2cPerformSemaphore) {
|
||||||
dtape_semaphore_destroy(s2cPerformSemaphore);
|
dtape_semaphore_destroy(s2cPerformSemaphore);
|
||||||
}
|
}
|
||||||
if (s2cReplySemaphore) {
|
if (s2cReplySemaphore) {
|
||||||
dtape_semaphore_destroy(s2cReplySemaphore);
|
dtape_semaphore_destroy(s2cReplySemaphore);
|
||||||
}
|
}
|
||||||
|
if (s2cInterruptEnterSemaphore) {
|
||||||
|
dtape_semaphore_destroy(s2cInterruptEnterSemaphore);
|
||||||
|
}
|
||||||
|
if (s2cInterruptExitSemaphore) {
|
||||||
|
dtape_semaphore_destroy(s2cInterruptExitSemaphore);
|
||||||
|
}
|
||||||
dtape_thread_destroy(dtapeThread);
|
dtape_thread_destroy(dtapeThread);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -865,35 +875,41 @@ void DarlingServer::Thread::setPendingCallOverride(bool pendingCallOverride) {
|
|||||||
* additionally, if someone else is already ptracing that process, we lose the ability to execute code with this approach.
|
* additionally, if someone else is already ptracing that process, we lose the ability to execute code with this approach.
|
||||||
* therefore, we have this RPC-based system instead.
|
* therefore, we have this RPC-based system instead.
|
||||||
*
|
*
|
||||||
* there is one major drawback to this approach, however: the process we want to execute an S2C call in
|
* in order to perform an S2C call, however, the target thread MUST be waiting for a message from the server.
|
||||||
* MUST have at least one thread waiting for a message from the server. two possible solutions:
|
* thus, when we want to perform an S2C call on a thread that isn't waiting for a message, we send it a real-time signal
|
||||||
* 1. we use a real-time signal to ask the process to execute the S2C call.
|
* to ask it to execute the S2C call. the signal is handled with the normal wrappers (interrupt_enter and interrupt_exit)
|
||||||
* note that with this approach we'd have to block the RT signal (or ignore it) while we're waiting for an RPC call
|
* to properly handle the case when we may be accidentally interrupting an ongoing call in the thread (since we may have raced
|
||||||
* so that we don't accidentally receive a normal RPC reply in the signal handler.
|
* with thread trying to perform a server call).
|
||||||
* this would probably be a bit tricky to implement correctly (without races).
|
|
||||||
* 2. we have each process create a dedicated thread for executing S2C calls.
|
|
||||||
* i'm currently leaning towards solution #1 because it avoids wasting extra resources unnecessarily.
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static DarlingServer::Log s2cLog("s2c");
|
static DarlingServer::Log s2cLog("s2c");
|
||||||
|
|
||||||
DarlingServer::Message DarlingServer::Thread::_s2cPerform(Message&& call, dserver_s2c_msgnum_t expectedReplyNumber, size_t expectedReplySize) {
|
DarlingServer::Message DarlingServer::Thread::_s2cPerform(Message&& call, dserver_s2c_msgnum_t expectedReplyNumber, size_t expectedReplySize) {
|
||||||
std::optional<Message> reply = std::nullopt;
|
std::optional<Message> reply = std::nullopt;
|
||||||
|
bool usingInterrupt = false;
|
||||||
|
|
||||||
// make sure we're the only one performing an S2C call on this thread
|
// make sure we're the only one performing an S2C call on this thread
|
||||||
dtape_semaphore_down_simple(_s2cPerformSempahore);
|
dtape_semaphore_down_simple(_s2cPerformSempahore);
|
||||||
|
|
||||||
s2cLog.debug() << _tid << "(" << _nstid << "): Going to perform S2C call" << s2cLog.endLog;
|
s2cLog.debug() << *this << ": Going to perform S2C call" << s2cLog.endLog;
|
||||||
|
|
||||||
// at least for now, S2C calls require the target thread to be waiting for an RPC reply
|
|
||||||
//
|
|
||||||
// TODO: allow threads to perform S2C calls at any time
|
|
||||||
{
|
{
|
||||||
std::shared_lock lock(_rwlock);
|
std::unique_lock lock(_rwlock);
|
||||||
|
|
||||||
if (!_activeCall) {
|
if (!_activeCall) {
|
||||||
dtape_semaphore_up(_s2cPerformSempahore);
|
// signal the thread that we want to perform an S2C call and wait for it to give us the green light
|
||||||
throw std::runtime_error("Cannot perform S2C call if thread is not waiting for reply");
|
lock.unlock();
|
||||||
|
s2cLog.debug() << *this << ": Sending S2C signal" << s2cLog.endLog;
|
||||||
|
usingInterrupt = true;
|
||||||
|
sendSignal(LINUX_SIGRTMIN + 1);
|
||||||
|
dtape_semaphore_down_simple(_s2cInterruptEnterSemaphore);
|
||||||
|
s2cLog.debug() << *this << ": Got green light to perform S2C call" << s2cLog.endLog;
|
||||||
|
lock.lock();
|
||||||
|
} else if (currentThread().get() != this) {
|
||||||
|
// we have an active call, so the client is waiting for a reply and is able to perform an S2C call,
|
||||||
|
// but we're not the active thread. thus, in order to guarantee the client doesn't receive a reply
|
||||||
|
// and stop waiting before we get a chance to perform our S2C call, let's make sure replies are deferred.
|
||||||
|
_deferReplyForS2C = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
call.setAddress(_address);
|
call.setAddress(_address);
|
||||||
@ -906,7 +922,7 @@ DarlingServer::Message DarlingServer::Thread::_s2cPerform(Message&& call, dserve
|
|||||||
throw std::runtime_error("Must be in a microthread (any microthread) to wait for S2C reply");
|
throw std::runtime_error("Must be in a microthread (any microthread) to wait for S2C reply");
|
||||||
}
|
}
|
||||||
|
|
||||||
s2cLog.debug() << _tid << "(" << _nstid << "): Going to send S2C message" << s2cLog.endLog;
|
s2cLog.debug() << *this << ": Going to send S2C message" << s2cLog.endLog;
|
||||||
|
|
||||||
// send the call
|
// send the call
|
||||||
Server::sharedInstance().sendMessage(std::move(call));
|
Server::sharedInstance().sendMessage(std::move(call));
|
||||||
@ -914,7 +930,7 @@ DarlingServer::Message DarlingServer::Thread::_s2cPerform(Message&& call, dserve
|
|||||||
// now let's wait for the reply
|
// now let's wait for the reply
|
||||||
dtape_semaphore_down_simple(_s2cReplySempahore);
|
dtape_semaphore_down_simple(_s2cReplySempahore);
|
||||||
|
|
||||||
s2cLog.debug() << _tid << "(" << _nstid << "): Received S2C reply" << s2cLog.endLog;
|
s2cLog.debug() << *this << ": Received S2C reply" << s2cLog.endLog;
|
||||||
|
|
||||||
// extract the reply
|
// extract the reply
|
||||||
{
|
{
|
||||||
@ -928,13 +944,29 @@ DarlingServer::Message DarlingServer::Thread::_s2cPerform(Message&& call, dserve
|
|||||||
|
|
||||||
reply = std::move(_s2cReply);
|
reply = std::move(_s2cReply);
|
||||||
_s2cReply = std::nullopt;
|
_s2cReply = std::nullopt;
|
||||||
|
|
||||||
|
// if we had replies deferred, now's the time to send them
|
||||||
|
if (_deferReplyForS2C) {
|
||||||
|
_deferReplyForS2C = false;
|
||||||
|
if (_deferredReply) {
|
||||||
|
Server::sharedInstance().sendMessage(std::move(*_deferredReply));
|
||||||
|
_deferredReply = std::nullopt;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s2cLog.debug() << _tid << "(" << _nstid << "): Done performing S2C call" << s2cLog.endLog;
|
s2cLog.debug() << *this << ": Done performing S2C call" << s2cLog.endLog;
|
||||||
|
|
||||||
// we're done performing the call; allow others to have a chance at performing an S2C call on this thread
|
// we're done performing the call; allow others to have a chance at performing an S2C call on this thread
|
||||||
dtape_semaphore_up(_s2cPerformSempahore);
|
dtape_semaphore_up(_s2cPerformSempahore);
|
||||||
|
|
||||||
|
if (usingInterrupt) {
|
||||||
|
// if we used the S2C signal to perform the call, then the s2c_perform call is currently waiting for us to finish;
|
||||||
|
// let it know that we're done
|
||||||
|
s2cLog.debug() << *this << ": Allowing thread to resume from S2C interrupt" << s2cLog.endLog;
|
||||||
|
dtape_semaphore_up(_s2cInterruptExitSemaphore);
|
||||||
|
}
|
||||||
|
|
||||||
// partially validate the reply
|
// partially validate the reply
|
||||||
|
|
||||||
if (reply->data().size() != expectedReplySize) {
|
if (reply->data().size() != expectedReplySize) {
|
||||||
@ -1145,6 +1177,8 @@ void DarlingServer::Thread::pushCallReply(std::shared_ptr<Call> expectedCall, Me
|
|||||||
|
|
||||||
if (_interruptedForSignal) {
|
if (_interruptedForSignal) {
|
||||||
_interrupts.top().savedReply = std::move(reply);
|
_interrupts.top().savedReply = std::move(reply);
|
||||||
|
} else if (_deferReplyForS2C) {
|
||||||
|
_deferredReply = std::move(reply);
|
||||||
} else {
|
} else {
|
||||||
Server::sharedInstance().sendMessage(std::move(reply));
|
Server::sharedInstance().sendMessage(std::move(reply));
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user