ntdll: Implement threadpool wait queues.

To implement waiting for an arbitrary number of handles, we group them
in buckets up to (MAXIMUM_WAIT_OBJECTS - 1) objects, and then assign a
dedicated wait queue thread. The last handle is used to notify about
changes.
This commit is contained in:
Sebastian Lackner 2015-07-05 02:22:52 +02:00 committed by Alexandre Julliard
parent 4523a54c62
commit f1be5dcac0

View File

@ -137,6 +137,7 @@ struct timer_queue
*/
#define THREADPOOL_WORKER_TIMEOUT 5000
#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
/* internal threadpool representation */
struct threadpool
@ -213,6 +214,13 @@ struct threadpool_object
struct
{
PTP_WAIT_CALLBACK callback;
LONG signaled;
/* information about the wait object, locked via waitqueue.cs */
struct waitqueue_bucket *bucket;
BOOL wait_pending;
struct list wait_entry;
ULONGLONG timeout;
HANDLE handle;
} wait;
} u;
};
@ -272,6 +280,38 @@ static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
};
/* global waitqueue object */
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
static struct
{
CRITICAL_SECTION cs;
LONG num_buckets;
struct list buckets;
}
waitqueue =
{
{ &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
0, /* num_buckets */
LIST_INIT( waitqueue.buckets ) /* buckets */
};
static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
{
0, 0, &waitqueue.cs,
{ &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
};
struct waitqueue_bucket
{
struct list bucket_entry;
LONG objcount;
struct list reserved;
struct list waiting;
HANDLE update_event;
};
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
@ -309,7 +349,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
}
static void CALLBACK threadpool_worker_proc( void *param );
static void tp_object_submit( struct threadpool_object *object );
static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
static void tp_object_shutdown( struct threadpool_object *object );
static BOOL tp_object_release( struct threadpool_object *object );
static struct threadpool *default_threadpool = NULL;
@ -1261,7 +1301,7 @@ static void CALLBACK timerqueue_thread_proc( void *param )
/* Queue a new callback in one of the worker threads. */
list_remove( &timer->u.timer.timer_entry );
timer->u.timer.timer_pending = FALSE;
tp_object_submit( timer );
tp_object_submit( timer, FALSE );
/* Insert the timer back into the queue, except its marked for shutdown. */
if (timer->u.timer.period && !timer->shutdown)
@ -1397,6 +1437,214 @@ static void tp_timerqueue_unlock( struct threadpool_object *timer )
RtlLeaveCriticalSection( &timerqueue.cs );
}
/***********************************************************************
* waitqueue_thread_proc (internal)
*/
static void CALLBACK waitqueue_thread_proc( void *param )
{
struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
struct waitqueue_bucket *bucket = param;
struct threadpool_object *wait, *next;
LARGE_INTEGER now, timeout;
DWORD num_handles;
NTSTATUS status;
TRACE( "starting wait queue thread\n" );
RtlEnterCriticalSection( &waitqueue.cs );
for (;;)
{
NtQuerySystemTime( &now );
timeout.QuadPart = TIMEOUT_INFINITE;
num_handles = 0;
LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
u.wait.wait_entry )
{
assert( wait->type == TP_OBJECT_TYPE_WAIT );
if (wait->u.wait.timeout <= now.QuadPart)
{
/* Wait object timed out. */
list_remove( &wait->u.wait.wait_entry );
list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
tp_object_submit( wait, FALSE );
}
else
{
if (wait->u.wait.timeout < timeout.QuadPart)
timeout.QuadPart = wait->u.wait.timeout;
assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
interlocked_inc( &wait->refcount );
objects[num_handles] = wait;
handles[num_handles] = wait->u.wait.handle;
num_handles++;
}
}
if (!bucket->objcount)
{
/* All wait objects have been destroyed, if no new wait objects are created
* within some amount of time, then we can shutdown this thread. */
assert( num_handles == 0 );
RtlLeaveCriticalSection( &waitqueue.cs );
timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
RtlEnterCriticalSection( &waitqueue.cs );
if (status == STATUS_TIMEOUT && !bucket->objcount)
break;
}
else
{
handles[num_handles] = bucket->update_event;
RtlLeaveCriticalSection( &waitqueue.cs );
status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
RtlEnterCriticalSection( &waitqueue.cs );
if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
{
wait = objects[status - STATUS_WAIT_0];
assert( wait->type == TP_OBJECT_TYPE_WAIT );
if (wait->u.wait.bucket)
{
/* Wait object signaled. */
assert( wait->u.wait.bucket == bucket );
list_remove( &wait->u.wait.wait_entry );
list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
tp_object_submit( wait, TRUE );
}
else
ERR("wait object %p triggered while object was destroyed\n", wait);
}
/* Release temporary references to wait objects. */
while (num_handles)
{
wait = objects[--num_handles];
assert( wait->type == TP_OBJECT_TYPE_WAIT );
tp_object_release( wait );
}
}
}
/* Remove this bucket from the list. */
list_remove( &bucket->bucket_entry );
if (!--waitqueue.num_buckets)
assert( list_empty( &waitqueue.buckets ) );
RtlLeaveCriticalSection( &waitqueue.cs );
TRACE( "terminating wait queue thread\n" );
assert( bucket->objcount == 0 );
assert( list_empty( &bucket->reserved ) );
assert( list_empty( &bucket->waiting ) );
NtClose( bucket->update_event );
RtlFreeHeap( GetProcessHeap(), 0, bucket );
}
/***********************************************************************
* tp_waitqueue_lock (internal)
*/
static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
{
struct waitqueue_bucket *bucket;
NTSTATUS status;
HANDLE thread;
assert( wait->type = TP_OBJECT_TYPE_WAIT );
wait->u.wait.signaled = 0;
wait->u.wait.bucket = NULL;
wait->u.wait.wait_pending = FALSE;
wait->u.wait.timeout = 0;
wait->u.wait.handle = INVALID_HANDLE_VALUE;
RtlEnterCriticalSection( &waitqueue.cs );
/* Try to assign to existing bucket if possible. */
LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
{
if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
{
list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
wait->u.wait.bucket = bucket;
bucket->objcount++;
status = STATUS_SUCCESS;
goto out;
}
}
/* Create a new bucket and corresponding worker thread. */
bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
if (!bucket)
{
status = STATUS_NO_MEMORY;
goto out;
}
bucket->objcount = 0;
list_init( &bucket->reserved );
list_init( &bucket->waiting );
status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
NULL, SynchronizationEvent, FALSE );
if (status)
{
RtlFreeHeap( GetProcessHeap(), 0, bucket );
goto out;
}
status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
waitqueue_thread_proc, bucket, &thread, NULL );
if (status == STATUS_SUCCESS)
{
list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
waitqueue.num_buckets++;
list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
wait->u.wait.bucket = bucket;
bucket->objcount++;
NtClose( thread );
}
else
{
NtClose( bucket->update_event );
RtlFreeHeap( GetProcessHeap(), 0, bucket );
}
out:
RtlLeaveCriticalSection( &waitqueue.cs );
return status;
}
/***********************************************************************
* tp_waitqueue_unlock (internal)
*/
static void tp_waitqueue_unlock( struct threadpool_object *wait )
{
assert( wait->type == TP_OBJECT_TYPE_WAIT );
RtlEnterCriticalSection( &waitqueue.cs );
if (wait->u.wait.bucket)
{
struct waitqueue_bucket *bucket = wait->u.wait.bucket;
assert( bucket->objcount > 0 );
list_remove( &wait->u.wait.wait_entry );
wait->u.wait.bucket = NULL;
bucket->objcount--;
NtSetEvent( bucket->update_event, NULL );
}
RtlLeaveCriticalSection( &waitqueue.cs );
}
/***********************************************************************
* tp_threadpool_alloc (internal)
*
@ -1666,7 +1914,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
* will be set, and tp_object_submit would fail with an assertion. */
if (is_simple_callback)
tp_object_submit( object );
tp_object_submit( object, FALSE );
if (object->group)
{
@ -1692,7 +1940,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
* Submits a threadpool object to the associcated threadpool. This
* function has to be VOID because TpPostWork can never fail on Windows.
*/
static void tp_object_submit( struct threadpool_object *object )
static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
{
struct threadpool *pool = object->pool;
NTSTATUS status = STATUS_UNSUCCESSFUL;
@ -1722,6 +1970,10 @@ static void tp_object_submit( struct threadpool_object *object )
if (!object->num_pending_callbacks++)
list_add_tail( &pool->pool, &object->pool_entry );
/* Count how often the object was signaled. */
if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
object->u.wait.signaled++;
/* No new thread started - wake up one existing thread. */
if (status != STATUS_SUCCESS)
{
@ -1748,6 +2000,9 @@ static void tp_object_cancel( struct threadpool_object *object, BOOL group_cance
pending_callbacks = object->num_pending_callbacks;
object->num_pending_callbacks = 0;
list_remove( &object->pool_entry );
if (object->type == TP_OBJECT_TYPE_WAIT)
object->u.wait.signaled = 0;
}
RtlLeaveCriticalSection( &pool->cs );
@ -1797,6 +2052,8 @@ static void tp_object_shutdown( struct threadpool_object *object )
{
if (object->type == TP_OBJECT_TYPE_TIMER)
tp_timerqueue_unlock( object );
else if (object->type == TP_OBJECT_TYPE_WAIT)
tp_waitqueue_unlock( object );
object->shutdown = TRUE;
}
@ -1851,6 +2108,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
TP_CALLBACK_INSTANCE *callback_instance;
struct threadpool_instance instance;
struct threadpool *pool = param;
TP_WAIT_RESULT wait_result = 0;
LARGE_INTEGER timeout;
struct list *ptr;
NTSTATUS status;
@ -1871,6 +2129,13 @@ static void CALLBACK threadpool_worker_proc( void *param )
if (--object->num_pending_callbacks)
list_add_tail( &pool->pool, &object->pool_entry );
/* For wait objects check if they were signaled or have timed out. */
if (object->type == TP_OBJECT_TYPE_WAIT)
{
wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
}
/* Leave critical section and do the actual callback. */
object->num_associated_callbacks++;
object->num_running_callbacks++;
@ -1922,8 +2187,8 @@ static void CALLBACK threadpool_worker_proc( void *param )
case TP_OBJECT_TYPE_WAIT:
{
TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
object->u.wait.callback, callback_instance, object->userdata, object, WAIT_OBJECT_0 );
object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, WAIT_OBJECT_0 );
object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
TRACE( "callback %p returned\n", object->u.wait.callback );
break;
}
@ -2097,6 +2362,15 @@ NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID us
object->type = TP_OBJECT_TYPE_WAIT;
object->u.wait.callback = callback;
status = tp_waitqueue_lock( object );
if (status)
{
tp_threadpool_unlock( pool );
RtlFreeHeap( GetProcessHeap(), 0, object );
return status;
}
tp_object_initialize( object, pool, userdata, environment );
*out = (TP_WAIT *)object;
@ -2304,7 +2578,7 @@ VOID WINAPI TpPostWork( TP_WORK *work )
TRACE( "%p\n", work );
tp_object_submit( this );
tp_object_submit( this, FALSE );
}
/***********************************************************************
@ -2558,7 +2832,7 @@ VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LO
RtlLeaveCriticalSection( &timerqueue.cs );
if (submit_timer)
tp_object_submit( this );
tp_object_submit( this, FALSE );
}
/***********************************************************************