Drain as much as possible before waiting.

This commit is contained in:
Unknown W. Brackets 2013-08-18 17:13:05 -07:00
parent e6236a1d31
commit 642e7b255e

View File

@ -170,19 +170,18 @@ struct MsgPipe : public KernelObject
while (!sendWaitingThreads.empty() && nmp.freeSize > 0)
{
MsgPipeWaitingThread *thread = &sendWaitingThreads.front();
u32 bytesToSend = 0;
if (thread->bufSize <= (u32) nmp.freeSize)
bytesToSend = thread->bufSize;
else if (thread->waitMode == SCE_KERNEL_MPW_ASAP && nmp.freeSize != 0)
bytesToSend = nmp.freeSize;
u32 bytesToSend = std::min(thread->freeSize, (u32) nmp.freeSize);
if (bytesToSend != 0)
thread->ReadBuffer(Memory::GetPointer(buffer + GetUsedSize()), bytesToSend);
nmp.freeSize -= bytesToSend;
filledSpace = true;
if (thread->waitMode == SCE_KERNEL_MPW_ASAP || thread->freeSize == 0)
{
thread->ReadBuffer(Memory::GetPointer(buffer + GetUsedSize()), bytesToSend);
thread->Complete(GetUID(), 0);
sendWaitingThreads.erase(sendWaitingThreads.begin());
wokeThreads = true;
filledSpace = true;
thread = NULL;
}
// Unlike receives, we don't do partial sends. Stop at first blocked thread.
else
@ -216,7 +215,7 @@ struct MsgPipe : public KernelObject
if (thread->waitMode == SCE_KERNEL_MPW_ASAP || thread->freeSize == 0)
{
thread->Complete(GetUID(), 0, thread->bufSize);
thread->Complete(GetUID(), 0);
receiveWaitingThreads.erase(receiveWaitingThreads.begin());
wokeThreads = true;
thread = NULL;
@ -665,29 +664,24 @@ int __KernelReceiveMsgPipe(MsgPipe *m, u32 receiveBufAddr, u32 receiveSize, int
return SCE_KERNEL_ERROR_ILLEGAL_SIZE;
}
u32 bytesToReceive = 0;
// If others are already waiting, space or not, we have to get in line.
m->SortReceiveThreads();
if (m->receiveWaitingThreads.empty())
while (m->GetUsedSize() > 0)
{
if (receiveSize <= m->GetUsedSize())
bytesToReceive = receiveSize;
else if (waitMode == SCE_KERNEL_MPW_ASAP)
bytesToReceive = m->GetUsedSize();
u32 bytesToReceive = std::min(receiveSize, m->GetUsedSize());
if (bytesToReceive != 0)
{
Memory::Memcpy(curReceiveAddr, Memory::GetPointer(m->buffer), bytesToReceive);
m->nmp.freeSize += bytesToReceive;
memmove(Memory::GetPointer(m->buffer), Memory::GetPointer(m->buffer) + bytesToReceive, m->GetUsedSize());
curReceiveAddr += bytesToReceive;
receiveSize -= bytesToReceive;
m->CheckSendThreads();
}
else
break;
}
if (bytesToReceive != 0)
{
Memory::Memcpy(curReceiveAddr, Memory::GetPointer(m->buffer), bytesToReceive);
m->nmp.freeSize += bytesToReceive;
memmove(Memory::GetPointer(m->buffer), Memory::GetPointer(m->buffer) + bytesToReceive, m->GetUsedSize());
curReceiveAddr += bytesToReceive;
receiveSize -= bytesToReceive;
if (m->CheckSendThreads())
hleReSchedule(cbEnabled, "msgpipe data received");
}
else if (receiveSize != 0)
if (receiveSize != 0 && (waitMode != SCE_KERNEL_MPW_ASAP || curReceiveAddr == receiveBufAddr))
{
if (poll)
return SCE_KERNEL_ERROR_MPP_EMPTY;