mirror of
https://github.com/python/cpython.git
synced 2025-12-31 04:23:37 +00:00
gh-114271: Make _thread.ThreadHandle thread-safe in free-threaded builds (GH-115190)
Make `_thread.ThreadHandle` thread-safe in free-threaded builds We protect the mutable state of `ThreadHandle` using a `_PyOnceFlag`. Concurrent operations (i.e. `join` or `detach`) on `ThreadHandle` block until it is their turn to execute or an earlier operation succeeds. Once an operation has been applied successfully all future operations complete immediately. The `join()` method is now idempotent. It may be called multiple times but the underlying OS thread will only be joined once. After `join()` succeeds, any future calls to `join()` will succeed immediately. The internal thread handle `detach()` method has been removed.
This commit is contained in:
parent
5e0c7bc1d3
commit
9e88173d36
5 changed files with 230 additions and 106 deletions
|
|
@ -189,8 +189,8 @@ def task():
|
|||
with threading_helper.wait_threads_exit():
|
||||
handle = thread.start_joinable_thread(task)
|
||||
handle.join()
|
||||
with self.assertRaisesRegex(ValueError, "not joinable"):
|
||||
handle.join()
|
||||
# Subsequent join() calls should succeed
|
||||
handle.join()
|
||||
|
||||
def test_joinable_not_joined(self):
|
||||
handle_destroyed = thread.allocate_lock()
|
||||
|
|
@ -233,58 +233,61 @@ def task():
|
|||
with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
|
||||
raise errors[0]
|
||||
|
||||
def test_detach_from_self(self):
|
||||
errors = []
|
||||
handles = []
|
||||
start_joinable_thread_returned = thread.allocate_lock()
|
||||
start_joinable_thread_returned.acquire()
|
||||
thread_detached = thread.allocate_lock()
|
||||
thread_detached.acquire()
|
||||
|
||||
def task():
|
||||
start_joinable_thread_returned.acquire()
|
||||
try:
|
||||
handles[0].detach()
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
finally:
|
||||
thread_detached.release()
|
||||
|
||||
with threading_helper.wait_threads_exit():
|
||||
handle = thread.start_joinable_thread(task)
|
||||
handles.append(handle)
|
||||
start_joinable_thread_returned.release()
|
||||
thread_detached.acquire()
|
||||
with self.assertRaisesRegex(ValueError, "not joinable"):
|
||||
handle.join()
|
||||
|
||||
assert len(errors) == 0
|
||||
|
||||
def test_detach_then_join(self):
|
||||
lock = thread.allocate_lock()
|
||||
lock.acquire()
|
||||
|
||||
def task():
|
||||
def test_join_then_self_join(self):
|
||||
# make sure we can't deadlock in the following scenario with
|
||||
# threads t0 and t1 (see comment in `ThreadHandle_join()` for more
|
||||
# details):
|
||||
#
|
||||
# - t0 joins t1
|
||||
# - t1 self joins
|
||||
def make_lock():
|
||||
lock = thread.allocate_lock()
|
||||
lock.acquire()
|
||||
return lock
|
||||
|
||||
error = None
|
||||
self_joiner_handle = None
|
||||
self_joiner_started = make_lock()
|
||||
self_joiner_barrier = make_lock()
|
||||
def self_joiner():
|
||||
nonlocal error
|
||||
|
||||
self_joiner_started.release()
|
||||
self_joiner_barrier.acquire()
|
||||
|
||||
try:
|
||||
self_joiner_handle.join()
|
||||
except Exception as e:
|
||||
error = e
|
||||
|
||||
joiner_started = make_lock()
|
||||
def joiner():
|
||||
joiner_started.release()
|
||||
self_joiner_handle.join()
|
||||
|
||||
with threading_helper.wait_threads_exit():
|
||||
handle = thread.start_joinable_thread(task)
|
||||
# detach() returns even though the thread is blocked on lock
|
||||
handle.detach()
|
||||
# join() then cannot be called anymore
|
||||
with self.assertRaisesRegex(ValueError, "not joinable"):
|
||||
handle.join()
|
||||
lock.release()
|
||||
self_joiner_handle = thread.start_joinable_thread(self_joiner)
|
||||
# Wait for the self-joining thread to start
|
||||
self_joiner_started.acquire()
|
||||
|
||||
def test_join_then_detach(self):
|
||||
def task():
|
||||
pass
|
||||
# Start the thread that joins the self-joiner
|
||||
joiner_handle = thread.start_joinable_thread(joiner)
|
||||
|
||||
with threading_helper.wait_threads_exit():
|
||||
handle = thread.start_joinable_thread(task)
|
||||
handle.join()
|
||||
with self.assertRaisesRegex(ValueError, "not joinable"):
|
||||
handle.detach()
|
||||
# Wait for the joiner to start
|
||||
joiner_started.acquire()
|
||||
|
||||
# Not great, but I don't think there's a deterministic way to make
|
||||
# sure that the self-joining thread has been joined.
|
||||
time.sleep(0.1)
|
||||
|
||||
# Unblock the self-joiner
|
||||
self_joiner_barrier.release()
|
||||
|
||||
self_joiner_handle.join()
|
||||
joiner_handle.join()
|
||||
|
||||
with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
|
||||
raise error
|
||||
|
||||
|
||||
class Barrier:
|
||||
|
|
|
|||
|
|
@ -931,7 +931,6 @@ class is implemented.
|
|||
if _HAVE_THREAD_NATIVE_ID:
|
||||
self._native_id = None
|
||||
self._tstate_lock = None
|
||||
self._join_lock = None
|
||||
self._handle = None
|
||||
self._started = Event()
|
||||
self._is_stopped = False
|
||||
|
|
@ -956,14 +955,11 @@ def _after_fork(self, new_ident=None):
|
|||
if self._tstate_lock is not None:
|
||||
self._tstate_lock._at_fork_reinit()
|
||||
self._tstate_lock.acquire()
|
||||
if self._join_lock is not None:
|
||||
self._join_lock._at_fork_reinit()
|
||||
else:
|
||||
# This thread isn't alive after fork: it doesn't have a tstate
|
||||
# anymore.
|
||||
self._is_stopped = True
|
||||
self._tstate_lock = None
|
||||
self._join_lock = None
|
||||
self._handle = None
|
||||
|
||||
def __repr__(self):
|
||||
|
|
@ -996,8 +992,6 @@ def start(self):
|
|||
if self._started.is_set():
|
||||
raise RuntimeError("threads can only be started once")
|
||||
|
||||
self._join_lock = _allocate_lock()
|
||||
|
||||
with _active_limbo_lock:
|
||||
_limbo[self] = self
|
||||
try:
|
||||
|
|
@ -1167,17 +1161,9 @@ def join(self, timeout=None):
|
|||
self._join_os_thread()
|
||||
|
||||
def _join_os_thread(self):
|
||||
join_lock = self._join_lock
|
||||
if join_lock is None:
|
||||
return
|
||||
with join_lock:
|
||||
# Calling join() multiple times would raise an exception
|
||||
# in one of the callers.
|
||||
if self._handle is not None:
|
||||
self._handle.join()
|
||||
self._handle = None
|
||||
# No need to keep this around
|
||||
self._join_lock = None
|
||||
# self._handle may be cleared post-fork
|
||||
if self._handle is not None:
|
||||
self._handle.join()
|
||||
|
||||
def _wait_for_tstate_lock(self, block=True, timeout=-1):
|
||||
# Issue #18808: wait for the thread state to be gone.
|
||||
|
|
@ -1478,6 +1464,10 @@ def __init__(self):
|
|||
with _active_limbo_lock:
|
||||
_active[self._ident] = self
|
||||
|
||||
def _join_os_thread(self):
|
||||
# No ThreadHandle for main thread
|
||||
pass
|
||||
|
||||
|
||||
# Helper thread-local instance to detect when a _DummyThread
|
||||
# is collected. Not a part of the public API.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue