Mostly works now, except for nsIThreadPool::Join

This commit is contained in:
warren%netscape.com 1999-04-05 21:02:24 +00:00
parent 4cc6308a9e
commit 338082238a
6 changed files with 462 additions and 74 deletions

View File

@ -59,6 +59,8 @@ class nsIThread : public nsISupports
public:
NS_DEFINE_STATIC_IID_ACCESSOR(NS_ITHREAD_IID);
static NS_BASE nsresult GetCurrent(nsIThread* *result);
NS_IMETHOD Join() = 0;
NS_IMETHOD GetPriority(PRThreadPriority *result) = 0;
@ -70,6 +72,7 @@ public:
NS_IMETHOD GetType(PRThreadType *result) = 0;
NS_IMETHOD GetState(PRThreadState *result) = 0;
NS_IMETHOD GetPRThread(PRThread* *result) = 0;
};
extern NS_BASE nsresult
@ -103,6 +106,10 @@ public:
NS_DEFINE_STATIC_IID_ACCESSOR(NS_ITHREADPOOL_IID);
NS_IMETHOD DispatchRequest(nsIRunnable* runnable) = 0;
NS_IMETHOD Join() = 0;
NS_IMETHOD Interrupt() = 0;
};
extern NS_BASE nsresult

View File

@ -19,6 +19,8 @@
#include "nsThread.h"
#include "prmem.h"
PRUintn nsThread::kIThreadSelf = 0;
////////////////////////////////////////////////////////////////////////////////
nsThread::nsThread()
@ -47,24 +49,34 @@ nsThread::Init(nsIRunnable* runnable,
nsThread::~nsThread()
{
if (mThread) {
PRStatus status = PR_SUCCESS;
status = PR_Interrupt(mThread);
NS_ASSERTION(status == PR_SUCCESS, "failed to interrupt worker");
status = PR_JoinThread(mThread);
NS_ASSERTION(status == PR_SUCCESS, "failed to join with worker");
PR_Free(mThread); // XXX right?
mThread = nsnull;
}
}
void
nsThread::Main(void* arg)
{
nsThread* self = (nsThread*)arg;
nsresult rv = NS_OK;
rv = self->RegisterThreadSelf();
NS_ASSERTION(rv == NS_OK, "failed to set thread self");
rv = self->mRunnable->Run();
NS_ASSERTION(NS_SUCCEEDED(rv), "runnable failed");
PRThreadState state;
rv = self->GetState(&state);
if (NS_SUCCEEDED(rv) && state == PR_UNJOINABLE_THREAD) {
Exit(arg);
}
}
void
nsThread::Exit(void* arg)
{
nsThread* self = (nsThread*)arg;
nsresult rv = NS_OK;
self->mThread = nsnull;
NS_RELEASE(self);
}
NS_IMPL_ISUPPORTS(nsThread, nsIThread::GetIID());
@ -72,6 +84,8 @@ NS_IMPL_ISUPPORTS(nsThread, nsIThread::GetIID());
NS_IMETHODIMP
nsThread::Join()
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
PRStatus status = PR_JoinThread(mThread);
return status == PR_SUCCESS ? NS_OK : NS_ERROR_FAILURE;
}
@ -79,6 +93,8 @@ nsThread::Join()
NS_IMETHODIMP
nsThread::GetPriority(PRThreadPriority *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = PR_GetThreadPriority(mThread);
return NS_OK;
}
@ -86,6 +102,8 @@ nsThread::GetPriority(PRThreadPriority *result)
NS_IMETHODIMP
nsThread::SetPriority(PRThreadPriority value)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
PR_SetThreadPriority(mThread, value);
return NS_OK;
}
@ -93,6 +111,8 @@ nsThread::SetPriority(PRThreadPriority value)
NS_IMETHODIMP
nsThread::Interrupt()
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
PRStatus status = PR_Interrupt(mThread);
return status == PR_SUCCESS ? NS_OK : NS_ERROR_FAILURE;
}
@ -100,6 +120,8 @@ nsThread::Interrupt()
NS_IMETHODIMP
nsThread::GetScope(PRThreadScope *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = PR_GetThreadScope(mThread);
return NS_OK;
}
@ -107,6 +129,8 @@ nsThread::GetScope(PRThreadScope *result)
NS_IMETHODIMP
nsThread::GetType(PRThreadType *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = PR_GetThreadType(mThread);
return NS_OK;
}
@ -114,10 +138,21 @@ nsThread::GetType(PRThreadType *result)
NS_IMETHODIMP
nsThread::GetState(PRThreadState *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = PR_GetThreadState(mThread);
return NS_OK;
}
NS_IMETHODIMP
nsThread::GetPRThread(PRThread* *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = mThread;
return NS_OK;
}
NS_BASE nsresult
NS_NewThread(nsIThread* *result,
nsIRunnable* runnable,
@ -145,6 +180,55 @@ NS_NewThread(nsIThread* *result,
////////////////////////////////////////////////////////////////////////////////
nsresult
nsThread::RegisterThreadSelf()
{
PRStatus status;
if (kIThreadSelf == 0) {
status = PR_NewThreadPrivateIndex(&kIThreadSelf, Exit);
if (status != PR_SUCCESS) return NS_ERROR_FAILURE;
NS_ASSERTION(kIThreadSelf != 0, "couldn't get thread private index");
}
status = PR_SetThreadPrivate(kIThreadSelf, this);
if (status != PR_SUCCESS) return NS_ERROR_FAILURE;
NS_ADDREF(this); // released in nsThread::Exit
return NS_OK;
}
NS_BASE nsresult
nsIThread::GetCurrent(nsIThread* *result)
{
PRStatus status;
nsThread* thread;
if (nsThread::kIThreadSelf == 0) {
status = PR_NewThreadPrivateIndex(&nsThread::kIThreadSelf, nsThread::Exit);
if (status != PR_SUCCESS) return NS_ERROR_FAILURE;
NS_ASSERTION(nsThread::kIThreadSelf != 0, "couldn't get thread private index");
}
thread = (nsThread*)PR_GetThreadPrivate(nsThread::kIThreadSelf);
if (thread == nsnull) {
// if the current thread doesn't have an nsIThread associated
// with it, make one
thread = new nsThread();
if (thread == nsnull)
return NS_ERROR_OUT_OF_MEMORY;
thread->SetPRThread(PR_CurrentThread());
nsresult rv = thread->RegisterThreadSelf();
if (NS_FAILED(rv)) return rv;
}
NS_ADDREF(thread);
*result = thread;
return NS_OK;
}
////////////////////////////////////////////////////////////////////////////////
nsThreadPool::nsThreadPool(PRUint32 minThreads, PRUint32 maxThreads)
: mThreads(nsnull), mRequests(nsnull),
mMinThreads(minThreads), mMaxThreads(maxThreads)
@ -166,7 +250,13 @@ nsThreadPool::Init(PRUint32 stackSize,
rv = NS_NewISupportsArray(&mRequests);
if (NS_FAILED(rv)) return rv;
mRequestMonitor = PR_NewMonitor();
if (mRequestMonitor == nsnull)
return NS_ERROR_OUT_OF_MEMORY;
PR_CEnterMonitor(this);
for (PRUint32 i = 0; i < mMinThreads; i++) {
nsThreadPoolRunnable* runnable =
new nsThreadPoolRunnable(this);
@ -175,22 +265,42 @@ nsThreadPool::Init(PRUint32 stackSize,
NS_ADDREF(runnable);
nsIThread* thread;
rv = NS_NewThread(&thread, runnable, stackSize, type,
rv = NS_NewThread(&thread, runnable, stackSize, PR_SYSTEM_THREAD,
priority, scope, state);
NS_RELEASE(runnable);
if (NS_FAILED(rv)) return rv;
if (NS_FAILED(rv)) goto exit;
rv = mThreads->AppendElement(thread);
NS_RELEASE(thread);
if (NS_FAILED(rv)) return rv;
if (NS_FAILED(rv)) goto exit;
}
return NS_OK;
// wait for some worker thread to be ready
PR_CWait(this, PR_INTERVAL_NO_TIMEOUT);
exit:
PR_CExitMonitor(this);
return rv;
}
nsThreadPool::~nsThreadPool()
{
NS_RELEASE(mThreads);
NS_RELEASE(mRequests);
if (mThreads) {
// clean up the worker threads
PRUint32 count = mThreads->Count();
for (PRUint32 i = 0; i < count; i++) {
nsIThread* thread = (nsIThread*)((*mThreads)[i]);
thread->Interrupt();
thread->Join(); // XXX race?
}
NS_RELEASE(mThreads);
}
NS_IF_RELEASE(mRequests);
if (mRequestMonitor) {
PR_DestroyMonitor(mRequestMonitor);
}
}
NS_IMPL_ISUPPORTS(nsThreadPool, nsIThreadPool::GetIID());
@ -199,30 +309,91 @@ NS_IMETHODIMP
nsThreadPool::DispatchRequest(nsIRunnable* runnable)
{
nsresult rv;
PR_CEnterMonitor(this);
PR_EnterMonitor(mRequestMonitor);
rv = mRequests->AppendElement(runnable);
if (NS_SUCCEEDED(rv))
PR_CNotify(this);
PR_Notify(mRequestMonitor);
PR_CExitMonitor(this);
PR_ExitMonitor(mRequestMonitor);
return rv;
}
#include <stdio.h>
nsIRunnable*
nsThreadPool::Dequeue()
nsThreadPool::GetRequest()
{
PR_CEnterMonitor(this);
nsresult rv = NS_OK;
nsIRunnable* request = nsnull;
PR_EnterMonitor(mRequestMonitor);
NS_ASSERTION(mRequests->Count() > 0, "request queue out of sync");
nsIRunnable* request = (nsIRunnable*)(*mRequests)[0];
PRBool removed = mRequests->RemoveElementAt(0);
NS_ASSERTION(removed, "nsISupportsArray broken");
while (mRequests->Count() == 0) {
// printf("thread %x waiting\n", PR_CurrentThread());
PRStatus status = PR_Wait(mRequestMonitor, PR_INTERVAL_NO_TIMEOUT);
if (status != PR_SUCCESS) {
rv = NS_ERROR_FAILURE;
break; // interrupted -- quit
}
}
PR_CExitMonitor(this);
if (NS_SUCCEEDED(rv)) {
NS_ASSERTION(mRequests->Count() > 0, "request queue out of sync");
request = (nsIRunnable*)(*mRequests)[0];
NS_ASSERTION(request != nsnull, "null runnable");
PRBool removed = mRequests->RemoveElementAt(0);
NS_ASSERTION(removed, "nsISupportsArray broken");
}
PR_ExitMonitor(mRequestMonitor);
return request;
}
NS_IMETHODIMP
nsThreadPool::Join()
{
nsresult rv = NS_OK;
PRUint32 count;
PRUint32 i;
// first wait for any outstanding requests to be processed
PR_CEnterMonitor(this);
while (mRequests->Count() > 0) {
PRStatus status = PR_CWait(this, PR_INTERVAL_NO_TIMEOUT);
if (status != PR_SUCCESS) {
rv = NS_ERROR_FAILURE; // our thread was interrupted!
break;
}
}
PR_CExitMonitor(this);
if (NS_FAILED(rv)) return rv;
// then interrupt the threads and join them
Interrupt();
count = mThreads->Count();
for (i = 0; i < count; i++) {
nsIThread* thread = (nsIThread*)((*mThreads)[i]);
rv = thread->Join();
if (NS_FAILED(rv)) return rv;
}
return rv;
}
NS_IMETHODIMP
nsThreadPool::Interrupt()
{
nsresult rv = NS_OK;
PRUint32 count = mThreads->Count();
for (PRUint32 i = 0; i < count; i++) {
nsIThread* thread = (nsIThread*)((*mThreads)[i]);
rv = thread->Interrupt();
if (NS_FAILED(rv)) return rv;
}
return rv;
}
NS_BASE nsresult
NS_NewThreadPool(nsIThreadPool* *result,
PRUint32 minThreads, PRUint32 maxThreads,
@ -267,19 +438,25 @@ NS_IMPL_ISUPPORTS(nsThreadPoolRunnable, nsIRunnable::GetIID());
NS_IMETHODIMP
nsThreadPoolRunnable::Run()
{
nsresult rv = NS_OK;
nsIRunnable* request;
// let the thread pool know we're ready
PR_CEnterMonitor(mPool);
while (PR_TRUE) {
PRStatus status = PR_CWait(mPool, PR_INTERVAL_NO_TIMEOUT);
if (status != PR_SUCCESS) break; // interrupted -- quit
nsIRunnable* runnable = mPool->Dequeue();
nsresult rv = NS_OK;
rv = runnable->Run();
NS_ASSERTION(NS_SUCCEEDED(rv), "runnable failed");
}
PR_CNotify(mPool);
PR_CExitMonitor(mPool);
return NS_OK;
while ((request = mPool->GetRequest()) != nsnull) {
// printf("running %x, thread %x\n", this, PR_CurrentThread());
rv = request->Run();
NS_ASSERTION(NS_SUCCEEDED(rv), "runnable failed");
// let the thread pool know we're finished a run
PR_CEnterMonitor(mPool);
PR_CNotify(mPool);
PR_CExitMonitor(mPool);
}
return rv;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -35,6 +35,7 @@ public:
NS_IMETHOD GetScope(PRThreadScope *result);
NS_IMETHOD GetType(PRThreadType *result);
NS_IMETHOD GetState(PRThreadState *result);
NS_IMETHOD GetPRThread(PRThread* *result);
// nsThread methods:
nsThread();
@ -46,7 +47,13 @@ public:
PRThreadPriority priority,
PRThreadScope scope,
PRThreadState state);
nsresult RegisterThreadSelf();
void SetPRThread(PRThread* thread) { mThread = thread; }
static void Main(void* arg);
static void Exit(void* arg);
static PRUintn kIThreadSelf;
protected:
PRThread* mThread;
@ -62,6 +69,8 @@ public:
// nsIThreadPool methods:
NS_IMETHOD DispatchRequest(nsIRunnable* runnable);
NS_IMETHOD Join();
NS_IMETHOD Interrupt();
// nsThreadPool methods:
nsThreadPool(PRUint32 minThreads, PRUint32 maxThreads);
@ -72,11 +81,12 @@ public:
PRThreadPriority priority,
PRThreadScope scope,
PRThreadState state);
nsIRunnable* Dequeue();
nsIRunnable* GetRequest();
protected:
nsISupportsArray* mThreads;
nsISupportsArray* mRequests;
PRMonitor* mRequestMonitor;
PRUint32 mMinThreads;
PRUint32 mMaxThreads;
};

View File

@ -59,6 +59,8 @@ class nsIThread : public nsISupports
public:
NS_DEFINE_STATIC_IID_ACCESSOR(NS_ITHREAD_IID);
static NS_BASE nsresult GetCurrent(nsIThread* *result);
NS_IMETHOD Join() = 0;
NS_IMETHOD GetPriority(PRThreadPriority *result) = 0;
@ -70,6 +72,7 @@ public:
NS_IMETHOD GetType(PRThreadType *result) = 0;
NS_IMETHOD GetState(PRThreadState *result) = 0;
NS_IMETHOD GetPRThread(PRThread* *result) = 0;
};
extern NS_BASE nsresult
@ -103,6 +106,10 @@ public:
NS_DEFINE_STATIC_IID_ACCESSOR(NS_ITHREADPOOL_IID);
NS_IMETHOD DispatchRequest(nsIRunnable* runnable) = 0;
NS_IMETHOD Join() = 0;
NS_IMETHOD Interrupt() = 0;
};
extern NS_BASE nsresult

View File

@ -19,6 +19,8 @@
#include "nsThread.h"
#include "prmem.h"
PRUintn nsThread::kIThreadSelf = 0;
////////////////////////////////////////////////////////////////////////////////
nsThread::nsThread()
@ -47,24 +49,34 @@ nsThread::Init(nsIRunnable* runnable,
nsThread::~nsThread()
{
if (mThread) {
PRStatus status = PR_SUCCESS;
status = PR_Interrupt(mThread);
NS_ASSERTION(status == PR_SUCCESS, "failed to interrupt worker");
status = PR_JoinThread(mThread);
NS_ASSERTION(status == PR_SUCCESS, "failed to join with worker");
PR_Free(mThread); // XXX right?
mThread = nsnull;
}
}
void
nsThread::Main(void* arg)
{
nsThread* self = (nsThread*)arg;
nsresult rv = NS_OK;
rv = self->RegisterThreadSelf();
NS_ASSERTION(rv == NS_OK, "failed to set thread self");
rv = self->mRunnable->Run();
NS_ASSERTION(NS_SUCCEEDED(rv), "runnable failed");
PRThreadState state;
rv = self->GetState(&state);
if (NS_SUCCEEDED(rv) && state == PR_UNJOINABLE_THREAD) {
Exit(arg);
}
}
void
nsThread::Exit(void* arg)
{
nsThread* self = (nsThread*)arg;
nsresult rv = NS_OK;
self->mThread = nsnull;
NS_RELEASE(self);
}
NS_IMPL_ISUPPORTS(nsThread, nsIThread::GetIID());
@ -72,6 +84,8 @@ NS_IMPL_ISUPPORTS(nsThread, nsIThread::GetIID());
NS_IMETHODIMP
nsThread::Join()
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
PRStatus status = PR_JoinThread(mThread);
return status == PR_SUCCESS ? NS_OK : NS_ERROR_FAILURE;
}
@ -79,6 +93,8 @@ nsThread::Join()
NS_IMETHODIMP
nsThread::GetPriority(PRThreadPriority *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = PR_GetThreadPriority(mThread);
return NS_OK;
}
@ -86,6 +102,8 @@ nsThread::GetPriority(PRThreadPriority *result)
NS_IMETHODIMP
nsThread::SetPriority(PRThreadPriority value)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
PR_SetThreadPriority(mThread, value);
return NS_OK;
}
@ -93,6 +111,8 @@ nsThread::SetPriority(PRThreadPriority value)
NS_IMETHODIMP
nsThread::Interrupt()
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
PRStatus status = PR_Interrupt(mThread);
return status == PR_SUCCESS ? NS_OK : NS_ERROR_FAILURE;
}
@ -100,6 +120,8 @@ nsThread::Interrupt()
NS_IMETHODIMP
nsThread::GetScope(PRThreadScope *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = PR_GetThreadScope(mThread);
return NS_OK;
}
@ -107,6 +129,8 @@ nsThread::GetScope(PRThreadScope *result)
NS_IMETHODIMP
nsThread::GetType(PRThreadType *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = PR_GetThreadType(mThread);
return NS_OK;
}
@ -114,10 +138,21 @@ nsThread::GetType(PRThreadType *result)
NS_IMETHODIMP
nsThread::GetState(PRThreadState *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = PR_GetThreadState(mThread);
return NS_OK;
}
NS_IMETHODIMP
nsThread::GetPRThread(PRThread* *result)
{
if (mThread == nsnull)
return NS_ERROR_FAILURE;
*result = mThread;
return NS_OK;
}
NS_BASE nsresult
NS_NewThread(nsIThread* *result,
nsIRunnable* runnable,
@ -145,6 +180,55 @@ NS_NewThread(nsIThread* *result,
////////////////////////////////////////////////////////////////////////////////
nsresult
nsThread::RegisterThreadSelf()
{
PRStatus status;
if (kIThreadSelf == 0) {
status = PR_NewThreadPrivateIndex(&kIThreadSelf, Exit);
if (status != PR_SUCCESS) return NS_ERROR_FAILURE;
NS_ASSERTION(kIThreadSelf != 0, "couldn't get thread private index");
}
status = PR_SetThreadPrivate(kIThreadSelf, this);
if (status != PR_SUCCESS) return NS_ERROR_FAILURE;
NS_ADDREF(this); // released in nsThread::Exit
return NS_OK;
}
NS_BASE nsresult
nsIThread::GetCurrent(nsIThread* *result)
{
PRStatus status;
nsThread* thread;
if (nsThread::kIThreadSelf == 0) {
status = PR_NewThreadPrivateIndex(&nsThread::kIThreadSelf, nsThread::Exit);
if (status != PR_SUCCESS) return NS_ERROR_FAILURE;
NS_ASSERTION(nsThread::kIThreadSelf != 0, "couldn't get thread private index");
}
thread = (nsThread*)PR_GetThreadPrivate(nsThread::kIThreadSelf);
if (thread == nsnull) {
// if the current thread doesn't have an nsIThread associated
// with it, make one
thread = new nsThread();
if (thread == nsnull)
return NS_ERROR_OUT_OF_MEMORY;
thread->SetPRThread(PR_CurrentThread());
nsresult rv = thread->RegisterThreadSelf();
if (NS_FAILED(rv)) return rv;
}
NS_ADDREF(thread);
*result = thread;
return NS_OK;
}
////////////////////////////////////////////////////////////////////////////////
nsThreadPool::nsThreadPool(PRUint32 minThreads, PRUint32 maxThreads)
: mThreads(nsnull), mRequests(nsnull),
mMinThreads(minThreads), mMaxThreads(maxThreads)
@ -166,7 +250,13 @@ nsThreadPool::Init(PRUint32 stackSize,
rv = NS_NewISupportsArray(&mRequests);
if (NS_FAILED(rv)) return rv;
mRequestMonitor = PR_NewMonitor();
if (mRequestMonitor == nsnull)
return NS_ERROR_OUT_OF_MEMORY;
PR_CEnterMonitor(this);
for (PRUint32 i = 0; i < mMinThreads; i++) {
nsThreadPoolRunnable* runnable =
new nsThreadPoolRunnable(this);
@ -175,22 +265,42 @@ nsThreadPool::Init(PRUint32 stackSize,
NS_ADDREF(runnable);
nsIThread* thread;
rv = NS_NewThread(&thread, runnable, stackSize, type,
rv = NS_NewThread(&thread, runnable, stackSize, PR_SYSTEM_THREAD,
priority, scope, state);
NS_RELEASE(runnable);
if (NS_FAILED(rv)) return rv;
if (NS_FAILED(rv)) goto exit;
rv = mThreads->AppendElement(thread);
NS_RELEASE(thread);
if (NS_FAILED(rv)) return rv;
if (NS_FAILED(rv)) goto exit;
}
return NS_OK;
// wait for some worker thread to be ready
PR_CWait(this, PR_INTERVAL_NO_TIMEOUT);
exit:
PR_CExitMonitor(this);
return rv;
}
nsThreadPool::~nsThreadPool()
{
NS_RELEASE(mThreads);
NS_RELEASE(mRequests);
if (mThreads) {
// clean up the worker threads
PRUint32 count = mThreads->Count();
for (PRUint32 i = 0; i < count; i++) {
nsIThread* thread = (nsIThread*)((*mThreads)[i]);
thread->Interrupt();
thread->Join(); // XXX race?
}
NS_RELEASE(mThreads);
}
NS_IF_RELEASE(mRequests);
if (mRequestMonitor) {
PR_DestroyMonitor(mRequestMonitor);
}
}
NS_IMPL_ISUPPORTS(nsThreadPool, nsIThreadPool::GetIID());
@ -199,30 +309,91 @@ NS_IMETHODIMP
nsThreadPool::DispatchRequest(nsIRunnable* runnable)
{
nsresult rv;
PR_CEnterMonitor(this);
PR_EnterMonitor(mRequestMonitor);
rv = mRequests->AppendElement(runnable);
if (NS_SUCCEEDED(rv))
PR_CNotify(this);
PR_Notify(mRequestMonitor);
PR_CExitMonitor(this);
PR_ExitMonitor(mRequestMonitor);
return rv;
}
#include <stdio.h>
nsIRunnable*
nsThreadPool::Dequeue()
nsThreadPool::GetRequest()
{
PR_CEnterMonitor(this);
nsresult rv = NS_OK;
nsIRunnable* request = nsnull;
PR_EnterMonitor(mRequestMonitor);
NS_ASSERTION(mRequests->Count() > 0, "request queue out of sync");
nsIRunnable* request = (nsIRunnable*)(*mRequests)[0];
PRBool removed = mRequests->RemoveElementAt(0);
NS_ASSERTION(removed, "nsISupportsArray broken");
while (mRequests->Count() == 0) {
// printf("thread %x waiting\n", PR_CurrentThread());
PRStatus status = PR_Wait(mRequestMonitor, PR_INTERVAL_NO_TIMEOUT);
if (status != PR_SUCCESS) {
rv = NS_ERROR_FAILURE;
break; // interrupted -- quit
}
}
PR_CExitMonitor(this);
if (NS_SUCCEEDED(rv)) {
NS_ASSERTION(mRequests->Count() > 0, "request queue out of sync");
request = (nsIRunnable*)(*mRequests)[0];
NS_ASSERTION(request != nsnull, "null runnable");
PRBool removed = mRequests->RemoveElementAt(0);
NS_ASSERTION(removed, "nsISupportsArray broken");
}
PR_ExitMonitor(mRequestMonitor);
return request;
}
NS_IMETHODIMP
nsThreadPool::Join()
{
nsresult rv = NS_OK;
PRUint32 count;
PRUint32 i;
// first wait for any outstanding requests to be processed
PR_CEnterMonitor(this);
while (mRequests->Count() > 0) {
PRStatus status = PR_CWait(this, PR_INTERVAL_NO_TIMEOUT);
if (status != PR_SUCCESS) {
rv = NS_ERROR_FAILURE; // our thread was interrupted!
break;
}
}
PR_CExitMonitor(this);
if (NS_FAILED(rv)) return rv;
// then interrupt the threads and join them
Interrupt();
count = mThreads->Count();
for (i = 0; i < count; i++) {
nsIThread* thread = (nsIThread*)((*mThreads)[i]);
rv = thread->Join();
if (NS_FAILED(rv)) return rv;
}
return rv;
}
NS_IMETHODIMP
nsThreadPool::Interrupt()
{
nsresult rv = NS_OK;
PRUint32 count = mThreads->Count();
for (PRUint32 i = 0; i < count; i++) {
nsIThread* thread = (nsIThread*)((*mThreads)[i]);
rv = thread->Interrupt();
if (NS_FAILED(rv)) return rv;
}
return rv;
}
NS_BASE nsresult
NS_NewThreadPool(nsIThreadPool* *result,
PRUint32 minThreads, PRUint32 maxThreads,
@ -267,19 +438,25 @@ NS_IMPL_ISUPPORTS(nsThreadPoolRunnable, nsIRunnable::GetIID());
NS_IMETHODIMP
nsThreadPoolRunnable::Run()
{
nsresult rv = NS_OK;
nsIRunnable* request;
// let the thread pool know we're ready
PR_CEnterMonitor(mPool);
while (PR_TRUE) {
PRStatus status = PR_CWait(mPool, PR_INTERVAL_NO_TIMEOUT);
if (status != PR_SUCCESS) break; // interrupted -- quit
nsIRunnable* runnable = mPool->Dequeue();
nsresult rv = NS_OK;
rv = runnable->Run();
NS_ASSERTION(NS_SUCCEEDED(rv), "runnable failed");
}
PR_CNotify(mPool);
PR_CExitMonitor(mPool);
return NS_OK;
while ((request = mPool->GetRequest()) != nsnull) {
// printf("running %x, thread %x\n", this, PR_CurrentThread());
rv = request->Run();
NS_ASSERTION(NS_SUCCEEDED(rv), "runnable failed");
// let the thread pool know we're finished a run
PR_CEnterMonitor(mPool);
PR_CNotify(mPool);
PR_CExitMonitor(mPool);
}
return rv;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -35,6 +35,7 @@ public:
NS_IMETHOD GetScope(PRThreadScope *result);
NS_IMETHOD GetType(PRThreadType *result);
NS_IMETHOD GetState(PRThreadState *result);
NS_IMETHOD GetPRThread(PRThread* *result);
// nsThread methods:
nsThread();
@ -46,7 +47,13 @@ public:
PRThreadPriority priority,
PRThreadScope scope,
PRThreadState state);
nsresult RegisterThreadSelf();
void SetPRThread(PRThread* thread) { mThread = thread; }
static void Main(void* arg);
static void Exit(void* arg);
static PRUintn kIThreadSelf;
protected:
PRThread* mThread;
@ -62,6 +69,8 @@ public:
// nsIThreadPool methods:
NS_IMETHOD DispatchRequest(nsIRunnable* runnable);
NS_IMETHOD Join();
NS_IMETHOD Interrupt();
// nsThreadPool methods:
nsThreadPool(PRUint32 minThreads, PRUint32 maxThreads);
@ -72,11 +81,12 @@ public:
PRThreadPriority priority,
PRThreadScope scope,
PRThreadState state);
nsIRunnable* Dequeue();
nsIRunnable* GetRequest();
protected:
nsISupportsArray* mThreads;
nsISupportsArray* mRequests;
PRMonitor* mRequestMonitor;
PRUint32 mMinThreads;
PRUint32 mMaxThreads;
};