mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 15:11:34 +00:00 
			
		
		
		
	PyMutex is a one byte lock with fast, inlineable lock and unlock functions for the common uncontended case. The design is based on WebKit's WTF::Lock. PyMutex is built using the _PyParkingLot APIs, which provides a cross-platform futex-like API (based on WebKit's WTF::ParkingLot). This internal API will be used for building other synchronization primitives used to implement PEP 703, such as one-time initialization and events. This also includes tests and a mini benchmark in Tools/lockbench/lockbench.py to compare with the existing PyThread_type_lock. Uncontended acquisition + release: * Linux (x86-64): PyMutex: 11 ns, PyThread_type_lock: 44 ns * macOS (arm64): PyMutex: 13 ns, PyThread_type_lock: 18 ns * Windows (x86-64): PyMutex: 13 ns, PyThread_type_lock: 38 ns PR Overview: The primary purpose of this PR is to implement PyMutex, but there are a number of support pieces (described below). * PyMutex: A 1-byte lock that doesn't require memory allocation to initialize and is generally faster than the existing PyThread_type_lock. The API is internal only for now. * _PyParking_Lot: A futex-like API based on the API of the same name in WebKit. Used to implement PyMutex. * _PyRawMutex: A word sized lock used to implement _PyParking_Lot. * PyEvent: A one time event. This was used a bunch in the "nogil" fork and is useful for testing the PyMutex implementation, so I've included it as part of the PR. * pycore_llist.h: Defines common operations on doubly-linked list. Not strictly necessary (could do the list operations manually), but they come up frequently in the "nogil" fork. ( Similar to https://man.freebsd.org/cgi/man.cgi?queue) --------- Co-authored-by: Eric Snow <ericsnowcurrently@gmail.com>
		
			
				
	
	
		
			370 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			370 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
#include "Python.h"
 | 
						|
 | 
						|
#include "pycore_llist.h"
 | 
						|
#include "pycore_lock.h"        // _PyRawMutex
 | 
						|
#include "pycore_parking_lot.h"
 | 
						|
#include "pycore_pyerrors.h"    // _Py_FatalErrorFormat
 | 
						|
#include "pycore_pystate.h"     // _PyThreadState_GET
 | 
						|
#include "pycore_semaphore.h"   // _PySemaphore
 | 
						|
 | 
						|
#include <stdbool.h>
 | 
						|
 | 
						|
 | 
						|
typedef struct {
 | 
						|
    // The mutex protects the waiter queue and the num_waiters counter.
 | 
						|
    _PyRawMutex mutex;
 | 
						|
 | 
						|
    // Linked list of `struct wait_entry` waiters in this bucket.
 | 
						|
    struct llist_node root;
 | 
						|
    size_t num_waiters;
 | 
						|
} Bucket;
 | 
						|
 | 
						|
struct wait_entry {
 | 
						|
    void *park_arg;
 | 
						|
    uintptr_t addr;
 | 
						|
    _PySemaphore sema;
 | 
						|
    struct llist_node node;
 | 
						|
    bool is_unparking;
 | 
						|
};
 | 
						|
 | 
						|
// Prime number to avoid correlations with memory addresses.
 | 
						|
// We want this to be roughly proportional to the number of CPU cores
 | 
						|
// to minimize contention on the bucket locks, but not too big to avoid
 | 
						|
// wasting memory. The exact choice does not matter much.
 | 
						|
#define NUM_BUCKETS 257
 | 
						|
 | 
						|
#define BUCKET_INIT(b, i) [i] = { .root = LLIST_INIT(b[i].root) }
 | 
						|
#define BUCKET_INIT_2(b, i)   BUCKET_INIT(b, i),     BUCKET_INIT(b, i+1)
 | 
						|
#define BUCKET_INIT_4(b, i)   BUCKET_INIT_2(b, i),   BUCKET_INIT_2(b, i+2)
 | 
						|
#define BUCKET_INIT_8(b, i)   BUCKET_INIT_4(b, i),   BUCKET_INIT_4(b, i+4)
 | 
						|
#define BUCKET_INIT_16(b, i)  BUCKET_INIT_8(b, i),   BUCKET_INIT_8(b, i+8)
 | 
						|
#define BUCKET_INIT_32(b, i)  BUCKET_INIT_16(b, i),  BUCKET_INIT_16(b, i+16)
 | 
						|
#define BUCKET_INIT_64(b, i)  BUCKET_INIT_32(b, i),  BUCKET_INIT_32(b, i+32)
 | 
						|
#define BUCKET_INIT_128(b, i) BUCKET_INIT_64(b, i),  BUCKET_INIT_64(b, i+64)
 | 
						|
#define BUCKET_INIT_256(b, i) BUCKET_INIT_128(b, i), BUCKET_INIT_128(b, i+128)
 | 
						|
 | 
						|
// Table of waiters (hashed by address)
 | 
						|
static Bucket buckets[NUM_BUCKETS] = {
 | 
						|
    BUCKET_INIT_256(buckets, 0),
 | 
						|
    BUCKET_INIT(buckets, 256),
 | 
						|
};
 | 
						|
 | 
						|
void
 | 
						|
_PySemaphore_Init(_PySemaphore *sema)
 | 
						|
{
 | 
						|
#if defined(MS_WINDOWS)
 | 
						|
    sema->platform_sem = CreateSemaphore(
 | 
						|
        NULL,   //  attributes
 | 
						|
        0,      //  initial count
 | 
						|
        10,     //  maximum count
 | 
						|
        NULL    //  unnamed
 | 
						|
    );
 | 
						|
    if (!sema->platform_sem) {
 | 
						|
        Py_FatalError("parking_lot: CreateSemaphore failed");
 | 
						|
    }
 | 
						|
#elif defined(_Py_USE_SEMAPHORES)
 | 
						|
    if (sem_init(&sema->platform_sem, /*pshared=*/0, /*value=*/0) < 0) {
 | 
						|
        Py_FatalError("parking_lot: sem_init failed");
 | 
						|
    }
 | 
						|
#else
 | 
						|
    if (pthread_mutex_init(&sema->mutex, NULL) != 0) {
 | 
						|
        Py_FatalError("parking_lot: pthread_mutex_init failed");
 | 
						|
    }
 | 
						|
    if (pthread_cond_init(&sema->cond, NULL)) {
 | 
						|
        Py_FatalError("parking_lot: pthread_cond_init failed");
 | 
						|
    }
 | 
						|
    sema->counter = 0;
 | 
						|
#endif
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
_PySemaphore_Destroy(_PySemaphore *sema)
 | 
						|
{
 | 
						|
#if defined(MS_WINDOWS)
 | 
						|
    CloseHandle(sema->platform_sem);
 | 
						|
#elif defined(_Py_USE_SEMAPHORES)
 | 
						|
    sem_destroy(&sema->platform_sem);
 | 
						|
#else
 | 
						|
    pthread_mutex_destroy(&sema->mutex);
 | 
						|
    pthread_cond_destroy(&sema->cond);
 | 
						|
#endif
 | 
						|
}
 | 
						|
 | 
						|
static int
 | 
						|
_PySemaphore_PlatformWait(_PySemaphore *sema, _PyTime_t timeout)
 | 
						|
{
 | 
						|
    int res;
 | 
						|
#if defined(MS_WINDOWS)
 | 
						|
    DWORD wait;
 | 
						|
    DWORD millis = 0;
 | 
						|
    if (timeout < 0) {
 | 
						|
        millis = INFINITE;
 | 
						|
    }
 | 
						|
    else {
 | 
						|
        millis = (DWORD) (timeout / 1000000);
 | 
						|
    }
 | 
						|
    wait = WaitForSingleObjectEx(sema->platform_sem, millis, FALSE);
 | 
						|
    if (wait == WAIT_OBJECT_0) {
 | 
						|
        res = Py_PARK_OK;
 | 
						|
    }
 | 
						|
    else if (wait == WAIT_TIMEOUT) {
 | 
						|
        res = Py_PARK_TIMEOUT;
 | 
						|
    }
 | 
						|
    else {
 | 
						|
        res = Py_PARK_INTR;
 | 
						|
    }
 | 
						|
#elif defined(_Py_USE_SEMAPHORES)
 | 
						|
    int err;
 | 
						|
    if (timeout >= 0) {
 | 
						|
        struct timespec ts;
 | 
						|
 | 
						|
        _PyTime_t deadline = _PyTime_Add(_PyTime_GetSystemClock(), timeout);
 | 
						|
        _PyTime_AsTimespec(deadline, &ts);
 | 
						|
 | 
						|
        err = sem_timedwait(&sema->platform_sem, &ts);
 | 
						|
    }
 | 
						|
    else {
 | 
						|
        err = sem_wait(&sema->platform_sem);
 | 
						|
    }
 | 
						|
    if (err == -1) {
 | 
						|
        err = errno;
 | 
						|
        if (err == EINTR) {
 | 
						|
            res = Py_PARK_INTR;
 | 
						|
        }
 | 
						|
        else if (err == ETIMEDOUT) {
 | 
						|
            res = Py_PARK_TIMEOUT;
 | 
						|
        }
 | 
						|
        else {
 | 
						|
            _Py_FatalErrorFormat(__func__,
 | 
						|
                "unexpected error from semaphore: %d",
 | 
						|
                err);
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else {
 | 
						|
        res = Py_PARK_OK;
 | 
						|
    }
 | 
						|
#else
 | 
						|
    pthread_mutex_lock(&sema->mutex);
 | 
						|
    int err = 0;
 | 
						|
    if (sema->counter == 0) {
 | 
						|
        if (timeout >= 0) {
 | 
						|
            struct timespec ts;
 | 
						|
 | 
						|
            _PyTime_t deadline = _PyTime_Add(_PyTime_GetSystemClock(), timeout);
 | 
						|
            _PyTime_AsTimespec(deadline, &ts);
 | 
						|
 | 
						|
            err = pthread_cond_timedwait(&sema->cond, &sema->mutex, &ts);
 | 
						|
        }
 | 
						|
        else {
 | 
						|
            err = pthread_cond_wait(&sema->cond, &sema->mutex);
 | 
						|
        }
 | 
						|
    }
 | 
						|
    if (sema->counter > 0) {
 | 
						|
        sema->counter--;
 | 
						|
        res = Py_PARK_OK;
 | 
						|
    }
 | 
						|
    else if (err) {
 | 
						|
        res = Py_PARK_TIMEOUT;
 | 
						|
    }
 | 
						|
    else {
 | 
						|
        res = Py_PARK_INTR;
 | 
						|
    }
 | 
						|
    pthread_mutex_unlock(&sema->mutex);
 | 
						|
#endif
 | 
						|
    return res;
 | 
						|
}
 | 
						|
 | 
						|
int
 | 
						|
_PySemaphore_Wait(_PySemaphore *sema, _PyTime_t timeout, int detach)
 | 
						|
{
 | 
						|
    PyThreadState *tstate = NULL;
 | 
						|
    if (detach) {
 | 
						|
        tstate = _PyThreadState_GET();
 | 
						|
        if (tstate) {
 | 
						|
            PyEval_ReleaseThread(tstate);
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    int res = _PySemaphore_PlatformWait(sema, timeout);
 | 
						|
 | 
						|
    if (detach && tstate) {
 | 
						|
        PyEval_AcquireThread(tstate);
 | 
						|
    }
 | 
						|
    return res;
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
_PySemaphore_Wakeup(_PySemaphore *sema)
 | 
						|
{
 | 
						|
#if defined(MS_WINDOWS)
 | 
						|
    if (!ReleaseSemaphore(sema->platform_sem, 1, NULL)) {
 | 
						|
        Py_FatalError("parking_lot: ReleaseSemaphore failed");
 | 
						|
    }
 | 
						|
#elif defined(_Py_USE_SEMAPHORES)
 | 
						|
    int err = sem_post(&sema->platform_sem);
 | 
						|
    if (err != 0) {
 | 
						|
        Py_FatalError("parking_lot: sem_post failed");
 | 
						|
    }
 | 
						|
#else
 | 
						|
    pthread_mutex_lock(&sema->mutex);
 | 
						|
    sema->counter++;
 | 
						|
    pthread_cond_signal(&sema->cond);
 | 
						|
    pthread_mutex_unlock(&sema->mutex);
 | 
						|
#endif
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
enqueue(Bucket *bucket, const void *address, struct wait_entry *wait)
 | 
						|
{
 | 
						|
    llist_insert_tail(&bucket->root, &wait->node);
 | 
						|
    ++bucket->num_waiters;
 | 
						|
}
 | 
						|
 | 
						|
static struct wait_entry *
 | 
						|
dequeue(Bucket *bucket, const void *address)
 | 
						|
{
 | 
						|
    // find the first waiter that is waiting on `address`
 | 
						|
    struct llist_node *root = &bucket->root;
 | 
						|
    struct llist_node *node;
 | 
						|
    llist_for_each(node, root) {
 | 
						|
        struct wait_entry *wait = llist_data(node, struct wait_entry, node);
 | 
						|
        if (wait->addr == (uintptr_t)address) {
 | 
						|
            llist_remove(node);
 | 
						|
            --bucket->num_waiters;
 | 
						|
            return wait;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    return NULL;
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
dequeue_all(Bucket *bucket, const void *address, struct llist_node *dst)
 | 
						|
{
 | 
						|
    // remove and append all matching waiters to dst
 | 
						|
    struct llist_node *root = &bucket->root;
 | 
						|
    struct llist_node *node;
 | 
						|
    llist_for_each_safe(node, root) {
 | 
						|
        struct wait_entry *wait = llist_data(node, struct wait_entry, node);
 | 
						|
        if (wait->addr == (uintptr_t)address) {
 | 
						|
            llist_remove(node);
 | 
						|
            llist_insert_tail(dst, node);
 | 
						|
            --bucket->num_waiters;
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
// Checks that `*addr == *expected` (only works for 1, 2, 4, or 8 bytes)
 | 
						|
static int
 | 
						|
atomic_memcmp(const void *addr, const void *expected, size_t addr_size)
 | 
						|
{
 | 
						|
    switch (addr_size) {
 | 
						|
    case 1: return _Py_atomic_load_uint8(addr) == *(const uint8_t *)expected;
 | 
						|
    case 2: return _Py_atomic_load_uint16(addr) == *(const uint16_t *)expected;
 | 
						|
    case 4: return _Py_atomic_load_uint32(addr) == *(const uint32_t *)expected;
 | 
						|
    case 8: return _Py_atomic_load_uint64(addr) == *(const uint64_t *)expected;
 | 
						|
    default: Py_UNREACHABLE();
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
int
 | 
						|
_PyParkingLot_Park(const void *addr, const void *expected, size_t size,
 | 
						|
                   _PyTime_t timeout_ns, void *park_arg, int detach)
 | 
						|
{
 | 
						|
    struct wait_entry wait = {
 | 
						|
        .park_arg = park_arg,
 | 
						|
        .addr = (uintptr_t)addr,
 | 
						|
        .is_unparking = false,
 | 
						|
    };
 | 
						|
 | 
						|
    Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
 | 
						|
 | 
						|
    _PyRawMutex_Lock(&bucket->mutex);
 | 
						|
    if (!atomic_memcmp(addr, expected, size)) {
 | 
						|
        _PyRawMutex_Unlock(&bucket->mutex);
 | 
						|
        return Py_PARK_AGAIN;
 | 
						|
    }
 | 
						|
    _PySemaphore_Init(&wait.sema);
 | 
						|
    enqueue(bucket, addr, &wait);
 | 
						|
    _PyRawMutex_Unlock(&bucket->mutex);
 | 
						|
 | 
						|
    int res = _PySemaphore_Wait(&wait.sema, timeout_ns, detach);
 | 
						|
    if (res == Py_PARK_OK) {
 | 
						|
        goto done;
 | 
						|
    }
 | 
						|
 | 
						|
    // timeout or interrupt
 | 
						|
    _PyRawMutex_Lock(&bucket->mutex);
 | 
						|
    if (wait.is_unparking) {
 | 
						|
        _PyRawMutex_Unlock(&bucket->mutex);
 | 
						|
        // Another thread has started to unpark us. Wait until we process the
 | 
						|
        // wakeup signal.
 | 
						|
        do {
 | 
						|
            res = _PySemaphore_Wait(&wait.sema, -1, detach);
 | 
						|
        } while (res != Py_PARK_OK);
 | 
						|
        goto done;
 | 
						|
    }
 | 
						|
    else {
 | 
						|
        llist_remove(&wait.node);
 | 
						|
        --bucket->num_waiters;
 | 
						|
    }
 | 
						|
    _PyRawMutex_Unlock(&bucket->mutex);
 | 
						|
 | 
						|
done:
 | 
						|
    _PySemaphore_Destroy(&wait.sema);
 | 
						|
    return res;
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
_PyParkingLot_Unpark(const void *addr, _Py_unpark_fn_t *fn, void *arg)
 | 
						|
{
 | 
						|
    Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
 | 
						|
 | 
						|
    // Find the first waiter that is waiting on `addr`
 | 
						|
    _PyRawMutex_Lock(&bucket->mutex);
 | 
						|
    struct wait_entry *waiter = dequeue(bucket, addr);
 | 
						|
    if (waiter) {
 | 
						|
        waiter->is_unparking = true;
 | 
						|
 | 
						|
        int has_more_waiters = (bucket->num_waiters > 0);
 | 
						|
        fn(arg, waiter->park_arg, has_more_waiters);
 | 
						|
    }
 | 
						|
    else {
 | 
						|
        fn(arg, NULL, 0);
 | 
						|
    }
 | 
						|
    _PyRawMutex_Unlock(&bucket->mutex);
 | 
						|
 | 
						|
    if (waiter) {
 | 
						|
        // Wakeup the waiter outside of the bucket lock
 | 
						|
        _PySemaphore_Wakeup(&waiter->sema);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
_PyParkingLot_UnparkAll(const void *addr)
 | 
						|
{
 | 
						|
    struct llist_node head = LLIST_INIT(head);
 | 
						|
    Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
 | 
						|
 | 
						|
    _PyRawMutex_Lock(&bucket->mutex);
 | 
						|
    dequeue_all(bucket, addr, &head);
 | 
						|
    _PyRawMutex_Unlock(&bucket->mutex);
 | 
						|
 | 
						|
    struct llist_node *node;
 | 
						|
    llist_for_each_safe(node, &head) {
 | 
						|
        struct wait_entry *waiter = llist_data(node, struct wait_entry, node);
 | 
						|
        llist_remove(node);
 | 
						|
        _PySemaphore_Wakeup(&waiter->sema);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
_PyParkingLot_AfterFork(void)
 | 
						|
{
 | 
						|
    // After a fork only one thread remains. That thread cannot be blocked
 | 
						|
    // so all entries in the parking lot are for dead threads.
 | 
						|
    memset(buckets, 0, sizeof(buckets));
 | 
						|
    for (Py_ssize_t i = 0; i < NUM_BUCKETS; i++) {
 | 
						|
        llist_init(&buckets[i].root);
 | 
						|
    }
 | 
						|
}
 |