cpython/Lib/test/test_multiprocessing.py

3445 lines
102 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
Merged revisions 64623,64640,64665,64687,64689-64690,64719,64721,64735,64742,64744-64746,64756-64761,64767-64769,64771-64772,64774-64775,64788,64793,64835-64836 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r64623 | benjamin.peterson | 2008-07-01 21:51:54 +0200 (Tue, 01 Jul 2008) | 1 line write a short little section for multiprocessing; it still needs help ........ r64640 | georg.brandl | 2008-07-01 22:56:03 +0200 (Tue, 01 Jul 2008) | 2 lines Add a comment about incref'ing w. ........ r64665 | jesse.noller | 2008-07-02 18:56:51 +0200 (Wed, 02 Jul 2008) | 1 line Add #!/usr/bin/env python for ben ........ r64687 | andrew.kuchling | 2008-07-03 14:50:03 +0200 (Thu, 03 Jul 2008) | 1 line Tweak wording ........ r64689 | benjamin.peterson | 2008-07-03 14:57:35 +0200 (Thu, 03 Jul 2008) | 1 line lowercase glossary term ........ r64690 | benjamin.peterson | 2008-07-03 15:01:17 +0200 (Thu, 03 Jul 2008) | 1 line let the term be linked ........ r64719 | raymond.hettinger | 2008-07-05 04:11:55 +0200 (Sat, 05 Jul 2008) | 1 line Update comment on prediction macros. ........ r64721 | georg.brandl | 2008-07-05 12:07:18 +0200 (Sat, 05 Jul 2008) | 2 lines Fix tabs. ........ r64735 | mark.dickinson | 2008-07-05 17:25:48 +0200 (Sat, 05 Jul 2008) | 3 lines Minor rewrite of cmath_log to work around a Sun compiler bug. See issue #3168. ........ r64742 | benjamin.peterson | 2008-07-05 18:29:38 +0200 (Sat, 05 Jul 2008) | 1 line make regrtest aware of the lib2to3 resource ........ r64744 | georg.brandl | 2008-07-05 18:43:45 +0200 (Sat, 05 Jul 2008) | 2 lines Keep below 80 chars. ........ r64745 | facundo.batista | 2008-07-05 21:19:50 +0200 (Sat, 05 Jul 2008) | 3 lines Issue 3289. Removed two lines that ended doing nothing. ........ r64746 | facundo.batista | 2008-07-05 22:39:59 +0200 (Sat, 05 Jul 2008) | 4 lines Issue #3239. Differentiate the ascii call from the curses one and the builtin one. ........ r64756 | gregory.p.smith | 2008-07-06 09:16:40 +0200 (Sun, 06 Jul 2008) | 3 lines - Issue #2113: Fix error in subprocess.Popen if the select system call is interrupted by a signal. ........ r64757 | benjamin.peterson | 2008-07-06 14:39:09 +0200 (Sun, 06 Jul 2008) | 1 line remove test_compact_freelists from test_sys ........ r64758 | gregory.p.smith | 2008-07-06 19:06:29 +0200 (Sun, 06 Jul 2008) | 2 lines fix issue3304 - remove an incorrect PyMem_Free in fileio_init ........ r64759 | georg.brandl | 2008-07-06 19:36:20 +0200 (Sun, 06 Jul 2008) | 2 lines Fix opensearch template. ........ r64760 | andrew.kuchling | 2008-07-06 19:43:16 +0200 (Sun, 06 Jul 2008) | 1 line Wording fix ........ r64761 | andrew.kuchling | 2008-07-06 19:44:17 +0200 (Sun, 06 Jul 2008) | 1 line Add two items; rewrap paragraph ........ r64767 | gregory.p.smith | 2008-07-07 06:31:58 +0200 (Mon, 07 Jul 2008) | 4 lines - Issue #3309: Fix bz2.BZFile itererator to release its internal lock properly when raising an exception due to the bz2file being closed. Prevents a deadlock. ........ r64768 | josiah.carlson | 2008-07-07 06:51:46 +0200 (Mon, 07 Jul 2008) | 2 lines Fixed bugs 760475, 953599, and 1519. ........ r64769 | gregory.p.smith | 2008-07-07 06:54:31 +0200 (Mon, 07 Jul 2008) | 2 lines Add commented out #_sha256 and #_sha512 lines per issue 3183. ........ r64771 | gregory.p.smith | 2008-07-07 07:09:12 +0200 (Mon, 07 Jul 2008) | 4 lines - Issue #3094: httplib.HTTPSConnection Host: headers no longer include the redundant ":443" port number designation when the connection is using the default https port (443). ........ r64772 | skip.montanaro | 2008-07-07 13:16:14 +0200 (Mon, 07 Jul 2008) | 2 lines Correct grammar. ........ r64774 | andrew.kuchling | 2008-07-07 18:51:09 +0200 (Mon, 07 Jul 2008) | 1 line Fix example to match text ........ r64775 | facundo.batista | 2008-07-07 19:02:59 +0200 (Mon, 07 Jul 2008) | 3 lines Issue 3306. Better control for a lenght in findmax() function. ........ r64788 | georg.brandl | 2008-07-08 09:05:23 +0200 (Tue, 08 Jul 2008) | 2 lines Add missing ABCs to list. ........ r64793 | nick.coghlan | 2008-07-08 16:21:42 +0200 (Tue, 08 Jul 2008) | 1 line Add missing NEWS and ACK entries for r64791 ........ r64835 | raymond.hettinger | 2008-07-10 11:31:08 +0200 (Thu, 10 Jul 2008) | 1 line Issue 3287: Raise correct exception for float inputs. ........ r64836 | raymond.hettinger | 2008-07-10 12:28:41 +0200 (Thu, 10 Jul 2008) | 1 line Use operator.index() instead of n.__index__(). ........
2008-07-16 03:43:04 +00:00
#
# Unit tests for the multiprocessing package
#
import unittest
import queue as pyqueue
import time
import io
import itertools
import sys
import os
import gc
import errno
import signal
import array
import socket
import random
import logging
import struct
import operator
Merged revisions 70734,70775,70856,70874,70876-70877 via svnmerge ........ r70734 | r.david.murray | 2009-03-30 15:04:00 -0400 (Mon, 30 Mar 2009) | 7 lines Add import_function method to test.test_support, and modify a number of tests that expect to be skipped if imports fail or functions don't exist to use import_function and import_module. The ultimate goal is to change regrtest to not skip automatically on ImportError. Checking in now to make sure the buldbots don't show any errors on platforms I can't direct test on. ........ r70775 | r.david.murray | 2009-03-30 19:05:48 -0400 (Mon, 30 Mar 2009) | 4 lines Change more tests to use import_module for the modules that should cause tests to be skipped. Also rename import_function to the more descriptive get_attribute and add a docstring. ........ r70856 | r.david.murray | 2009-03-31 14:32:17 -0400 (Tue, 31 Mar 2009) | 7 lines A few more test skips via import_module, and change import_module to return the error message produced by importlib, so that if an import in the package whose import is being wrapped is what failed the skip message will contain the name of that module instead of the name of the wrapped module. Also fixed formatting of some previous comments. ........ r70874 | r.david.murray | 2009-03-31 15:33:15 -0400 (Tue, 31 Mar 2009) | 5 lines Improve test_support.import_module docstring, remove deprecated flag from get_attribute since it isn't likely to do anything useful. ........ r70876 | r.david.murray | 2009-03-31 15:49:15 -0400 (Tue, 31 Mar 2009) | 4 lines Remove the regrtest check that turns any ImportError into a skipped test. Hopefully all modules whose imports legitimately result in a skipped test have been properly wrapped by the previous commits. ........ r70877 | r.david.murray | 2009-03-31 15:57:24 -0400 (Tue, 31 Mar 2009) | 2 lines Add NEWS entry for regrtest change. ........
2009-03-31 23:16:50 +00:00
import test.support
import test.script_helper
Merged revisions 70734,70775,70856,70874,70876-70877 via svnmerge ........ r70734 | r.david.murray | 2009-03-30 15:04:00 -0400 (Mon, 30 Mar 2009) | 7 lines Add import_function method to test.test_support, and modify a number of tests that expect to be skipped if imports fail or functions don't exist to use import_function and import_module. The ultimate goal is to change regrtest to not skip automatically on ImportError. Checking in now to make sure the buldbots don't show any errors on platforms I can't direct test on. ........ r70775 | r.david.murray | 2009-03-30 19:05:48 -0400 (Mon, 30 Mar 2009) | 4 lines Change more tests to use import_module for the modules that should cause tests to be skipped. Also rename import_function to the more descriptive get_attribute and add a docstring. ........ r70856 | r.david.murray | 2009-03-31 14:32:17 -0400 (Tue, 31 Mar 2009) | 7 lines A few more test skips via import_module, and change import_module to return the error message produced by importlib, so that if an import in the package whose import is being wrapped is what failed the skip message will contain the name of that module instead of the name of the wrapped module. Also fixed formatting of some previous comments. ........ r70874 | r.david.murray | 2009-03-31 15:33:15 -0400 (Tue, 31 Mar 2009) | 5 lines Improve test_support.import_module docstring, remove deprecated flag from get_attribute since it isn't likely to do anything useful. ........ r70876 | r.david.murray | 2009-03-31 15:49:15 -0400 (Tue, 31 Mar 2009) | 4 lines Remove the regrtest check that turns any ImportError into a skipped test. Hopefully all modules whose imports legitimately result in a skipped test have been properly wrapped by the previous commits. ........ r70877 | r.david.murray | 2009-03-31 15:57:24 -0400 (Tue, 31 Mar 2009) | 2 lines Add NEWS entry for regrtest change. ........
2009-03-31 23:16:50 +00:00
# Skip tests if _multiprocessing wasn't built.
_multiprocessing = test.support.import_module('_multiprocessing')
# Skip tests if sem_open implementation is broken.
test.support.import_module('multiprocessing.synchronize')
Merged revisions 80552-80556,80564-80566,80568-80571 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r80552 | victor.stinner | 2010-04-27 23:46:03 +0200 (mar., 27 avril 2010) | 3 lines Issue #7449, part 1: fix test_support.py for Python compiled without thread ........ r80553 | victor.stinner | 2010-04-27 23:47:01 +0200 (mar., 27 avril 2010) | 1 line Issue #7449, part 2: regrtest.py -j option requires thread support ........ r80554 | victor.stinner | 2010-04-27 23:51:26 +0200 (mar., 27 avril 2010) | 9 lines Issue #7449 part 3, test_doctest: import trace module in test_coverage() Import trace module fail if the threading module is missing. test_coverage() is only used if test_doctest.py is used with the -c option. This commit allows to execute the test suite without thread support. Move "import trace" in test_coverage() and use test_support.import_module('trace'). ........ r80555 | victor.stinner | 2010-04-27 23:56:26 +0200 (mar., 27 avril 2010) | 6 lines Issue #7449, part 4: skip test_multiprocessing if thread support is disabled import threading after _multiprocessing to raise a more revelant error message: "No module named _multiprocessing". _multiprocessing is not compiled without thread support. ........ r80556 | victor.stinner | 2010-04-28 00:01:24 +0200 (mer., 28 avril 2010) | 8 lines Issue #7449, part 5: split Test.test_open() of ctypes/test/test_errno.py * Split Test.test_open() in 2 functions: test_open() and test_thread_open() * Skip test_open() and test_thread_open() if we are unable to find the C library * Skip test_thread_open() if thread support is disabled * Use unittest.skipUnless(os.name == "nt", ...) on test_GetLastError() ........ r80564 | victor.stinner | 2010-04-28 00:59:35 +0200 (mer., 28 avril 2010) | 4 lines Issue #7449, part 6: fix test_hashlib for missing threading module Move @test_support.reap_thread decorator from test_main() to test_threaded_hashing(). ........ r80565 | victor.stinner | 2010-04-28 01:01:29 +0200 (mer., 28 avril 2010) | 6 lines Issue #7449, part 7: simplify threading detection in test_capi * Skip TestPendingCalls if threading module is missing * Test if threading module is present or not, instead of test the presence of _testcapi._test_thread_state ........ r80566 | victor.stinner | 2010-04-28 01:03:16 +0200 (mer., 28 avril 2010) | 4 lines Issue #7449, part 8: don't skip the whole test_asynchat if threading is missing TestFifo can be executed without the threading module ........ r80568 | victor.stinner | 2010-04-28 01:14:58 +0200 (mer., 28 avril 2010) | 6 lines Issue #7449, part 9: fix test_xmlrpclib for missing threading module * Skip testcases using threads if threading module is missing * Use "http://" instead of URL in ServerProxyTestCase if threading is missing because URL is not set in this case ........ r80569 | victor.stinner | 2010-04-28 01:33:58 +0200 (mer., 28 avril 2010) | 6 lines Partial revert of r80556 (Issue #7449, part 5, fix ctypes test) Rewrite r80556: the thread test have to be executed just after the test on libc_open() and so the test cannot be splitted in two functions (without duplicating code, and I don't want to duplicate code). ........ r80570 | victor.stinner | 2010-04-28 01:51:16 +0200 (mer., 28 avril 2010) | 8 lines Issue #7449, part 10: test_cmd imports trace module using test_support.import_module() Use test_support.import_module() instead of import to raise a SkipTest exception if the import fail. Import trace fails if the threading module is missing. See also part 3: test_doctest: import trace module in test_coverage(). ........ r80571 | victor.stinner | 2010-04-28 01:55:59 +0200 (mer., 28 avril 2010) | 6 lines Issue #7449, last part (11): fix many tests if thread support is disabled * Use try/except ImportError or test_support.import_module() to import thread and threading modules * Add @unittest.skipUnless(threading, ...) to testcases using threads ........
2010-04-28 22:31:17 +00:00
# import threading after _multiprocessing to raise a more revelant error
# message: "No module named _multiprocessing". _multiprocessing is not compiled
# without thread support.
import threading
import multiprocessing.dummy
import multiprocessing.connection
import multiprocessing.managers
import multiprocessing.heap
import multiprocessing.pool
from multiprocessing import util
try:
from multiprocessing import reduction
HAS_REDUCTION = True
except ImportError:
HAS_REDUCTION = False
try:
from multiprocessing.sharedctypes import Value, copy
HAS_SHAREDCTYPES = True
except ImportError:
HAS_SHAREDCTYPES = False
try:
import msvcrt
except ImportError:
msvcrt = None
#
#
#
2008-07-13 18:45:30 +00:00
def latin(s):
return s.encode('latin')
#
# Constants
#
LOG_LEVEL = util.SUBWARNING
#LOG_LEVEL = logging.DEBUG
DELTA = 0.1
CHECK_TIMINGS = False # making true makes tests take a lot longer
# and can sometimes cause some non-serious
# failures because some calls block a bit
# longer than expected
if CHECK_TIMINGS:
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
else:
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
HAVE_GETVALUE = not getattr(_multiprocessing,
'HAVE_BROKEN_SEM_GETVALUE', False)
2009-01-19 16:23:53 +00:00
WIN32 = (sys.platform == "win32")
from multiprocessing.connection import wait
def wait_for_handle(handle, timeout):
if timeout is not None and timeout < 0.0:
timeout = None
return wait([handle], timeout)
2009-01-19 16:23:53 +00:00
try:
MAXFD = os.sysconf("SC_OPEN_MAX")
except:
MAXFD = 256
#
# Some tests require ctypes
#
try:
from ctypes import Structure, c_int, c_double
except ImportError:
Structure = object
c_int = c_double = None
def check_enough_semaphores():
"""Check that the system supports enough semaphores to run the test."""
# minimum number of semaphores available according to POSIX
nsems_min = 256
try:
nsems = os.sysconf("SC_SEM_NSEMS_MAX")
except (AttributeError, ValueError):
# sysconf not available or setting not available
return
if nsems == -1 or nsems >= nsems_min:
return
raise unittest.SkipTest("The OS doesn't support enough semaphores "
"to run the test (required: %d)." % nsems_min)
#
# Creates a wrapper for a function which records the time it takes to finish
#
class TimingWrapper(object):
def __init__(self, func):
self.func = func
self.elapsed = None
def __call__(self, *args, **kwds):
t = time.time()
try:
return self.func(*args, **kwds)
finally:
self.elapsed = time.time() - t
#
# Base class for test cases
#
class BaseTestCase(object):
ALLOWED_TYPES = ('processes', 'manager', 'threads')
def assertTimingAlmostEqual(self, a, b):
if CHECK_TIMINGS:
self.assertAlmostEqual(a, b, 1)
def assertReturnsIfImplemented(self, value, func, *args):
try:
res = func(*args)
except NotImplementedError:
pass
else:
return self.assertEqual(value, res)
# For the sanity of Windows users, rather than crashing or freezing in
# multiple ways.
def __reduce__(self, *args):
raise NotImplementedError("shouldn't try to pickle a test case")
__reduce_ex__ = __reduce__
#
# Return the value of a semaphore
#
def get_value(self):
try:
return self.get_value()
except AttributeError:
try:
return self._Semaphore__value
except AttributeError:
try:
return self._value
except AttributeError:
raise NotImplementedError
#
# Testcases
#
class _TestProcess(BaseTestCase):
ALLOWED_TYPES = ('processes', 'threads')
def test_current(self):
if self.TYPE == 'threads':
return
current = self.current_process()
authkey = current.authkey
self.assertTrue(current.is_alive())
self.assertTrue(not current.daemon)
self.assertIsInstance(authkey, bytes)
self.assertTrue(len(authkey) > 0)
self.assertEqual(current.ident, os.getpid())
self.assertEqual(current.exitcode, None)
def test_daemon_argument(self):
if self.TYPE == "threads":
return
# By default uses the current process's daemon flag.
proc0 = self.Process(target=self._test)
2011-03-02 00:15:44 +00:00
self.assertEqual(proc0.daemon, self.current_process().daemon)
proc1 = self.Process(target=self._test, daemon=True)
self.assertTrue(proc1.daemon)
proc2 = self.Process(target=self._test, daemon=False)
self.assertFalse(proc2.daemon)
@classmethod
def _test(cls, q, *args, **kwds):
current = cls.current_process()
q.put(args)
q.put(kwds)
q.put(current.name)
if cls.TYPE != 'threads':
q.put(bytes(current.authkey))
q.put(current.pid)
def test_process(self):
q = self.Queue(1)
e = self.Event()
args = (q, 1, 2)
kwargs = {'hello':23, 'bye':2.54}
name = 'SomeProcess'
p = self.Process(
target=self._test, args=args, kwargs=kwargs, name=name
)
p.daemon = True
current = self.current_process()
if self.TYPE != 'threads':
self.assertEqual(p.authkey, current.authkey)
self.assertEqual(p.is_alive(), False)
self.assertEqual(p.daemon, True)
self.assertNotIn(p, self.active_children())
self.assertTrue(type(self.active_children()) is list)
self.assertEqual(p.exitcode, None)
p.start()
self.assertEqual(p.exitcode, None)
self.assertEqual(p.is_alive(), True)
self.assertIn(p, self.active_children())
self.assertEqual(q.get(), args[1:])
self.assertEqual(q.get(), kwargs)
self.assertEqual(q.get(), p.name)
if self.TYPE != 'threads':
self.assertEqual(q.get(), current.authkey)
self.assertEqual(q.get(), p.pid)
p.join()
self.assertEqual(p.exitcode, 0)
self.assertEqual(p.is_alive(), False)
self.assertNotIn(p, self.active_children())
@classmethod
def _test_terminate(cls):
time.sleep(1000)
def test_terminate(self):
if self.TYPE == 'threads':
return
p = self.Process(target=self._test_terminate)
p.daemon = True
p.start()
self.assertEqual(p.is_alive(), True)
self.assertIn(p, self.active_children())
self.assertEqual(p.exitcode, None)
join = TimingWrapper(p.join)
self.assertEqual(join(0), None)
self.assertTimingAlmostEqual(join.elapsed, 0.0)
self.assertEqual(p.is_alive(), True)
self.assertEqual(join(-1), None)
self.assertTimingAlmostEqual(join.elapsed, 0.0)
self.assertEqual(p.is_alive(), True)
p.terminate()
self.assertEqual(join(), None)
self.assertTimingAlmostEqual(join.elapsed, 0.0)
self.assertEqual(p.is_alive(), False)
self.assertNotIn(p, self.active_children())
p.join()
# XXX sometimes get p.exitcode == 0 on Windows ...
#self.assertEqual(p.exitcode, -signal.SIGTERM)
def test_cpu_count(self):
try:
cpus = multiprocessing.cpu_count()
except NotImplementedError:
cpus = 1
self.assertTrue(type(cpus) is int)
self.assertTrue(cpus >= 1)
def test_active_children(self):
self.assertEqual(type(self.active_children()), list)
p = self.Process(target=time.sleep, args=(DELTA,))
self.assertNotIn(p, self.active_children())
p.daemon = True
p.start()
self.assertIn(p, self.active_children())
p.join()
self.assertNotIn(p, self.active_children())
@classmethod
def _test_recursion(cls, wconn, id):
from multiprocessing import forking
wconn.send(id)
if len(id) < 2:
for i in range(2):
p = cls.Process(
target=cls._test_recursion, args=(wconn, id+[i])
)
p.start()
p.join()
def test_recursion(self):
rconn, wconn = self.Pipe(duplex=False)
self._test_recursion(wconn, [])
time.sleep(DELTA)
result = []
while rconn.poll():
result.append(rconn.recv())
expected = [
[],
[0],
[0, 0],
[0, 1],
[1],
[1, 0],
[1, 1]
]
self.assertEqual(result, expected)
@classmethod
def _test_sentinel(cls, event):
event.wait(10.0)
def test_sentinel(self):
if self.TYPE == "threads":
return
event = self.Event()
p = self.Process(target=self._test_sentinel, args=(event,))
with self.assertRaises(ValueError):
p.sentinel
p.start()
self.addCleanup(p.join)
sentinel = p.sentinel
self.assertIsInstance(sentinel, int)
self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
event.set()
p.join()
self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
#
#
#
class _UpperCaser(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
self.child_conn, self.parent_conn = multiprocessing.Pipe()
def run(self):
self.parent_conn.close()
for s in iter(self.child_conn.recv, None):
self.child_conn.send(s.upper())
self.child_conn.close()
def submit(self, s):
assert type(s) is str
self.parent_conn.send(s)
return self.parent_conn.recv()
def stop(self):
self.parent_conn.send(None)
self.parent_conn.close()
self.child_conn.close()
class _TestSubclassingProcess(BaseTestCase):
ALLOWED_TYPES = ('processes',)
def test_subclassing(self):
uppercaser = _UpperCaser()
uppercaser.daemon = True
uppercaser.start()
self.assertEqual(uppercaser.submit('hello'), 'HELLO')
self.assertEqual(uppercaser.submit('world'), 'WORLD')
uppercaser.stop()
uppercaser.join()
def test_stderr_flush(self):
# sys.stderr is flushed at process shutdown (issue #13812)
if self.TYPE == "threads":
return
testfn = test.support.TESTFN
self.addCleanup(test.support.unlink, testfn)
proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
proc.start()
proc.join()
with open(testfn, 'r') as f:
err = f.read()
# The whole traceback was printed
self.assertIn("ZeroDivisionError", err)
self.assertIn("test_multiprocessing.py", err)
self.assertIn("1/0 # MARKER", err)
@classmethod
def _test_stderr_flush(cls, testfn):
sys.stderr = open(testfn, 'w')
1/0 # MARKER
@classmethod
def _test_sys_exit(cls, reason, testfn):
sys.stderr = open(testfn, 'w')
sys.exit(reason)
def test_sys_exit(self):
# See Issue 13854
if self.TYPE == 'threads':
return
testfn = test.support.TESTFN
self.addCleanup(test.support.unlink, testfn)
for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
p.daemon = True
p.start()
p.join(5)
self.assertEqual(p.exitcode, code)
with open(testfn, 'r') as f:
self.assertEqual(f.read().rstrip(), str(reason))
for reason in (True, False, 8):
p = self.Process(target=sys.exit, args=(reason,))
p.daemon = True
p.start()
p.join(5)
self.assertEqual(p.exitcode, reason)
#
#
#
def queue_empty(q):
if hasattr(q, 'empty'):
return q.empty()
else:
return q.qsize() == 0
def queue_full(q, maxsize):
if hasattr(q, 'full'):
return q.full()
else:
return q.qsize() == maxsize
class _TestQueue(BaseTestCase):
@classmethod
def _test_put(cls, queue, child_can_start, parent_can_continue):
child_can_start.wait()
for i in range(6):
queue.get()
parent_can_continue.set()
def test_put(self):
MAXSIZE = 6
queue = self.Queue(maxsize=MAXSIZE)
child_can_start = self.Event()
parent_can_continue = self.Event()
proc = self.Process(
target=self._test_put,
args=(queue, child_can_start, parent_can_continue)
)
proc.daemon = True
proc.start()
self.assertEqual(queue_empty(queue), True)
self.assertEqual(queue_full(queue, MAXSIZE), False)
queue.put(1)
queue.put(2, True)
queue.put(3, True, None)
queue.put(4, False)
queue.put(5, False, None)
queue.put_nowait(6)
# the values may be in buffer but not yet in pipe so sleep a bit
time.sleep(DELTA)
self.assertEqual(queue_empty(queue), False)
self.assertEqual(queue_full(queue, MAXSIZE), True)
put = TimingWrapper(queue.put)
put_nowait = TimingWrapper(queue.put_nowait)
self.assertRaises(pyqueue.Full, put, 7, False)
self.assertTimingAlmostEqual(put.elapsed, 0)
self.assertRaises(pyqueue.Full, put, 7, False, None)
self.assertTimingAlmostEqual(put.elapsed, 0)
self.assertRaises(pyqueue.Full, put_nowait, 7)
self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
self.assertTimingAlmostEqual(put.elapsed, 0)
self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
child_can_start.set()
parent_can_continue.wait()
self.assertEqual(queue_empty(queue), True)
self.assertEqual(queue_full(queue, MAXSIZE), False)
proc.join()
@classmethod
def _test_get(cls, queue, child_can_start, parent_can_continue):
child_can_start.wait()
#queue.put(1)
queue.put(2)
queue.put(3)
queue.put(4)
queue.put(5)
parent_can_continue.set()
def test_get(self):
queue = self.Queue()
child_can_start = self.Event()
parent_can_continue = self.Event()
proc = self.Process(
target=self._test_get,
args=(queue, child_can_start, parent_can_continue)
)
proc.daemon = True
proc.start()
self.assertEqual(queue_empty(queue), True)
child_can_start.set()
parent_can_continue.wait()
time.sleep(DELTA)
self.assertEqual(queue_empty(queue), False)
# Hangs unexpectedly, remove for now
#self.assertEqual(queue.get(), 1)
self.assertEqual(queue.get(True, None), 2)
self.assertEqual(queue.get(True), 3)
self.assertEqual(queue.get(timeout=1), 4)
self.assertEqual(queue.get_nowait(), 5)
self.assertEqual(queue_empty(queue), True)
get = TimingWrapper(queue.get)
get_nowait = TimingWrapper(queue.get_nowait)
self.assertRaises(pyqueue.Empty, get, False)
self.assertTimingAlmostEqual(get.elapsed, 0)
self.assertRaises(pyqueue.Empty, get, False, None)
self.assertTimingAlmostEqual(get.elapsed, 0)
self.assertRaises(pyqueue.Empty, get_nowait)
self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
self.assertTimingAlmostEqual(get.elapsed, 0)
self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
proc.join()
@classmethod
def _test_fork(cls, queue):
for i in range(10, 20):
queue.put(i)
# note that at this point the items may only be buffered, so the
# process cannot shutdown until the feeder thread has finished
# pushing items onto the pipe.
def test_fork(self):
# Old versions of Queue would fail to create a new feeder
# thread for a forked process if the original process had its
# own feeder thread. This test checks that this no longer
# happens.
queue = self.Queue()
# put items on queue so that main process starts a feeder thread
for i in range(10):
queue.put(i)
# wait to make sure thread starts before we fork a new process
time.sleep(DELTA)
# fork process
p = self.Process(target=self._test_fork, args=(queue,))
p.daemon = True
p.start()
# check that all expected items are in the queue
for i in range(20):
self.assertEqual(queue.get(), i)
self.assertRaises(pyqueue.Empty, queue.get, False)
p.join()
def test_qsize(self):
q = self.Queue()
try:
self.assertEqual(q.qsize(), 0)
except NotImplementedError:
return
q.put(1)
self.assertEqual(q.qsize(), 1)
q.put(5)
self.assertEqual(q.qsize(), 2)
q.get()
self.assertEqual(q.qsize(), 1)
q.get()
self.assertEqual(q.qsize(), 0)
@classmethod
def _test_task_done(cls, q):
for obj in iter(q.get, None):
time.sleep(DELTA)
q.task_done()
def test_task_done(self):
queue = self.JoinableQueue()
if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
self.skipTest("requires 'queue.task_done()' method")
workers = [self.Process(target=self._test_task_done, args=(queue,))
for i in range(4)]
for p in workers:
p.daemon = True
p.start()
for i in range(10):
queue.put(i)
queue.join()
for p in workers:
queue.put(None)
for p in workers:
p.join()
#
#
#
class _TestLock(BaseTestCase):
def test_lock(self):
lock = self.Lock()
self.assertEqual(lock.acquire(), True)
self.assertEqual(lock.acquire(False), False)
self.assertEqual(lock.release(), None)
self.assertRaises((ValueError, threading.ThreadError), lock.release)
def test_rlock(self):
lock = self.RLock()
self.assertEqual(lock.acquire(), True)
self.assertEqual(lock.acquire(), True)
self.assertEqual(lock.acquire(), True)
self.assertEqual(lock.release(), None)
self.assertEqual(lock.release(), None)
self.assertEqual(lock.release(), None)
self.assertRaises((AssertionError, RuntimeError), lock.release)
2009-03-31 03:25:07 +00:00
def test_lock_context(self):
with self.Lock():
pass
class _TestSemaphore(BaseTestCase):
def _test_semaphore(self, sem):
self.assertReturnsIfImplemented(2, get_value, sem)
self.assertEqual(sem.acquire(), True)
self.assertReturnsIfImplemented(1, get_value, sem)
self.assertEqual(sem.acquire(), True)
self.assertReturnsIfImplemented(0, get_value, sem)
self.assertEqual(sem.acquire(False), False)
self.assertReturnsIfImplemented(0, get_value, sem)
self.assertEqual(sem.release(), None)
self.assertReturnsIfImplemented(1, get_value, sem)
self.assertEqual(sem.release(), None)
self.assertReturnsIfImplemented(2, get_value, sem)
def test_semaphore(self):
sem = self.Semaphore(2)
self._test_semaphore(sem)
self.assertEqual(sem.release(), None)
self.assertReturnsIfImplemented(3, get_value, sem)
self.assertEqual(sem.release(), None)
self.assertReturnsIfImplemented(4, get_value, sem)
def test_bounded_semaphore(self):
sem = self.BoundedSemaphore(2)
self._test_semaphore(sem)
# Currently fails on OS/X
#if HAVE_GETVALUE:
# self.assertRaises(ValueError, sem.release)
# self.assertReturnsIfImplemented(2, get_value, sem)
def test_timeout(self):
if self.TYPE != 'processes':
return
sem = self.Semaphore(0)
acquire = TimingWrapper(sem.acquire)
self.assertEqual(acquire(False), False)
self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
self.assertEqual(acquire(False, None), False)
self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
self.assertEqual(acquire(False, TIMEOUT1), False)
self.assertTimingAlmostEqual(acquire.elapsed, 0)
self.assertEqual(acquire(True, TIMEOUT2), False)
self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
self.assertEqual(acquire(timeout=TIMEOUT3), False)
self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
class _TestCondition(BaseTestCase):
@classmethod
def f(cls, cond, sleeping, woken, timeout=None):
cond.acquire()
sleeping.release()
cond.wait(timeout)
woken.release()
cond.release()
def check_invariant(self, cond):
# this is only supposed to succeed when there are no sleepers
if self.TYPE == 'processes':
try:
sleepers = (cond._sleeping_count.get_value() -
cond._woken_count.get_value())
self.assertEqual(sleepers, 0)
self.assertEqual(cond._wait_semaphore.get_value(), 0)
except NotImplementedError:
pass
def test_notify(self):
cond = self.Condition()
sleeping = self.Semaphore(0)
woken = self.Semaphore(0)
p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
# wait for both children to start sleeping
sleeping.acquire()
sleeping.acquire()
# check no process/thread has woken up
time.sleep(DELTA)
self.assertReturnsIfImplemented(0, get_value, woken)
# wake up one process/thread
cond.acquire()
cond.notify()
cond.release()
# check one process/thread has woken up
time.sleep(DELTA)
self.assertReturnsIfImplemented(1, get_value, woken)
# wake up another
cond.acquire()
cond.notify()
cond.release()
# check other has woken up
time.sleep(DELTA)
self.assertReturnsIfImplemented(2, get_value, woken)
# check state is not mucked up
self.check_invariant(cond)
p.join()
def test_notify_all(self):
cond = self.Condition()
sleeping = self.Semaphore(0)
woken = self.Semaphore(0)
# start some threads/processes which will timeout
for i in range(3):
p = self.Process(target=self.f,
args=(cond, sleeping, woken, TIMEOUT1))
p.daemon = True
p.start()
t = threading.Thread(target=self.f,
args=(cond, sleeping, woken, TIMEOUT1))
t.daemon = True
t.start()
# wait for them all to sleep
for i in range(6):
sleeping.acquire()
# check they have all timed out
for i in range(6):
woken.acquire()
self.assertReturnsIfImplemented(0, get_value, woken)
# check state is not mucked up
self.check_invariant(cond)
# start some more threads/processes
for i in range(3):
p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
t.daemon = True
t.start()
# wait for them to all sleep
for i in range(6):
sleeping.acquire()
# check no process/thread has woken up
time.sleep(DELTA)
self.assertReturnsIfImplemented(0, get_value, woken)
# wake them all up
cond.acquire()
cond.notify_all()
cond.release()
# check they have all woken
for i in range(10):
try:
if get_value(woken) == 6:
break
except NotImplementedError:
break
time.sleep(DELTA)
self.assertReturnsIfImplemented(6, get_value, woken)
# check state is not mucked up
self.check_invariant(cond)
def test_timeout(self):
cond = self.Condition()
wait = TimingWrapper(cond.wait)
cond.acquire()
res = wait(TIMEOUT1)
cond.release()
2010-10-28 09:24:56 +00:00
self.assertEqual(res, False)
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
@classmethod
def _test_waitfor_f(cls, cond, state):
with cond:
state.value = 0
cond.notify()
result = cond.wait_for(lambda : state.value==4)
if not result or state.value != 4:
sys.exit(1)
@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
def test_waitfor(self):
# based on test in test/lock_tests.py
cond = self.Condition()
state = self.Value('i', -1)
p = self.Process(target=self._test_waitfor_f, args=(cond, state))
p.daemon = True
p.start()
with cond:
result = cond.wait_for(lambda : state.value==0)
self.assertTrue(result)
self.assertEqual(state.value, 0)
for i in range(4):
time.sleep(0.01)
with cond:
state.value += 1
cond.notify()
p.join(5)
self.assertFalse(p.is_alive())
self.assertEqual(p.exitcode, 0)
@classmethod
def _test_waitfor_timeout_f(cls, cond, state, success, sem):
sem.release()
with cond:
expected = 0.1
dt = time.time()
result = cond.wait_for(lambda : state.value==4, timeout=expected)
dt = time.time() - dt
# borrow logic in assertTimeout() from test/lock_tests.py
if not result and expected * 0.6 < dt < expected * 10.0:
success.value = True
@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
def test_waitfor_timeout(self):
# based on test in test/lock_tests.py
cond = self.Condition()
state = self.Value('i', 0)
success = self.Value('i', False)
sem = self.Semaphore(0)
p = self.Process(target=self._test_waitfor_timeout_f,
args=(cond, state, success, sem))
p.daemon = True
p.start()
self.assertTrue(sem.acquire(timeout=10))
# Only increment 3 times, so state == 4 is never reached.
for i in range(3):
time.sleep(0.01)
with cond:
state.value += 1
cond.notify()
p.join(5)
self.assertTrue(success.value)
@classmethod
def _test_wait_result(cls, c, pid):
with c:
c.notify()
time.sleep(1)
if pid is not None:
os.kill(pid, signal.SIGINT)
def test_wait_result(self):
if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
pid = os.getpid()
else:
pid = None
c = self.Condition()
with c:
self.assertFalse(c.wait(0))
self.assertFalse(c.wait(0.1))
p = self.Process(target=self._test_wait_result, args=(c, pid))
p.start()
self.assertTrue(c.wait(10))
if pid is not None:
self.assertRaises(KeyboardInterrupt, c.wait, 10)
p.join()
class _TestEvent(BaseTestCase):
@classmethod
def _test_event(cls, event):
time.sleep(TIMEOUT2)
event.set()
def test_event(self):
event = self.Event()
wait = TimingWrapper(event.wait)
# Removed temporarily, due to API shear, this does not
# work with threading._Event objects. is_set == isSet
self.assertEqual(event.is_set(), False)
# Removed, threading.Event.wait() will return the value of the __flag
# instead of None. API Shear with the semaphore backed mp.Event
self.assertEqual(wait(0.0), False)
self.assertTimingAlmostEqual(wait.elapsed, 0.0)
self.assertEqual(wait(TIMEOUT1), False)
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
event.set()
# See note above on the API differences
self.assertEqual(event.is_set(), True)
self.assertEqual(wait(), True)
self.assertTimingAlmostEqual(wait.elapsed, 0.0)
self.assertEqual(wait(TIMEOUT1), True)
self.assertTimingAlmostEqual(wait.elapsed, 0.0)
# self.assertEqual(event.is_set(), True)
event.clear()
#self.assertEqual(event.is_set(), False)
p = self.Process(target=self._test_event, args=(event,))
p.daemon = True
p.start()
self.assertEqual(wait(), True)
#
# Tests for Barrier - adapted from tests in test/lock_tests.py
#
# Many of the tests for threading.Barrier use a list as an atomic
# counter: a value is appended to increment the counter, and the
# length of the list gives the value. We use the class DummyList
# for the same purpose.
class _DummyList(object):
def __init__(self):
wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
lock = multiprocessing.Lock()
self.__setstate__((wrapper, lock))
self._lengthbuf[0] = 0
def __setstate__(self, state):
(self._wrapper, self._lock) = state
self._lengthbuf = self._wrapper.create_memoryview().cast('i')
def __getstate__(self):
return (self._wrapper, self._lock)
def append(self, _):
with self._lock:
self._lengthbuf[0] += 1
def __len__(self):
with self._lock:
return self._lengthbuf[0]
def _wait():
# A crude wait/yield function not relying on synchronization primitives.
time.sleep(0.01)
class Bunch(object):
"""
A bunch of threads.
"""
def __init__(self, namespace, f, args, n, wait_before_exit=False):
"""
Construct a bunch of `n` threads running the same function `f`.
If `wait_before_exit` is True, the threads won't terminate until
do_finish() is called.
"""
self.f = f
self.args = args
self.n = n
self.started = namespace.DummyList()
self.finished = namespace.DummyList()
self._can_exit = namespace.Event()
if not wait_before_exit:
self._can_exit.set()
for i in range(n):
p = namespace.Process(target=self.task)
p.daemon = True
p.start()
def task(self):
pid = os.getpid()
self.started.append(pid)
try:
self.f(*self.args)
finally:
self.finished.append(pid)
self._can_exit.wait(30)
assert self._can_exit.is_set()
def wait_for_started(self):
while len(self.started) < self.n:
_wait()
def wait_for_finished(self):
while len(self.finished) < self.n:
_wait()
def do_finish(self):
self._can_exit.set()
class AppendTrue(object):
def __init__(self, obj):
self.obj = obj
def __call__(self):
self.obj.append(True)
class _TestBarrier(BaseTestCase):
"""
Tests for Barrier objects.
"""
N = 5
2012-06-18 14:11:10 +01:00
defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
def setUp(self):
self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
def tearDown(self):
self.barrier.abort()
self.barrier = None
def DummyList(self):
if self.TYPE == 'threads':
return []
elif self.TYPE == 'manager':
return self.manager.list()
else:
return _DummyList()
def run_threads(self, f, args):
b = Bunch(self, f, args, self.N-1)
f(*args)
b.wait_for_finished()
@classmethod
def multipass(cls, barrier, results, n):
m = barrier.parties
assert m == cls.N
for i in range(n):
results[0].append(True)
assert len(results[1]) == i * m
barrier.wait()
results[1].append(True)
assert len(results[0]) == (i + 1) * m
barrier.wait()
try:
assert barrier.n_waiting == 0
except NotImplementedError:
pass
assert not barrier.broken
def test_barrier(self, passes=1):
"""
Test that a barrier is passed in lockstep
"""
results = [self.DummyList(), self.DummyList()]
self.run_threads(self.multipass, (self.barrier, results, passes))
def test_barrier_10(self):
"""
Test that a barrier works for 10 consecutive runs
"""
return self.test_barrier(10)
@classmethod
def _test_wait_return_f(cls, barrier, queue):
res = barrier.wait()
queue.put(res)
def test_wait_return(self):
"""
test the return value from barrier.wait
"""
queue = self.Queue()
self.run_threads(self._test_wait_return_f, (self.barrier, queue))
results = [queue.get() for i in range(self.N)]
self.assertEqual(results.count(0), 1)
@classmethod
def _test_action_f(cls, barrier, results):
barrier.wait()
if len(results) != 1:
raise RuntimeError
def test_action(self):
"""
Test the 'action' callback
"""
results = self.DummyList()
barrier = self.Barrier(self.N, action=AppendTrue(results))
self.run_threads(self._test_action_f, (barrier, results))
self.assertEqual(len(results), 1)
@classmethod
def _test_abort_f(cls, barrier, results1, results2):
try:
i = barrier.wait()
if i == cls.N//2:
raise RuntimeError
barrier.wait()
results1.append(True)
except threading.BrokenBarrierError:
results2.append(True)
except RuntimeError:
barrier.abort()
def test_abort(self):
"""
Test that an abort will put the barrier in a broken state
"""
results1 = self.DummyList()
results2 = self.DummyList()
self.run_threads(self._test_abort_f,
(self.barrier, results1, results2))
self.assertEqual(len(results1), 0)
self.assertEqual(len(results2), self.N-1)
self.assertTrue(self.barrier.broken)
@classmethod
def _test_reset_f(cls, barrier, results1, results2, results3):
i = barrier.wait()
if i == cls.N//2:
# Wait until the other threads are all in the barrier.
while barrier.n_waiting < cls.N-1:
time.sleep(0.001)
barrier.reset()
else:
try:
barrier.wait()
results1.append(True)
except threading.BrokenBarrierError:
results2.append(True)
# Now, pass the barrier again
barrier.wait()
results3.append(True)
def test_reset(self):
"""
Test that a 'reset' on a barrier frees the waiting threads
"""
results1 = self.DummyList()
results2 = self.DummyList()
results3 = self.DummyList()
self.run_threads(self._test_reset_f,
(self.barrier, results1, results2, results3))
self.assertEqual(len(results1), 0)
self.assertEqual(len(results2), self.N-1)
self.assertEqual(len(results3), self.N)
@classmethod
def _test_abort_and_reset_f(cls, barrier, barrier2,
results1, results2, results3):
try:
i = barrier.wait()
if i == cls.N//2:
raise RuntimeError
barrier.wait()
results1.append(True)
except threading.BrokenBarrierError:
results2.append(True)
except RuntimeError:
barrier.abort()
# Synchronize and reset the barrier. Must synchronize first so
# that everyone has left it when we reset, and after so that no
# one enters it before the reset.
if barrier2.wait() == cls.N//2:
barrier.reset()
barrier2.wait()
barrier.wait()
results3.append(True)
def test_abort_and_reset(self):
"""
Test that a barrier can be reset after being broken.
"""
results1 = self.DummyList()
results2 = self.DummyList()
results3 = self.DummyList()
barrier2 = self.Barrier(self.N)
self.run_threads(self._test_abort_and_reset_f,
(self.barrier, barrier2, results1, results2, results3))
self.assertEqual(len(results1), 0)
self.assertEqual(len(results2), self.N-1)
self.assertEqual(len(results3), self.N)
@classmethod
def _test_timeout_f(cls, barrier, results):
2012-06-18 14:11:10 +01:00
i = barrier.wait()
if i == cls.N//2:
# One thread is late!
2012-06-18 14:11:10 +01:00
time.sleep(1.0)
try:
barrier.wait(0.5)
except threading.BrokenBarrierError:
results.append(True)
def test_timeout(self):
"""
Test wait(timeout)
"""
results = self.DummyList()
self.run_threads(self._test_timeout_f, (self.barrier, results))
self.assertEqual(len(results), self.barrier.parties)
@classmethod
def _test_default_timeout_f(cls, barrier, results):
2012-06-18 14:11:10 +01:00
i = barrier.wait(cls.defaultTimeout)
if i == cls.N//2:
# One thread is later than the default timeout
2012-06-18 14:11:10 +01:00
time.sleep(1.0)
try:
barrier.wait()
except threading.BrokenBarrierError:
results.append(True)
def test_default_timeout(self):
"""
Test the barrier's default timeout
"""
2012-06-18 14:11:10 +01:00
barrier = self.Barrier(self.N, timeout=0.5)
results = self.DummyList()
self.run_threads(self._test_default_timeout_f, (barrier, results))
self.assertEqual(len(results), barrier.parties)
def test_single_thread(self):
b = self.Barrier(1)
b.wait()
b.wait()
@classmethod
def _test_thousand_f(cls, barrier, passes, conn, lock):
for i in range(passes):
barrier.wait()
with lock:
conn.send(i)
def test_thousand(self):
if self.TYPE == 'manager':
return
passes = 1000
lock = self.Lock()
conn, child_conn = self.Pipe(False)
for j in range(self.N):
p = self.Process(target=self._test_thousand_f,
args=(self.barrier, passes, child_conn, lock))
p.start()
for i in range(passes):
for j in range(self.N):
self.assertEqual(conn.recv(), i)
#
#
#
class _TestValue(BaseTestCase):
ALLOWED_TYPES = ('processes',)
codes_values = [
('i', 4343, 24234),
('d', 3.625, -4.25),
('h', -232, 234),
('c', latin('x'), latin('y'))
]
def setUp(self):
if not HAS_SHAREDCTYPES:
self.skipTest("requires multiprocessing.sharedctypes")
@classmethod
def _test(cls, values):
for sv, cv in zip(values, cls.codes_values):
sv.value = cv[2]
def test_value(self, raw=False):
if raw:
values = [self.RawValue(code, value)
for code, value, _ in self.codes_values]
else:
values = [self.Value(code, value)
for code, value, _ in self.codes_values]
for sv, cv in zip(values, self.codes_values):
self.assertEqual(sv.value, cv[1])
proc = self.Process(target=self._test, args=(values,))
proc.daemon = True
proc.start()
proc.join()
for sv, cv in zip(values, self.codes_values):
self.assertEqual(sv.value, cv[2])
def test_rawvalue(self):
self.test_value(raw=True)
def test_getobj_getlock(self):
val1 = self.Value('i', 5)
lock1 = val1.get_lock()
obj1 = val1.get_obj()
val2 = self.Value('i', 5, lock=None)
lock2 = val2.get_lock()
obj2 = val2.get_obj()
lock = self.Lock()
val3 = self.Value('i', 5, lock=lock)
lock3 = val3.get_lock()
obj3 = val3.get_obj()
self.assertEqual(lock, lock3)
2009-01-18 03:11:38 +00:00
arr4 = self.Value('i', 5, lock=False)
self.assertFalse(hasattr(arr4, 'get_lock'))
self.assertFalse(hasattr(arr4, 'get_obj'))
2009-01-18 03:11:38 +00:00
self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
arr5 = self.RawValue('i', 5)
self.assertFalse(hasattr(arr5, 'get_lock'))
self.assertFalse(hasattr(arr5, 'get_obj'))
class _TestArray(BaseTestCase):
ALLOWED_TYPES = ('processes',)
@classmethod
def f(cls, seq):
for i in range(1, len(seq)):
seq[i] += seq[i-1]
@unittest.skipIf(c_int is None, "requires _ctypes")
def test_array(self, raw=False):
seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
if raw:
arr = self.RawArray('i', seq)
else:
arr = self.Array('i', seq)
self.assertEqual(len(arr), len(seq))
self.assertEqual(arr[3], seq[3])
self.assertEqual(list(arr[2:7]), list(seq[2:7]))
arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
self.assertEqual(list(arr[:]), seq)
self.f(seq)
p = self.Process(target=self.f, args=(arr,))
p.daemon = True
p.start()
p.join()
self.assertEqual(list(arr[:]), seq)
@unittest.skipIf(c_int is None, "requires _ctypes")
def test_array_from_size(self):
size = 10
# Test for zeroing (see issue #11675).
# The repetition below strengthens the test by increasing the chances
# of previously allocated non-zero memory being used for the new array
# on the 2nd and 3rd loops.
for _ in range(3):
arr = self.Array('i', size)
self.assertEqual(len(arr), size)
self.assertEqual(list(arr), [0] * size)
arr[:] = range(10)
self.assertEqual(list(arr), list(range(10)))
del arr
@unittest.skipIf(c_int is None, "requires _ctypes")
def test_rawarray(self):
self.test_array(raw=True)
@unittest.skipIf(c_int is None, "requires _ctypes")
def test_getobj_getlock_obj(self):
arr1 = self.Array('i', list(range(10)))
lock1 = arr1.get_lock()
obj1 = arr1.get_obj()
arr2 = self.Array('i', list(range(10)), lock=None)
lock2 = arr2.get_lock()
obj2 = arr2.get_obj()
lock = self.Lock()
arr3 = self.Array('i', list(range(10)), lock=lock)
lock3 = arr3.get_lock()
obj3 = arr3.get_obj()
self.assertEqual(lock, lock3)
2009-01-18 03:11:38 +00:00
arr4 = self.Array('i', range(10), lock=False)
self.assertFalse(hasattr(arr4, 'get_lock'))
self.assertFalse(hasattr(arr4, 'get_obj'))
2009-01-18 03:11:38 +00:00
self.assertRaises(AttributeError,
self.Array, 'i', range(10), lock='notalock')
arr5 = self.RawArray('i', range(10))
self.assertFalse(hasattr(arr5, 'get_lock'))
self.assertFalse(hasattr(arr5, 'get_obj'))
#
#
#
class _TestContainers(BaseTestCase):
ALLOWED_TYPES = ('manager',)
def test_list(self):
a = self.list(list(range(10)))
self.assertEqual(a[:], list(range(10)))
b = self.list()
self.assertEqual(b[:], [])
b.extend(list(range(5)))
self.assertEqual(b[:], list(range(5)))
self.assertEqual(b[2], 2)
self.assertEqual(b[2:10], [2,3,4])
b *= 2
self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
self.assertEqual(a[:], list(range(10)))
d = [a, b]
e = self.list(d)
self.assertEqual(
e[:],
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
)
f = self.list([a])
a.append('hello')
self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
def test_dict(self):
d = self.dict()
indices = list(range(65, 70))
for i in indices:
d[i] = chr(i)
self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
self.assertEqual(sorted(d.keys()), indices)
self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
def test_namespace(self):
n = self.Namespace()
n.name = 'Bob'
n.job = 'Builder'
n._hidden = 'hidden'
self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
del n.job
self.assertEqual(str(n), "Namespace(name='Bob')")
self.assertTrue(hasattr(n, 'name'))
self.assertTrue(not hasattr(n, 'job'))
#
#
#
def sqr(x, wait=0.0):
time.sleep(wait)
return x*x
def mul(x, y):
return x*y
class _TestPool(BaseTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.pool = cls.Pool(4)
@classmethod
def tearDownClass(cls):
cls.pool.terminate()
cls.pool.join()
cls.pool = None
super().tearDownClass()
def test_apply(self):
papply = self.pool.apply
self.assertEqual(papply(sqr, (5,)), sqr(5))
self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
def test_map(self):
pmap = self.pool.map
self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
list(map(sqr, list(range(100)))))
def test_starmap(self):
psmap = self.pool.starmap
tuples = list(zip(range(10), range(9,-1, -1)))
self.assertEqual(psmap(mul, tuples),
list(itertools.starmap(mul, tuples)))
tuples = list(zip(range(100), range(99,-1, -1)))
self.assertEqual(psmap(mul, tuples, chunksize=20),
list(itertools.starmap(mul, tuples)))
def test_starmap_async(self):
tuples = list(zip(range(100), range(99,-1, -1)))
self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
list(itertools.starmap(mul, tuples)))
def test_map_async(self):
self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
list(map(sqr, list(range(10)))))
def test_map_async_callbacks(self):
call_args = self.manager.list() if self.TYPE == 'manager' else []
self.pool.map_async(int, ['1'],
callback=call_args.append,
error_callback=call_args.append).wait()
self.assertEqual(1, len(call_args))
self.assertEqual([1], call_args[0])
self.pool.map_async(int, ['a'],
callback=call_args.append,
error_callback=call_args.append).wait()
self.assertEqual(2, len(call_args))
self.assertIsInstance(call_args[1], ValueError)
Merged revisions 73995,74002,74005,74007-74008,74011,74019-74023 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r73995 | vinay.sajip | 2009-07-13 07:21:05 -0400 (Mon, 13 Jul 2009) | 1 line Issue #6314: logging: Extra checks on the "level" argument in more places. ........ r74002 | marc-andre.lemburg | 2009-07-13 16:23:49 -0400 (Mon, 13 Jul 2009) | 6 lines Use a new global DEV_NULL instead of hard-coding /dev/null into the system command helper functions. See #6479 for some motivation. ........ r74005 | marc-andre.lemburg | 2009-07-13 17:28:33 -0400 (Mon, 13 Jul 2009) | 6 lines Use a different VER command output parser to address the localization issues mentioned in #3410. Prepare for Windows 7 (still commented out). ........ r74007 | michael.foord | 2009-07-14 13:58:12 -0400 (Tue, 14 Jul 2009) | 1 line Move TestRunner initialisation into unittest.TestProgram.runTests. Fixes issue 6418. ........ r74008 | benjamin.peterson | 2009-07-14 20:46:42 -0400 (Tue, 14 Jul 2009) | 1 line update year ........ r74011 | ezio.melotti | 2009-07-15 13:07:04 -0400 (Wed, 15 Jul 2009) | 1 line methods' names pep8ification ........ r74019 | amaury.forgeotdarc | 2009-07-15 17:29:27 -0400 (Wed, 15 Jul 2009) | 2 lines #6076 Add a title to the IDLE Preferences window. ........ r74020 | georg.brandl | 2009-07-16 03:18:07 -0400 (Thu, 16 Jul 2009) | 1 line #5910: fix kqueue for calls with more than one event. ........ r74021 | georg.brandl | 2009-07-16 03:33:04 -0400 (Thu, 16 Jul 2009) | 1 line #6486: start with built in functions rather than "built in objects". ........ r74022 | georg.brandl | 2009-07-16 03:38:35 -0400 (Thu, 16 Jul 2009) | 1 line #6481: fix typo in os.system() replacement. ........ r74023 | jesse.noller | 2009-07-16 10:23:04 -0400 (Thu, 16 Jul 2009) | 1 line Issue 6433: multiprocessing.pool.map hangs on empty list ........
2009-07-17 09:18:18 +00:00
def test_map_chunksize(self):
try:
self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
except multiprocessing.TimeoutError:
self.fail("pool.map_async with chunksize stalled on null list")
def test_async(self):
res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
get = TimingWrapper(res.get)
self.assertEqual(get(), 49)
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
def test_async_timeout(self):
res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
get = TimingWrapper(res.get)
self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
def test_imap(self):
it = self.pool.imap(sqr, list(range(10)))
self.assertEqual(list(it), list(map(sqr, list(range(10)))))
it = self.pool.imap(sqr, list(range(10)))
for i in range(10):
self.assertEqual(next(it), i*i)
self.assertRaises(StopIteration, it.__next__)
it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
for i in range(1000):
self.assertEqual(next(it), i*i)
self.assertRaises(StopIteration, it.__next__)
def test_imap_unordered(self):
it = self.pool.imap_unordered(sqr, list(range(1000)))
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
def test_make_pool(self):
self.assertRaises(ValueError, multiprocessing.Pool, -1)
self.assertRaises(ValueError, multiprocessing.Pool, 0)
p = multiprocessing.Pool(3)
self.assertEqual(3, len(p._pool))
p.close()
p.join()
def test_terminate(self):
result = self.pool.map_async(
time.sleep, [0.1 for i in range(10000)], chunksize=1
)
self.pool.terminate()
join = TimingWrapper(self.pool.join)
join()
2011-03-24 16:39:07 +01:00
self.assertLess(join.elapsed, 0.5)
def test_empty_iterable(self):
# See Issue 12157
p = self.Pool(1)
self.assertEqual(p.map(sqr, []), [])
self.assertEqual(list(p.imap(sqr, [])), [])
self.assertEqual(list(p.imap_unordered(sqr, [])), [])
self.assertEqual(p.map_async(sqr, []).get(), [])
p.close()
p.join()
def test_context(self):
if self.TYPE == 'processes':
L = list(range(10))
expected = [sqr(i) for i in L]
with multiprocessing.Pool(2) as p:
r = p.map_async(sqr, L)
self.assertEqual(r.get(), expected)
print(p._state)
self.assertRaises(ValueError, p.map_async, sqr, L)
def raising():
raise KeyError("key")
def unpickleable_result():
return lambda: 42
class _TestPoolWorkerErrors(BaseTestCase):
ALLOWED_TYPES = ('processes', )
def test_async_error_callback(self):
p = multiprocessing.Pool(2)
scratchpad = [None]
def errback(exc):
scratchpad[0] = exc
res = p.apply_async(raising, error_callback=errback)
self.assertRaises(KeyError, res.get)
self.assertTrue(scratchpad[0])
self.assertIsInstance(scratchpad[0], KeyError)
p.close()
p.join()
def test_unpickleable_result(self):
from multiprocessing.pool import MaybeEncodingError
p = multiprocessing.Pool(2)
# Make sure we don't lose pool processes because of encoding errors.
for iteration in range(20):
scratchpad = [None]
def errback(exc):
scratchpad[0] = exc
res = p.apply_async(unpickleable_result, error_callback=errback)
self.assertRaises(MaybeEncodingError, res.get)
wrapped = scratchpad[0]
self.assertTrue(wrapped)
self.assertIsInstance(scratchpad[0], MaybeEncodingError)
self.assertIsNotNone(wrapped.exc)
self.assertIsNotNone(wrapped.value)
p.close()
p.join()
class _TestPoolWorkerLifetime(BaseTestCase):
ALLOWED_TYPES = ('processes', )
def test_pool_worker_lifetime(self):
p = multiprocessing.Pool(3, maxtasksperchild=10)
self.assertEqual(3, len(p._pool))
origworkerpids = [w.pid for w in p._pool]
# Run many tasks so each worker gets replaced (hopefully)
results = []
for i in range(100):
results.append(p.apply_async(sqr, (i, )))
# Fetch the results and verify we got the right answers,
# also ensuring all the tasks have completed.
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))
# Refill the pool
p._repopulate_pool()
# Wait until all workers are alive
# (countdown * DELTA = 5 seconds max startup process time)
countdown = 50
while countdown and not all(w.is_alive() for w in p._pool):
countdown -= 1
time.sleep(DELTA)
finalworkerpids = [w.pid for w in p._pool]
# All pids should be assigned. See issue #7805.
self.assertNotIn(None, origworkerpids)
self.assertNotIn(None, finalworkerpids)
# Finally, check that the worker pids have changed
self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
p.close()
p.join()
def test_pool_worker_lifetime_early_close(self):
# Issue #10332: closing a pool whose workers have limited lifetimes
# before all the tasks completed would make join() hang.
p = multiprocessing.Pool(3, maxtasksperchild=1)
results = []
for i in range(6):
results.append(p.apply_async(sqr, (i, 0.3)))
p.close()
p.join()
# check the results
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))
#
# Test of creating a customized manager class
#
from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
class FooBar(object):
def f(self):
return 'f()'
def g(self):
raise ValueError
def _h(self):
return '_h()'
def baz():
for i in range(10):
yield i*i
class IteratorProxy(BaseProxy):
_exposed_ = ('__next__',)
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
class MyManager(BaseManager):
pass
MyManager.register('Foo', callable=FooBar)
MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
class _TestMyManager(BaseTestCase):
ALLOWED_TYPES = ('manager',)
def test_mymanager(self):
manager = MyManager()
manager.start()
self.common(manager)
manager.shutdown()
# If the manager process exited cleanly then the exitcode
# will be zero. Otherwise (after a short timeout)
# terminate() is used, resulting in an exitcode of -SIGTERM.
self.assertEqual(manager._process.exitcode, 0)
def test_mymanager_context(self):
with MyManager() as manager:
self.common(manager)
self.assertEqual(manager._process.exitcode, 0)
def test_mymanager_context_prestarted(self):
manager = MyManager()
manager.start()
with manager:
self.common(manager)
self.assertEqual(manager._process.exitcode, 0)
def common(self, manager):
foo = manager.Foo()
bar = manager.Bar()
baz = manager.baz()
foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
self.assertEqual(foo_methods, ['f', 'g'])
self.assertEqual(bar_methods, ['f', '_h'])
self.assertEqual(foo.f(), 'f()')
self.assertRaises(ValueError, foo.g)
self.assertEqual(foo._callmethod('f'), 'f()')
self.assertRaises(RemoteError, foo._callmethod, '_h')
self.assertEqual(bar.f(), 'f()')
self.assertEqual(bar._h(), '_h()')
self.assertEqual(bar._callmethod('f'), 'f()')
self.assertEqual(bar._callmethod('_h'), '_h()')
self.assertEqual(list(baz), [i*i for i in range(10)])
#
# Test of connecting to a remote server and using xmlrpclib for serialization
#
_queue = pyqueue.Queue()
def get_queue():
return _queue
class QueueManager(BaseManager):
'''manager class used by server process'''
QueueManager.register('get_queue', callable=get_queue)
class QueueManager2(BaseManager):
'''manager class which specifies the same interface as QueueManager'''
QueueManager2.register('get_queue')
SERIALIZER = 'xmlrpclib'
class _TestRemoteManager(BaseTestCase):
ALLOWED_TYPES = ('manager',)
@classmethod
def _putter(cls, address, authkey):
manager = QueueManager2(
address=address, authkey=authkey, serializer=SERIALIZER
)
manager.connect()
queue = manager.get_queue()
queue.put(('hello world', None, True, 2.25))
def test_remote(self):
authkey = os.urandom(32)
manager = QueueManager(
address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
)
manager.start()
p = self.Process(target=self._putter, args=(manager.address, authkey))
p.daemon = True
p.start()
manager2 = QueueManager2(
address=manager.address, authkey=authkey, serializer=SERIALIZER
)
manager2.connect()
queue = manager2.get_queue()
# Note that xmlrpclib will deserialize object as a list not a tuple
self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
# Because we are using xmlrpclib for serialization instead of
# pickle this will cause a serialization error.
self.assertRaises(Exception, queue.put, time.sleep)
# Make queue finalizer run before the server is stopped
del queue
manager.shutdown()
2009-03-30 16:37:36 +00:00
class _TestManagerRestart(BaseTestCase):
@classmethod
def _putter(cls, address, authkey):
2009-03-30 16:37:36 +00:00
manager = QueueManager(
address=address, authkey=authkey, serializer=SERIALIZER)
manager.connect()
queue = manager.get_queue()
queue.put('hello world')
def test_rapid_restart(self):
authkey = os.urandom(32)
manager = QueueManager(
address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
srvr = manager.get_server()
addr = srvr.address
# Close the connection.Listener socket which gets opened as a part
# of manager.get_server(). It's not needed for the test.
srvr.listener.close()
2009-03-30 16:37:36 +00:00
manager.start()
p = self.Process(target=self._putter, args=(manager.address, authkey))
p.daemon = True
2009-03-30 16:37:36 +00:00
p.start()
queue = manager.get_queue()
self.assertEqual(queue.get(), 'hello world')
del queue
2009-03-30 16:37:36 +00:00
manager.shutdown()
manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER)
try:
manager.start()
2012-12-25 16:47:37 +02:00
except OSError as e:
if e.errno != errno.EADDRINUSE:
raise
# Retry after some time, in case the old socket was lingering
# (sporadic failure on buildbots)
time.sleep(1.0)
manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER)
manager.shutdown()
2009-03-30 16:37:36 +00:00
#
#
#
SENTINEL = latin('')
class _TestConnection(BaseTestCase):
ALLOWED_TYPES = ('processes', 'threads')
@classmethod
def _echo(cls, conn):
for msg in iter(conn.recv_bytes, SENTINEL):
conn.send_bytes(msg)
conn.close()
def test_connection(self):
conn, child_conn = self.Pipe()
p = self.Process(target=self._echo, args=(child_conn,))
p.daemon = True
p.start()
seq = [1, 2.25, None]
msg = latin('hello world')
longmsg = msg * 10
arr = array.array('i', list(range(4)))
if self.TYPE == 'processes':
self.assertEqual(type(conn.fileno()), int)
self.assertEqual(conn.send(seq), None)
self.assertEqual(conn.recv(), seq)
self.assertEqual(conn.send_bytes(msg), None)
self.assertEqual(conn.recv_bytes(), msg)
if self.TYPE == 'processes':
buffer = array.array('i', [0]*10)
expected = list(arr) + [0] * (10 - len(arr))
self.assertEqual(conn.send_bytes(arr), None)
self.assertEqual(conn.recv_bytes_into(buffer),
len(arr) * buffer.itemsize)
self.assertEqual(list(buffer), expected)
buffer = array.array('i', [0]*10)
expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
self.assertEqual(conn.send_bytes(arr), None)
self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
len(arr) * buffer.itemsize)
self.assertEqual(list(buffer), expected)
buffer = bytearray(latin(' ' * 40))
self.assertEqual(conn.send_bytes(longmsg), None)
try:
res = conn.recv_bytes_into(buffer)
except multiprocessing.BufferTooShort as e:
self.assertEqual(e.args, (longmsg,))
else:
self.fail('expected BufferTooShort, got %s' % res)
poll = TimingWrapper(conn.poll)
self.assertEqual(poll(), False)
self.assertTimingAlmostEqual(poll.elapsed, 0)
self.assertEqual(poll(-1), False)
self.assertTimingAlmostEqual(poll.elapsed, 0)
self.assertEqual(poll(TIMEOUT1), False)
self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
conn.send(None)
self.assertEqual(poll(TIMEOUT1), True)
self.assertTimingAlmostEqual(poll.elapsed, 0)
self.assertEqual(conn.recv(), None)
really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
conn.send_bytes(really_big_msg)
self.assertEqual(conn.recv_bytes(), really_big_msg)
conn.send_bytes(SENTINEL) # tell child to quit
child_conn.close()
if self.TYPE == 'processes':
self.assertEqual(conn.readable, True)
self.assertEqual(conn.writable, True)
self.assertRaises(EOFError, conn.recv)
self.assertRaises(EOFError, conn.recv_bytes)
p.join()
def test_duplex_false(self):
reader, writer = self.Pipe(duplex=False)
self.assertEqual(writer.send(1), None)
self.assertEqual(reader.recv(), 1)
if self.TYPE == 'processes':
self.assertEqual(reader.readable, True)
self.assertEqual(reader.writable, False)
self.assertEqual(writer.readable, False)
self.assertEqual(writer.writable, True)
2012-12-25 16:47:37 +02:00
self.assertRaises(OSError, reader.send, 2)
self.assertRaises(OSError, writer.recv)
self.assertRaises(OSError, writer.poll)
def test_spawn_close(self):
# We test that a pipe connection can be closed by parent
# process immediately after child is spawned. On Windows this
# would have sometimes failed on old versions because
# child_conn would be closed before the child got a chance to
# duplicate it.
conn, child_conn = self.Pipe()
p = self.Process(target=self._echo, args=(child_conn,))
p.daemon = True
p.start()
child_conn.close() # this might complete before child initializes
msg = latin('hello')
conn.send_bytes(msg)
self.assertEqual(conn.recv_bytes(), msg)
conn.send_bytes(SENTINEL)
conn.close()
p.join()
def test_sendbytes(self):
if self.TYPE != 'processes':
return
msg = latin('abcdefghijklmnopqrstuvwxyz')
a, b = self.Pipe()
a.send_bytes(msg)
self.assertEqual(b.recv_bytes(), msg)
a.send_bytes(msg, 5)
self.assertEqual(b.recv_bytes(), msg[5:])
a.send_bytes(msg, 7, 8)
self.assertEqual(b.recv_bytes(), msg[7:7+8])
a.send_bytes(msg, 26)
self.assertEqual(b.recv_bytes(), latin(''))
a.send_bytes(msg, 26, 0)
self.assertEqual(b.recv_bytes(), latin(''))
self.assertRaises(ValueError, a.send_bytes, msg, 27)
self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
self.assertRaises(ValueError, a.send_bytes, msg, -1)
self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
@classmethod
def _is_fd_assigned(cls, fd):
try:
os.fstat(fd)
except OSError as e:
if e.errno == errno.EBADF:
return False
raise
else:
return True
@classmethod
def _writefd(cls, conn, data, create_dummy_fds=False):
if create_dummy_fds:
for i in range(0, 256):
if not cls._is_fd_assigned(i):
os.dup2(conn.fileno(), i)
fd = reduction.recv_handle(conn)
if msvcrt:
fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
os.write(fd, data)
os.close(fd)
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
def test_fd_transfer(self):
if self.TYPE != 'processes':
self.skipTest("only makes sense with processes")
conn, child_conn = self.Pipe(duplex=True)
p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
p.daemon = True
p.start()
self.addCleanup(test.support.unlink, test.support.TESTFN)
with open(test.support.TESTFN, "wb") as f:
fd = f.fileno()
if msvcrt:
fd = msvcrt.get_osfhandle(fd)
reduction.send_handle(conn, fd, p.pid)
p.join()
with open(test.support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"foo")
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
@unittest.skipIf(sys.platform == "win32",
"test semantics don't make sense on Windows")
@unittest.skipIf(MAXFD <= 256,
"largest assignable fd number is too small")
@unittest.skipUnless(hasattr(os, "dup2"),
"test needs os.dup2()")
def test_large_fd_transfer(self):
# With fd > 256 (issue #11657)
if self.TYPE != 'processes':
self.skipTest("only makes sense with processes")
conn, child_conn = self.Pipe(duplex=True)
p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
p.daemon = True
p.start()
self.addCleanup(test.support.unlink, test.support.TESTFN)
with open(test.support.TESTFN, "wb") as f:
fd = f.fileno()
for newfd in range(256, MAXFD):
if not self._is_fd_assigned(newfd):
break
else:
self.fail("could not find an unassigned large file descriptor")
os.dup2(fd, newfd)
try:
reduction.send_handle(conn, newfd, p.pid)
finally:
os.close(newfd)
p.join()
with open(test.support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"bar")
@classmethod
def _send_data_without_fd(self, conn):
os.write(conn.fileno(), b"\0")
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
@unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
def test_missing_fd_transfer(self):
# Check that exception is raised when received data is not
# accompanied by a file descriptor in ancillary data.
if self.TYPE != 'processes':
self.skipTest("only makes sense with processes")
conn, child_conn = self.Pipe(duplex=True)
p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
p.daemon = True
p.start()
self.assertRaises(RuntimeError, reduction.recv_handle, conn)
p.join()
def test_context(self):
a, b = self.Pipe()
with a, b:
a.send(1729)
self.assertEqual(b.recv(), 1729)
if self.TYPE == 'processes':
self.assertFalse(a.closed)
self.assertFalse(b.closed)
if self.TYPE == 'processes':
self.assertTrue(a.closed)
self.assertTrue(b.closed)
2012-12-25 16:47:37 +02:00
self.assertRaises(OSError, a.recv)
self.assertRaises(OSError, b.recv)
class _TestListener(BaseTestCase):
ALLOWED_TYPES = ('processes',)
def test_multiple_bind(self):
for family in self.connection.families:
l = self.connection.Listener(family=family)
self.addCleanup(l.close)
self.assertRaises(OSError, self.connection.Listener,
l.address, family)
def test_context(self):
with self.connection.Listener() as l:
with self.connection.Client(l.address) as c:
with l.accept() as d:
c.send(1729)
self.assertEqual(d.recv(), 1729)
if self.TYPE == 'processes':
2012-12-25 16:47:37 +02:00
self.assertRaises(OSError, l.accept)
class _TestListenerClient(BaseTestCase):
ALLOWED_TYPES = ('processes', 'threads')
@classmethod
def _test(cls, address):
conn = cls.connection.Client(address)
conn.send('hello')
conn.close()
def test_listener_client(self):
for family in self.connection.families:
l = self.connection.Listener(family=family)
p = self.Process(target=self._test, args=(l.address,))
p.daemon = True
p.start()
conn = l.accept()
self.assertEqual(conn.recv(), 'hello')
p.join()
l.close()
2012-05-05 19:45:37 +01:00
def test_issue14725(self):
l = self.connection.Listener()
p = self.Process(target=self._test, args=(l.address,))
p.daemon = True
p.start()
time.sleep(1)
# On Windows the client process should by now have connected,
# written data and closed the pipe handle by now. This causes
# ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
# 14725.
conn = l.accept()
self.assertEqual(conn.recv(), 'hello')
conn.close()
p.join()
l.close()
class _TestPoll(unittest.TestCase):
ALLOWED_TYPES = ('processes', 'threads')
def test_empty_string(self):
a, b = self.Pipe()
self.assertEqual(a.poll(), False)
b.send_bytes(b'')
self.assertEqual(a.poll(), True)
self.assertEqual(a.poll(), True)
@classmethod
def _child_strings(cls, conn, strings):
for s in strings:
time.sleep(0.1)
conn.send_bytes(s)
conn.close()
def test_strings(self):
strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
a, b = self.Pipe()
p = self.Process(target=self._child_strings, args=(b, strings))
p.start()
for s in strings:
for i in range(200):
if a.poll(0.01):
break
x = a.recv_bytes()
self.assertEqual(s, x)
p.join()
@classmethod
def _child_boundaries(cls, r):
# Polling may "pull" a message in to the child process, but we
# don't want it to pull only part of a message, as that would
# corrupt the pipe for any other processes which might later
# read from it.
r.poll(5)
def test_boundaries(self):
r, w = self.Pipe(False)
p = self.Process(target=self._child_boundaries, args=(r,))
p.start()
time.sleep(2)
L = [b"first", b"second"]
for obj in L:
w.send_bytes(obj)
w.close()
p.join()
self.assertIn(r.recv_bytes(), L)
@classmethod
def _child_dont_merge(cls, b):
b.send_bytes(b'a')
b.send_bytes(b'b')
b.send_bytes(b'cd')
def test_dont_merge(self):
a, b = self.Pipe()
self.assertEqual(a.poll(0.0), False)
self.assertEqual(a.poll(0.1), False)
p = self.Process(target=self._child_dont_merge, args=(b,))
p.start()
self.assertEqual(a.recv_bytes(), b'a')
self.assertEqual(a.poll(1.0), True)
self.assertEqual(a.poll(1.0), True)
self.assertEqual(a.recv_bytes(), b'b')
self.assertEqual(a.poll(1.0), True)
self.assertEqual(a.poll(1.0), True)
self.assertEqual(a.poll(0.0), True)
self.assertEqual(a.recv_bytes(), b'cd')
p.join()
#
# Test of sending connection and socket objects between processes
#
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
class _TestPicklingConnections(BaseTestCase):
ALLOWED_TYPES = ('processes',)
@classmethod
def tearDownClass(cls):
from multiprocessing.reduction import resource_sharer
resource_sharer.stop(timeout=5)
@classmethod
def _listener(cls, conn, families):
for fam in families:
l = cls.connection.Listener(family=fam)
conn.send(l.address)
new_conn = l.accept()
conn.send(new_conn)
new_conn.close()
l.close()
l = socket.socket()
l.bind(('localhost', 0))
l.listen(1)
conn.send(l.getsockname())
new_conn, addr = l.accept()
conn.send(new_conn)
new_conn.close()
l.close()
conn.recv()
@classmethod
def _remote(cls, conn):
for (address, msg) in iter(conn.recv, None):
client = cls.connection.Client(address)
client.send(msg.upper())
client.close()
address, msg = conn.recv()
client = socket.socket()
client.connect(address)
client.sendall(msg.upper())
client.close()
conn.close()
def test_pickling(self):
families = self.connection.families
lconn, lconn0 = self.Pipe()
lp = self.Process(target=self._listener, args=(lconn0, families))
lp.daemon = True
lp.start()
lconn0.close()
rconn, rconn0 = self.Pipe()
rp = self.Process(target=self._remote, args=(rconn0,))
rp.daemon = True
rp.start()
rconn0.close()
for fam in families:
msg = ('This connection uses family %s' % fam).encode('ascii')
address = lconn.recv()
rconn.send((address, msg))
new_conn = lconn.recv()
self.assertEqual(new_conn.recv(), msg.upper())
rconn.send(None)
msg = latin('This connection uses a normal socket')
address = lconn.recv()
rconn.send((address, msg))
new_conn = lconn.recv()
buf = []
while True:
s = new_conn.recv(100)
if not s:
break
buf.append(s)
buf = b''.join(buf)
self.assertEqual(buf, msg.upper())
new_conn.close()
lconn.send(None)
rconn.close()
lconn.close()
lp.join()
rp.join()
@classmethod
def child_access(cls, conn):
w = conn.recv()
w.send('all is well')
w.close()
r = conn.recv()
msg = r.recv()
conn.send(msg*2)
conn.close()
def test_access(self):
# On Windows, if we do not specify a destination pid when
# using DupHandle then we need to be careful to use the
# correct access flags for DuplicateHandle(), or else
# DupHandle.detach() will raise PermissionError. For example,
# for a read only pipe handle we should use
# access=FILE_GENERIC_READ. (Unfortunately
# DUPLICATE_SAME_ACCESS does not work.)
conn, child_conn = self.Pipe()
p = self.Process(target=self.child_access, args=(child_conn,))
p.daemon = True
p.start()
child_conn.close()
r, w = self.Pipe(duplex=False)
conn.send(w)
w.close()
self.assertEqual(r.recv(), 'all is well')
r.close()
r, w = self.Pipe(duplex=False)
conn.send(r)
r.close()
w.send('foobar')
w.close()
self.assertEqual(conn.recv(), 'foobar'*2)
#
#
#
class _TestHeap(BaseTestCase):
ALLOWED_TYPES = ('processes',)
def test_heap(self):
iterations = 5000
maxblocks = 50
blocks = []
# create and destroy lots of blocks of different sizes
for i in range(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = multiprocessing.heap.BufferWrapper(size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
# get the heap object
heap = multiprocessing.heap.BufferWrapper._heap
# verify the state of the heap
all = []
occupied = 0
heap._lock.acquire()
self.addCleanup(heap._lock.release)
for L in list(heap._len_to_seq.values()):
for arena, start, stop in L:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'free'))
for arena, start, stop in heap._allocated_blocks:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
for i in range(len(all)-1):
(arena, start, stop) = all[i][:3]
(narena, nstart, nstop) = all[i+1][:3]
self.assertTrue((arena != narena and nstart == 0) or
(stop == nstart))
def test_free_from_gc(self):
# Check that freeing of blocks by the garbage collector doesn't deadlock
# (issue #12352).
# Make sure the GC is enabled, and set lower collection thresholds to
# make collections more frequent (and increase the probability of
# deadlock).
if not gc.isenabled():
gc.enable()
self.addCleanup(gc.disable)
thresholds = gc.get_threshold()
self.addCleanup(gc.set_threshold, *thresholds)
gc.set_threshold(10)
# perform numerous block allocations, with cyclic references to make
# sure objects are collected asynchronously by the gc
for i in range(5000):
a = multiprocessing.heap.BufferWrapper(1)
b = multiprocessing.heap.BufferWrapper(1)
# circular references
a.buddy = b
b.buddy = a
#
#
#
class _Foo(Structure):
_fields_ = [
('x', c_int),
('y', c_double)
]
class _TestSharedCTypes(BaseTestCase):
ALLOWED_TYPES = ('processes',)
def setUp(self):
if not HAS_SHAREDCTYPES:
self.skipTest("requires multiprocessing.sharedctypes")
@classmethod
def _double(cls, x, y, foo, arr, string):
x.value *= 2
y.value *= 2
foo.x *= 2
foo.y *= 2
string.value *= 2
for i in range(len(arr)):
arr[i] *= 2
def test_sharedctypes(self, lock=False):
x = Value('i', 7, lock=lock)
Merged revisions 78018,78035-78040,78042-78043,78046,78048-78052,78054,78059,78075-78080 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r78018 | georg.brandl | 2010-02-06 11:08:21 +0100 (Sa, 06 Feb 2010) | 1 line #7864: make deprecation notices a bit clearer. ........ r78035 | georg.brandl | 2010-02-06 23:44:17 +0100 (Sa, 06 Feb 2010) | 1 line Fix duplicate import. ........ r78036 | georg.brandl | 2010-02-06 23:49:47 +0100 (Sa, 06 Feb 2010) | 1 line Remove unused import. ........ r78037 | georg.brandl | 2010-02-06 23:59:15 +0100 (Sa, 06 Feb 2010) | 1 line No need to assign the results of expressions used only for side effects. ........ r78038 | georg.brandl | 2010-02-07 00:02:29 +0100 (So, 07 Feb 2010) | 1 line Add a missing import. ........ r78039 | georg.brandl | 2010-02-07 00:06:24 +0100 (So, 07 Feb 2010) | 1 line Add missing imports. ........ r78040 | georg.brandl | 2010-02-07 00:08:00 +0100 (So, 07 Feb 2010) | 1 line Fix a few UnboundLocalErrors in test_long. ........ r78042 | georg.brandl | 2010-02-07 00:12:12 +0100 (So, 07 Feb 2010) | 1 line Add missing import. ........ r78043 | georg.brandl | 2010-02-07 00:12:19 +0100 (So, 07 Feb 2010) | 1 line Remove duplicate test method. ........ r78046 | georg.brandl | 2010-02-07 00:18:00 +0100 (So, 07 Feb 2010) | 1 line Fix various missing import/unbound name errors. ........ r78048 | georg.brandl | 2010-02-07 00:23:45 +0100 (So, 07 Feb 2010) | 1 line We heard you like test failures so we put unbound locals in your test so that you can fail while you fail. ........ r78049 | georg.brandl | 2010-02-07 00:33:33 +0100 (So, 07 Feb 2010) | 1 line Fix import/access for some identifiers. _TestSharedCTypes does not seem to be executed? ........ r78050 | georg.brandl | 2010-02-07 00:34:10 +0100 (So, 07 Feb 2010) | 1 line Fix more unbound locals in code paths that do not seem to be used. ........ r78051 | georg.brandl | 2010-02-07 00:53:52 +0100 (So, 07 Feb 2010) | 1 line Add missing import when running these tests standalone. ........ r78052 | georg.brandl | 2010-02-07 00:54:04 +0100 (So, 07 Feb 2010) | 1 line Add missing import when running these tests standalone. ........ r78054 | georg.brandl | 2010-02-07 00:58:25 +0100 (So, 07 Feb 2010) | 1 line Add missing import. ........ r78059 | georg.brandl | 2010-02-07 12:34:15 +0100 (So, 07 Feb 2010) | 1 line Use "regexp" consistently. ........ r78075 | georg.brandl | 2010-02-07 13:16:12 +0100 (So, 07 Feb 2010) | 1 line Fix another duplicated test method. ........ r78076 | georg.brandl | 2010-02-07 13:19:43 +0100 (So, 07 Feb 2010) | 1 line Fix wrong usage of "except X, Y:". ........ r78077 | georg.brandl | 2010-02-07 13:25:50 +0100 (So, 07 Feb 2010) | 1 line Fix two redefined test methods. ........ r78078 | georg.brandl | 2010-02-07 13:27:06 +0100 (So, 07 Feb 2010) | 1 line Fix a redefined test method. ........ r78079 | georg.brandl | 2010-02-07 13:34:26 +0100 (So, 07 Feb 2010) | 1 line Add a minimal test for fnmatchcase(). ........ r78080 | georg.brandl | 2010-02-07 13:55:12 +0100 (So, 07 Feb 2010) | 1 line Remove duplicate test method. ........
2010-03-14 10:23:39 +00:00
y = Value(c_double, 1.0/3.0, lock=lock)
foo = Value(_Foo, 3, 2, lock=lock)
Merged revisions 78018,78035-78040,78042-78043,78046,78048-78052,78054,78059,78075-78080 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r78018 | georg.brandl | 2010-02-06 11:08:21 +0100 (Sa, 06 Feb 2010) | 1 line #7864: make deprecation notices a bit clearer. ........ r78035 | georg.brandl | 2010-02-06 23:44:17 +0100 (Sa, 06 Feb 2010) | 1 line Fix duplicate import. ........ r78036 | georg.brandl | 2010-02-06 23:49:47 +0100 (Sa, 06 Feb 2010) | 1 line Remove unused import. ........ r78037 | georg.brandl | 2010-02-06 23:59:15 +0100 (Sa, 06 Feb 2010) | 1 line No need to assign the results of expressions used only for side effects. ........ r78038 | georg.brandl | 2010-02-07 00:02:29 +0100 (So, 07 Feb 2010) | 1 line Add a missing import. ........ r78039 | georg.brandl | 2010-02-07 00:06:24 +0100 (So, 07 Feb 2010) | 1 line Add missing imports. ........ r78040 | georg.brandl | 2010-02-07 00:08:00 +0100 (So, 07 Feb 2010) | 1 line Fix a few UnboundLocalErrors in test_long. ........ r78042 | georg.brandl | 2010-02-07 00:12:12 +0100 (So, 07 Feb 2010) | 1 line Add missing import. ........ r78043 | georg.brandl | 2010-02-07 00:12:19 +0100 (So, 07 Feb 2010) | 1 line Remove duplicate test method. ........ r78046 | georg.brandl | 2010-02-07 00:18:00 +0100 (So, 07 Feb 2010) | 1 line Fix various missing import/unbound name errors. ........ r78048 | georg.brandl | 2010-02-07 00:23:45 +0100 (So, 07 Feb 2010) | 1 line We heard you like test failures so we put unbound locals in your test so that you can fail while you fail. ........ r78049 | georg.brandl | 2010-02-07 00:33:33 +0100 (So, 07 Feb 2010) | 1 line Fix import/access for some identifiers. _TestSharedCTypes does not seem to be executed? ........ r78050 | georg.brandl | 2010-02-07 00:34:10 +0100 (So, 07 Feb 2010) | 1 line Fix more unbound locals in code paths that do not seem to be used. ........ r78051 | georg.brandl | 2010-02-07 00:53:52 +0100 (So, 07 Feb 2010) | 1 line Add missing import when running these tests standalone. ........ r78052 | georg.brandl | 2010-02-07 00:54:04 +0100 (So, 07 Feb 2010) | 1 line Add missing import when running these tests standalone. ........ r78054 | georg.brandl | 2010-02-07 00:58:25 +0100 (So, 07 Feb 2010) | 1 line Add missing import. ........ r78059 | georg.brandl | 2010-02-07 12:34:15 +0100 (So, 07 Feb 2010) | 1 line Use "regexp" consistently. ........ r78075 | georg.brandl | 2010-02-07 13:16:12 +0100 (So, 07 Feb 2010) | 1 line Fix another duplicated test method. ........ r78076 | georg.brandl | 2010-02-07 13:19:43 +0100 (So, 07 Feb 2010) | 1 line Fix wrong usage of "except X, Y:". ........ r78077 | georg.brandl | 2010-02-07 13:25:50 +0100 (So, 07 Feb 2010) | 1 line Fix two redefined test methods. ........ r78078 | georg.brandl | 2010-02-07 13:27:06 +0100 (So, 07 Feb 2010) | 1 line Fix a redefined test method. ........ r78079 | georg.brandl | 2010-02-07 13:34:26 +0100 (So, 07 Feb 2010) | 1 line Add a minimal test for fnmatchcase(). ........ r78080 | georg.brandl | 2010-02-07 13:55:12 +0100 (So, 07 Feb 2010) | 1 line Remove duplicate test method. ........
2010-03-14 10:23:39 +00:00
arr = self.Array('d', list(range(10)), lock=lock)
string = self.Array('c', 20, lock=lock)
string.value = latin('hello')
p = self.Process(target=self._double, args=(x, y, foo, arr, string))
p.daemon = True
p.start()
p.join()
self.assertEqual(x.value, 14)
self.assertAlmostEqual(y.value, 2.0/3.0)
self.assertEqual(foo.x, 6)
self.assertAlmostEqual(foo.y, 4.0)
for i in range(10):
self.assertAlmostEqual(arr[i], i*2)
self.assertEqual(string.value, latin('hellohello'))
def test_synchronize(self):
self.test_sharedctypes(lock=True)
def test_copy(self):
foo = _Foo(2, 5.0)
bar = copy(foo)
foo.x = 0
foo.y = 0
self.assertEqual(bar.x, 2)
self.assertAlmostEqual(bar.y, 5.0)
#
#
#
class _TestFinalize(BaseTestCase):
ALLOWED_TYPES = ('processes',)
@classmethod
def _test_finalize(cls, conn):
class Foo(object):
pass
a = Foo()
util.Finalize(a, conn.send, args=('a',))
del a # triggers callback for a
b = Foo()
close_b = util.Finalize(b, conn.send, args=('b',))
close_b() # triggers callback for b
close_b() # does nothing because callback has already been called
del b # does nothing because callback has already been called
c = Foo()
util.Finalize(c, conn.send, args=('c',))
d10 = Foo()
util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
d01 = Foo()
util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
d02 = Foo()
util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
d03 = Foo()
util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
# call multiprocessing's cleanup function then exit process without
# garbage collecting locals
util._exit_function()
conn.close()
os._exit(0)
def test_finalize(self):
conn, child_conn = self.Pipe()
p = self.Process(target=self._test_finalize, args=(child_conn,))
p.daemon = True
p.start()
p.join()
result = [obj for obj in iter(conn.recv, 'STOP')]
self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
#
# Test that from ... import * works for each module
#
class _TestImportStar(BaseTestCase):
ALLOWED_TYPES = ('processes',)
def test_import(self):
modules = [
'multiprocessing', 'multiprocessing.connection',
'multiprocessing.heap', 'multiprocessing.managers',
'multiprocessing.pool', 'multiprocessing.process',
'multiprocessing.synchronize', 'multiprocessing.util'
]
if HAS_REDUCTION:
modules.append('multiprocessing.reduction')
if c_int is not None:
# This module requires _ctypes
modules.append('multiprocessing.sharedctypes')
for name in modules:
__import__(name)
mod = sys.modules[name]
for attr in getattr(mod, '__all__', ()):
self.assertTrue(
hasattr(mod, attr),
'%r does not have attribute %r' % (mod, attr)
)
#
# Quick test that logging works -- does not test logging output
#
class _TestLogging(BaseTestCase):
ALLOWED_TYPES = ('processes',)
def test_enable_logging(self):
logger = multiprocessing.get_logger()
logger.setLevel(util.SUBWARNING)
self.assertTrue(logger is not None)
logger.debug('this will not be printed')
logger.info('nor will this')
logger.setLevel(LOG_LEVEL)
@classmethod
def _test_level(cls, conn):
logger = multiprocessing.get_logger()
conn.send(logger.getEffectiveLevel())
def test_level(self):
LEVEL1 = 32
LEVEL2 = 37
logger = multiprocessing.get_logger()
root_logger = logging.getLogger()
root_level = root_logger.level
reader, writer = multiprocessing.Pipe(duplex=False)
logger.setLevel(LEVEL1)
p = self.Process(target=self._test_level, args=(writer,))
p.daemon = True
p.start()
self.assertEqual(LEVEL1, reader.recv())
logger.setLevel(logging.NOTSET)
root_logger.setLevel(LEVEL2)
p = self.Process(target=self._test_level, args=(writer,))
p.daemon = True
p.start()
self.assertEqual(LEVEL2, reader.recv())
root_logger.setLevel(root_level)
logger.setLevel(level=LOG_LEVEL)
# class _TestLoggingProcessName(BaseTestCase):
#
# def handle(self, record):
# assert record.processName == multiprocessing.current_process().name
# self.__handled = True
#
# def test_logging(self):
# handler = logging.Handler()
# handler.handle = self.handle
# self.__handled = False
# # Bypass getLogger() and side-effects
# logger = logging.getLoggerClass()(
# 'multiprocessing.test.TestLoggingProcessName')
# logger.addHandler(handler)
# logger.propagate = False
#
# logger.warn('foo')
# assert self.__handled
#
2009-01-19 16:23:53 +00:00
# Test to verify handle verification, see issue 3321
#
class TestInvalidHandle(unittest.TestCase):
@unittest.skipIf(WIN32, "skipped on Windows")
2009-01-19 16:23:53 +00:00
def test_invalid_handles(self):
conn = multiprocessing.connection.Connection(44977608)
try:
2012-12-25 16:47:37 +02:00
self.assertRaises((ValueError, OSError), conn.poll)
finally:
# Hack private attribute _handle to avoid printing an error
# in conn.__del__
conn._handle = None
2012-12-25 16:47:37 +02:00
self.assertRaises((ValueError, OSError),
multiprocessing.connection.Connection, -1)
2009-01-19 16:23:53 +00:00
#
# Functions used to create test cases from the base ones in this module
#
def create_test_cases(Mixin, type):
result = {}
glob = globals()
Type = type.capitalize()
ALL_TYPES = {'processes', 'threads', 'manager'}
for name in list(glob.keys()):
if name.startswith('_Test'):
base = glob[name]
assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
if type in base.ALLOWED_TYPES:
newname = 'With' + Type + name[1:]
class Temp(base, Mixin, unittest.TestCase):
pass
result[newname] = Temp
Temp.__name__ = Temp.__qualname__ = newname
Temp.__module__ = Mixin.__module__
return result
#
# Create test cases
#
class ProcessesMixin(object):
TYPE = 'processes'
Process = multiprocessing.Process
connection = multiprocessing.connection
current_process = staticmethod(multiprocessing.current_process)
active_children = staticmethod(multiprocessing.active_children)
Pool = staticmethod(multiprocessing.Pool)
Pipe = staticmethod(multiprocessing.Pipe)
Queue = staticmethod(multiprocessing.Queue)
JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
Lock = staticmethod(multiprocessing.Lock)
RLock = staticmethod(multiprocessing.RLock)
Semaphore = staticmethod(multiprocessing.Semaphore)
BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
Condition = staticmethod(multiprocessing.Condition)
Event = staticmethod(multiprocessing.Event)
Barrier = staticmethod(multiprocessing.Barrier)
Value = staticmethod(multiprocessing.Value)
Array = staticmethod(multiprocessing.Array)
RawValue = staticmethod(multiprocessing.RawValue)
RawArray = staticmethod(multiprocessing.RawArray)
testcases_processes = create_test_cases(ProcessesMixin, type='processes')
globals().update(testcases_processes)
class ManagerMixin(object):
TYPE = 'manager'
Process = multiprocessing.Process
Queue = property(operator.attrgetter('manager.Queue'))
JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
Lock = property(operator.attrgetter('manager.Lock'))
RLock = property(operator.attrgetter('manager.RLock'))
Semaphore = property(operator.attrgetter('manager.Semaphore'))
BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
Condition = property(operator.attrgetter('manager.Condition'))
Event = property(operator.attrgetter('manager.Event'))
Barrier = property(operator.attrgetter('manager.Barrier'))
Value = property(operator.attrgetter('manager.Value'))
Array = property(operator.attrgetter('manager.Array'))
list = property(operator.attrgetter('manager.list'))
dict = property(operator.attrgetter('manager.dict'))
Namespace = property(operator.attrgetter('manager.Namespace'))
@classmethod
def Pool(cls, *args, **kwds):
return cls.manager.Pool(*args, **kwds)
@classmethod
def setUpClass(cls):
cls.manager = multiprocessing.Manager()
@classmethod
def tearDownClass(cls):
multiprocessing.active_children() # discard dead process objs
gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:
# This is not really an error since some tests do not
# ensure that all processes which hold a reference to a
# managed object have been joined.
print('Shared objects which still exist at manager shutdown:')
print(cls.manager._debug_info())
cls.manager.shutdown()
cls.manager.join()
cls.manager = None
testcases_manager = create_test_cases(ManagerMixin, type='manager')
globals().update(testcases_manager)
class ThreadsMixin(object):
TYPE = 'threads'
Process = multiprocessing.dummy.Process
connection = multiprocessing.dummy.connection
current_process = staticmethod(multiprocessing.dummy.current_process)
active_children = staticmethod(multiprocessing.dummy.active_children)
Pool = staticmethod(multiprocessing.Pool)
Pipe = staticmethod(multiprocessing.dummy.Pipe)
Queue = staticmethod(multiprocessing.dummy.Queue)
JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
Lock = staticmethod(multiprocessing.dummy.Lock)
RLock = staticmethod(multiprocessing.dummy.RLock)
Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
Condition = staticmethod(multiprocessing.dummy.Condition)
Event = staticmethod(multiprocessing.dummy.Event)
Barrier = staticmethod(multiprocessing.dummy.Barrier)
Value = staticmethod(multiprocessing.dummy.Value)
Array = staticmethod(multiprocessing.dummy.Array)
testcases_threads = create_test_cases(ThreadsMixin, type='threads')
globals().update(testcases_threads)
class OtherTest(unittest.TestCase):
# TODO: add more tests for deliver/answer challenge.
def test_deliver_challenge_auth_failure(self):
class _FakeConnection(object):
def recv_bytes(self, size):
return b'something bogus'
def send_bytes(self, data):
pass
self.assertRaises(multiprocessing.AuthenticationError,
multiprocessing.connection.deliver_challenge,
_FakeConnection(), b'abc')
def test_answer_challenge_auth_failure(self):
class _FakeConnection(object):
def __init__(self):
self.count = 0
def recv_bytes(self, size):
self.count += 1
if self.count == 1:
return multiprocessing.connection.CHALLENGE
elif self.count == 2:
return b'something bogus'
return b''
def send_bytes(self, data):
pass
self.assertRaises(multiprocessing.AuthenticationError,
multiprocessing.connection.answer_challenge,
_FakeConnection(), b'abc')
Merged revisions 70912,70944,70968,71033,71041,71208,71263,71286,71395-71396,71405-71406,71485,71492,71494 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r70912 | georg.brandl | 2009-03-31 17:35:46 -0500 (Tue, 31 Mar 2009) | 1 line #5617: add a handy function to print a unicode string to gdbinit. ........ r70944 | georg.brandl | 2009-03-31 23:32:39 -0500 (Tue, 31 Mar 2009) | 1 line #5631: add upload to list of possible commands, which is presented in --help-commands. ........ r70968 | michael.foord | 2009-04-01 13:25:38 -0500 (Wed, 01 Apr 2009) | 1 line Adding Wing project file ........ r71033 | brett.cannon | 2009-04-01 22:34:53 -0500 (Wed, 01 Apr 2009) | 3 lines Fix two issues introduced by issue #71031 by changing the signature of PyImport_AppendInittab() to take a const char *. ........ r71041 | jesse.noller | 2009-04-02 00:17:26 -0500 (Thu, 02 Apr 2009) | 1 line Add custom initializer argument to multiprocess.Manager*, courtesy of lekma ........ r71208 | michael.foord | 2009-04-04 20:15:01 -0500 (Sat, 04 Apr 2009) | 4 lines Change the way unittest.TestSuite use their tests to always access them through iteration. Non behavior changing, this allows you to create custom subclasses that override __iter__. Issue #5693 ........ r71263 | michael.foord | 2009-04-05 14:19:28 -0500 (Sun, 05 Apr 2009) | 4 lines Adding assertIs and assertIsNot methods to unittest.TestCase Issue #2578 ........ r71286 | tarek.ziade | 2009-04-05 17:04:38 -0500 (Sun, 05 Apr 2009) | 1 line added a simplest test to distutils.spawn._nt_quote_args ........ r71395 | benjamin.peterson | 2009-04-08 08:27:29 -0500 (Wed, 08 Apr 2009) | 1 line these must be installed to correctly run tests ........ r71396 | benjamin.peterson | 2009-04-08 08:29:41 -0500 (Wed, 08 Apr 2009) | 1 line fix syntax ........ r71405 | andrew.kuchling | 2009-04-09 06:22:47 -0500 (Thu, 09 Apr 2009) | 1 line Add items ........ r71406 | andrew.kuchling | 2009-04-09 06:23:36 -0500 (Thu, 09 Apr 2009) | 1 line Typo fixes ........ r71485 | andrew.kuchling | 2009-04-11 11:12:23 -0500 (Sat, 11 Apr 2009) | 1 line Add various items ........ r71492 | georg.brandl | 2009-04-11 13:19:27 -0500 (Sat, 11 Apr 2009) | 1 line Take credit for a patch of mine. ........ r71494 | benjamin.peterson | 2009-04-11 14:31:00 -0500 (Sat, 11 Apr 2009) | 1 line ignore py3_test_grammar when compiling the library ........
2009-04-11 20:45:40 +00:00
#
# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
#
def initializer(ns):
ns.test += 1
class TestInitializers(unittest.TestCase):
def setUp(self):
self.mgr = multiprocessing.Manager()
self.ns = self.mgr.Namespace()
self.ns.test = 0
def tearDown(self):
self.mgr.shutdown()
self.mgr.join()
Merged revisions 70912,70944,70968,71033,71041,71208,71263,71286,71395-71396,71405-71406,71485,71492,71494 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r70912 | georg.brandl | 2009-03-31 17:35:46 -0500 (Tue, 31 Mar 2009) | 1 line #5617: add a handy function to print a unicode string to gdbinit. ........ r70944 | georg.brandl | 2009-03-31 23:32:39 -0500 (Tue, 31 Mar 2009) | 1 line #5631: add upload to list of possible commands, which is presented in --help-commands. ........ r70968 | michael.foord | 2009-04-01 13:25:38 -0500 (Wed, 01 Apr 2009) | 1 line Adding Wing project file ........ r71033 | brett.cannon | 2009-04-01 22:34:53 -0500 (Wed, 01 Apr 2009) | 3 lines Fix two issues introduced by issue #71031 by changing the signature of PyImport_AppendInittab() to take a const char *. ........ r71041 | jesse.noller | 2009-04-02 00:17:26 -0500 (Thu, 02 Apr 2009) | 1 line Add custom initializer argument to multiprocess.Manager*, courtesy of lekma ........ r71208 | michael.foord | 2009-04-04 20:15:01 -0500 (Sat, 04 Apr 2009) | 4 lines Change the way unittest.TestSuite use their tests to always access them through iteration. Non behavior changing, this allows you to create custom subclasses that override __iter__. Issue #5693 ........ r71263 | michael.foord | 2009-04-05 14:19:28 -0500 (Sun, 05 Apr 2009) | 4 lines Adding assertIs and assertIsNot methods to unittest.TestCase Issue #2578 ........ r71286 | tarek.ziade | 2009-04-05 17:04:38 -0500 (Sun, 05 Apr 2009) | 1 line added a simplest test to distutils.spawn._nt_quote_args ........ r71395 | benjamin.peterson | 2009-04-08 08:27:29 -0500 (Wed, 08 Apr 2009) | 1 line these must be installed to correctly run tests ........ r71396 | benjamin.peterson | 2009-04-08 08:29:41 -0500 (Wed, 08 Apr 2009) | 1 line fix syntax ........ r71405 | andrew.kuchling | 2009-04-09 06:22:47 -0500 (Thu, 09 Apr 2009) | 1 line Add items ........ r71406 | andrew.kuchling | 2009-04-09 06:23:36 -0500 (Thu, 09 Apr 2009) | 1 line Typo fixes ........ r71485 | andrew.kuchling | 2009-04-11 11:12:23 -0500 (Sat, 11 Apr 2009) | 1 line Add various items ........ r71492 | georg.brandl | 2009-04-11 13:19:27 -0500 (Sat, 11 Apr 2009) | 1 line Take credit for a patch of mine. ........ r71494 | benjamin.peterson | 2009-04-11 14:31:00 -0500 (Sat, 11 Apr 2009) | 1 line ignore py3_test_grammar when compiling the library ........
2009-04-11 20:45:40 +00:00
def test_manager_initializer(self):
m = multiprocessing.managers.SyncManager()
self.assertRaises(TypeError, m.start, 1)
m.start(initializer, (self.ns,))
self.assertEqual(self.ns.test, 1)
m.shutdown()
m.join()
Merged revisions 70912,70944,70968,71033,71041,71208,71263,71286,71395-71396,71405-71406,71485,71492,71494 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r70912 | georg.brandl | 2009-03-31 17:35:46 -0500 (Tue, 31 Mar 2009) | 1 line #5617: add a handy function to print a unicode string to gdbinit. ........ r70944 | georg.brandl | 2009-03-31 23:32:39 -0500 (Tue, 31 Mar 2009) | 1 line #5631: add upload to list of possible commands, which is presented in --help-commands. ........ r70968 | michael.foord | 2009-04-01 13:25:38 -0500 (Wed, 01 Apr 2009) | 1 line Adding Wing project file ........ r71033 | brett.cannon | 2009-04-01 22:34:53 -0500 (Wed, 01 Apr 2009) | 3 lines Fix two issues introduced by issue #71031 by changing the signature of PyImport_AppendInittab() to take a const char *. ........ r71041 | jesse.noller | 2009-04-02 00:17:26 -0500 (Thu, 02 Apr 2009) | 1 line Add custom initializer argument to multiprocess.Manager*, courtesy of lekma ........ r71208 | michael.foord | 2009-04-04 20:15:01 -0500 (Sat, 04 Apr 2009) | 4 lines Change the way unittest.TestSuite use their tests to always access them through iteration. Non behavior changing, this allows you to create custom subclasses that override __iter__. Issue #5693 ........ r71263 | michael.foord | 2009-04-05 14:19:28 -0500 (Sun, 05 Apr 2009) | 4 lines Adding assertIs and assertIsNot methods to unittest.TestCase Issue #2578 ........ r71286 | tarek.ziade | 2009-04-05 17:04:38 -0500 (Sun, 05 Apr 2009) | 1 line added a simplest test to distutils.spawn._nt_quote_args ........ r71395 | benjamin.peterson | 2009-04-08 08:27:29 -0500 (Wed, 08 Apr 2009) | 1 line these must be installed to correctly run tests ........ r71396 | benjamin.peterson | 2009-04-08 08:29:41 -0500 (Wed, 08 Apr 2009) | 1 line fix syntax ........ r71405 | andrew.kuchling | 2009-04-09 06:22:47 -0500 (Thu, 09 Apr 2009) | 1 line Add items ........ r71406 | andrew.kuchling | 2009-04-09 06:23:36 -0500 (Thu, 09 Apr 2009) | 1 line Typo fixes ........ r71485 | andrew.kuchling | 2009-04-11 11:12:23 -0500 (Sat, 11 Apr 2009) | 1 line Add various items ........ r71492 | georg.brandl | 2009-04-11 13:19:27 -0500 (Sat, 11 Apr 2009) | 1 line Take credit for a patch of mine. ........ r71494 | benjamin.peterson | 2009-04-11 14:31:00 -0500 (Sat, 11 Apr 2009) | 1 line ignore py3_test_grammar when compiling the library ........
2009-04-11 20:45:40 +00:00
def test_pool_initializer(self):
self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
p = multiprocessing.Pool(1, initializer, (self.ns,))
p.close()
p.join()
self.assertEqual(self.ns.test, 1)
#
# Issue 5155, 5313, 5331: Test process in processes
# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
#
def _ThisSubProcess(q):
try:
item = q.get(block=False)
except pyqueue.Empty:
pass
def _TestProcess(q):
queue = multiprocessing.Queue()
subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
subProc.daemon = True
subProc.start()
subProc.join()
def _afunc(x):
return x*x
def pool_in_process():
pool = multiprocessing.Pool(processes=4)
x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
pool.close()
pool.join()
class _file_like(object):
def __init__(self, delegate):
self._delegate = delegate
self._pid = None
@property
def cache(self):
pid = os.getpid()
# There are no race conditions since fork keeps only the running thread
if pid != self._pid:
self._pid = pid
self._cache = []
return self._cache
def write(self, data):
self.cache.append(data)
def flush(self):
self._delegate.write(''.join(self.cache))
self._cache = []
class TestStdinBadfiledescriptor(unittest.TestCase):
def test_queue_in_process(self):
queue = multiprocessing.Queue()
proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
proc.start()
proc.join()
def test_pool_in_process(self):
p = multiprocessing.Process(target=pool_in_process)
p.start()
p.join()
def test_flushing(self):
sio = io.StringIO()
flike = _file_like(sio)
flike.write('foo')
proc = multiprocessing.Process(target=lambda: flike.flush())
flike.flush()
assert sio.getvalue() == 'foo'
class TestWait(unittest.TestCase):
@classmethod
def _child_test_wait(cls, w, slow):
for i in range(10):
if slow:
time.sleep(random.random()*0.1)
w.send((i, os.getpid()))
w.close()
def test_wait(self, slow=False):
from multiprocessing.connection import wait
readers = []
procs = []
messages = []
for i in range(4):
2012-03-06 13:43:24 +01:00
r, w = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
p.daemon = True
p.start()
w.close()
readers.append(r)
procs.append(p)
self.addCleanup(p.join)
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
r.close()
else:
messages.append(msg)
messages.sort()
expected = sorted((i, p.pid) for i in range(10) for p in procs)
self.assertEqual(messages, expected)
@classmethod
def _child_test_wait_socket(cls, address, slow):
s = socket.socket()
s.connect(address)
for i in range(10):
if slow:
time.sleep(random.random()*0.1)
s.sendall(('%s\n' % i).encode('ascii'))
s.close()
def test_wait_socket(self, slow=False):
from multiprocessing.connection import wait
l = socket.socket()
l.bind(('', 0))
l.listen(4)
addr = ('localhost', l.getsockname()[1])
readers = []
procs = []
dic = {}
for i in range(4):
2012-03-06 13:43:24 +01:00
p = multiprocessing.Process(target=self._child_test_wait_socket,
args=(addr, slow))
p.daemon = True
p.start()
procs.append(p)
self.addCleanup(p.join)
for i in range(4):
r, _ = l.accept()
readers.append(r)
dic[r] = []
l.close()
while readers:
for r in wait(readers):
msg = r.recv(32)
if not msg:
readers.remove(r)
r.close()
else:
dic[r].append(msg)
expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
for v in dic.values():
self.assertEqual(b''.join(v), expected)
def test_wait_slow(self):
self.test_wait(True)
def test_wait_socket_slow(self):
2012-05-08 16:08:07 +01:00
self.test_wait_socket(True)
def test_wait_timeout(self):
from multiprocessing.connection import wait
expected = 5
a, b = multiprocessing.Pipe()
start = time.time()
res = wait([a, b], expected)
delta = time.time() - start
self.assertEqual(res, [])
self.assertLess(delta, expected * 2)
self.assertGreater(delta, expected * 0.5)
b.send(None)
start = time.time()
res = wait([a, b], 20)
delta = time.time() - start
self.assertEqual(res, [a])
self.assertLess(delta, 0.4)
@classmethod
def signal_and_sleep(cls, sem, period):
sem.release()
time.sleep(period)
def test_wait_integer(self):
from multiprocessing.connection import wait
expected = 3
sem = multiprocessing.Semaphore(0)
a, b = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.signal_and_sleep,
args=(sem, expected))
p.start()
self.assertIsInstance(p.sentinel, int)
self.assertTrue(sem.acquire(timeout=20))
start = time.time()
res = wait([a, p.sentinel, b], expected + 20)
delta = time.time() - start
self.assertEqual(res, [p.sentinel])
self.assertLess(delta, expected + 2)
self.assertGreater(delta, expected - 2)
a.send(None)
start = time.time()
res = wait([a, p.sentinel, b], 20)
delta = time.time() - start
self.assertEqual(res, [p.sentinel, b])
self.assertLess(delta, 0.4)
b.send(None)
start = time.time()
res = wait([a, p.sentinel, b], 20)
delta = time.time() - start
self.assertEqual(res, [a, p.sentinel, b])
self.assertLess(delta, 0.4)
p.terminate()
p.join()
def test_neg_timeout(self):
from multiprocessing.connection import wait
a, b = multiprocessing.Pipe()
t = time.time()
res = wait([a], timeout=-1)
t = time.time() - t
self.assertEqual(res, [])
self.assertLess(t, 1)
a.close()
b.close()
#
# Issue 14151: Test invalid family on invalid environment
#
class TestInvalidFamily(unittest.TestCase):
@unittest.skipIf(WIN32, "skipped on Windows")
def test_invalid_family(self):
with self.assertRaises(ValueError):
multiprocessing.connection.Listener(r'\\.\test')
@unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
def test_invalid_family_win32(self):
with self.assertRaises(ValueError):
multiprocessing.connection.Listener('/var/test.pipe')
#
# Issue 12098: check sys.flags of child matches that for parent
#
class TestFlags(unittest.TestCase):
@classmethod
def run_in_grandchild(cls, conn):
conn.send(tuple(sys.flags))
@classmethod
def run_in_child(cls):
import json
r, w = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
p.start()
grandchild_flags = r.recv()
p.join()
r.close()
w.close()
flags = (tuple(sys.flags), grandchild_flags)
print(json.dumps(flags))
def test_flags(self):
import json, subprocess
# start child process using unusual flags
prog = ('from test.test_multiprocessing import TestFlags; ' +
'TestFlags.run_in_child()')
data = subprocess.check_output(
[sys.executable, '-E', '-S', '-O', '-c', prog])
child_flags, grandchild_flags = json.loads(data.decode('ascii'))
self.assertEqual(child_flags, grandchild_flags)
#
# Test interaction with socket timeouts - see Issue #6056
#
class TestTimeouts(unittest.TestCase):
@classmethod
def _test_timeout(cls, child, address):
time.sleep(1)
child.send(123)
child.close()
conn = multiprocessing.connection.Client(address)
conn.send(456)
conn.close()
def test_timeout(self):
old_timeout = socket.getdefaulttimeout()
try:
socket.setdefaulttimeout(0.1)
parent, child = multiprocessing.Pipe(duplex=True)
l = multiprocessing.connection.Listener(family='AF_INET')
p = multiprocessing.Process(target=self._test_timeout,
args=(child, l.address))
p.start()
child.close()
self.assertEqual(parent.recv(), 123)
parent.close()
conn = l.accept()
self.assertEqual(conn.recv(), 456)
conn.close()
l.close()
p.join(10)
finally:
socket.setdefaulttimeout(old_timeout)
#
# Test what happens with no "if __name__ == '__main__'"
#
class TestNoForkBomb(unittest.TestCase):
def test_noforkbomb(self):
name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
if WIN32:
rc, out, err = test.script_helper.assert_python_failure(name)
self.assertEqual('', out.decode('ascii'))
self.assertIn('RuntimeError', err.decode('ascii'))
else:
rc, out, err = test.script_helper.assert_python_ok(name)
self.assertEqual('123', out.decode('ascii').rstrip())
self.assertEqual('', err.decode('ascii'))
#
#
#
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
2012-08-14 12:51:14 +01:00
TestFlags, TestTimeouts, TestNoForkBomb]
#
#
#
def test_main(run=None):
2008-06-18 14:22:48 +00:00
if sys.platform.startswith("linux"):
try:
lock = multiprocessing.RLock()
except OSError:
Merged revisions 70554,70588-70589,70598,70605,70611-70621,70623-70624,70626-70627 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r70554 | benjamin.peterson | 2009-03-23 16:25:15 -0500 (Mon, 23 Mar 2009) | 1 line complain when there's no last exception ........ r70588 | benjamin.peterson | 2009-03-24 17:56:32 -0500 (Tue, 24 Mar 2009) | 1 line fix newline issue in test summary ........ r70589 | benjamin.peterson | 2009-03-24 18:07:07 -0500 (Tue, 24 Mar 2009) | 1 line another style nit ........ r70598 | benjamin.peterson | 2009-03-25 16:24:04 -0500 (Wed, 25 Mar 2009) | 1 line add shorthands for expected failures and unexpected success ........ r70605 | benjamin.peterson | 2009-03-26 11:32:23 -0500 (Thu, 26 Mar 2009) | 1 line remove uneeded function ........ r70611 | benjamin.peterson | 2009-03-26 13:35:37 -0500 (Thu, 26 Mar 2009) | 1 line add much better tests for python version information parsing ........ r70612 | benjamin.peterson | 2009-03-26 13:55:48 -0500 (Thu, 26 Mar 2009) | 1 line more and more implementations now support sys.subversion ........ r70613 | benjamin.peterson | 2009-03-26 13:58:30 -0500 (Thu, 26 Mar 2009) | 1 line roll old test in with new one ........ r70614 | benjamin.peterson | 2009-03-26 14:09:21 -0500 (Thu, 26 Mar 2009) | 1 line add support for PyPy ........ r70615 | benjamin.peterson | 2009-03-26 14:58:18 -0500 (Thu, 26 Mar 2009) | 5 lines add some useful utilities for skipping tests with unittest's new skipping ability most significantly apply a modified portion of the patch from #4242 with patches for skipping implementation details ........ r70616 | benjamin.peterson | 2009-03-26 15:05:50 -0500 (Thu, 26 Mar 2009) | 1 line rename TestCase.skip() to skipTest() because it causes annoying problems with trial #5571 ........ r70617 | benjamin.peterson | 2009-03-26 15:17:27 -0500 (Thu, 26 Mar 2009) | 1 line apply the second part of #4242's patch; classify all the implementation details in test_descr ........ r70618 | benjamin.peterson | 2009-03-26 15:48:25 -0500 (Thu, 26 Mar 2009) | 1 line remove test_support.TestSkipped and just use unittest.SkipTest ........ r70619 | benjamin.peterson | 2009-03-26 15:49:40 -0500 (Thu, 26 Mar 2009) | 1 line fix naming ........ r70620 | benjamin.peterson | 2009-03-26 16:10:30 -0500 (Thu, 26 Mar 2009) | 1 line fix incorrect auto-translation of TestSkipped -> unittest.SkipTest ........ r70621 | benjamin.peterson | 2009-03-26 16:11:16 -0500 (Thu, 26 Mar 2009) | 1 line must pass argument to get expected behavior ;) ........ r70623 | benjamin.peterson | 2009-03-26 16:30:10 -0500 (Thu, 26 Mar 2009) | 1 line add missing import ........ r70624 | benjamin.peterson | 2009-03-26 16:30:54 -0500 (Thu, 26 Mar 2009) | 1 line ** is required here ........ r70626 | benjamin.peterson | 2009-03-26 16:40:29 -0500 (Thu, 26 Mar 2009) | 1 line update email tests to use SkipTest ........ r70627 | benjamin.peterson | 2009-03-26 16:44:43 -0500 (Thu, 26 Mar 2009) | 1 line fix another name ........
2009-03-28 21:42:05 +00:00
raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
check_enough_semaphores()
if run is None:
from test.support import run_unittest as run
util.get_temp_dir() # creates temp directory for use by all processes
multiprocessing.get_logger().setLevel(LOG_LEVEL)
testcases = (
Merged revisions 64475,64544-64545,64550,64557-64558,64565,64570,64577,64582-64583,64585,64590,64592-64593,64625,64630,64638,64647,64655-64656,64663-64664 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r64475 | raymond.hettinger | 2008-06-22 22:29:28 -0500 (Sun, 22 Jun 2008) | 1 line Issue 3161: Missing import and test. ........ r64544 | georg.brandl | 2008-06-26 16:12:55 -0500 (Thu, 26 Jun 2008) | 2 lines Use newer versions of externals. ........ r64545 | benjamin.peterson | 2008-06-26 16:23:30 -0500 (Thu, 26 Jun 2008) | 1 line add a htmlview directive ........ r64550 | brett.cannon | 2008-06-26 19:32:16 -0500 (Thu, 26 Jun 2008) | 2 lines Ignore .pyc and .pyo files. ........ r64557 | mark.dickinson | 2008-06-27 05:11:52 -0500 (Fri, 27 Jun 2008) | 3 lines Remove trailing 'L's from numerator and denominator in the repr() of a Fraction instance. ........ r64558 | mark.dickinson | 2008-06-27 06:03:21 -0500 (Fri, 27 Jun 2008) | 2 lines Add Jean Brouwers for his work on math.sum ........ r64565 | raymond.hettinger | 2008-06-27 16:34:24 -0500 (Fri, 27 Jun 2008) | 1 line Fix whitespace in example code. ........ r64570 | hyeshik.chang | 2008-06-27 20:04:31 -0500 (Fri, 27 Jun 2008) | 8 lines Give information for compililation of _multiprocessing.SemLock on FreeBSD: FreeBSD's P1003.1b semaphore support is highly experimental and it's disabled by default. Even if a user loads the experimental kernel module manually, _multiprocessing doesn't work correctly due to several known incompatibilities around sem_unlink and sem_getvalue, yet. ........ r64577 | raymond.hettinger | 2008-06-28 17:16:53 -0500 (Sat, 28 Jun 2008) | 1 line Issue 3230: Do not the set specific size macro. ........ r64582 | benjamin.peterson | 2008-06-28 18:06:05 -0500 (Sat, 28 Jun 2008) | 2 lines convert test_audioop to unittest. Thanks to Giampaolo Rodola. ........ r64583 | benjamin.peterson | 2008-06-28 18:06:49 -0500 (Sat, 28 Jun 2008) | 1 line rewrap ........ r64585 | benjamin.peterson | 2008-06-28 18:35:31 -0500 (Sat, 28 Jun 2008) | 1 line fix typo ........ r64590 | benjamin.peterson | 2008-06-29 08:43:07 -0500 (Sun, 29 Jun 2008) | 1 line reinstate the ending backtick. thanks Nick :) ........ r64592 | vinay.sajip | 2008-06-29 16:25:28 -0500 (Sun, 29 Jun 2008) | 2 lines Removed out-of-date comment in _install_handlers and used issubclass in place of equality comparison of classes. ........ r64593 | vinay.sajip | 2008-06-29 16:27:15 -0500 (Sun, 29 Jun 2008) | 1 line Updated to reflect change in logging.config to remove out-of-date comment in _install_handlers and the use of issubclass in place of equality comparison of classes. ........ r64625 | georg.brandl | 2008-07-01 14:59:00 -0500 (Tue, 01 Jul 2008) | 2 lines Add a link to PEP 324. ........ r64630 | georg.brandl | 2008-07-01 15:18:10 -0500 (Tue, 01 Jul 2008) | 2 lines #3216: fix Execute's parameter description. ........ r64638 | georg.brandl | 2008-07-01 15:50:02 -0500 (Tue, 01 Jul 2008) | 2 lines #1410739: add a footnote about "is" and "unusual" behavior. ........ r64647 | benjamin.peterson | 2008-07-01 18:33:06 -0500 (Tue, 01 Jul 2008) | 1 line add ABC to the glossary ........ r64655 | mark.dickinson | 2008-07-02 04:37:01 -0500 (Wed, 02 Jul 2008) | 7 lines Replace occurrences of '\d' with '[0-9]' in Decimal regex, to make sure that the behaviour of Decimal doesn't change if/when re.UNICODE becomes assumed in Python 3.0. Also add a check that alternative Unicode digits (e.g. u'\N{FULLWIDTH DIGIT ONE}') are *not* accepted in a numeric string. ........ r64656 | nick.coghlan | 2008-07-02 08:09:19 -0500 (Wed, 02 Jul 2008) | 1 line Issue 3190: pydoc now hides module __package__ attributes ........ r64663 | jesse.noller | 2008-07-02 11:44:09 -0500 (Wed, 02 Jul 2008) | 1 line Reenable the manager tests with Amaury's threading fix ........ r64664 | facundo.batista | 2008-07-02 11:52:55 -0500 (Wed, 02 Jul 2008) | 4 lines Issue #449227: Now with the rlcompleter module, callable objects are added a '(' when completed. ........
2008-07-02 20:22:54 +00:00
sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
testcases_other
)
loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
run(suite)
def main():
test_main(unittest.TextTestRunner(verbosity=2).run)
if __name__ == '__main__':
main()