mirror of
https://github.com/python/cpython.git
synced 2026-01-06 15:32:22 +00:00
gh-136003: Execute pre-finalization callbacks in a loop (GH-136004)
This commit is contained in:
parent
d6a6fe2a5b
commit
2191497933
9 changed files with 281 additions and 43 deletions
|
|
@ -107,6 +107,7 @@ struct _ts {
|
|||
# define _PyThreadState_WHENCE_THREADING 3
|
||||
# define _PyThreadState_WHENCE_GILSTATE 4
|
||||
# define _PyThreadState_WHENCE_EXEC 5
|
||||
# define _PyThreadState_WHENCE_THREADING_DAEMON 6
|
||||
#endif
|
||||
|
||||
/* Currently holds the GIL. Must be its own field to avoid data races */
|
||||
|
|
|
|||
|
|
@ -79,6 +79,62 @@ def thready():
|
|||
# want them to affect the rest of the tests.
|
||||
script_helper.assert_python_ok("-c", textwrap.dedent(source))
|
||||
|
||||
@threading_helper.requires_working_threading()
|
||||
def test_thread_created_in_atexit(self):
|
||||
source = """if True:
|
||||
import atexit
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
||||
def run():
|
||||
print(24)
|
||||
time.sleep(1)
|
||||
print(42)
|
||||
|
||||
@atexit.register
|
||||
def start_thread():
|
||||
threading.Thread(target=run).start()
|
||||
"""
|
||||
return_code, stdout, stderr = script_helper.assert_python_ok("-c", source)
|
||||
self.assertEqual(return_code, 0)
|
||||
self.assertEqual(stdout, f"24{os.linesep}42{os.linesep}".encode("utf-8"))
|
||||
self.assertEqual(stderr, b"")
|
||||
|
||||
@threading_helper.requires_working_threading()
|
||||
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
|
||||
def test_thread_created_in_atexit_subinterpreter(self):
|
||||
try:
|
||||
from concurrent import interpreters
|
||||
except ImportError:
|
||||
self.skipTest("subinterpreters are not available")
|
||||
|
||||
read, write = os.pipe()
|
||||
source = f"""if True:
|
||||
import atexit
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
|
||||
def run():
|
||||
os.write({write}, b'spanish')
|
||||
time.sleep(1)
|
||||
os.write({write}, b'inquisition')
|
||||
|
||||
@atexit.register
|
||||
def start_thread():
|
||||
threading.Thread(target=run).start()
|
||||
"""
|
||||
interp = interpreters.create()
|
||||
try:
|
||||
interp.exec(source)
|
||||
|
||||
# Close the interpreter to invoke atexit callbacks
|
||||
interp.close()
|
||||
self.assertEqual(os.read(read, 100), b"spanishinquisition")
|
||||
finally:
|
||||
os.close(read)
|
||||
os.close(write)
|
||||
|
||||
@support.cpython_only
|
||||
class SubinterpreterTest(unittest.TestCase):
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@
|
|||
from test import support
|
||||
from test.support import MISSING_C_DOCSTRINGS
|
||||
from test.support import import_helper
|
||||
from test.support import script_helper
|
||||
from test.support import threading_helper
|
||||
from test.support import warnings_helper
|
||||
from test.support import requires_limited_api
|
||||
|
|
@ -1641,6 +1642,36 @@ def subthread():
|
|||
|
||||
self.assertEqual(actual, int(interpid))
|
||||
|
||||
@threading_helper.requires_working_threading()
|
||||
def test_pending_call_creates_thread(self):
|
||||
source = """
|
||||
import _testinternalcapi
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
||||
def output():
|
||||
print(24)
|
||||
time.sleep(1)
|
||||
print(42)
|
||||
|
||||
|
||||
def callback():
|
||||
threading.Thread(target=output).start()
|
||||
|
||||
|
||||
def create_pending_call():
|
||||
time.sleep(1)
|
||||
_testinternalcapi.simple_pending_call(callback)
|
||||
|
||||
|
||||
threading.Thread(target=create_pending_call).start()
|
||||
"""
|
||||
return_code, stdout, stderr = script_helper.assert_python_ok('-c', textwrap.dedent(source))
|
||||
self.assertEqual(return_code, 0)
|
||||
self.assertEqual(stdout, f"24{os.linesep}42{os.linesep}".encode("utf-8"))
|
||||
self.assertEqual(stderr, b"")
|
||||
|
||||
|
||||
class SubinterpreterTest(unittest.TestCase):
|
||||
|
||||
|
|
@ -1949,6 +1980,41 @@ def test_module_state_shared_in_global(self):
|
|||
subinterp_attr_id = os.read(r, 100)
|
||||
self.assertEqual(main_attr_id, subinterp_attr_id)
|
||||
|
||||
@threading_helper.requires_working_threading()
|
||||
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
|
||||
@requires_subinterpreters
|
||||
def test_pending_call_creates_thread_subinterpreter(self):
|
||||
interpreters = import_helper.import_module("concurrent.interpreters")
|
||||
r, w = os.pipe()
|
||||
source = f"""if True:
|
||||
import _testinternalcapi
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
|
||||
|
||||
def output():
|
||||
time.sleep(1)
|
||||
os.write({w}, b"x")
|
||||
|
||||
|
||||
def callback():
|
||||
threading.Thread(target=output).start()
|
||||
|
||||
|
||||
def create_pending_call():
|
||||
time.sleep(1)
|
||||
_testinternalcapi.simple_pending_call(callback)
|
||||
|
||||
|
||||
threading.Thread(target=create_pending_call).start()
|
||||
"""
|
||||
interp = interpreters.create()
|
||||
interp.exec(source)
|
||||
interp.close()
|
||||
data = os.read(r, 1)
|
||||
self.assertEqual(data, b"x")
|
||||
|
||||
|
||||
@requires_subinterpreters
|
||||
class InterpreterConfigTests(unittest.TestCase):
|
||||
|
|
|
|||
|
|
@ -1557,8 +1557,9 @@ def _shutdown():
|
|||
# normally - that won't happen until the interpreter is nearly dead. So
|
||||
# mark it done here.
|
||||
if _main_thread._os_thread_handle.is_done() and _is_main_interpreter():
|
||||
# _shutdown() was already called
|
||||
return
|
||||
# _shutdown() was already called, but threads might have started
|
||||
# in the meantime.
|
||||
return _thread_shutdown()
|
||||
|
||||
global _SHUTTING_DOWN
|
||||
_SHUTTING_DOWN = True
|
||||
|
|
|
|||
|
|
@ -0,0 +1,3 @@
|
|||
Fix :class:`threading.Thread` objects becoming incorrectly daemon when
|
||||
created from an :mod:`atexit` callback or a pending call
|
||||
(:c:func:`Py_AddPendingCall`).
|
||||
|
|
@ -2376,6 +2376,16 @@ emscripten_set_up_async_input_device(PyObject *self, PyObject *Py_UNUSED(ignored
|
|||
}
|
||||
#endif
|
||||
|
||||
static PyObject *
|
||||
simple_pending_call(PyObject *self, PyObject *callable)
|
||||
{
|
||||
if (_PyEval_AddPendingCall(_PyInterpreterState_GET(), _pending_callback, Py_NewRef(callable), 0) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyMethodDef module_functions[] = {
|
||||
{"get_configs", get_configs, METH_NOARGS},
|
||||
{"get_recursion_depth", get_recursion_depth, METH_NOARGS},
|
||||
|
|
@ -2481,6 +2491,7 @@ static PyMethodDef module_functions[] = {
|
|||
#ifdef __EMSCRIPTEN__
|
||||
{"emscripten_set_up_async_input_device", emscripten_set_up_async_input_device, METH_NOARGS},
|
||||
#endif
|
||||
{"simple_pending_call", simple_pending_call, METH_O},
|
||||
{NULL, NULL} /* sentinel */
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -429,7 +429,7 @@ force_done(void *arg)
|
|||
|
||||
static int
|
||||
ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args,
|
||||
PyObject *kwargs)
|
||||
PyObject *kwargs, int daemon)
|
||||
{
|
||||
// Mark the handle as starting to prevent any other threads from doing so
|
||||
PyMutex_Lock(&self->mutex);
|
||||
|
|
@ -453,7 +453,8 @@ ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args,
|
|||
goto start_failed;
|
||||
}
|
||||
PyInterpreterState *interp = _PyInterpreterState_GET();
|
||||
boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING);
|
||||
uint8_t whence = daemon ? _PyThreadState_WHENCE_THREADING_DAEMON : _PyThreadState_WHENCE_THREADING;
|
||||
boot->tstate = _PyThreadState_New(interp, whence);
|
||||
if (boot->tstate == NULL) {
|
||||
PyMem_RawFree(boot);
|
||||
if (!PyErr_Occurred()) {
|
||||
|
|
@ -1916,7 +1917,7 @@ do_start_new_thread(thread_module_state *state, PyObject *func, PyObject *args,
|
|||
add_to_shutdown_handles(state, handle);
|
||||
}
|
||||
|
||||
if (ThreadHandle_start(handle, func, args, kwargs) < 0) {
|
||||
if (ThreadHandle_start(handle, func, args, kwargs, daemon) < 0) {
|
||||
if (!daemon) {
|
||||
remove_from_shutdown_handles(handle);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2013,6 +2013,133 @@ resolve_final_tstate(_PyRuntimeState *runtime)
|
|||
return main_tstate;
|
||||
}
|
||||
|
||||
#ifdef Py_GIL_DISABLED
|
||||
#define ASSERT_WORLD_STOPPED(interp) assert(interp->runtime->stoptheworld.world_stopped)
|
||||
#else
|
||||
#define ASSERT_WORLD_STOPPED(interp)
|
||||
#endif
|
||||
|
||||
static int
|
||||
interp_has_threads(PyInterpreterState *interp)
|
||||
{
|
||||
/* This needs to check for non-daemon threads only, otherwise we get stuck
|
||||
* in an infinite loop. */
|
||||
assert(interp != NULL);
|
||||
ASSERT_WORLD_STOPPED(interp);
|
||||
assert(interp->threads.head != NULL);
|
||||
if (interp->threads.head->next == NULL) {
|
||||
// No other threads active, easy way out.
|
||||
return 0;
|
||||
}
|
||||
|
||||
// We don't have to worry about locking this because the
|
||||
// world is stopped.
|
||||
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, tstate) {
|
||||
if (tstate->_whence == _PyThreadState_WHENCE_THREADING) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
interp_has_pending_calls(PyInterpreterState *interp)
|
||||
{
|
||||
assert(interp != NULL);
|
||||
ASSERT_WORLD_STOPPED(interp);
|
||||
return interp->ceval.pending.npending != 0;
|
||||
}
|
||||
|
||||
static int
|
||||
interp_has_atexit_callbacks(PyInterpreterState *interp)
|
||||
{
|
||||
assert(interp != NULL);
|
||||
assert(interp->atexit.callbacks != NULL);
|
||||
ASSERT_WORLD_STOPPED(interp);
|
||||
assert(PyList_CheckExact(interp->atexit.callbacks));
|
||||
return PyList_GET_SIZE(interp->atexit.callbacks) != 0;
|
||||
}
|
||||
|
||||
static int
|
||||
runtime_has_subinterpreters(_PyRuntimeState *runtime)
|
||||
{
|
||||
assert(runtime != NULL);
|
||||
HEAD_LOCK(runtime);
|
||||
PyInterpreterState *interp = runtime->interpreters.head;
|
||||
HEAD_UNLOCK(runtime);
|
||||
return interp->next != NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
make_pre_finalization_calls(PyThreadState *tstate, int subinterpreters)
|
||||
{
|
||||
assert(tstate != NULL);
|
||||
PyInterpreterState *interp = tstate->interp;
|
||||
/* Each of these functions can start one another, e.g. a pending call
|
||||
* could start a thread or vice versa. To ensure that we properly clean
|
||||
* call everything, we run these in a loop until none of them run anything. */
|
||||
for (;;) {
|
||||
assert(!interp->runtime->stoptheworld.world_stopped);
|
||||
|
||||
// Wrap up existing "threading"-module-created, non-daemon threads.
|
||||
wait_for_thread_shutdown(tstate);
|
||||
|
||||
// Make any remaining pending calls.
|
||||
_Py_FinishPendingCalls(tstate);
|
||||
|
||||
/* The interpreter is still entirely intact at this point, and the
|
||||
* exit funcs may be relying on that. In particular, if some thread
|
||||
* or exit func is still waiting to do an import, the import machinery
|
||||
* expects Py_IsInitialized() to return true. So don't say the
|
||||
* runtime is uninitialized until after the exit funcs have run.
|
||||
* Note that Threading.py uses an exit func to do a join on all the
|
||||
* threads created thru it, so this also protects pending imports in
|
||||
* the threads created via Threading.
|
||||
*/
|
||||
|
||||
_PyAtExit_Call(tstate->interp);
|
||||
|
||||
if (subinterpreters) {
|
||||
/* Clean up any lingering subinterpreters.
|
||||
|
||||
Two preconditions need to be met here:
|
||||
|
||||
- This has to happen before _PyRuntimeState_SetFinalizing is
|
||||
called, or else threads might get prematurely blocked.
|
||||
- The world must not be stopped, as finalizers can run.
|
||||
*/
|
||||
finalize_subinterpreters();
|
||||
}
|
||||
|
||||
|
||||
/* Stop the world to prevent other threads from creating threads or
|
||||
* atexit callbacks. On the default build, this is simply locked by
|
||||
* the GIL. For pending calls, we acquire the dedicated mutex, because
|
||||
* Py_AddPendingCall() can be called without an attached thread state.
|
||||
*/
|
||||
|
||||
PyMutex_Lock(&interp->ceval.pending.mutex);
|
||||
// XXX Why does _PyThreadState_DeleteList() rely on all interpreters
|
||||
// being stopped?
|
||||
_PyEval_StopTheWorldAll(interp->runtime);
|
||||
int has_subinterpreters = subinterpreters
|
||||
? runtime_has_subinterpreters(interp->runtime)
|
||||
: 0;
|
||||
int should_continue = (interp_has_threads(interp)
|
||||
|| interp_has_atexit_callbacks(interp)
|
||||
|| interp_has_pending_calls(interp)
|
||||
|| has_subinterpreters);
|
||||
if (!should_continue) {
|
||||
break;
|
||||
}
|
||||
_PyEval_StartTheWorldAll(interp->runtime);
|
||||
PyMutex_Unlock(&interp->ceval.pending.mutex);
|
||||
}
|
||||
assert(PyMutex_IsLocked(&interp->ceval.pending.mutex));
|
||||
ASSERT_WORLD_STOPPED(interp);
|
||||
}
|
||||
|
||||
static int
|
||||
_Py_Finalize(_PyRuntimeState *runtime)
|
||||
{
|
||||
|
|
@ -2029,33 +2156,8 @@ _Py_Finalize(_PyRuntimeState *runtime)
|
|||
// Block some operations.
|
||||
tstate->interp->finalizing = 1;
|
||||
|
||||
// Wrap up existing "threading"-module-created, non-daemon threads.
|
||||
wait_for_thread_shutdown(tstate);
|
||||
|
||||
// Make any remaining pending calls.
|
||||
_Py_FinishPendingCalls(tstate);
|
||||
|
||||
/* The interpreter is still entirely intact at this point, and the
|
||||
* exit funcs may be relying on that. In particular, if some thread
|
||||
* or exit func is still waiting to do an import, the import machinery
|
||||
* expects Py_IsInitialized() to return true. So don't say the
|
||||
* runtime is uninitialized until after the exit funcs have run.
|
||||
* Note that Threading.py uses an exit func to do a join on all the
|
||||
* threads created thru it, so this also protects pending imports in
|
||||
* the threads created via Threading.
|
||||
*/
|
||||
|
||||
_PyAtExit_Call(tstate->interp);
|
||||
|
||||
/* Clean up any lingering subinterpreters.
|
||||
|
||||
Two preconditions need to be met here:
|
||||
|
||||
- This has to happen before _PyRuntimeState_SetFinalizing is
|
||||
called, or else threads might get prematurely blocked.
|
||||
- The world must not be stopped, as finalizers can run.
|
||||
*/
|
||||
finalize_subinterpreters();
|
||||
// This call stops the world and takes the pending calls lock.
|
||||
make_pre_finalization_calls(tstate, /*subinterpreters=*/1);
|
||||
|
||||
assert(_PyThreadState_GET() == tstate);
|
||||
|
||||
|
|
@ -2073,7 +2175,7 @@ _Py_Finalize(_PyRuntimeState *runtime)
|
|||
#endif
|
||||
|
||||
/* Ensure that remaining threads are detached */
|
||||
_PyEval_StopTheWorldAll(runtime);
|
||||
ASSERT_WORLD_STOPPED(tstate->interp);
|
||||
|
||||
/* Remaining daemon threads will be trapped in PyThread_hang_thread
|
||||
when they attempt to take the GIL (ex: PyEval_RestoreThread()). */
|
||||
|
|
@ -2094,6 +2196,7 @@ _Py_Finalize(_PyRuntimeState *runtime)
|
|||
_PyThreadState_SetShuttingDown(p);
|
||||
}
|
||||
_PyEval_StartTheWorldAll(runtime);
|
||||
PyMutex_Unlock(&tstate->interp->ceval.pending.mutex);
|
||||
|
||||
/* Clear frames of other threads to call objects destructors. Destructors
|
||||
will be called in the current Python thread. Since
|
||||
|
|
@ -2449,15 +2552,10 @@ Py_EndInterpreter(PyThreadState *tstate)
|
|||
}
|
||||
interp->finalizing = 1;
|
||||
|
||||
// Wrap up existing "threading"-module-created, non-daemon threads.
|
||||
wait_for_thread_shutdown(tstate);
|
||||
// This call stops the world and takes the pending calls lock.
|
||||
make_pre_finalization_calls(tstate, /*subinterpreters=*/0);
|
||||
|
||||
// Make any remaining pending calls.
|
||||
_Py_FinishPendingCalls(tstate);
|
||||
|
||||
_PyAtExit_Call(tstate->interp);
|
||||
_PyRuntimeState *runtime = interp->runtime;
|
||||
_PyEval_StopTheWorldAll(runtime);
|
||||
ASSERT_WORLD_STOPPED(interp);
|
||||
/* Remaining daemon threads will automatically exit
|
||||
when they attempt to take the GIL (ex: PyEval_RestoreThread()). */
|
||||
_PyInterpreterState_SetFinalizing(interp, tstate);
|
||||
|
|
@ -2467,7 +2565,8 @@ Py_EndInterpreter(PyThreadState *tstate)
|
|||
_PyThreadState_SetShuttingDown(p);
|
||||
}
|
||||
|
||||
_PyEval_StartTheWorldAll(runtime);
|
||||
_PyEval_StartTheWorldAll(interp->runtime);
|
||||
PyMutex_Unlock(&interp->ceval.pending.mutex);
|
||||
_PyThreadState_DeleteList(list, /*is_after_fork=*/0);
|
||||
|
||||
// XXX Call something like _PyImport_Disable() here?
|
||||
|
|
|
|||
|
|
@ -1461,7 +1461,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
|
|||
assert(tstate->prev == NULL);
|
||||
|
||||
assert(tstate->_whence == _PyThreadState_WHENCE_NOTSET);
|
||||
assert(whence >= 0 && whence <= _PyThreadState_WHENCE_EXEC);
|
||||
assert(whence >= 0 && whence <= _PyThreadState_WHENCE_THREADING_DAEMON);
|
||||
tstate->_whence = whence;
|
||||
|
||||
assert(id > 0);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue