mirror of
https://github.com/darlinghq/darlingserver.git
synced 2024-11-23 04:19:44 +00:00
Defer kqchannel notifications until after read replies are sent
See DarlingServer::Kqchan::MachPort::_read() for why this is necessary. This fixes crashes in libkqueue due to out-of-order kqchannel messages, mainly visible in aslmanager.
This commit is contained in:
parent
a4afa0cd29
commit
4f05ef945b
@ -40,6 +40,8 @@ namespace DarlingServer {
|
||||
bool _canSend = false;
|
||||
std::mutex _notificationMutex;
|
||||
bool _canSendNotification = true;
|
||||
bool _deferNotification = false;
|
||||
std::optional<Message> _deferredNotification = std::nullopt;
|
||||
std::mutex _sendingMutex;
|
||||
uint64_t _notificationCount = 0;
|
||||
|
||||
@ -52,6 +54,7 @@ namespace DarlingServer {
|
||||
virtual std::shared_ptr<Kqchan> sharedFromRoot();
|
||||
|
||||
void _sendNotification();
|
||||
void _sendDeferredNotification();
|
||||
|
||||
public:
|
||||
virtual ~Kqchan();
|
||||
|
@ -168,8 +168,26 @@ void DarlingServer::Kqchan::_sendNotification() {
|
||||
notification->header.pid = 0;
|
||||
notification->header.tid = 0;
|
||||
|
||||
lock.unlock(); // the outbox has its own lock
|
||||
_outbox.push(std::move(msg));
|
||||
if (_deferNotification) {
|
||||
_deferredNotification = std::move(msg);
|
||||
} else {
|
||||
lock.unlock(); // the outbox has its own lock
|
||||
_outbox.push(std::move(msg));
|
||||
}
|
||||
};
|
||||
|
||||
void DarlingServer::Kqchan::_sendDeferredNotification() {
|
||||
std::unique_lock lock(_notificationMutex);
|
||||
|
||||
_deferNotification = false;
|
||||
|
||||
if (_deferredNotification) {
|
||||
Message notification(std::move(*_deferredNotification));
|
||||
_deferredNotification = std::nullopt;
|
||||
|
||||
lock.unlock(); // the outbox has its own lock
|
||||
_outbox.push(std::move(notification));
|
||||
}
|
||||
};
|
||||
|
||||
void DarlingServer::Kqchan::logToStream(Log::Stream& stream) const {
|
||||
@ -333,6 +351,23 @@ void DarlingServer::Kqchan::MachPort::_read(uint64_t defaultBuffer, uint64_t def
|
||||
std::unique_lock lock(_notificationMutex);
|
||||
kqchanMachPortLog.debug() << *this << ": received acknowledgement (implicitly via read) for notification " << _notificationCount++ << "; notifications may now be sent" << kqchanMachPortLog.endLog;
|
||||
_canSendNotification = true;
|
||||
|
||||
// defer future notifications until we send our reply.
|
||||
// we do it this way because:
|
||||
// 1) if we don't defer them and just send them right now,
|
||||
// a notification may be sent before we send our reply,
|
||||
// leading to out-of-order messages in the channel (which
|
||||
// causes an abort on the client side).
|
||||
// 2) if we instead move the `_canSendNotification` update to after
|
||||
// we send the reply, we may miss a notification for an event
|
||||
// that occurred right after we generated the reply but before
|
||||
// we updated `_canSendNotification`.
|
||||
// with this approach (notification deferral), channel messages are kept in-order
|
||||
// and we don't miss any notifications. worst case scenario, we might send
|
||||
// a duplicate notification for an event that occurred right after this update
|
||||
// but before we generate the reply; in that case, the client will simply try to read
|
||||
// the duplicate event but we won't have anything and we'll tell it to drop the event.
|
||||
_deferNotification = true;
|
||||
}
|
||||
|
||||
auto maybeThread = threadRegistry().lookupEntryByNSID(nstid);
|
||||
@ -362,6 +397,9 @@ void DarlingServer::Kqchan::MachPort::_read(uint64_t defaultBuffer, uint64_t def
|
||||
Thread::currentThread()->impersonate(nullptr);
|
||||
|
||||
self->_outbox.push(std::move(msg));
|
||||
|
||||
// now let's send any deferred notifications we might have
|
||||
self->_sendDeferredNotification();
|
||||
});
|
||||
};
|
||||
|
||||
@ -521,6 +559,9 @@ void DarlingServer::Kqchan::Process::_read() {
|
||||
std::unique_lock lock(_notificationMutex);
|
||||
kqchanProcLog.debug() << *this << ": received acknowledgement (implicitly via read) for notification " << _notificationCount++ << "; notifications may now be sent" << kqchanProcLog.endLog;
|
||||
_canSendNotification = true;
|
||||
|
||||
// see MachPort::_read() for why we defer notifications
|
||||
_deferNotification = true;
|
||||
}
|
||||
|
||||
std::unique_lock lock(_mutex, std::defer_lock);
|
||||
@ -597,6 +638,9 @@ void DarlingServer::Kqchan::Process::_read() {
|
||||
}
|
||||
|
||||
_outbox.push(std::move(msg));
|
||||
|
||||
// now let's send any deferred notifications we might have
|
||||
_sendDeferredNotification();
|
||||
};
|
||||
|
||||
void DarlingServer::Kqchan::Process::_notify(uint32_t event, int64_t data) {
|
||||
|
Loading…
Reference in New Issue
Block a user