mirror of
https://github.com/darlinghq/darlingserver.git
synced 2024-11-23 04:19:44 +00:00
Implement kernel runner queue scaling
This allows kernel runner threads to be created as necessary to process the work that comes in through `kernelAsync` and `kernelSync`. There's currently a hardcoded max of 10 permanent kernel runners. However, if the workload is too much, temporary runners can be spawned; each temporary worker processes a single work item and then exits. There is no limit on the number of temporary workers that can be spawned.
This commit is contained in:
parent
e17bdbb1e7
commit
80ab579ca2
@ -714,38 +714,87 @@ static std::queue<std::function<void()>> kernelAsyncRunnerQueue;
|
||||
// we use libsimple_lock_t so we can pass it to `suspend` to unlock it after suspending.
|
||||
// XXX: we could use a std::mutex if we add an overload to `suspend` for it.
|
||||
static libsimple_lock_t kernelAsyncRunnerQueueLock;
|
||||
static dtape_semaphore_t* kernelAsyncRunnerQueueSempahore = nullptr;
|
||||
static uint64_t kernelAsyncRunnersAvailable = 0;
|
||||
static std::vector<std::shared_ptr<DarlingServer::Thread>> permanentKernelAsyncRunners;
|
||||
|
||||
#define MAX_PERMANENT_KERNEL_RUNNERS 10
|
||||
|
||||
static void kernelAsyncRunnerThreadWorker(bool permanent, std::shared_ptr<DarlingServer::Thread> self) {
|
||||
do {
|
||||
// we're going to wait for work; we're available now.
|
||||
libsimple_lock_lock(&kernelAsyncRunnerQueueLock);
|
||||
++kernelAsyncRunnersAvailable;
|
||||
libsimple_lock_unlock(&kernelAsyncRunnerQueueLock);
|
||||
|
||||
if (!dtape_semaphore_down_simple(kernelAsyncRunnerQueueSempahore)) {
|
||||
// we were interrupted. go again if we're permanent; otherwise, die.
|
||||
libsimple_lock_lock(&kernelAsyncRunnerQueueLock);
|
||||
--kernelAsyncRunnersAvailable;
|
||||
libsimple_lock_unlock(&kernelAsyncRunnerQueueLock);
|
||||
|
||||
if (permanent) {
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void kernelAsyncRunnerThreadWorker() {
|
||||
while (true) {
|
||||
libsimple_lock_lock(&kernelAsyncRunnerQueueLock);
|
||||
|
||||
if (kernelAsyncRunnerQueue.empty()) {
|
||||
// unlocks the lock
|
||||
DarlingServer::Thread::currentThread()->suspend(nullptr, &kernelAsyncRunnerQueueLock);
|
||||
// we didn't find any work (we were probably awoken spuriously).
|
||||
// go again if we're permanent; otherwise, die.
|
||||
--kernelAsyncRunnersAvailable;
|
||||
libsimple_lock_unlock(&kernelAsyncRunnerQueueLock);
|
||||
|
||||
if (permanent) {
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// we're going to perform some work; we're no longer available
|
||||
--kernelAsyncRunnersAvailable;
|
||||
|
||||
auto func = kernelAsyncRunnerQueue.front();
|
||||
kernelAsyncRunnerQueue.pop();
|
||||
|
||||
libsimple_lock_unlock(&kernelAsyncRunnerQueueLock);
|
||||
|
||||
// perform the work
|
||||
func();
|
||||
}
|
||||
} while (permanent);
|
||||
|
||||
self = nullptr;
|
||||
DarlingServer::Thread::currentThread()->terminate();
|
||||
__builtin_unreachable();
|
||||
};
|
||||
|
||||
void DarlingServer::Thread::kernelAsync(std::function<void()> fn) {
|
||||
// TODO: this could scale up depending on the size of the queue (like XNU's thread calls)
|
||||
static auto runnerThread = []() {
|
||||
auto thread = std::make_shared<Thread>(KernelThreadConstructorTag());
|
||||
thread->startKernelThread(kernelAsyncRunnerThreadWorker);
|
||||
return thread;
|
||||
static bool inited = []() {
|
||||
kernelAsyncRunnerQueueSempahore = dtape_semaphore_create(Process::kernelProcess()->_dtapeTask, 0);
|
||||
return true;
|
||||
}();
|
||||
|
||||
libsimple_lock_lock(&kernelAsyncRunnerQueueLock);
|
||||
kernelAsyncRunnerQueue.push(fn);
|
||||
runnerThread->resume(); // resume the runner (it's most likely suspended waiting for work)
|
||||
if (kernelAsyncRunnersAvailable == 0) {
|
||||
// we need to get some work done, but there are no workers available.
|
||||
// if we have less workers than the max permanent number of workers,
|
||||
// let's spawn a permanent worker. otherwise, just spawn a temporary worker.
|
||||
auto thread = std::make_shared<Thread>(KernelThreadConstructorTag());
|
||||
auto permanent = permanentKernelAsyncRunners.size() < MAX_PERMANENT_KERNEL_RUNNERS;
|
||||
thread->startKernelThread(std::bind(kernelAsyncRunnerThreadWorker, permanent, thread));
|
||||
if (permanent) {
|
||||
permanentKernelAsyncRunners.push_back(std::move(thread));
|
||||
}
|
||||
}
|
||||
libsimple_lock_unlock(&kernelAsyncRunnerQueueLock);
|
||||
|
||||
// increment the semaphore to let workers know there's work available.
|
||||
dtape_semaphore_up(kernelAsyncRunnerQueueSempahore);
|
||||
};
|
||||
|
||||
void DarlingServer::Thread::kernelSync(std::function<void()> fn) {
|
||||
|
Loading…
Reference in New Issue
Block a user