Simplify and fix msgpipe send queuing logic.

Should reschedule, should wait in line, couple error messages.
This commit is contained in:
Unknown W. Brackets 2013-08-18 13:27:28 -07:00
parent 27268967f9
commit 23625eec30

View File

@ -133,10 +133,16 @@ struct MsgPipe : public KernelObject
userMemory.Free(buffer);
}
u32 GetUsedSize()
{
return (u32)(nmp.bufSize - nmp.freeSize);
}
void AddWaitingThread(std::vector<MsgPipeWaitingThread> &list, SceUID id, u32 addr, u32 size, int waitMode, u32 transferredBytesAddr)
{
MsgPipeWaitingThread thread = { id, addr, size, size, waitMode, transferredBytesAddr };
// Start out with 0 transferred bytes while waiting.
// TODO: for receive, it might be a different (partial) number.
if (thread.transferredBytes.IsValid())
*thread.transferredBytes = 0;
@ -153,61 +159,75 @@ struct MsgPipe : public KernelObject
AddWaitingThread(receiveWaitingThreads, id, addr, size, waitMode, transferredBytesAddr);
}
void CheckSendThreads()
bool CheckSendThreads()
{
SortSendThreads();
if (sendWaitingThreads.empty())
return;
MsgPipeWaitingThread *thread = &sendWaitingThreads.front();
if ((u32) nmp.freeSize >= thread->bufSize)
bool wokeThreads = false;
bool filledSpace = false;
while (!sendWaitingThreads.empty() && nmp.freeSize > 0)
{
// Put all the data to the buffer
Memory::Memcpy(buffer + (nmp.bufSize - nmp.freeSize), Memory::GetPointer(thread->bufAddr), thread->bufSize);
nmp.freeSize -= thread->bufSize;
thread->Complete(GetUID(), 0, thread->bufSize);
sendWaitingThreads.erase(sendWaitingThreads.begin());
CheckReceiveThreads();
}
else if (thread->waitMode == SCE_KERNEL_MPW_ASAP && nmp.freeSize != 0)
{
// Put as much data as possible into the buffer
Memory::Memcpy(buffer + (nmp.bufSize - nmp.freeSize), Memory::GetPointer(thread->bufAddr), nmp.freeSize);
nmp.freeSize = 0;
thread->Complete(GetUID(), 0, nmp.freeSize);
receiveWaitingThreads.erase(receiveWaitingThreads.begin());
CheckReceiveThreads();
MsgPipeWaitingThread *thread = &sendWaitingThreads.front();
u32 bytesToSend = 0;
if (thread->bufSize <= (u32) nmp.freeSize)
bytesToSend = thread->bufSize;
else if (thread->waitMode == SCE_KERNEL_MPW_ASAP && nmp.freeSize != 0)
bytesToSend = nmp.freeSize;
if (bytesToSend != 0)
{
thread->ReadBuffer(Memory::GetPointer(buffer + GetUsedSize()), bytesToSend);
thread->Complete(GetUID(), 0);
sendWaitingThreads.erase(sendWaitingThreads.begin());
wokeThreads = true;
filledSpace = true;
}
// Unlike receives, we don't do partial sends. Stop at first blocked thread.
else
break;
}
if (filledSpace)
wokeThreads |= CheckReceiveThreads();
return wokeThreads;
}
// This function should be only ran when the temporary buffer size is not 0 (otherwise, data is copied directly to the threads)
void CheckReceiveThreads()
bool CheckReceiveThreads()
{
SortReceiveThreads();
if (receiveWaitingThreads.empty())
return;
MsgPipeWaitingThread *thread = &receiveWaitingThreads.front();
if ((u32) nmp.bufSize - (u32) nmp.freeSize >= thread->bufSize)
bool wokeThreads = false;
bool freedSpace = false;
while (!receiveWaitingThreads.empty() && GetUsedSize() > 0)
{
// Get the needed data from the buffer
Memory::Memcpy(thread->bufAddr, Memory::GetPointer(buffer), thread->bufSize);
// Put the unused data at the start of the buffer
memmove(Memory::GetPointer(buffer), Memory::GetPointer(buffer) + thread->bufSize, nmp.bufSize - nmp.freeSize);
nmp.freeSize += thread->bufSize;
thread->Complete(GetUID(), 0, thread->bufSize);
receiveWaitingThreads.erase(receiveWaitingThreads.begin());
CheckSendThreads();
}
else if (thread->waitMode == SCE_KERNEL_MPW_ASAP && nmp.freeSize != nmp.bufSize)
{
// Get all the data from the buffer
Memory::Memcpy(thread->bufAddr, Memory::GetPointer(buffer), nmp.bufSize - nmp.freeSize);
nmp.freeSize = nmp.bufSize;
thread->Complete(GetUID(), 0, nmp.bufSize - nmp.freeSize);
receiveWaitingThreads.erase(receiveWaitingThreads.begin());
CheckSendThreads();
MsgPipeWaitingThread *thread = &receiveWaitingThreads.front();
// Receive as much as possible, even if it's not enough to wake up.
u32 bytesToSend = std::min(thread->freeSize, GetUsedSize());
thread->WriteBuffer(Memory::GetPointer(buffer), bytesToSend);
// Put the unused data at the start of the buffer.
nmp.freeSize += bytesToSend;
memmove(Memory::GetPointer(buffer), Memory::GetPointer(buffer) + bytesToSend, GetUsedSize());
freedSpace = true;
if (thread->waitMode == SCE_KERNEL_MPW_ASAP || thread->freeSize == 0)
{
thread->Complete(GetUID(), 0, thread->bufSize);
receiveWaitingThreads.erase(receiveWaitingThreads.begin());
wokeThreads = true;
thread = NULL;
}
// Stop at the first that can't wake up.
else
break;
}
if (freedSpace)
wokeThreads |= CheckSendThreads();
return wokeThreads;
}
void SortReceiveThreads()
@ -381,42 +401,43 @@ int __KernelSendMsgPipe(MsgPipe *m, u32 sendBufAddr, u32 sendSize, int waitMode,
u32 curSendAddr = sendBufAddr;
SceUID uid = m->GetUID();
if (sendSize & 0x80000000)
{
ERROR_LOG(HLE, "__KernelSendMsgPipe(%d): illegal size %d", uid, sendSize);
return SCE_KERNEL_ERROR_ILLEGAL_ADDR;
}
// If the buffer size is 0, nothing is buffered and all operations wait.
if (m->nmp.bufSize == 0)
{
while (!m->receiveWaitingThreads.empty())
m->SortReceiveThreads();
while (!m->receiveWaitingThreads.empty() && sendSize != 0)
{
MsgPipeWaitingThread *thread = &m->receiveWaitingThreads.front();
if (thread->freeSize > sendSize)
if (!thread->IsStillWaiting(uid))
{
Memory::Memcpy(thread->bufAddr + (thread->bufSize - thread->freeSize), Memory::GetPointer(curSendAddr), sendSize);
thread->freeSize -= sendSize;
curSendAddr += sendSize;
sendSize = 0;
if (thread->waitMode == SCE_KERNEL_MPW_ASAP)
m->receiveWaitingThreads.erase(m->receiveWaitingThreads.begin());
continue;
}
u32 bytesToSend = std::min(thread->freeSize, sendSize);
if (bytesToSend > 0)
{
thread->WriteBuffer(Memory::GetPointer(curSendAddr), bytesToSend);
sendSize -= bytesToSend;
curSendAddr += bytesToSend;
if (thread->freeSize == 0 || thread->waitMode == SCE_KERNEL_MPW_ASAP)
{
thread->Complete(m->GetUID(), 0, thread->bufSize - thread->freeSize);
thread->Complete(uid, 0);
m->receiveWaitingThreads.erase(m->receiveWaitingThreads.begin());
hleReSchedule(cbEnabled, "msgpipe data sent");
thread = NULL;
}
break;
}
else if (thread->freeSize == sendSize)
{
Memory::Memcpy(thread->bufAddr + (thread->bufSize - thread->freeSize), Memory::GetPointer(curSendAddr), sendSize);
thread->Complete(m->GetUID(), 0, thread->bufSize);
m->receiveWaitingThreads.erase(m->receiveWaitingThreads.begin());
curSendAddr += sendSize;
sendSize = 0;
break;
}
else
{
Memory::Memcpy(thread->bufAddr + (thread->bufSize - thread->freeSize), Memory::GetPointer(curSendAddr), thread->freeSize);
sendSize -= thread->freeSize;
curSendAddr += thread->freeSize;
thread->Complete(m->GetUID(), 0, thread->bufSize);
m->receiveWaitingThreads.erase(m->receiveWaitingThreads.begin());
}
}
// If there is still data to send and (we want to send all of it or we didn't send anything)
if (sendSize != 0 && (waitMode != SCE_KERNEL_MPW_ASAP || curSendAddr == sendBufAddr))
{
@ -435,19 +456,31 @@ int __KernelSendMsgPipe(MsgPipe *m, u32 sendBufAddr, u32 sendSize, int waitMode,
}
else
{
if (sendSize <= (u32) m->nmp.freeSize)
if (sendSize > (u32) m->nmp.bufSize)
{
Memory::Memcpy(m->buffer + (m->nmp.bufSize - m->nmp.freeSize), Memory::GetPointer(sendBufAddr), sendSize);
m->nmp.freeSize -= sendSize;
curSendAddr = sendBufAddr + sendSize;
sendSize = 0;
ERROR_LOG(HLE, "__KernelSendMsgPipe(%d): size %d too large for buffer", uid, sendSize);
return SCE_KERNEL_ERROR_ILLEGAL_SIZE;
}
else if (waitMode == SCE_KERNEL_MPW_ASAP && m->nmp.freeSize != 0)
u32 bytesToSend = 0;
// If others are already waiting, space or not, we have to get in line.
if (m->sendWaitingThreads.empty())
{
Memory::Memcpy(m->buffer + (m->nmp.bufSize - m->nmp.freeSize), Memory::GetPointer(sendBufAddr), m->nmp.freeSize);
curSendAddr = sendBufAddr + m->nmp.freeSize;
sendSize -= m->nmp.freeSize;
m->nmp.freeSize = 0;
if (sendSize <= (u32) m->nmp.freeSize)
bytesToSend = sendSize;
else if (waitMode == SCE_KERNEL_MPW_ASAP && m->nmp.freeSize != 0)
bytesToSend = m->nmp.freeSize;
}
if (bytesToSend != 0)
{
Memory::Memcpy(m->buffer + (m->nmp.bufSize - m->nmp.freeSize), Memory::GetPointer(sendBufAddr), bytesToSend);
m->nmp.freeSize -= bytesToSend;
curSendAddr += bytesToSend;
sendSize -= bytesToSend;
if (m->CheckReceiveThreads())
hleReSchedule(cbEnabled, "msgpipe data sent");
}
else
{
@ -463,13 +496,11 @@ int __KernelSendMsgPipe(MsgPipe *m, u32 sendBufAddr, u32 sendSize, int waitMode,
return 0;
}
}
if (curSendAddr != sendBufAddr)
{
m->CheckReceiveThreads();
}
}
Memory::Write_U32(curSendAddr - sendBufAddr, resultAddr);
// We didn't wait, so update the number of bytes transferred now.
if (Memory::IsValidAddress(resultAddr))
Memory::Write_U32(curSendAddr - sendBufAddr, resultAddr);
return 0;
}