gh-135427: Fix DeprecationWarning for os.fork when run in threads with -Werror (GH-136796)

Don't ignore errors raised by `PyErr_WarnFormat` in `warn_about_fork_with_threads`
Instead, ignore the warnings in all test code that forks. (That's a lot of functions.)

In `test_support`, make `ignore_warnings` a context manager (as well as decorator),
and add a `message` argument to it.
Also add a `ignore_fork_in_thread_deprecation_warnings` helper for the deadlock-in-fork
warning.
This commit is contained in:
Rani Pinchuk 2025-08-26 15:33:21 +02:00 committed by GitHub
parent f60f8225ed
commit fd8f42d3d1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 390 additions and 69 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,11 +1,11 @@
import contextlib
import functools
import importlib
import re
import sys
import warnings
def import_deprecated(name):
"""Import *name* while suppressing DeprecationWarning."""
with warnings.catch_warnings():
@ -42,20 +42,32 @@ def check_syntax_warning(testcase, statement, errtext='',
testcase.assertEqual(warns, [])
def ignore_warnings(*, category):
@contextlib.contextmanager
def ignore_warnings(*, category, message=''):
"""Decorator to suppress warnings.
Use of context managers to hide warnings make diffs
more noisy and tools like 'git blame' less useful.
Can also be used as a context manager. This is not preferred,
because it makes diffs more noisy and tools like 'git blame' less useful.
But, it's useful for async functions.
"""
def decorator(test):
@functools.wraps(test)
def wrapper(self, *args, **kwargs):
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=category)
return test(self, *args, **kwargs)
return wrapper
return decorator
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=category, message=message)
yield
@contextlib.contextmanager
def ignore_fork_in_thread_deprecation_warnings():
"""Suppress deprecation warnings related to forking in multi-threaded code.
See gh-135427
Can be used as decorator (preferred) or context manager.
"""
with ignore_warnings(
message=".*fork.*may lead to deadlocks in the child.*",
category=DeprecationWarning,
):
yield
class WarningsRecorder(object):

View file

@ -15,7 +15,7 @@
from unittest import mock
from test import support
from test.support import os_helper
from test.support import os_helper, warnings_helper
from test.support import socket_helper
from test.support import wait_process
from test.support import hashlib_helper
@ -1183,29 +1183,31 @@ async def runner():
class TestFork(unittest.IsolatedAsyncioTestCase):
async def test_fork_not_share_event_loop(self):
# The forked process should not share the event loop with the parent
loop = asyncio.get_running_loop()
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
pid = os.fork()
if pid == 0:
# child
try:
loop = asyncio.get_event_loop()
os.write(w, b'LOOP:' + str(id(loop)).encode())
except RuntimeError:
os.write(w, b'NO LOOP')
except BaseException as e:
os.write(w, b'ERROR:' + ascii(e).encode())
finally:
os._exit(0)
else:
# parent
result = os.read(r, 100)
self.assertEqual(result, b'NO LOOP')
wait_process(pid, exitcode=0)
with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
# The forked process should not share the event loop with the parent
loop = asyncio.get_running_loop()
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
pid = os.fork()
if pid == 0:
# child
try:
loop = asyncio.get_event_loop()
os.write(w, b'LOOP:' + str(id(loop)).encode())
except RuntimeError:
os.write(w, b'NO LOOP')
except BaseException as e:
os.write(w, b'ERROR:' + ascii(e).encode())
finally:
os._exit(0)
else:
# parent
result = os.read(r, 100)
self.assertEqual(result, b'NO LOOP')
wait_process(pid, exitcode=0)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_fork_signal_handling(self):
@ -1253,6 +1255,7 @@ async def func():
self.assertFalse(parent_handled.is_set())
self.assertTrue(child_handled.is_set())
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_fork_asyncio_run(self):
@ -1273,6 +1276,7 @@ async def child_main():
self.assertEqual(result.value, 42)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_fork_asyncio_subprocess(self):

View file

@ -31,6 +31,7 @@
from test import support
from test.support import cpython_only, swap_attr
from test.support import async_yield, run_yielding_async_fn
from test.support import warnings_helper
from test.support.import_helper import import_module
from test.support.os_helper import (EnvironmentVarGuard, TESTFN, unlink)
from test.support.script_helper import assert_python_ok
@ -2545,6 +2546,7 @@ def run_child(self, child, terminal_input):
finally:
signal.signal(signal.SIGHUP, old_sighup)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def _run_child(self, child, terminal_input):
r, w = os.pipe() # Pipe test results from child back to parent
try:

View file

@ -5,7 +5,7 @@
from concurrent import futures
from operator import add
from test import support
from test.support import Py_GIL_DISABLED
from test.support import Py_GIL_DISABLED, warnings_helper
def mul(x, y):
@ -43,10 +43,12 @@ class ExecutorTest:
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
self.assertEqual(256, future.result())
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEqual(16, future.result())
@ -57,6 +59,7 @@ def test_submit_keyword(self):
with self.assertRaises(TypeError):
self.executor.submit(arg=1)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map(self):
self.assertEqual(
list(self.executor.map(pow, range(10), range(10))),
@ -66,6 +69,7 @@ def test_map(self):
list(self.executor.map(pow, range(10), range(10), chunksize=3)),
list(map(pow, range(10), range(10))))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
@ -73,6 +77,7 @@ def test_map_exception(self):
with self.assertRaises(ZeroDivisionError):
i.__next__()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_resource('walltime')
def test_map_timeout(self):
results = []
@ -108,6 +113,7 @@ def test_map_buffersize_value_validation(self):
):
self.executor.map(str, range(4), buffersize=buffersize)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
@ -115,6 +121,7 @@ def test_map_buffersize(self):
res = self.executor.map(str, ints, buffersize=buffersize)
self.assertListEqual(list(res), ["0", "1", "2", "3"])
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_on_multiple_iterables(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
@ -122,12 +129,14 @@ def test_map_buffersize_on_multiple_iterables(self):
res = self.executor.map(add, ints, ints, buffersize=buffersize)
self.assertListEqual(list(res), [0, 2, 4, 6])
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_on_infinite_iterable(self):
res = self.executor.map(str, itertools.count(), buffersize=2)
self.assertEqual(next(res, None), "0")
self.assertEqual(next(res, None), "1")
self.assertEqual(next(res, None), "2")
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_on_multiple_infinite_iterables(self):
res = self.executor.map(
add,
@ -147,6 +156,7 @@ def test_map_buffersize_without_iterable(self):
res = self.executor.map(str, buffersize=2)
self.assertIsNone(next(res, None))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_when_buffer_is_full(self):
ints = iter(range(4))
buffersize = 2
@ -158,6 +168,7 @@ def test_map_buffersize_when_buffer_is_full(self):
msg="should have fetched only `buffersize` elements from `ints`.",
)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
@ -165,6 +176,7 @@ def test_shutdown_race_issue12456(self):
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.cpython_only
def test_no_stale_references(self):
# Issue #16284: check that the executors don't unnecessarily hang onto
@ -209,6 +221,7 @@ def test_max_workers_negative(self):
"than 0"):
self.executor_type(max_workers=number)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_free_reference(self):
# Issue #14406: Result iterator should not keep an internal
# reference to result objects.
@ -221,6 +234,7 @@ def test_free_reference(self):
if wr() is None:
break
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_swallows_falsey_exceptions(self):
# see gh-132063: Prevent exceptions that evaluate as falsey
# from being ignored.

View file

@ -7,6 +7,7 @@
CANCELLED_AND_NOTIFIED, FINISHED, Future)
from test import support
from test.support import warnings_helper
from .util import (
PENDING_FUTURE, RUNNING_FUTURE,
@ -19,6 +20,7 @@ def mul(x, y):
class AsCompletedTests:
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_no_timeout(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(mul, 7, 6)
@ -35,6 +37,7 @@ def test_no_timeout(self):
future1, future2]),
completed)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_future_times_out(self):
"""Test ``futures.as_completed`` timing out before
completing it's final future."""
@ -62,6 +65,7 @@ def test_future_times_out(self):
# Check that ``future`` wasn't completed.
self.assertEqual(completed_futures, already_completed)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_duplicate_futures(self):
# Issue 20367. Duplicate futures should not raise exceptions or give
# duplicate responses.

View file

@ -10,6 +10,7 @@
from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
from test import support
from test.support import warnings_helper
from .util import (
create_executor_tests, setup_module,
@ -111,6 +112,7 @@ def _fail_on_deadlock(self, executor):
print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
self.fail(f"Executor deadlock:\n\n{tb}")
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def _check_error(self, error, func, *args, ignore_stderr=False):
# test for deadlock caused by crashes or exiting in a pool
self.executor.shutdown(wait=True)
@ -199,6 +201,7 @@ def test_exit_during_result_unpickle_in_result_handler(self):
# the result_handler thread
self._check_error(BrokenProcessPool, _return_instance, ExitAtUnpickle)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.skip_if_sanitizer("UBSan: explicit SIGSEV not allowed", ub=True)
def test_shutdown_deadlock(self):
# Test that the pool calling shutdown do not cause deadlock
@ -212,6 +215,7 @@ def test_shutdown_deadlock(self):
with self.assertRaises(BrokenProcessPool):
f.result()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shutdown_deadlock_pickle(self):
# Test that the pool calling shutdown with wait=False does not cause
# a deadlock if a task fails at pickle after the shutdown call.
@ -238,6 +242,7 @@ def test_shutdown_deadlock_pickle(self):
# dangling threads
executor_manager.join()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.skip_if_sanitizer("UBSan: explicit SIGSEV not allowed", ub=True)
def test_crash_big_data(self):
# Test that there is a clean exception instead of a deadlock when a
@ -254,6 +259,7 @@ def test_crash_big_data(self):
executor.shutdown(wait=True)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
# fill up and block. See: https://github.com/python/cpython/issues/105829

View file

@ -11,6 +11,7 @@
from logging.handlers import QueueHandler
from test import support
from test.support import warnings_helper
from .util import ExecutorMixin, create_executor_tests, setup_module
@ -48,6 +49,7 @@ def setUp(self):
initargs=('initialized',))
super().setUp()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_initializer(self):
futures = [self.executor.submit(get_init_status)
for _ in range(self.worker_count)]
@ -74,6 +76,7 @@ def setUp(self):
self.executor_kwargs = dict(initializer=init_fail)
super().setUp()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_initializer(self):
with self._assert_logged('ValueError: error in initializer'):
try:

View file

@ -9,7 +9,7 @@
from concurrent.futures.process import BrokenProcessPool
from test import support
from test.support import hashlib_helper
from test.support import hashlib_helper, warnings_helper
from test.test_importlib.metadata.fixtures import parameterize
from .executor import ExecutorTest, mul
@ -49,6 +49,7 @@ def test_max_workers_too_large(self):
"max_workers must be <= 61"):
futures.ProcessPoolExecutor(max_workers=62)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_killed_child(self):
# When a child process is abruptly terminated, the whole pool gets
# "broken".
@ -61,6 +62,7 @@ def test_killed_child(self):
# Submitting other jobs fails as well.
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_chunksize(self):
def bad_map():
list(self.executor.map(pow, range(40), range(40), chunksize=-1))
@ -81,6 +83,7 @@ def bad_map():
def _test_traceback(cls):
raise RuntimeError(123) # some comment
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_traceback(self):
# We want ensure that the traceback from the child process is
# contained in the traceback raised in the main process.
@ -103,6 +106,7 @@ def test_traceback(self):
self.assertIn('raise RuntimeError(123) # some comment',
f1.getvalue())
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
def test_ressources_gced_in_workers(self):
# Ensure that argument for a job are correctly gc-ed after the job
@ -123,6 +127,7 @@ def test_ressources_gced_in_workers(self):
mgr.shutdown()
mgr.join()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_saturation(self):
executor = self.executor
mp_context = self.get_context()
@ -208,6 +213,7 @@ def test_max_tasks_early_shutdown(self):
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_python_finalization_error(self):
# gh-109047: Catch RuntimeError on thread creation
# during Python finalization.
@ -258,6 +264,7 @@ def test_force_shutdown_workers_invalid_op(self):
executor._force_shutdown,
operation='invalid operation'),
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers(self, function_name):
manager = self.get_context().Manager()

View file

@ -6,6 +6,7 @@
from concurrent import futures
from test import support
from test.support import warnings_helper
from test.support.script_helper import assert_python_ok
from .util import (
@ -78,12 +79,14 @@ def run_last():
self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
self.assertEqual(out.strip(), b"runtime-error")
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_hang_issue12364(self):
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
self.executor.shutdown()
for f in fs:
f.result()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_cancel_futures(self):
assert self.worker_count <= 5, "test needs few workers"
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
@ -129,6 +132,7 @@ def test_hang_gh83386(self):
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_hang_gh94440(self):
"""shutdown(wait=True) doesn't hang when a future was submitted and
quickly canceled right before shutdown.
@ -172,6 +176,7 @@ def acquire_lock(lock):
for t in self.executor._threads:
t.join()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_context_manager_shutdown(self):
with futures.ThreadPoolExecutor(max_workers=5) as e:
executor = e
@ -181,6 +186,7 @@ def test_context_manager_shutdown(self):
for t in executor._threads:
t.join()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_del_shutdown(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
res = executor.map(abs, range(-5, 5))
@ -194,6 +200,7 @@ def test_del_shutdown(self):
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the threads when calling
# shutdown with wait=False
@ -208,7 +215,7 @@ def test_shutdown_no_wait(self):
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_thread_names_assigned(self):
executor = futures.ThreadPoolExecutor(
max_workers=5, thread_name_prefix='SpecialPool')
@ -221,6 +228,7 @@ def test_thread_names_assigned(self):
self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
t.join()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_thread_names_default(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
@ -254,6 +262,7 @@ def test_cancel_futures_wait_false(self):
class ProcessPoolShutdownTest(ExecutorShutdownTest):
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_processes_terminate(self):
def acquire_lock(lock):
lock.acquire()
@ -277,6 +286,7 @@ def acquire_lock(lock):
for p in processes.values():
p.join()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context()) as e:
@ -287,6 +297,7 @@ def test_context_manager_shutdown(self):
for p in processes.values():
p.join()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context())
@ -309,6 +320,7 @@ def test_del_shutdown(self):
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the processes when calling
# shutdown with wait=False

View file

@ -7,6 +7,7 @@
import unittest
from concurrent import futures
from test import support
from test.support import warnings_helper
from .executor import ExecutorTest, mul
from .util import BaseTestCase, ThreadPoolMixin, setup_module
@ -53,6 +54,7 @@ def test_idle_thread_reuse(self):
@support.requires_fork()
@unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
@support.requires_resource('cpu')
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_hang_global_shutdown_lock(self):
# bpo-45021: _global_shutdown_lock should be reinitialized in the child
# process, otherwise it will never exit
@ -68,6 +70,7 @@ def submit(pool):
@support.requires_fork()
@unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_process_fork_from_a_threadpool(self):
# bpo-43944: clear concurrent.futures.thread._threads_queues after fork,
# otherwise child process will try to join parent thread

View file

@ -3,7 +3,7 @@
import unittest
from concurrent import futures
from test import support
from test.support import threading_helper
from test.support import threading_helper, warnings_helper
from .util import (
CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
@ -22,6 +22,7 @@ def wait_and_raise(e):
class WaitTests:
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_20369(self):
# See https://bugs.python.org/issue20369
future = self.executor.submit(mul, 1, 2)
@ -30,7 +31,7 @@ def test_20369(self):
self.assertEqual({future}, done)
self.assertEqual(set(), not_done)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_completed(self):
event = self.create_event()
future1 = self.executor.submit(mul, 21, 2)
@ -47,6 +48,7 @@ def test_first_completed(self):
event.set()
future2.result() # wait for job to finish
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_completed_some_already_completed(self):
event = self.create_event()
future1 = self.executor.submit(event.wait)
@ -64,6 +66,7 @@ def test_first_completed_some_already_completed(self):
event.set()
future1.result() # wait for job to finish
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_exception(self):
event1 = self.create_event()
event2 = self.create_event()
@ -93,6 +96,7 @@ def wait_for_future1():
event2.set()
future3.result() # wait for job to finish
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_exception_some_already_complete(self):
event = self.create_event()
future1 = self.executor.submit(divmod, 21, 0)
@ -114,6 +118,7 @@ def test_first_exception_some_already_complete(self):
event.set()
future2.result() # wait for job to finish
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_exception_one_already_failed(self):
event = self.create_event()
future1 = self.executor.submit(event.wait)
@ -129,6 +134,7 @@ def test_first_exception_one_already_failed(self):
event.set()
future1.result() # wait for job to finish
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
@ -148,6 +154,7 @@ def test_all_completed(self):
future2]), finished)
self.assertEqual(set(), pending)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_timeout(self):
short_timeout = 0.050

View file

@ -10,7 +10,7 @@
from concurrent.futures.process import _check_system_limits
from test import support
from test.support import threading_helper
from test.support import threading_helper, warnings_helper
def create_future(state=PENDING, exception=None, result=None):
@ -51,7 +51,8 @@ def setUp(self):
max_workers=self.worker_count,
mp_context=self.get_context(),
**self.executor_kwargs)
self.manager = self.get_context().Manager()
with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
self.manager = self.get_context().Manager()
else:
self.executor = self.executor_type(
max_workers=self.worker_count,

View file

@ -11,6 +11,7 @@
from test.fork_wait import ForkWait
from test import support
from test.support import warnings_helper
# Skip test if fork does not exist.
@ -19,6 +20,7 @@
class ForkTest(ForkWait):
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_threaded_import_lock_fork(self):
"""Check fork() in main thread works while a subthread is doing an import"""
import_started = threading.Event()
@ -61,7 +63,7 @@ def importer():
except OSError:
pass
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_nested_import_lock_fork(self):
"""Check fork() in main thread works while the main thread is doing an import"""
exitcode = 42

View file

@ -9,6 +9,8 @@
import time
import unittest
from test.support import warnings_helper
if not hasattr(select, "kqueue"):
raise unittest.SkipTest("test works only on BSD")
@ -257,6 +259,7 @@ def test_fd_non_inheritable(self):
self.addCleanup(kqueue.close)
self.assertEqual(os.get_inheritable(kqueue.fileno()), False)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
def test_fork(self):
# gh-110395: kqueue objects must be closed after fork

View file

@ -730,6 +730,7 @@ def remove_loop(fname, tries):
# based on os.fork existing because that is what users and this test use.
# This helps ensure that when fork exists (the important concept) that the
# register_at_fork mechanism is also present and used.
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
@threading_helper.requires_working_threading()
@skip_if_asan_fork
@ -4045,6 +4046,7 @@ def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_man
self._apply_simple_queue_listener_configuration(qspec)
manager.assert_not_called()
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@skip_if_tsan_fork
@support.requires_subprocess()
@unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
@ -4067,6 +4069,7 @@ def test_config_reject_simple_queue_handler_multiprocessing_context(self):
with self.assertRaises(ValueError):
self._apply_simple_queue_listener_configuration(qspec)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@skip_if_tsan_fork
@support.requires_subprocess()
@unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
@ -4107,6 +4110,7 @@ def _mpinit_issue121723(qspec, message_to_log):
# log a message (this creates a record put in the queue)
logging.getLogger().info(message_to_log)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@skip_if_tsan_fork
@support.requires_subprocess()
def test_multiprocessing_queues(self):
@ -5337,6 +5341,7 @@ def _extract_logrecord_process_name(key, logMultiprocessing, conn=None):
else:
return results
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@skip_if_tsan_fork
def test_multiprocessing(self):
support.skip_if_broken_multiprocessing_synchronize()

View file

@ -8,7 +8,7 @@
import io
import tempfile
from test import support
from test.support import import_helper
from test.support import import_helper, warnings_helper
from test.support import os_helper
from test.support import refleak_helper
from test.support import socket_helper
@ -1212,6 +1212,7 @@ def test_add_and_close(self):
self.assertEqual(contents, f.read())
self._box = self._factory(self._path)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
@unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
def test_lock_conflict(self):

View file

@ -3518,6 +3518,7 @@ def test_getppid(self):
self.assertEqual(error, b'')
self.assertEqual(int(stdout), os.getpid())
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def check_waitpid(self, code, exitcode, callback=None):
if sys.platform == 'win32':
# On Windows, os.spawnv() simply joins arguments with spaces:
@ -3620,30 +3621,35 @@ def create_args(self, *, with_env=False, use_bytes=False):
return program, args
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnl')
def test_spawnl(self):
program, args = self.create_args()
exitcode = os.spawnl(os.P_WAIT, program, *args)
self.assertEqual(exitcode, self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnle')
def test_spawnle(self):
program, args = self.create_args(with_env=True)
exitcode = os.spawnle(os.P_WAIT, program, *args, self.env)
self.assertEqual(exitcode, self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnlp')
def test_spawnlp(self):
program, args = self.create_args()
exitcode = os.spawnlp(os.P_WAIT, program, *args)
self.assertEqual(exitcode, self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnlpe')
def test_spawnlpe(self):
program, args = self.create_args(with_env=True)
exitcode = os.spawnlpe(os.P_WAIT, program, *args, self.env)
self.assertEqual(exitcode, self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnv')
def test_spawnv(self):
program, args = self.create_args()
@ -3654,30 +3660,35 @@ def test_spawnv(self):
exitcode = os.spawnv(os.P_WAIT, FakePath(program), args)
self.assertEqual(exitcode, self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnve')
def test_spawnve(self):
program, args = self.create_args(with_env=True)
exitcode = os.spawnve(os.P_WAIT, program, args, self.env)
self.assertEqual(exitcode, self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnvp')
def test_spawnvp(self):
program, args = self.create_args()
exitcode = os.spawnvp(os.P_WAIT, program, args)
self.assertEqual(exitcode, self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnvpe')
def test_spawnvpe(self):
program, args = self.create_args(with_env=True)
exitcode = os.spawnvpe(os.P_WAIT, program, args, self.env)
self.assertEqual(exitcode, self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnv')
def test_nowait(self):
program, args = self.create_args()
pid = os.spawnv(os.P_NOWAIT, program, args)
support.wait_process(pid, exitcode=self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnve')
def test_spawnve_bytes(self):
# Test bytes handling in parse_arglist and parse_envlist (#28114)
@ -3685,18 +3696,21 @@ def test_spawnve_bytes(self):
exitcode = os.spawnve(os.P_WAIT, program, args, self.env)
self.assertEqual(exitcode, self.exitcode)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnl')
def test_spawnl_noargs(self):
program, __ = self.create_args()
self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program)
self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program, '')
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnle')
def test_spawnle_noargs(self):
program, __ = self.create_args()
self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, {})
self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, '', {})
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnv')
def test_spawnv_noargs(self):
program, __ = self.create_args()
@ -3705,6 +3719,7 @@ def test_spawnv_noargs(self):
self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ('',))
self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, [''])
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnve')
def test_spawnve_noargs(self):
program, __ = self.create_args()
@ -3761,10 +3776,12 @@ def _test_invalid_env(self, spawn):
exitcode = spawn(os.P_WAIT, program, args, newenv)
self.assertEqual(exitcode, 0)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnve')
def test_spawnve_invalid_env(self):
self._test_invalid_env(os.spawnve)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnvpe')
def test_spawnvpe_invalid_env(self):
self._test_invalid_env(os.spawnvpe)
@ -4881,6 +4898,7 @@ def test_posix_pty_functions(self):
self.addCleanup(os.close, son_fd)
self.assertEqual(os.ptsname(mother_fd), os.ttyname(son_fd))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(hasattr(os, 'spawnl'), "need os.spawnl()")
@support.requires_subprocess()
def test_pipe_spawnl(self):

View file

@ -11,7 +11,7 @@
from unittest import mock
from test import support
from test.support import os_helper
from test.support import os_helper, warnings_helper
try:
# Some of the iOS tests need ctypes to operate.
@ -465,7 +465,7 @@ def test_mac_ver(self):
else:
self.assertEqual(res[2], 'PowerPC')
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(sys.platform == 'darwin', "OSX only test")
def test_mac_ver_with_fork(self):
# Issue7895: platform.mac_ver() crashes when using fork without exec

View file

@ -1,6 +1,6 @@
import unittest
from test.support import (
is_android, is_apple_mobile, is_wasm32, reap_children, verbose
is_android, is_apple_mobile, is_wasm32, reap_children, verbose, warnings_helper
)
from test.support.import_helper import import_module
from test.support.os_helper import TESTFN, unlink
@ -194,6 +194,7 @@ def test_openpty(self):
s2 = _readline(master_fd)
self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_fork(self):
debug("calling pty.fork()")
pid, master_fd = pty.fork()
@ -295,6 +296,7 @@ def test_master_read(self):
self.assertEqual(data, b"")
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_spawn_doesnt_hang(self):
self.addCleanup(unlink, TESTFN)
with open(TESTFN, 'wb') as f:

View file

@ -14,6 +14,8 @@
from fractions import Fraction
from collections import abc, Counter
from test.support import warnings_helper
class MyIndex:
def __init__(self, value):
@ -1399,6 +1401,7 @@ def test__all__(self):
# tests validity but not completeness of the __all__ list
self.assertTrue(set(random.__all__) <= set(dir(random)))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@test.support.requires_fork()
def test_after_fork(self):
# Test the global Random instance gets reseeded in child

View file

@ -17,6 +17,7 @@
from test.support import os_helper
from test.support import socket_helper
from test.support import threading_helper
from test.support import warnings_helper
test.support.requires("network")
@ -43,6 +44,7 @@ def receive(sock, n, timeout=test.support.SHORT_TIMEOUT):
raise RuntimeError("timed out on %r" % (sock,))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@test.support.requires_fork()
@contextlib.contextmanager
def simple_subprocess(testcase):
@ -173,6 +175,7 @@ def test_ThreadingTCPServer(self):
socketserver.StreamRequestHandler,
self.stream_examine)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_forking
def test_ForkingTCPServer(self):
with simple_subprocess(self):
@ -192,6 +195,7 @@ def test_ThreadingUnixStreamServer(self):
socketserver.StreamRequestHandler,
self.stream_examine)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_unix_sockets
@requires_forking
def test_ForkingUnixStreamServer(self):
@ -210,6 +214,7 @@ def test_ThreadingUDPServer(self):
socketserver.DatagramRequestHandler,
self.dgram_examine)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_forking
def test_ForkingUDPServer(self):
with simple_subprocess(self):
@ -229,6 +234,7 @@ def test_ThreadingUnixDatagramServer(self):
socketserver.DatagramRequestHandler,
self.dgram_examine)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_unix_sockets
@requires_forking
def test_ForkingUnixDatagramServer(self):
@ -314,11 +320,13 @@ def test_threading_not_handled(self):
self.assertIs(cm.exc_type, SystemExit)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_forking
def test_forking_handled(self):
ForkingErrorTestServer(ValueError)
self.check_result(handled=True)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_forking
def test_forking_not_handled(self):
ForkingErrorTestServer(SystemExit)

View file

@ -485,6 +485,7 @@ def test_check__all__(self):
self.assertRaises(AssertionError, support.check__all__, self, unittest)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'),
'need os.waitpid() and os.WNOHANG')
@support.requires_fork()

View file

@ -8,7 +8,7 @@
from test.support.script_helper import (assert_python_ok, assert_python_failure,
interpreter_requires_environment)
from test import support
from test.support import force_not_colorized
from test.support import force_not_colorized, warnings_helper
from test.support import os_helper
from test.support import threading_helper
@ -354,6 +354,7 @@ def fork_child(self):
# everything is fine
return 0
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
def test_fork(self):
# check that tracemalloc is still working after fork

View file

@ -13,7 +13,7 @@
from unittest import mock
from test import support
from test.support import import_helper
from test.support import import_helper, warnings_helper
from test.support.script_helper import assert_python_ok
py_uuid = import_helper.import_fresh_module('uuid', blocked=['_uuid'])
@ -1112,6 +1112,7 @@ def test_uuid8_uniqueness(self):
versions = {u.version for u in uuids}
self.assertSetEqual(versions, {8})
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
def testIssue8621(self):
# On at least some versions of OSX self.uuid.uuid4 generates

View file

@ -0,0 +1,4 @@
With :option:`-Werror <-W>`, the DeprecationWarning emitted by :py:func:`os.fork`
and :py:func:`os.forkpty` in mutli-threaded processes is now raised as an exception.
Previously it was silently ignored.
Patch by Rani Pinchuk.

View file

@ -8000,7 +8000,7 @@ os_register_at_fork_impl(PyObject *module, PyObject *before,
//
// This should only be called from the parent process after
// PyOS_AfterFork_Parent().
static void
static int
warn_about_fork_with_threads(const char* name)
{
// It's not safe to issue the warning while the world is stopped, because
@ -8051,14 +8051,14 @@ warn_about_fork_with_threads(const char* name)
PyObject *threading = PyImport_GetModule(&_Py_ID(threading));
if (!threading) {
PyErr_Clear();
return;
return 0;
}
PyObject *threading_active =
PyObject_GetAttr(threading, &_Py_ID(_active));
if (!threading_active) {
PyErr_Clear();
Py_DECREF(threading);
return;
return 0;
}
PyObject *threading_limbo =
PyObject_GetAttr(threading, &_Py_ID(_limbo));
@ -8066,7 +8066,7 @@ warn_about_fork_with_threads(const char* name)
PyErr_Clear();
Py_DECREF(threading);
Py_DECREF(threading_active);
return;
return 0;
}
Py_DECREF(threading);
// Duplicating what threading.active_count() does but without holding
@ -8082,7 +8082,7 @@ warn_about_fork_with_threads(const char* name)
Py_DECREF(threading_limbo);
}
if (num_python_threads > 1) {
PyErr_WarnFormat(
return PyErr_WarnFormat(
PyExc_DeprecationWarning, 1,
#ifdef HAVE_GETPID
"This process (pid=%d) is multi-threaded, "
@ -8094,8 +8094,8 @@ warn_about_fork_with_threads(const char* name)
getpid(),
#endif
name);
PyErr_Clear();
}
return 0;
}
#endif // HAVE_FORK1 || HAVE_FORKPTY || HAVE_FORK
@ -8134,7 +8134,9 @@ os_fork1_impl(PyObject *module)
/* parent: release the import lock. */
PyOS_AfterFork_Parent();
// After PyOS_AfterFork_Parent() starts the world to avoid deadlock.
warn_about_fork_with_threads("fork1");
if (warn_about_fork_with_threads("fork1") < 0) {
return NULL;
}
}
if (pid == -1) {
errno = saved_errno;
@ -8183,7 +8185,8 @@ os_fork_impl(PyObject *module)
/* parent: release the import lock. */
PyOS_AfterFork_Parent();
// After PyOS_AfterFork_Parent() starts the world to avoid deadlock.
warn_about_fork_with_threads("fork");
if (warn_about_fork_with_threads("fork") < 0)
return NULL;
}
if (pid == -1) {
errno = saved_errno;
@ -9040,7 +9043,8 @@ os_forkpty_impl(PyObject *module)
/* parent: release the import lock. */
PyOS_AfterFork_Parent();
// After PyOS_AfterFork_Parent() starts the world to avoid deadlock.
warn_about_fork_with_threads("forkpty");
if (warn_about_fork_with_threads("forkpty") < 0)
return NULL;
}
if (pid == -1) {
return posix_error();