mirror of
https://github.com/python/cpython.git
synced 2026-01-20 22:30:28 +00:00
bpo-33532: Fix test_multiprocessing_forkserver.test_ignore() (GH-7322)
Use also support.SOCK_MAX_SIZE, not only support.PIPE_MAX_SIZE, to get the size for a blocking send into a multiprocessing pipe. Replace also test.support with support.
This commit is contained in:
parent
137e80346f
commit
64e538bc70
1 changed files with 33 additions and 31 deletions
|
|
@ -20,14 +20,14 @@
|
|||
import struct
|
||||
import operator
|
||||
import weakref
|
||||
import test.support
|
||||
from test import support
|
||||
import test.support.script_helper
|
||||
|
||||
|
||||
# Skip tests if _multiprocessing wasn't built.
|
||||
_multiprocessing = test.support.import_module('_multiprocessing')
|
||||
_multiprocessing = support.import_module('_multiprocessing')
|
||||
# Skip tests if sem_open implementation is broken.
|
||||
test.support.import_module('multiprocessing.synchronize')
|
||||
support.import_module('multiprocessing.synchronize')
|
||||
# import threading after _multiprocessing to raise a more relevant error
|
||||
# message: "No module named _multiprocessing". _multiprocessing is not compiled
|
||||
# without thread support.
|
||||
|
|
@ -567,8 +567,8 @@ def test_stderr_flush(self):
|
|||
if self.TYPE == "threads":
|
||||
self.skipTest('test not appropriate for {}'.format(self.TYPE))
|
||||
|
||||
testfn = test.support.TESTFN
|
||||
self.addCleanup(test.support.unlink, testfn)
|
||||
testfn = support.TESTFN
|
||||
self.addCleanup(support.unlink, testfn)
|
||||
proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
|
||||
proc.start()
|
||||
proc.join()
|
||||
|
|
@ -597,8 +597,8 @@ def test_sys_exit(self):
|
|||
if self.TYPE == 'threads':
|
||||
self.skipTest('test not appropriate for {}'.format(self.TYPE))
|
||||
|
||||
testfn = test.support.TESTFN
|
||||
self.addCleanup(test.support.unlink, testfn)
|
||||
testfn = support.TESTFN
|
||||
self.addCleanup(support.unlink, testfn)
|
||||
|
||||
for reason in (
|
||||
[1, 2, 3],
|
||||
|
|
@ -853,7 +853,7 @@ def test_task_done(self):
|
|||
close_queue(queue)
|
||||
|
||||
def test_no_import_lock_contention(self):
|
||||
with test.support.temp_cwd():
|
||||
with support.temp_cwd():
|
||||
module_name = 'imported_by_an_imported_module'
|
||||
with open(module_name + '.py', 'w') as f:
|
||||
f.write("""if 1:
|
||||
|
|
@ -866,7 +866,7 @@ def test_no_import_lock_contention(self):
|
|||
del q
|
||||
""")
|
||||
|
||||
with test.support.DirsOnSysPath(os.getcwd()):
|
||||
with support.DirsOnSysPath(os.getcwd()):
|
||||
try:
|
||||
__import__(module_name)
|
||||
except pyqueue.Empty:
|
||||
|
|
@ -891,7 +891,7 @@ def test_queue_feeder_donot_stop_onexc(self):
|
|||
class NotSerializable(object):
|
||||
def __reduce__(self):
|
||||
raise AttributeError
|
||||
with test.support.captured_stderr():
|
||||
with support.captured_stderr():
|
||||
q = self.Queue()
|
||||
q.put(NotSerializable())
|
||||
q.put(True)
|
||||
|
|
@ -2194,7 +2194,7 @@ def test_traceback(self):
|
|||
self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
|
||||
self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
|
||||
|
||||
with test.support.captured_stderr() as f1:
|
||||
with support.captured_stderr() as f1:
|
||||
try:
|
||||
raise exc
|
||||
except RuntimeError:
|
||||
|
|
@ -2476,7 +2476,7 @@ def test_remote(self):
|
|||
authkey = os.urandom(32)
|
||||
|
||||
manager = QueueManager(
|
||||
address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
|
||||
address=(support.HOST, 0), authkey=authkey, serializer=SERIALIZER
|
||||
)
|
||||
manager.start()
|
||||
|
||||
|
|
@ -2513,7 +2513,7 @@ def _putter(cls, address, authkey):
|
|||
def test_rapid_restart(self):
|
||||
authkey = os.urandom(32)
|
||||
manager = QueueManager(
|
||||
address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
|
||||
address=(support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
|
||||
srvr = manager.get_server()
|
||||
addr = srvr.address
|
||||
# Close the connection.Listener socket which gets opened as a part
|
||||
|
|
@ -2736,14 +2736,14 @@ def test_fd_transfer(self):
|
|||
p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
|
||||
p.daemon = True
|
||||
p.start()
|
||||
self.addCleanup(test.support.unlink, test.support.TESTFN)
|
||||
with open(test.support.TESTFN, "wb") as f:
|
||||
self.addCleanup(support.unlink, support.TESTFN)
|
||||
with open(support.TESTFN, "wb") as f:
|
||||
fd = f.fileno()
|
||||
if msvcrt:
|
||||
fd = msvcrt.get_osfhandle(fd)
|
||||
reduction.send_handle(conn, fd, p.pid)
|
||||
p.join()
|
||||
with open(test.support.TESTFN, "rb") as f:
|
||||
with open(support.TESTFN, "rb") as f:
|
||||
self.assertEqual(f.read(), b"foo")
|
||||
|
||||
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
|
||||
|
|
@ -2762,8 +2762,8 @@ def test_large_fd_transfer(self):
|
|||
p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
|
||||
p.daemon = True
|
||||
p.start()
|
||||
self.addCleanup(test.support.unlink, test.support.TESTFN)
|
||||
with open(test.support.TESTFN, "wb") as f:
|
||||
self.addCleanup(support.unlink, support.TESTFN)
|
||||
with open(support.TESTFN, "wb") as f:
|
||||
fd = f.fileno()
|
||||
for newfd in range(256, MAXFD):
|
||||
if not self._is_fd_assigned(newfd):
|
||||
|
|
@ -2776,7 +2776,7 @@ def test_large_fd_transfer(self):
|
|||
finally:
|
||||
os.close(newfd)
|
||||
p.join()
|
||||
with open(test.support.TESTFN, "rb") as f:
|
||||
with open(support.TESTFN, "rb") as f:
|
||||
self.assertEqual(f.read(), b"bar")
|
||||
|
||||
@classmethod
|
||||
|
|
@ -2986,7 +2986,7 @@ def _listener(cls, conn, families):
|
|||
l.close()
|
||||
|
||||
l = socket.socket()
|
||||
l.bind((test.support.HOST, 0))
|
||||
l.bind((support.HOST, 0))
|
||||
l.listen()
|
||||
conn.send(l.getsockname())
|
||||
new_conn, addr = l.accept()
|
||||
|
|
@ -3336,7 +3336,7 @@ def make_finalizers():
|
|||
gc.set_threshold(5, 5, 5)
|
||||
threads = [threading.Thread(target=run_finalizers),
|
||||
threading.Thread(target=make_finalizers)]
|
||||
with test.support.start_threads(threads):
|
||||
with support.start_threads(threads):
|
||||
time.sleep(4.0) # Wait a bit to trigger race condition
|
||||
finish = True
|
||||
if exc is not None:
|
||||
|
|
@ -3697,7 +3697,7 @@ def _child_test_wait_socket(cls, address, slow):
|
|||
def test_wait_socket(self, slow=False):
|
||||
from multiprocessing.connection import wait
|
||||
l = socket.socket()
|
||||
l.bind((test.support.HOST, 0))
|
||||
l.bind((support.HOST, 0))
|
||||
l.listen()
|
||||
addr = l.getsockname()
|
||||
readers = []
|
||||
|
|
@ -3910,11 +3910,11 @@ def test_noforkbomb(self):
|
|||
sm = multiprocessing.get_start_method()
|
||||
name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
|
||||
if sm != 'fork':
|
||||
rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
|
||||
rc, out, err = support.script_helper.assert_python_failure(name, sm)
|
||||
self.assertEqual(out, b'')
|
||||
self.assertIn(b'RuntimeError', err)
|
||||
else:
|
||||
rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
|
||||
rc, out, err = support.script_helper.assert_python_ok(name, sm)
|
||||
self.assertEqual(out.rstrip(), b'123')
|
||||
self.assertEqual(err, b'')
|
||||
|
||||
|
|
@ -4021,6 +4021,9 @@ def test_closefd(self):
|
|||
|
||||
class TestIgnoreEINTR(unittest.TestCase):
|
||||
|
||||
# Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
|
||||
CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
|
||||
|
||||
@classmethod
|
||||
def _test_ignore(cls, conn):
|
||||
def handler(signum, frame):
|
||||
|
|
@ -4029,7 +4032,7 @@ def handler(signum, frame):
|
|||
conn.send('ready')
|
||||
x = conn.recv()
|
||||
conn.send(x)
|
||||
conn.send_bytes(b'x' * test.support.PIPE_MAX_SIZE)
|
||||
conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
|
||||
def test_ignore(self):
|
||||
|
|
@ -4048,8 +4051,7 @@ def test_ignore(self):
|
|||
self.assertEqual(conn.recv(), 1234)
|
||||
time.sleep(0.1)
|
||||
os.kill(p.pid, signal.SIGUSR1)
|
||||
self.assertEqual(conn.recv_bytes(),
|
||||
b'x' * test.support.PIPE_MAX_SIZE)
|
||||
self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
|
||||
time.sleep(0.1)
|
||||
p.join()
|
||||
finally:
|
||||
|
|
@ -4145,7 +4147,7 @@ def test_preload_resources(self):
|
|||
if multiprocessing.get_start_method() != 'forkserver':
|
||||
self.skipTest("test only relevant for 'forkserver' method")
|
||||
name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
|
||||
rc, out, err = test.support.script_helper.assert_python_ok(name)
|
||||
rc, out, err = support.script_helper.assert_python_ok(name)
|
||||
out = out.decode()
|
||||
err = err.decode()
|
||||
if out.rstrip() != 'ok' or err != '':
|
||||
|
|
@ -4279,7 +4281,7 @@ def setUpClass(cls):
|
|||
def tearDownClass(cls):
|
||||
# bpo-26762: Some multiprocessing objects like Pool create reference
|
||||
# cycles. Trigger a garbage collection to break these cycles.
|
||||
test.support.gc_collect()
|
||||
support.gc_collect()
|
||||
|
||||
processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
|
||||
if processes:
|
||||
|
|
@ -4458,7 +4460,7 @@ def tearDownModule():
|
|||
|
||||
# bpo-26762: Some multiprocessing objects like Pool create reference
|
||||
# cycles. Trigger a garbage collection to break these cycles.
|
||||
test.support.gc_collect()
|
||||
support.gc_collect()
|
||||
|
||||
multiprocessing.set_start_method(old_start_method[0], force=True)
|
||||
# pause a bit so we don't get warning about dangling threads/processes
|
||||
|
|
@ -4480,7 +4482,7 @@ def tearDownModule():
|
|||
if need_sleep:
|
||||
time.sleep(0.5)
|
||||
multiprocessing.process._cleanup()
|
||||
test.support.gc_collect()
|
||||
support.gc_collect()
|
||||
|
||||
remote_globs['setUpModule'] = setUpModule
|
||||
remote_globs['tearDownModule'] = tearDownModule
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue