mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 03:04:41 +00:00 
			
		
		
		
	 80ba17a3dd
			
		
	
	
		80ba17a3dd
		
			
		
	
	
	
	
		
			
			gh-112804: Clamping timeout value for _PySemaphore_PlatformWait (gh-124914)
* gh-112804: Clamping timeout value for _PySemaphore_PlatformWait
* Address code review
* nit
(cherry picked from commit a5fc50994a)
Co-authored-by: Donghee Na <donghee.na@python.org>
		
	
			
		
			
				
	
	
		
			401 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			401 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| #include "Python.h"
 | |
| 
 | |
| #include "pycore_llist.h"
 | |
| #include "pycore_lock.h"          // _PyRawMutex
 | |
| #include "pycore_parking_lot.h"
 | |
| #include "pycore_pyerrors.h"      // _Py_FatalErrorFormat
 | |
| #include "pycore_pystate.h"       // _PyThreadState_GET
 | |
| #include "pycore_semaphore.h"     // _PySemaphore
 | |
| #include "pycore_time.h"          // _PyTime_Add()
 | |
| 
 | |
| #include <stdbool.h>
 | |
| 
 | |
| 
 | |
| typedef struct {
 | |
|     // The mutex protects the waiter queue and the num_waiters counter.
 | |
|     _PyRawMutex mutex;
 | |
| 
 | |
|     // Linked list of `struct wait_entry` waiters in this bucket.
 | |
|     struct llist_node root;
 | |
|     size_t num_waiters;
 | |
| } Bucket;
 | |
| 
 | |
| struct wait_entry {
 | |
|     void *park_arg;
 | |
|     uintptr_t addr;
 | |
|     _PySemaphore sema;
 | |
|     struct llist_node node;
 | |
|     bool is_unparking;
 | |
| };
 | |
| 
 | |
| // Prime number to avoid correlations with memory addresses.
 | |
| // We want this to be roughly proportional to the number of CPU cores
 | |
| // to minimize contention on the bucket locks, but not too big to avoid
 | |
| // wasting memory. The exact choice does not matter much.
 | |
| #define NUM_BUCKETS 257
 | |
| 
 | |
| #define BUCKET_INIT(b, i) [i] = { .root = LLIST_INIT(b[i].root) }
 | |
| #define BUCKET_INIT_2(b, i)   BUCKET_INIT(b, i),     BUCKET_INIT(b, i+1)
 | |
| #define BUCKET_INIT_4(b, i)   BUCKET_INIT_2(b, i),   BUCKET_INIT_2(b, i+2)
 | |
| #define BUCKET_INIT_8(b, i)   BUCKET_INIT_4(b, i),   BUCKET_INIT_4(b, i+4)
 | |
| #define BUCKET_INIT_16(b, i)  BUCKET_INIT_8(b, i),   BUCKET_INIT_8(b, i+8)
 | |
| #define BUCKET_INIT_32(b, i)  BUCKET_INIT_16(b, i),  BUCKET_INIT_16(b, i+16)
 | |
| #define BUCKET_INIT_64(b, i)  BUCKET_INIT_32(b, i),  BUCKET_INIT_32(b, i+32)
 | |
| #define BUCKET_INIT_128(b, i) BUCKET_INIT_64(b, i),  BUCKET_INIT_64(b, i+64)
 | |
| #define BUCKET_INIT_256(b, i) BUCKET_INIT_128(b, i), BUCKET_INIT_128(b, i+128)
 | |
| 
 | |
| // Table of waiters (hashed by address)
 | |
| static Bucket buckets[NUM_BUCKETS] = {
 | |
|     BUCKET_INIT_256(buckets, 0),
 | |
|     BUCKET_INIT(buckets, 256),
 | |
| };
 | |
| 
 | |
| void
 | |
| _PySemaphore_Init(_PySemaphore *sema)
 | |
| {
 | |
| #if defined(MS_WINDOWS)
 | |
|     sema->platform_sem = CreateSemaphore(
 | |
|         NULL,   //  attributes
 | |
|         0,      //  initial count
 | |
|         10,     //  maximum count
 | |
|         NULL    //  unnamed
 | |
|     );
 | |
|     if (!sema->platform_sem) {
 | |
|         Py_FatalError("parking_lot: CreateSemaphore failed");
 | |
|     }
 | |
| #elif defined(_Py_USE_SEMAPHORES)
 | |
|     if (sem_init(&sema->platform_sem, /*pshared=*/0, /*value=*/0) < 0) {
 | |
|         Py_FatalError("parking_lot: sem_init failed");
 | |
|     }
 | |
| #else
 | |
|     if (pthread_mutex_init(&sema->mutex, NULL) != 0) {
 | |
|         Py_FatalError("parking_lot: pthread_mutex_init failed");
 | |
|     }
 | |
|     if (pthread_cond_init(&sema->cond, NULL)) {
 | |
|         Py_FatalError("parking_lot: pthread_cond_init failed");
 | |
|     }
 | |
|     sema->counter = 0;
 | |
| #endif
 | |
| }
 | |
| 
 | |
| void
 | |
| _PySemaphore_Destroy(_PySemaphore *sema)
 | |
| {
 | |
| #if defined(MS_WINDOWS)
 | |
|     CloseHandle(sema->platform_sem);
 | |
| #elif defined(_Py_USE_SEMAPHORES)
 | |
|     sem_destroy(&sema->platform_sem);
 | |
| #else
 | |
|     pthread_mutex_destroy(&sema->mutex);
 | |
|     pthread_cond_destroy(&sema->cond);
 | |
| #endif
 | |
| }
 | |
| 
 | |
| static int
 | |
| _PySemaphore_PlatformWait(_PySemaphore *sema, PyTime_t timeout)
 | |
| {
 | |
|     int res;
 | |
| #if defined(MS_WINDOWS)
 | |
|     DWORD wait;
 | |
|     DWORD millis = 0;
 | |
|     if (timeout < 0) {
 | |
|         millis = INFINITE;
 | |
|     }
 | |
|     else {
 | |
|         PyTime_t div = _PyTime_AsMilliseconds(timeout, _PyTime_ROUND_TIMEOUT);
 | |
|         // Prevent overflow with clamping the result
 | |
|         if ((PyTime_t)PY_DWORD_MAX < div) {
 | |
|             millis = PY_DWORD_MAX;
 | |
|         }
 | |
|         else {
 | |
|             millis = (DWORD) div;
 | |
|         }
 | |
|     }
 | |
|     wait = WaitForSingleObjectEx(sema->platform_sem, millis, FALSE);
 | |
|     if (wait == WAIT_OBJECT_0) {
 | |
|         res = Py_PARK_OK;
 | |
|     }
 | |
|     else if (wait == WAIT_TIMEOUT) {
 | |
|         res = Py_PARK_TIMEOUT;
 | |
|     }
 | |
|     else {
 | |
|         res = Py_PARK_INTR;
 | |
|     }
 | |
| #elif defined(_Py_USE_SEMAPHORES)
 | |
|     int err;
 | |
|     if (timeout >= 0) {
 | |
|         struct timespec ts;
 | |
| 
 | |
| #if defined(CLOCK_MONOTONIC) && defined(HAVE_SEM_CLOCKWAIT) && !defined(_Py_THREAD_SANITIZER)
 | |
|         PyTime_t now;
 | |
|         // silently ignore error: cannot report error to the caller
 | |
|         (void)PyTime_MonotonicRaw(&now);
 | |
|         PyTime_t deadline = _PyTime_Add(now, timeout);
 | |
|         _PyTime_AsTimespec_clamp(deadline, &ts);
 | |
| 
 | |
|         err = sem_clockwait(&sema->platform_sem, CLOCK_MONOTONIC, &ts);
 | |
| #else
 | |
|         PyTime_t now;
 | |
|         // silently ignore error: cannot report error to the caller
 | |
|         (void)PyTime_TimeRaw(&now);
 | |
|         PyTime_t deadline = _PyTime_Add(now, timeout);
 | |
| 
 | |
|         _PyTime_AsTimespec_clamp(deadline, &ts);
 | |
| 
 | |
|         err = sem_timedwait(&sema->platform_sem, &ts);
 | |
| #endif
 | |
|     }
 | |
|     else {
 | |
|         err = sem_wait(&sema->platform_sem);
 | |
|     }
 | |
|     if (err == -1) {
 | |
|         err = errno;
 | |
|         if (err == EINTR) {
 | |
|             res = Py_PARK_INTR;
 | |
|         }
 | |
|         else if (err == ETIMEDOUT) {
 | |
|             res = Py_PARK_TIMEOUT;
 | |
|         }
 | |
|         else {
 | |
|             _Py_FatalErrorFormat(__func__,
 | |
|                 "unexpected error from semaphore: %d",
 | |
|                 err);
 | |
|         }
 | |
|     }
 | |
|     else {
 | |
|         res = Py_PARK_OK;
 | |
|     }
 | |
| #else
 | |
|     pthread_mutex_lock(&sema->mutex);
 | |
|     int err = 0;
 | |
|     if (sema->counter == 0) {
 | |
|         if (timeout >= 0) {
 | |
|             struct timespec ts;
 | |
| #if defined(HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP)
 | |
|             _PyTime_AsTimespec_clamp(timeout, &ts);
 | |
|             err = pthread_cond_timedwait_relative_np(&sema->cond, &sema->mutex, &ts);
 | |
| #else
 | |
|             PyTime_t now;
 | |
|             (void)PyTime_TimeRaw(&now);
 | |
|             PyTime_t deadline = _PyTime_Add(now, timeout);
 | |
|             _PyTime_AsTimespec_clamp(deadline, &ts);
 | |
| 
 | |
|             err = pthread_cond_timedwait(&sema->cond, &sema->mutex, &ts);
 | |
| #endif // HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP
 | |
|         }
 | |
|         else {
 | |
|             err = pthread_cond_wait(&sema->cond, &sema->mutex);
 | |
|         }
 | |
|     }
 | |
|     if (sema->counter > 0) {
 | |
|         sema->counter--;
 | |
|         res = Py_PARK_OK;
 | |
|     }
 | |
|     else if (err) {
 | |
|         res = Py_PARK_TIMEOUT;
 | |
|     }
 | |
|     else {
 | |
|         res = Py_PARK_INTR;
 | |
|     }
 | |
|     pthread_mutex_unlock(&sema->mutex);
 | |
| #endif
 | |
|     return res;
 | |
| }
 | |
| 
 | |
| int
 | |
| _PySemaphore_Wait(_PySemaphore *sema, PyTime_t timeout, int detach)
 | |
| {
 | |
|     PyThreadState *tstate = NULL;
 | |
|     if (detach) {
 | |
|         tstate = _PyThreadState_GET();
 | |
|         if (tstate && _Py_atomic_load_int_relaxed(&tstate->state) ==
 | |
|                           _Py_THREAD_ATTACHED) {
 | |
|             // Only detach if we are attached
 | |
|             PyEval_ReleaseThread(tstate);
 | |
|         }
 | |
|         else {
 | |
|             tstate = NULL;
 | |
|         }
 | |
|     }
 | |
|     int res = _PySemaphore_PlatformWait(sema, timeout);
 | |
|     if (tstate) {
 | |
|         PyEval_AcquireThread(tstate);
 | |
|     }
 | |
|     return res;
 | |
| }
 | |
| 
 | |
| void
 | |
| _PySemaphore_Wakeup(_PySemaphore *sema)
 | |
| {
 | |
| #if defined(MS_WINDOWS)
 | |
|     if (!ReleaseSemaphore(sema->platform_sem, 1, NULL)) {
 | |
|         Py_FatalError("parking_lot: ReleaseSemaphore failed");
 | |
|     }
 | |
| #elif defined(_Py_USE_SEMAPHORES)
 | |
|     int err = sem_post(&sema->platform_sem);
 | |
|     if (err != 0) {
 | |
|         Py_FatalError("parking_lot: sem_post failed");
 | |
|     }
 | |
| #else
 | |
|     pthread_mutex_lock(&sema->mutex);
 | |
|     sema->counter++;
 | |
|     pthread_cond_signal(&sema->cond);
 | |
|     pthread_mutex_unlock(&sema->mutex);
 | |
| #endif
 | |
| }
 | |
| 
 | |
| static void
 | |
| enqueue(Bucket *bucket, const void *address, struct wait_entry *wait)
 | |
| {
 | |
|     llist_insert_tail(&bucket->root, &wait->node);
 | |
|     ++bucket->num_waiters;
 | |
| }
 | |
| 
 | |
| static struct wait_entry *
 | |
| dequeue(Bucket *bucket, const void *address)
 | |
| {
 | |
|     // find the first waiter that is waiting on `address`
 | |
|     struct llist_node *root = &bucket->root;
 | |
|     struct llist_node *node;
 | |
|     llist_for_each(node, root) {
 | |
|         struct wait_entry *wait = llist_data(node, struct wait_entry, node);
 | |
|         if (wait->addr == (uintptr_t)address) {
 | |
|             llist_remove(node);
 | |
|             --bucket->num_waiters;
 | |
|             wait->is_unparking = true;
 | |
|             return wait;
 | |
|         }
 | |
|     }
 | |
|     return NULL;
 | |
| }
 | |
| 
 | |
| static void
 | |
| dequeue_all(Bucket *bucket, const void *address, struct llist_node *dst)
 | |
| {
 | |
|     // remove and append all matching waiters to dst
 | |
|     struct llist_node *root = &bucket->root;
 | |
|     struct llist_node *node;
 | |
|     llist_for_each_safe(node, root) {
 | |
|         struct wait_entry *wait = llist_data(node, struct wait_entry, node);
 | |
|         if (wait->addr == (uintptr_t)address) {
 | |
|             llist_remove(node);
 | |
|             llist_insert_tail(dst, node);
 | |
|             --bucket->num_waiters;
 | |
|             wait->is_unparking = true;
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| // Checks that `*addr == *expected` (only works for 1, 2, 4, or 8 bytes)
 | |
| static int
 | |
| atomic_memcmp(const void *addr, const void *expected, size_t addr_size)
 | |
| {
 | |
|     switch (addr_size) {
 | |
|     case 1: return _Py_atomic_load_uint8(addr) == *(const uint8_t *)expected;
 | |
|     case 2: return _Py_atomic_load_uint16(addr) == *(const uint16_t *)expected;
 | |
|     case 4: return _Py_atomic_load_uint32(addr) == *(const uint32_t *)expected;
 | |
|     case 8: return _Py_atomic_load_uint64(addr) == *(const uint64_t *)expected;
 | |
|     default: Py_UNREACHABLE();
 | |
|     }
 | |
| }
 | |
| 
 | |
| int
 | |
| _PyParkingLot_Park(const void *addr, const void *expected, size_t size,
 | |
|                    PyTime_t timeout_ns, void *park_arg, int detach)
 | |
| {
 | |
|     struct wait_entry wait = {
 | |
|         .park_arg = park_arg,
 | |
|         .addr = (uintptr_t)addr,
 | |
|         .is_unparking = false,
 | |
|     };
 | |
| 
 | |
|     Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
 | |
| 
 | |
|     _PyRawMutex_Lock(&bucket->mutex);
 | |
|     if (!atomic_memcmp(addr, expected, size)) {
 | |
|         _PyRawMutex_Unlock(&bucket->mutex);
 | |
|         return Py_PARK_AGAIN;
 | |
|     }
 | |
|     _PySemaphore_Init(&wait.sema);
 | |
|     enqueue(bucket, addr, &wait);
 | |
|     _PyRawMutex_Unlock(&bucket->mutex);
 | |
| 
 | |
|     int res = _PySemaphore_Wait(&wait.sema, timeout_ns, detach);
 | |
|     if (res == Py_PARK_OK) {
 | |
|         goto done;
 | |
|     }
 | |
| 
 | |
|     // timeout or interrupt
 | |
|     _PyRawMutex_Lock(&bucket->mutex);
 | |
|     if (wait.is_unparking) {
 | |
|         _PyRawMutex_Unlock(&bucket->mutex);
 | |
|         // Another thread has started to unpark us. Wait until we process the
 | |
|         // wakeup signal.
 | |
|         do {
 | |
|             res = _PySemaphore_Wait(&wait.sema, -1, detach);
 | |
|         } while (res != Py_PARK_OK);
 | |
|         goto done;
 | |
|     }
 | |
|     else {
 | |
|         llist_remove(&wait.node);
 | |
|         --bucket->num_waiters;
 | |
|     }
 | |
|     _PyRawMutex_Unlock(&bucket->mutex);
 | |
| 
 | |
| done:
 | |
|     _PySemaphore_Destroy(&wait.sema);
 | |
|     return res;
 | |
| 
 | |
| }
 | |
| 
 | |
| void
 | |
| _PyParkingLot_Unpark(const void *addr, _Py_unpark_fn_t *fn, void *arg)
 | |
| {
 | |
|     Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
 | |
| 
 | |
|     // Find the first waiter that is waiting on `addr`
 | |
|     _PyRawMutex_Lock(&bucket->mutex);
 | |
|     struct wait_entry *waiter = dequeue(bucket, addr);
 | |
|     if (waiter) {
 | |
|         int has_more_waiters = (bucket->num_waiters > 0);
 | |
|         fn(arg, waiter->park_arg, has_more_waiters);
 | |
|     }
 | |
|     else {
 | |
|         fn(arg, NULL, 0);
 | |
|     }
 | |
|     _PyRawMutex_Unlock(&bucket->mutex);
 | |
| 
 | |
|     if (waiter) {
 | |
|         // Wakeup the waiter outside of the bucket lock
 | |
|         _PySemaphore_Wakeup(&waiter->sema);
 | |
|     }
 | |
| }
 | |
| 
 | |
| void
 | |
| _PyParkingLot_UnparkAll(const void *addr)
 | |
| {
 | |
|     struct llist_node head = LLIST_INIT(head);
 | |
|     Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
 | |
| 
 | |
|     _PyRawMutex_Lock(&bucket->mutex);
 | |
|     dequeue_all(bucket, addr, &head);
 | |
|     _PyRawMutex_Unlock(&bucket->mutex);
 | |
| 
 | |
|     struct llist_node *node;
 | |
|     llist_for_each_safe(node, &head) {
 | |
|         struct wait_entry *waiter = llist_data(node, struct wait_entry, node);
 | |
|         llist_remove(node);
 | |
|         _PySemaphore_Wakeup(&waiter->sema);
 | |
|     }
 | |
| }
 | |
| 
 | |
| void
 | |
| _PyParkingLot_AfterFork(void)
 | |
| {
 | |
|     // After a fork only one thread remains. That thread cannot be blocked
 | |
|     // so all entries in the parking lot are for dead threads.
 | |
|     memset(buckets, 0, sizeof(buckets));
 | |
|     for (Py_ssize_t i = 0; i < NUM_BUCKETS; i++) {
 | |
|         llist_init(&buckets[i].root);
 | |
|     }
 | |
| }
 |