darling-libobjc2/toydispatch.c

241 lines
7.6 KiB
C
Raw Normal View History

#include <pthread.h>
#include <stdlib.h>
#define __TOY_DISPATCH__
#include "objc/toydispatch.h"
/**
* Amount of total space in the ring buffer. Must be a power of two.
*/
#define RING_BUFFER_SIZE 32
/**
* Mask for converting a free-running counters into ring buffer indexes.
*/
#define RING_BUFFER_MASK (RING_BUFFER_SIZE - 1)
struct dispatch_queue
{
/**
* Reference count for this queue.
*/
int refcount;
/**
* Spin lock value. Set to 1 when the queue is locked. This allows
* multiple threads to write to the queue but only one to read from it.
* Reading and writing can happen concurrently, but writing requires
* acquisition of this lock.
*/
volatile int spinlock;
/**
* Producer free-running counter. Incremented every time that a new item
* is inserted into the ring buffer.
*/
unsigned int producer;
/**
* Consumer free-running counter. Incremented every time that an item is
* removed from the buffer.
*/
unsigned int consumer;
/**
* Mutex used to protect the condition variable.
*/
pthread_mutex_t mutex;
/**
* Condition variable used in blocking mode. The consumer thread will
* sleep on this condition variable when the queue has been empty for a
* little while. The next producer thread to insert something will poke
* the condition variable on any empty->non-empty transition.
*/
pthread_cond_t conditionVariable;
/**
* Ring buffer containing functions and data to be executed by the
* consumer.
*/
struct
{
dispatch_function_t function;
void *data;
} ring_buffer[RING_BUFFER_SIZE];
};
/**
* Check how much space is in the queue. The number of used elements in the
* queue is always equal to producer - consumer. Producer will always
* overflow before consumer (because you can't remove objects that have not
* been inserted. In this case, the subtraction will be something along the
* lines of (0 - (2^32 - 14)). This will be -(2^32 - 14), however this value
* can't be represented in a 32-bit integer and so will overflow to 14, giving
* the correct result, irrespective of overflow.
*/
#define SPACE(q) (RING_BUFFER_SIZE - (q->producer - q->consumer))
/**
* The buffer is full if there is no space in it.
*/
#define ISFULL(q) (SPACE(q) == 0)
/**
* The buffer is empty if there is no data in it.
*/
#define ISEMPTY(q) ((q->producer - q->consumer) == 0)
/**
* Converting the free running counters to array indexes is a masking
* operation. For this to work, the buffer size must be a power of two.
* RING_BUFFER_MASK = RING_BUFFER_SIZE - 1. If RING_BUFFER_SIZE is 256, we want the lowest 8
* bits of the index, which is obtained by ANDing the value with 255. Any
* power of two may be selected. Non power-of-two values could be used if a
* more complex mapping operation were chosen, but this one is nice and cheap.
*/
#define MASK(index) ((index) & RING_BUFFER_MASK)
/**
* Lock the queue. This uses a very lightweight, nonrecursive, spinlock. It
* is expected that queue insertions will be relatively uncontended.
*/
inline static void lock_queue(dispatch_queue_t queue)
{
// Set the spin lock value to 1 if it is 0.
while(!__sync_bool_compare_and_swap(&queue->spinlock, 0, 1))
{
// If it is already 1, let another thread play with the CPU for a bit
// then try again.
sched_yield();
}
}
/**
* Unlock the queue. This doesn't need to be an atomic op; that will cause a
* complete pipeline flush on this thread and not actually buy us anything
* because at this point only one thread (this one) will do anything that will
* modify the variable. The other threads will all be using atomic
* compare-and-exchange instructions which will fail because we already set it
* to 1.
*/
inline static void unlock_queue(dispatch_queue_t queue)
{
queue->spinlock = 0;
}
/**
* Inserting an element into the queue involves the following steps:
*
* 1) Check that there is space in the buffer.
* Spin if there isn't any.
* 2) Add the invocation and optionally the proxy containing the return value
* (nil for none) to the next two elements in the ring buffer.
* 3) Increment the producer counter (by two, since we are adding two elements).
* 4) If the queue was previously empty, we need to transition back to lockless
* mode. This is done by signalling the condition variable that the other
* thread will be waiting on if it is in blocking mode.
*/
inline static void insert_into_queue(dispatch_queue_t queue,
dispatch_function_t function,
void *data)
{
/* Wait for space in the buffer */
lock_queue(queue);
while (ISFULL(queue))
{
sched_yield();
}
unsigned int idx = MASK(queue->producer);
queue->ring_buffer[idx].function = function;
queue->ring_buffer[idx].data = data;
// NOTE: This doesn't actually need to be atomic on a strongly-ordered
// architecture like x86.
__sync_fetch_and_add(&queue->producer, 1);
unsigned int space = queue->producer - queue->consumer;
unlock_queue(queue);
// If we've just transitioned from empty to full, wake up the consumer thread.
// Note: We do this after unlocking the queue, because it is much more
// expensive than anything else that we do in this function and we don't
// want to hold the spinlock for any longer than possible. We need to
// calculate the space first, however, because otherwise another thread may
// increment producer, while consumer stays the same (with the consumer
// thread sleeping), preventing the wakeup.
if (space == 1)
{
pthread_mutex_lock(&queue->mutex);
pthread_cond_signal(&queue->conditionVariable);
pthread_mutex_unlock(&queue->mutex);
}
}
/**
* Removing an element from the queue involves the following steps:
*
* 1) Wait until the queue has messages waiting. If there are none, enter
* blocking mode. The additional test inside the mutex ensures that a
* transition from blocking to non-blocking mode will not be missed, since the
* condition variable can only be signalled when the producer thread has the
* mutex.
* 2) Read the invocation and return proxy from the buffer.
* 3) Incrememt the consumer counter.
*/
static inline void read_from_queue(dispatch_queue_t queue,
dispatch_function_t *function, void **data)
{
while (ISEMPTY(queue))
{
pthread_mutex_lock(&queue->mutex);
if (ISEMPTY(queue))
{
pthread_cond_wait(&queue->conditionVariable, &queue->mutex);
}
pthread_mutex_unlock(&queue->mutex);
}
unsigned int idx = MASK(queue->consumer);
*function = queue->ring_buffer[idx].function;
*data = queue->ring_buffer[idx].data;
__sync_fetch_and_add(&queue->consumer, 1);
}
static void *runloop(void *q)
{
dispatch_queue_t queue = q;
dispatch_function_t function;
void *data;
while (queue->refcount > 0)
{
read_from_queue(queue, &function, &data);
function(data);
}
pthread_cond_destroy(&queue->conditionVariable);
pthread_mutex_destroy(&queue->mutex);
free(queue);
return NULL;
}
dispatch_queue_t dispatch_queue_create(const char *label,
void *attr)
{
dispatch_queue_t queue = calloc(1, sizeof(struct dispatch_queue));
queue->refcount = 1;
pthread_cond_init(&queue->conditionVariable, NULL);
pthread_mutex_init(&queue->mutex, NULL);
pthread_t thread;
pthread_create(&thread, NULL, runloop, queue);
pthread_detach(thread);
return queue;
}
void dispatch_async_f(dispatch_queue_t queue, void *context,
dispatch_function_t work)
{
insert_into_queue(queue, work, context);
}
static void release(void *queue)
{
((dispatch_queue_t)queue)->refcount--;
}
void dispatch_release(dispatch_queue_t queue)
{
// Asynchronously release the queue, so that we don't delete it before all
// of the work is finished.
insert_into_queue(queue, release, queue);
}
void dispatch_retain(dispatch_queue_t queue)
{
queue->refcount++;
}