mirror of
https://github.com/python/cpython.git
synced 2026-04-14 07:41:00 +00:00
gh-120321: Make gi_yieldfrom thread-safe in free-threading build (#144292)
Add a FRAME_SUSPENDED_YIELD_FROM_LOCKED state that acts as a brief lock, preventing other threads from transitioning the frame state while gen_getyieldfrom reads the yield-from object off the stack.
This commit is contained in:
parent
a7048327ed
commit
a01694dacd
9 changed files with 103 additions and 19 deletions
|
|
@ -44,15 +44,16 @@ extern PyFrameObject* _PyFrame_New_NoTrack(PyCodeObject *code);
|
|||
/* other API */
|
||||
|
||||
typedef enum _framestate {
|
||||
FRAME_CREATED = -3,
|
||||
FRAME_SUSPENDED = -2,
|
||||
FRAME_SUSPENDED_YIELD_FROM = -1,
|
||||
FRAME_CREATED = -4,
|
||||
FRAME_SUSPENDED = -3,
|
||||
FRAME_SUSPENDED_YIELD_FROM = -2,
|
||||
FRAME_SUSPENDED_YIELD_FROM_LOCKED = -1,
|
||||
FRAME_EXECUTING = 0,
|
||||
FRAME_COMPLETED = 1,
|
||||
FRAME_CLEARED = 4
|
||||
} PyFrameState;
|
||||
|
||||
#define FRAME_STATE_SUSPENDED(S) ((S) == FRAME_SUSPENDED || (S) == FRAME_SUSPENDED_YIELD_FROM)
|
||||
#define FRAME_STATE_SUSPENDED(S) ((S) >= FRAME_SUSPENDED && (S) <= FRAME_SUSPENDED_YIELD_FROM_LOCKED)
|
||||
#define FRAME_STATE_FINISHED(S) ((S) >= FRAME_COMPLETED)
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
|||
|
|
@ -70,6 +70,9 @@ PyMutex_LockFlags(PyMutex *m, _PyLockFlags flags)
|
|||
// error messages) otherwise returns 0.
|
||||
extern int _PyMutex_TryUnlock(PyMutex *m);
|
||||
|
||||
// Yield the processor to other threads (e.g., sched_yield).
|
||||
extern void _Py_yield(void);
|
||||
|
||||
|
||||
// PyEvent is a one-time event notification
|
||||
typedef struct {
|
||||
|
|
|
|||
|
|
@ -250,21 +250,32 @@ def requires_working_threading(*, module=False):
|
|||
return unittest.skipUnless(can_start_thread, msg)
|
||||
|
||||
|
||||
def run_concurrently(worker_func, nthreads, args=(), kwargs={}):
|
||||
def run_concurrently(worker_func, nthreads=None, args=(), kwargs={}):
|
||||
"""
|
||||
Run the worker function concurrently in multiple threads.
|
||||
Run the worker function(s) concurrently in multiple threads.
|
||||
|
||||
If `worker_func` is a single callable, it is used for all threads.
|
||||
If it is a list of callables, each callable is used for one thread.
|
||||
"""
|
||||
from collections.abc import Iterable
|
||||
|
||||
if nthreads is None:
|
||||
nthreads = len(worker_func)
|
||||
if not isinstance(worker_func, Iterable):
|
||||
worker_func = [worker_func] * nthreads
|
||||
assert len(worker_func) == nthreads
|
||||
|
||||
barrier = threading.Barrier(nthreads)
|
||||
|
||||
def wrapper_func(*args, **kwargs):
|
||||
def wrapper_func(func, *args, **kwargs):
|
||||
# Wait for all threads to reach this point before proceeding.
|
||||
barrier.wait()
|
||||
worker_func(*args, **kwargs)
|
||||
func(*args, **kwargs)
|
||||
|
||||
with catch_threading_exception() as cm:
|
||||
workers = [
|
||||
threading.Thread(target=wrapper_func, args=args, kwargs=kwargs)
|
||||
for _ in range(nthreads)
|
||||
threading.Thread(target=wrapper_func, args=(func, *args), kwargs=kwargs)
|
||||
for func in worker_func
|
||||
]
|
||||
with start_threads(workers):
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -1,4 +1,6 @@
|
|||
import concurrent.futures
|
||||
import itertools
|
||||
import threading
|
||||
import unittest
|
||||
from threading import Barrier
|
||||
from unittest import TestCase
|
||||
|
|
@ -120,3 +122,38 @@ def drive_generator(g):
|
|||
|
||||
g = gen()
|
||||
threading_helper.run_concurrently(drive_generator, self.NUM_THREADS, args=(g,))
|
||||
|
||||
def test_concurrent_gi_yieldfrom(self):
|
||||
def gen_yield_from():
|
||||
yield from itertools.count()
|
||||
|
||||
g = gen_yield_from()
|
||||
next(g) # Put in FRAME_SUSPENDED_YIELD_FROM state
|
||||
|
||||
def read_yieldfrom(gen):
|
||||
for _ in range(10000):
|
||||
self.assertIsNotNone(gen.gi_yieldfrom)
|
||||
|
||||
threading_helper.run_concurrently(read_yieldfrom, self.NUM_THREADS, args=(g,))
|
||||
|
||||
def test_gi_yieldfrom_close_race(self):
|
||||
def gen_yield_from():
|
||||
yield from itertools.count()
|
||||
|
||||
g = gen_yield_from()
|
||||
next(g)
|
||||
|
||||
done = threading.Event()
|
||||
|
||||
def reader():
|
||||
while not done.is_set():
|
||||
g.gi_yieldfrom
|
||||
|
||||
def closer():
|
||||
try:
|
||||
g.close()
|
||||
except ValueError:
|
||||
pass
|
||||
done.set()
|
||||
|
||||
threading_helper.run_concurrently([reader, closer])
|
||||
|
|
|
|||
|
|
@ -0,0 +1,2 @@
|
|||
Made ``gi_yieldfrom`` thread-safe in the free-threading build
|
||||
by using a lightweight lock on the frame state.
|
||||
|
|
@ -10,6 +10,7 @@
|
|||
#include "pycore_gc.h" // _PyGC_CLEAR_FINALIZED()
|
||||
#include "pycore_genobject.h" // _PyGen_SetStopIterationValue()
|
||||
#include "pycore_interpframe.h" // _PyFrame_GetCode()
|
||||
#include "pycore_lock.h" // _Py_yield()
|
||||
#include "pycore_modsupport.h" // _PyArg_CheckPositional()
|
||||
#include "pycore_object.h" // _PyObject_GC_UNTRACK()
|
||||
#include "pycore_opcode_utils.h" // RESUME_AFTER_YIELD_FROM
|
||||
|
|
@ -37,8 +38,20 @@ static PyObject* async_gen_athrow_new(PyAsyncGenObject *, PyObject *);
|
|||
_Py_CAST(PyAsyncGenObject*, (op))
|
||||
|
||||
#ifdef Py_GIL_DISABLED
|
||||
static bool
|
||||
gen_try_set_frame_state(PyGenObject *gen, int8_t *expected, int8_t state)
|
||||
{
|
||||
if (*expected == FRAME_SUSPENDED_YIELD_FROM_LOCKED) {
|
||||
// Wait for the in-progress gi_yieldfrom read to complete
|
||||
_Py_yield();
|
||||
*expected = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
|
||||
return false;
|
||||
}
|
||||
return _Py_atomic_compare_exchange_int8(&gen->gi_frame_state, expected, state);
|
||||
}
|
||||
|
||||
# define _Py_GEN_TRY_SET_FRAME_STATE(gen, expected, state) \
|
||||
_Py_atomic_compare_exchange_int8(&(gen)->gi_frame_state, &expected, (state))
|
||||
gen_try_set_frame_state((gen), &(expected), (state))
|
||||
#else
|
||||
# define _Py_GEN_TRY_SET_FRAME_STATE(gen, expected, state) \
|
||||
((gen)->gi_frame_state = (state), true)
|
||||
|
|
@ -470,9 +483,7 @@ gen_close(PyObject *self, PyObject *args)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
assert(frame_state == FRAME_SUSPENDED_YIELD_FROM ||
|
||||
frame_state == FRAME_SUSPENDED);
|
||||
|
||||
assert(FRAME_STATE_SUSPENDED(frame_state));
|
||||
} while (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_EXECUTING));
|
||||
|
||||
int err = 0;
|
||||
|
|
@ -876,12 +887,26 @@ static PyObject *
|
|||
gen_getyieldfrom(PyObject *self, void *Py_UNUSED(ignored))
|
||||
{
|
||||
PyGenObject *gen = _PyGen_CAST(self);
|
||||
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
|
||||
#ifdef Py_GIL_DISABLED
|
||||
int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
|
||||
do {
|
||||
if (frame_state != FRAME_SUSPENDED_YIELD_FROM &&
|
||||
frame_state != FRAME_SUSPENDED_YIELD_FROM_LOCKED)
|
||||
{
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
} while (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_SUSPENDED_YIELD_FROM_LOCKED));
|
||||
|
||||
PyObject *result = PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe));
|
||||
_Py_atomic_store_int8_release(&gen->gi_frame_state, FRAME_SUSPENDED_YIELD_FROM);
|
||||
return result;
|
||||
#else
|
||||
int8_t frame_state = gen->gi_frame_state;
|
||||
if (frame_state != FRAME_SUSPENDED_YIELD_FROM) {
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
// TODO: still not thread-safe with free threading
|
||||
return PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe));
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -3391,7 +3391,9 @@ _PyEval_GetAwaitable(PyObject *iterable, int oparg)
|
|||
else if (PyCoro_CheckExact(iter)) {
|
||||
PyCoroObject *coro = (PyCoroObject *)iter;
|
||||
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(coro->cr_frame_state);
|
||||
if (frame_state == FRAME_SUSPENDED_YIELD_FROM) {
|
||||
if (frame_state == FRAME_SUSPENDED_YIELD_FROM ||
|
||||
frame_state == FRAME_SUSPENDED_YIELD_FROM_LOCKED)
|
||||
{
|
||||
/* `iter` is a coroutine object that is being awaited. */
|
||||
Py_CLEAR(iter);
|
||||
_PyErr_SetString(PyThreadState_GET(), PyExc_RuntimeError,
|
||||
|
|
|
|||
|
|
@ -522,19 +522,22 @@ gen_try_set_executing(PyGenObject *gen)
|
|||
#ifdef Py_GIL_DISABLED
|
||||
if (!_PyObject_IsUniquelyReferenced((PyObject *)gen)) {
|
||||
int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
|
||||
while (frame_state < FRAME_EXECUTING) {
|
||||
while (frame_state < FRAME_SUSPENDED_YIELD_FROM_LOCKED) {
|
||||
if (_Py_atomic_compare_exchange_int8(&gen->gi_frame_state,
|
||||
&frame_state,
|
||||
FRAME_EXECUTING)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// NB: We return false for FRAME_SUSPENDED_YIELD_FROM_LOCKED as well.
|
||||
// That case is rare enough that we can just handle it in the deopt.
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
// Use faster non-atomic modifications in the GIL-enabled build and when
|
||||
// the object is uniquely referenced in the free-threaded build.
|
||||
if (gen->gi_frame_state < FRAME_EXECUTING) {
|
||||
assert(gen->gi_frame_state != FRAME_SUSPENDED_YIELD_FROM_LOCKED);
|
||||
gen->gi_frame_state = FRAME_EXECUTING;
|
||||
return true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ struct mutex_entry {
|
|||
int handed_off;
|
||||
};
|
||||
|
||||
static void
|
||||
void
|
||||
_Py_yield(void)
|
||||
{
|
||||
#ifdef MS_WINDOWS
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue