183: Make locking buckets a safe operation r=Amanieu a=faern

There should not be anything `unsafe` about locking `HashTable` buckets. There is no input you can give it, nor state you can cause by calling other public functions in this module that will make the lock bucket functions behave unsafely. Unless I missed some aspect of course.

I first made `get_hashtable` and `create_hashtable` return static safe references instead of raw pointers, so working with the returned HashTable became a bit easier. The returned pointers are going to always point to valid HashTables anyway, so I did not see anything unsafe about it. The tables might become inactive, but they will still be there, and never unsafe to access.

Co-authored-by: Linus Färnstrand <faern@faern.net>
This commit is contained in:
bors[bot]
2019-10-04 03:00:14 +00:00
committed by GitHub
+87 -40
View File
@@ -17,6 +17,13 @@ use smallvec::SmallVec;
use std::time::{Duration, Instant};
static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
/// Holds the pointer to the currently active `HashTable`.
///
/// # Safety
///
/// Except for the initial value of null, it must always point to a valid `HashTable` instance.
/// Any `HashTable` this global static has ever pointed to must never be freed.
static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
// Even with 3x more buckets than threads, the memory overhead per thread is
@@ -184,27 +191,31 @@ impl Drop for ThreadData {
}
}
// Get a pointer to the latest hash table, creating one if it doesn't exist yet.
/// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
/// The reference is valid forever. However, the `HashTable` it references might become stale
/// at any point. Meaning it still exists, but it is not the instance in active use.
#[inline]
fn get_hashtable() -> *mut HashTable {
fn get_hashtable() -> &'static HashTable {
let table = HASHTABLE.load(Ordering::Acquire);
// If there is no table, create one
if table.is_null() {
create_hashtable()
} else {
table
// SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
unsafe { &*table }
}
}
// Get a pointer to the latest hash table, creating one if it doesn't exist yet.
/// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
/// The reference is valid forever. However, the `HashTable` it references might become stale
/// at any point. Meaning it still exists, but it is not the instance in active use.
#[cold]
fn create_hashtable() -> *mut HashTable {
fn create_hashtable() -> &'static HashTable {
let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
// If this fails then it means some other thread created the hash
// table first.
match HASHTABLE.compare_exchange(
// If this fails then it means some other thread created the hash table first.
let table = match HASHTABLE.compare_exchange(
ptr::null_mut(),
new_table,
Ordering::Release,
@@ -213,12 +224,16 @@ fn create_hashtable() -> *mut HashTable {
Ok(_) => new_table,
Err(old_table) => {
// Free the table we created
// SAFETY: `new_table` is created from `Box::into_raw` above and only freed here.
unsafe {
Box::from_raw(new_table);
}
old_table
}
}
};
// SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we
// created here, or it is one loaded from `HASHTABLE`.
unsafe { &*table }
}
// Grow the hash table so that it is big enough for the given number of threads.
@@ -240,16 +255,19 @@ unsafe fn grow_hashtable(num_threads: usize) {
// Now check if our table is still the latest one. Another thread could
// have grown the hash table between us reading HASHTABLE and locking
// the buckets.
if HASHTABLE.load(Ordering::Relaxed) == old_table {
if HASHTABLE.load(Ordering::Relaxed) == old_table as *const _ as *mut _ {
break;
}
// Unlock buckets and try again
for b in &(*old_table).entries[..] {
// SAFETY: We hold the lock here, as required
b.mutex.unlock();
}
old_table = HASHTABLE.load(Ordering::Acquire);
// SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
// Here we know `get_hashtable` above must have initialized `HASHTABLE`
old_table = &*HASHTABLE.load(Ordering::Acquire);
}
// Create the new table
@@ -281,6 +299,7 @@ unsafe fn grow_hashtable(num_threads: usize) {
// Unlock all buckets in the old table
for b in &(*old_table).entries[..] {
// SAFETY: We hold the lock here, as required
b.mutex.unlock();
}
}
@@ -297,41 +316,44 @@ fn hash(key: usize, bits: u32) -> usize {
key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
}
// Lock the bucket for the given key
/// Locks the bucket for the given key and returns a reference to it.
/// The returned bucket must be unlocked again in order to not cause deadlocks.
#[inline]
unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {
fn lock_bucket(key: usize) -> &'static Bucket {
let mut bucket;
loop {
let hashtable = get_hashtable();
let hash = hash(key, (*hashtable).hash_bits);
bucket = &(*hashtable).entries[hash];
let hash = hash(key, hashtable.hash_bits);
bucket = &hashtable.entries[hash];
// Lock the bucket
bucket.mutex.lock();
// If no other thread has rehashed the table before we grabbed the lock
// then we are good to go! The lock we grabbed prevents any rehashes.
if HASHTABLE.load(Ordering::Relaxed) == hashtable {
if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
return bucket;
}
// Unlock the bucket and try again
bucket.mutex.unlock();
// SAFETY: We hold the lock here, as required
unsafe { bucket.mutex.unlock() };
}
}
// Lock the bucket for the given key, but check that the key hasn't been changed
// in the meantime due to a requeue.
/// Locks the bucket for the given key and returns a reference to it. But checks that the key
/// hasn't been changed in the meantime due to a requeue.
/// The returned bucket must be unlocked again in order to not cause deadlocks.
#[inline]
unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
let mut bucket;
loop {
let hashtable = get_hashtable();
let current_key = key.load(Ordering::Relaxed);
let hash = hash(current_key, (*hashtable).hash_bits);
bucket = &(*hashtable).entries[hash];
let hash = hash(current_key, hashtable.hash_bits);
bucket = &hashtable.entries[hash];
// Lock the bucket
bucket.mutex.lock();
@@ -339,31 +361,36 @@ unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
// Check that both the hash table and key are correct while the bucket
// is locked. Note that the key can't change once we locked the proper
// bucket for it, so we just keep trying until we have the correct key.
if HASHTABLE.load(Ordering::Relaxed) == hashtable
if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _
&& key.load(Ordering::Relaxed) == current_key
{
return (current_key, bucket);
}
// Unlock the bucket and try again
bucket.mutex.unlock();
// SAFETY: We hold the lock here, as required
unsafe { bucket.mutex.unlock() };
}
}
// Lock the two buckets for the given pair of keys
/// Locks the two buckets for the given pair of keys and returns references to them.
/// The returned buckets must be unlocked again in order to not cause deadlocks.
///
/// If both keys hash to the same value, both returned references will be to the same bucket. Be
/// careful to only unlock it once in this case, always use `unlock_bucket_pair`.
#[inline]
unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket) {
fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) {
let mut bucket1;
loop {
let hashtable = get_hashtable();
// Get the lowest bucket first
let hash1 = hash(key1, (*hashtable).hash_bits);
let hash2 = hash(key2, (*hashtable).hash_bits);
let hash1 = hash(key1, hashtable.hash_bits);
let hash2 = hash(key2, hashtable.hash_bits);
if hash1 <= hash2 {
bucket1 = &(*hashtable).entries[hash1];
bucket1 = &hashtable.entries[hash1];
} else {
bucket1 = &(*hashtable).entries[hash2];
bucket1 = &hashtable.entries[hash2];
}
// Lock the first bucket
@@ -371,27 +398,32 @@ unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Buc
// If no other thread has rehashed the table before we grabbed the lock
// then we are good to go! The lock we grabbed prevents any rehashes.
if HASHTABLE.load(Ordering::Relaxed) == hashtable {
if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
// Now lock the second bucket and return the two buckets
if hash1 == hash2 {
return (bucket1, bucket1);
} else if hash1 < hash2 {
let bucket2 = &(*hashtable).entries[hash2];
let bucket2 = &hashtable.entries[hash2];
bucket2.mutex.lock();
return (bucket1, bucket2);
} else {
let bucket2 = &(*hashtable).entries[hash1];
let bucket2 = &hashtable.entries[hash1];
bucket2.mutex.lock();
return (bucket2, bucket1);
}
}
// Unlock the bucket and try again
bucket1.mutex.unlock();
// SAFETY: We hold the lock here, as required
unsafe { bucket1.mutex.unlock() };
}
}
// Unlock a pair of buckets
/// Unlock a pair of buckets
///
/// # Safety
///
/// Both buckets must be locked
#[inline]
unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
bucket1.mutex.unlock();
@@ -537,6 +569,7 @@ pub unsafe fn park(
// If the validation function fails, just return
if !validate() {
// SAFETY: We hold the lock here, as required
bucket.mutex.unlock();
return ParkResult::Invalid;
}
@@ -553,6 +586,7 @@ pub unsafe fn park(
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
// SAFETY: We hold the lock here, as required
bucket.mutex.unlock();
// Invoke the pre-sleep callback
@@ -583,6 +617,7 @@ pub unsafe fn park(
// Now we need to check again if we were unparked or timed out. Unlike the
// last check this is precise because we hold the bucket lock.
if !thread_data.parker.timed_out() {
// SAFETY: We hold the lock here, as required
bucket.mutex.unlock();
return ParkResult::Unparked(thread_data.unpark_token.get());
}
@@ -630,6 +665,7 @@ pub unsafe fn park(
debug_assert!(!current.is_null());
// Unlock the bucket, we are done
// SAFETY: We hold the lock here, as required
bucket.mutex.unlock();
ParkResult::TimedOut
})
@@ -701,6 +737,7 @@ pub unsafe fn unpark_one(
// the queue locked while we perform a system call. Finally we wake
// up the parked thread.
let handle = (*current).parker.unpark_lock();
// SAFETY: We hold the lock here, as required
bucket.mutex.unlock();
handle.unpark();
@@ -714,6 +751,7 @@ pub unsafe fn unpark_one(
// No threads with a matching key were found in the bucket
callback(result);
// SAFETY: We hold the lock here, as required
bucket.mutex.unlock();
result
}
@@ -764,6 +802,7 @@ pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
}
// Unlock the bucket
// SAFETY: We hold the lock here, as required
bucket.mutex.unlock();
// Now that we are outside the lock, wake up all the threads that we removed
@@ -817,6 +856,7 @@ pub unsafe fn unpark_requeue(
let mut result = UnparkResult::default();
let op = validate();
if op == RequeueOp::Abort {
// SAFETY: Both buckets are locked, as required.
unlock_bucket_pair(bucket_from, bucket_to);
return result;
}
@@ -897,9 +937,11 @@ pub unsafe fn unpark_requeue(
if let Some(wakeup_thread) = wakeup_thread {
(*wakeup_thread).unpark_token.set(token);
let handle = (*wakeup_thread).parker.unpark_lock();
// SAFETY: Both buckets are locked, as required.
unlock_bucket_pair(bucket_from, bucket_to);
handle.unpark();
} else {
// SAFETY: Both buckets are locked, as required.
unlock_bucket_pair(bucket_from, bucket_to);
}
@@ -996,6 +1038,7 @@ pub unsafe fn unpark_filter(
t.1 = Some((*t.0).parker.unpark_lock());
}
// SAFETY: We hold the lock here, as required
bucket.mutex.unlock();
// Now that we are outside the lock, wake up all the threads that we removed
@@ -1188,6 +1231,7 @@ mod deadlock_impl {
}
current = (*current).next_in_queue.get();
}
// SAFETY: We hold the lock here, as required
b.mutex.unlock();
}
@@ -1212,19 +1256,20 @@ mod deadlock_impl {
let mut table = get_hashtable();
loop {
// Lock all buckets in the old table
for b in &(*table).entries[..] {
for b in &table.entries[..] {
b.mutex.lock();
}
// Now check if our table is still the latest one. Another thread could
// have grown the hash table between us getting and locking the hash table.
let new_table = get_hashtable();
if new_table == table {
if new_table as *const _ == table as *const _ {
break;
}
// Unlock buckets and try again
for b in &(*table).entries[..] {
for b in &table.entries[..] {
// SAFETY: We hold the lock here, as required
b.mutex.unlock();
}
@@ -1235,7 +1280,7 @@ mod deadlock_impl {
let mut graph =
DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
for b in &(*table).entries[..] {
for b in &table.entries[..] {
let mut current = b.queue_head.get();
while !current.is_null() {
if !(*current).parked_with_timeout.get()
@@ -1256,7 +1301,8 @@ mod deadlock_impl {
}
}
for b in &(*table).entries[..] {
for b in &table.entries[..] {
// SAFETY: We hold the lock here, as required
b.mutex.unlock();
}
@@ -1272,6 +1318,7 @@ mod deadlock_impl {
(*td).deadlock_data.deadlocked.set(true);
*(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
let handle = (*td).parker.unpark_lock();
// SAFETY: We hold the lock here, as required
bucket.mutex.unlock();
// unpark the deadlocked thread!
// on unpark it'll notice the deadlocked flag and report back