mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	 0db85eeba7
			
		
	
	
		0db85eeba7
		
			
		
	
	
	
	
		
			
			The Tier 2 opcode _IS_ITER_EXHAUSTED_LIST (and _TUPLE) didn't set it->it_seq to NULL, causing a subtle bug that resulted in test_exhausted_iterator in list_tests.py to fail when running all tests with -Xuops. The bug was introduced in gh-106696. Added this as an explicit test. Also fixed the dependencies for ceval.o -- it depends on executor_cases.c.h.
		
			
				
	
	
		
			2667 lines
		
	
	
	
		
			99 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			2667 lines
		
	
	
	
		
			99 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Run the _testcapi module tests (tests for the Python/C API):  by defn,
 | |
| # these are all functions _testcapi exports whose name begins with 'test_'.
 | |
| 
 | |
| import _thread
 | |
| from collections import OrderedDict, deque
 | |
| import contextlib
 | |
| import importlib.machinery
 | |
| import importlib.util
 | |
| import json
 | |
| import opcode
 | |
| import os
 | |
| import pickle
 | |
| import queue
 | |
| import random
 | |
| import sys
 | |
| import textwrap
 | |
| import threading
 | |
| import time
 | |
| import types
 | |
| import unittest
 | |
| import warnings
 | |
| import weakref
 | |
| import operator
 | |
| from test import support
 | |
| from test.support import MISSING_C_DOCSTRINGS
 | |
| from test.support import import_helper
 | |
| from test.support import threading_helper
 | |
| from test.support import warnings_helper
 | |
| from test.support import requires_limited_api
 | |
| from test.support.script_helper import assert_python_failure, assert_python_ok, run_python_until_end
 | |
| try:
 | |
|     import _posixsubprocess
 | |
| except ImportError:
 | |
|     _posixsubprocess = None
 | |
| try:
 | |
|     import _testmultiphase
 | |
| except ImportError:
 | |
|     _testmultiphase = None
 | |
| try:
 | |
|     import _testsinglephase
 | |
| except ImportError:
 | |
|     _testsinglephase = None
 | |
| try:
 | |
|     import _xxsubinterpreters as _interpreters
 | |
| except ModuleNotFoundError:
 | |
|     _interpreters = None
 | |
| 
 | |
| # Skip this test if the _testcapi module isn't available.
 | |
| _testcapi = import_helper.import_module('_testcapi')
 | |
| 
 | |
| import _testinternalcapi
 | |
| 
 | |
| 
 | |
| def decode_stderr(err):
 | |
|     return err.decode('utf-8', 'replace').replace('\r', '')
 | |
| 
 | |
| 
 | |
| def requires_subinterpreters(meth):
 | |
|     """Decorator to skip a test if subinterpreters are not supported."""
 | |
|     return unittest.skipIf(_interpreters is None,
 | |
|                            'subinterpreters required')(meth)
 | |
| 
 | |
| 
 | |
| def testfunction(self):
 | |
|     """some doc"""
 | |
|     return self
 | |
| 
 | |
| 
 | |
| class InstanceMethod:
 | |
|     id = _testcapi.instancemethod(id)
 | |
|     testfunction = _testcapi.instancemethod(testfunction)
 | |
| 
 | |
| class CAPITest(unittest.TestCase):
 | |
| 
 | |
|     def test_instancemethod(self):
 | |
|         inst = InstanceMethod()
 | |
|         self.assertEqual(id(inst), inst.id())
 | |
|         self.assertTrue(inst.testfunction() is inst)
 | |
|         self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
 | |
|         self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
 | |
| 
 | |
|         InstanceMethod.testfunction.attribute = "test"
 | |
|         self.assertEqual(testfunction.attribute, "test")
 | |
|         self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
 | |
| 
 | |
|     @support.requires_subprocess()
 | |
|     def test_no_FatalError_infinite_loop(self):
 | |
|         run_result, _cmd_line = run_python_until_end(
 | |
|             '-c', 'import _testcapi; _testcapi.crash_no_current_thread()',
 | |
|         )
 | |
|         _rc, out, err = run_result
 | |
|         self.assertEqual(out, b'')
 | |
|         # This used to cause an infinite loop.
 | |
|         msg = ("Fatal Python error: PyThreadState_Get: "
 | |
|                "the function must be called with the GIL held, "
 | |
|                "after Python initialization and before Python finalization, "
 | |
|                "but the GIL is released "
 | |
|                "(the current Python thread state is NULL)").encode()
 | |
|         self.assertTrue(err.rstrip().startswith(msg),
 | |
|                         err)
 | |
| 
 | |
|     def test_memoryview_from_NULL_pointer(self):
 | |
|         self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
 | |
| 
 | |
|     @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
 | |
|     def test_seq_bytes_to_charp_array(self):
 | |
|         # Issue #15732: crash in _PySequence_BytesToCharpArray()
 | |
|         class Z(object):
 | |
|             def __len__(self):
 | |
|                 return 1
 | |
|         with self.assertRaisesRegex(TypeError, 'indexing'):
 | |
|             _posixsubprocess.fork_exec(
 | |
|                           1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False)
 | |
|         # Issue #15736: overflow in _PySequence_BytesToCharpArray()
 | |
|         class Z(object):
 | |
|             def __len__(self):
 | |
|                 return sys.maxsize
 | |
|             def __getitem__(self, i):
 | |
|                 return b'x'
 | |
|         self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
 | |
|                           1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False)
 | |
| 
 | |
|     @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
 | |
|     def test_subprocess_fork_exec(self):
 | |
|         class Z(object):
 | |
|             def __len__(self):
 | |
|                 return 1
 | |
| 
 | |
|         # Issue #15738: crash in subprocess_fork_exec()
 | |
|         self.assertRaises(TypeError, _posixsubprocess.fork_exec,
 | |
|                           Z(),[b'1'],True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False)
 | |
| 
 | |
|     @unittest.skipIf(MISSING_C_DOCSTRINGS,
 | |
|                      "Signature information for builtins requires docstrings")
 | |
|     def test_docstring_signature_parsing(self):
 | |
| 
 | |
|         self.assertEqual(_testcapi.no_docstring.__doc__, None)
 | |
|         self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
 | |
| 
 | |
|         self.assertEqual(_testcapi.docstring_empty.__doc__, None)
 | |
|         self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
 | |
| 
 | |
|         self.assertEqual(_testcapi.docstring_no_signature.__doc__,
 | |
|             "This docstring has no signature.")
 | |
|         self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
 | |
| 
 | |
|         self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
 | |
|             "docstring_with_invalid_signature($module, /, boo)\n"
 | |
|             "\n"
 | |
|             "This docstring has an invalid signature."
 | |
|             )
 | |
|         self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
 | |
| 
 | |
|         self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
 | |
|             "docstring_with_invalid_signature2($module, /, boo)\n"
 | |
|             "\n"
 | |
|             "--\n"
 | |
|             "\n"
 | |
|             "This docstring also has an invalid signature."
 | |
|             )
 | |
|         self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)
 | |
| 
 | |
|         self.assertEqual(_testcapi.docstring_with_signature.__doc__,
 | |
|             "This docstring has a valid signature.")
 | |
|         self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
 | |
| 
 | |
|         self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None)
 | |
|         self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__,
 | |
|             "($module, /, sig)")
 | |
| 
 | |
|         self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
 | |
|             "\nThis docstring has a valid signature and some extra newlines.")
 | |
|         self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
 | |
|             "($module, /, parameter)")
 | |
| 
 | |
|     def test_c_type_with_matrix_multiplication(self):
 | |
|         M = _testcapi.matmulType
 | |
|         m1 = M()
 | |
|         m2 = M()
 | |
|         self.assertEqual(m1 @ m2, ("matmul", m1, m2))
 | |
|         self.assertEqual(m1 @ 42, ("matmul", m1, 42))
 | |
|         self.assertEqual(42 @ m1, ("matmul", 42, m1))
 | |
|         o = m1
 | |
|         o @= m2
 | |
|         self.assertEqual(o, ("imatmul", m1, m2))
 | |
|         o = m1
 | |
|         o @= 42
 | |
|         self.assertEqual(o, ("imatmul", m1, 42))
 | |
|         o = 42
 | |
|         o @= m1
 | |
|         self.assertEqual(o, ("matmul", 42, m1))
 | |
| 
 | |
|     def test_c_type_with_ipow(self):
 | |
|         # When the __ipow__ method of a type was implemented in C, using the
 | |
|         # modulo param would cause segfaults.
 | |
|         o = _testcapi.ipowType()
 | |
|         self.assertEqual(o.__ipow__(1), (1, None))
 | |
|         self.assertEqual(o.__ipow__(2, 2), (2, 2))
 | |
| 
 | |
|     def test_return_null_without_error(self):
 | |
|         # Issue #23571: A function must not return NULL without setting an
 | |
|         # error
 | |
|         if support.Py_DEBUG:
 | |
|             code = textwrap.dedent("""
 | |
|                 import _testcapi
 | |
|                 from test import support
 | |
| 
 | |
|                 with support.SuppressCrashReport():
 | |
|                     _testcapi.return_null_without_error()
 | |
|             """)
 | |
|             rc, out, err = assert_python_failure('-c', code)
 | |
|             err = decode_stderr(err)
 | |
|             self.assertRegex(err,
 | |
|                 r'Fatal Python error: _Py_CheckFunctionResult: '
 | |
|                     r'a function returned NULL without setting an exception\n'
 | |
|                 r'Python runtime state: initialized\n'
 | |
|                 r'SystemError: <built-in function return_null_without_error> '
 | |
|                     r'returned NULL without setting an exception\n'
 | |
|                 r'\n'
 | |
|                 r'Current thread.*:\n'
 | |
|                 r'  File .*", line 6 in <module>\n')
 | |
|         else:
 | |
|             with self.assertRaises(SystemError) as cm:
 | |
|                 _testcapi.return_null_without_error()
 | |
|             self.assertRegex(str(cm.exception),
 | |
|                              'return_null_without_error.* '
 | |
|                              'returned NULL without setting an exception')
 | |
| 
 | |
|     def test_return_result_with_error(self):
 | |
|         # Issue #23571: A function must not return a result with an error set
 | |
|         if support.Py_DEBUG:
 | |
|             code = textwrap.dedent("""
 | |
|                 import _testcapi
 | |
|                 from test import support
 | |
| 
 | |
|                 with support.SuppressCrashReport():
 | |
|                     _testcapi.return_result_with_error()
 | |
|             """)
 | |
|             rc, out, err = assert_python_failure('-c', code)
 | |
|             err = decode_stderr(err)
 | |
|             self.assertRegex(err,
 | |
|                     r'Fatal Python error: _Py_CheckFunctionResult: '
 | |
|                         r'a function returned a result with an exception set\n'
 | |
|                     r'Python runtime state: initialized\n'
 | |
|                     r'ValueError\n'
 | |
|                     r'\n'
 | |
|                     r'The above exception was the direct cause '
 | |
|                         r'of the following exception:\n'
 | |
|                     r'\n'
 | |
|                     r'SystemError: <built-in '
 | |
|                         r'function return_result_with_error> '
 | |
|                         r'returned a result with an exception set\n'
 | |
|                     r'\n'
 | |
|                     r'Current thread.*:\n'
 | |
|                     r'  File .*, line 6 in <module>\n')
 | |
|         else:
 | |
|             with self.assertRaises(SystemError) as cm:
 | |
|                 _testcapi.return_result_with_error()
 | |
|             self.assertRegex(str(cm.exception),
 | |
|                              'return_result_with_error.* '
 | |
|                              'returned a result with an exception set')
 | |
| 
 | |
|     def test_getitem_with_error(self):
 | |
|         # Test _Py_CheckSlotResult(). Raise an exception and then calls
 | |
|         # PyObject_GetItem(): check that the assertion catches the bug.
 | |
|         # PyObject_GetItem() must not be called with an exception set.
 | |
|         code = textwrap.dedent("""
 | |
|             import _testcapi
 | |
|             from test import support
 | |
| 
 | |
|             with support.SuppressCrashReport():
 | |
|                 _testcapi.getitem_with_error({1: 2}, 1)
 | |
|         """)
 | |
|         rc, out, err = assert_python_failure('-c', code)
 | |
|         err = decode_stderr(err)
 | |
|         if 'SystemError: ' not in err:
 | |
|             self.assertRegex(err,
 | |
|                     r'Fatal Python error: _Py_CheckSlotResult: '
 | |
|                         r'Slot __getitem__ of type dict succeeded '
 | |
|                         r'with an exception set\n'
 | |
|                     r'Python runtime state: initialized\n'
 | |
|                     r'ValueError: bug\n'
 | |
|                     r'\n'
 | |
|                     r'Current thread .* \(most recent call first\):\n'
 | |
|                     r'  File .*, line 6 in <module>\n'
 | |
|                     r'\n'
 | |
|                     r'Extension modules: _testcapi \(total: 1\)\n')
 | |
|         else:
 | |
|             # Python built with NDEBUG macro defined:
 | |
|             # test _Py_CheckFunctionResult() instead.
 | |
|             self.assertIn('returned a result with an exception set', err)
 | |
| 
 | |
|     def test_buildvalue_N(self):
 | |
|         _testcapi.test_buildvalue_N()
 | |
| 
 | |
|     def test_mapping_keys_values_items(self):
 | |
|         class Mapping1(dict):
 | |
|             def keys(self):
 | |
|                 return list(super().keys())
 | |
|             def values(self):
 | |
|                 return list(super().values())
 | |
|             def items(self):
 | |
|                 return list(super().items())
 | |
|         class Mapping2(dict):
 | |
|             def keys(self):
 | |
|                 return tuple(super().keys())
 | |
|             def values(self):
 | |
|                 return tuple(super().values())
 | |
|             def items(self):
 | |
|                 return tuple(super().items())
 | |
|         dict_obj = {'foo': 1, 'bar': 2, 'spam': 3}
 | |
| 
 | |
|         for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(),
 | |
|                         dict_obj, OrderedDict(dict_obj),
 | |
|                         Mapping1(dict_obj), Mapping2(dict_obj)]:
 | |
|             self.assertListEqual(_testcapi.get_mapping_keys(mapping),
 | |
|                                  list(mapping.keys()))
 | |
|             self.assertListEqual(_testcapi.get_mapping_values(mapping),
 | |
|                                  list(mapping.values()))
 | |
|             self.assertListEqual(_testcapi.get_mapping_items(mapping),
 | |
|                                  list(mapping.items()))
 | |
| 
 | |
|     def test_mapping_keys_values_items_bad_arg(self):
 | |
|         self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None)
 | |
|         self.assertRaises(AttributeError, _testcapi.get_mapping_values, None)
 | |
|         self.assertRaises(AttributeError, _testcapi.get_mapping_items, None)
 | |
| 
 | |
|         class BadMapping:
 | |
|             def keys(self):
 | |
|                 return None
 | |
|             def values(self):
 | |
|                 return None
 | |
|             def items(self):
 | |
|                 return None
 | |
|         bad_mapping = BadMapping()
 | |
|         self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping)
 | |
|         self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping)
 | |
|         self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping)
 | |
| 
 | |
|     def test_mapping_has_key(self):
 | |
|         dct = {'a': 1}
 | |
|         self.assertTrue(_testcapi.mapping_has_key(dct, 'a'))
 | |
|         self.assertFalse(_testcapi.mapping_has_key(dct, 'b'))
 | |
| 
 | |
|         class SubDict(dict):
 | |
|             pass
 | |
| 
 | |
|         dct2 = SubDict({'a': 1})
 | |
|         self.assertTrue(_testcapi.mapping_has_key(dct2, 'a'))
 | |
|         self.assertFalse(_testcapi.mapping_has_key(dct2, 'b'))
 | |
| 
 | |
|     def test_sequence_set_slice(self):
 | |
|         # Correct case:
 | |
|         data = [1, 2, 3, 4, 5]
 | |
|         data_copy = data.copy()
 | |
| 
 | |
|         _testcapi.sequence_set_slice(data, 1, 3, [8, 9])
 | |
|         data_copy[1:3] = [8, 9]
 | |
|         self.assertEqual(data, data_copy)
 | |
|         self.assertEqual(data, [1, 8, 9, 4, 5])
 | |
| 
 | |
|         # Custom class:
 | |
|         class Custom:
 | |
|             def __setitem__(self, index, value):
 | |
|                 self.index = index
 | |
|                 self.value = value
 | |
| 
 | |
|         c = Custom()
 | |
|         _testcapi.sequence_set_slice(c, 0, 5, 'abc')
 | |
|         self.assertEqual(c.index, slice(0, 5))
 | |
|         self.assertEqual(c.value, 'abc')
 | |
| 
 | |
|         # Immutable sequences must raise:
 | |
|         bad_seq1 = (1, 2, 3, 4)
 | |
|         with self.assertRaises(TypeError):
 | |
|             _testcapi.sequence_set_slice(bad_seq1, 1, 3, (8, 9))
 | |
|         self.assertEqual(bad_seq1, (1, 2, 3, 4))
 | |
| 
 | |
|         bad_seq2 = 'abcd'
 | |
|         with self.assertRaises(TypeError):
 | |
|             _testcapi.sequence_set_slice(bad_seq2, 1, 3, 'xy')
 | |
|         self.assertEqual(bad_seq2, 'abcd')
 | |
| 
 | |
|         # Not a sequence:
 | |
|         with self.assertRaises(TypeError):
 | |
|             _testcapi.sequence_set_slice(None, 1, 3, 'xy')
 | |
| 
 | |
|     def test_sequence_del_slice(self):
 | |
|         # Correct case:
 | |
|         data = [1, 2, 3, 4, 5]
 | |
|         data_copy = data.copy()
 | |
| 
 | |
|         _testcapi.sequence_del_slice(data, 1, 3)
 | |
|         del data_copy[1:3]
 | |
|         self.assertEqual(data, data_copy)
 | |
|         self.assertEqual(data, [1, 4, 5])
 | |
| 
 | |
|         # Custom class:
 | |
|         class Custom:
 | |
|             def __delitem__(self, index):
 | |
|                 self.index = index
 | |
| 
 | |
|         c = Custom()
 | |
|         _testcapi.sequence_del_slice(c, 0, 5)
 | |
|         self.assertEqual(c.index, slice(0, 5))
 | |
| 
 | |
|         # Immutable sequences must raise:
 | |
|         bad_seq1 = (1, 2, 3, 4)
 | |
|         with self.assertRaises(TypeError):
 | |
|             _testcapi.sequence_del_slice(bad_seq1, 1, 3)
 | |
|         self.assertEqual(bad_seq1, (1, 2, 3, 4))
 | |
| 
 | |
|         bad_seq2 = 'abcd'
 | |
|         with self.assertRaises(TypeError):
 | |
|             _testcapi.sequence_del_slice(bad_seq2, 1, 3)
 | |
|         self.assertEqual(bad_seq2, 'abcd')
 | |
| 
 | |
|         # Not a sequence:
 | |
|         with self.assertRaises(TypeError):
 | |
|             _testcapi.sequence_del_slice(None, 1, 3)
 | |
| 
 | |
|         mapping = {1: 'a', 2: 'b', 3: 'c'}
 | |
|         with self.assertRaises(KeyError):
 | |
|             _testcapi.sequence_del_slice(mapping, 1, 3)
 | |
|         self.assertEqual(mapping, {1: 'a', 2: 'b', 3: 'c'})
 | |
| 
 | |
|     @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'),
 | |
|                          'need _testcapi.negative_refcount')
 | |
|     def test_negative_refcount(self):
 | |
|         # bpo-35059: Check that Py_DECREF() reports the correct filename
 | |
|         # when calling _Py_NegativeRefcount() to abort Python.
 | |
|         code = textwrap.dedent("""
 | |
|             import _testcapi
 | |
|             from test import support
 | |
| 
 | |
|             with support.SuppressCrashReport():
 | |
|                 _testcapi.negative_refcount()
 | |
|         """)
 | |
|         rc, out, err = assert_python_failure('-c', code)
 | |
|         self.assertRegex(err,
 | |
|                          br'_testcapimodule\.c:[0-9]+: '
 | |
|                          br'_Py_NegativeRefcount: Assertion failed: '
 | |
|                          br'object has negative ref count')
 | |
| 
 | |
|     def test_trashcan_subclass(self):
 | |
|         # bpo-35983: Check that the trashcan mechanism for "list" is NOT
 | |
|         # activated when its tp_dealloc is being called by a subclass
 | |
|         from _testcapi import MyList
 | |
|         L = None
 | |
|         for i in range(1000):
 | |
|             L = MyList((L,))
 | |
| 
 | |
|     @support.requires_resource('cpu')
 | |
|     def test_trashcan_python_class1(self):
 | |
|         self.do_test_trashcan_python_class(list)
 | |
| 
 | |
|     @support.requires_resource('cpu')
 | |
|     def test_trashcan_python_class2(self):
 | |
|         from _testcapi import MyList
 | |
|         self.do_test_trashcan_python_class(MyList)
 | |
| 
 | |
|     def do_test_trashcan_python_class(self, base):
 | |
|         # Check that the trashcan mechanism works properly for a Python
 | |
|         # subclass of a class using the trashcan (this specific test assumes
 | |
|         # that the base class "base" behaves like list)
 | |
|         class PyList(base):
 | |
|             # Count the number of PyList instances to verify that there is
 | |
|             # no memory leak
 | |
|             num = 0
 | |
|             def __init__(self, *args):
 | |
|                 __class__.num += 1
 | |
|                 super().__init__(*args)
 | |
|             def __del__(self):
 | |
|                 __class__.num -= 1
 | |
| 
 | |
|         for parity in (0, 1):
 | |
|             L = None
 | |
|             # We need in the order of 2**20 iterations here such that a
 | |
|             # typical 8MB stack would overflow without the trashcan.
 | |
|             for i in range(2**20):
 | |
|                 L = PyList((L,))
 | |
|                 L.attr = i
 | |
|             if parity:
 | |
|                 # Add one additional nesting layer
 | |
|                 L = (L,)
 | |
|             self.assertGreater(PyList.num, 0)
 | |
|             del L
 | |
|             self.assertEqual(PyList.num, 0)
 | |
| 
 | |
|     def test_heap_ctype_doc_and_text_signature(self):
 | |
|         self.assertEqual(_testcapi.HeapDocCType.__doc__, "somedoc")
 | |
|         self.assertEqual(_testcapi.HeapDocCType.__text_signature__, "(arg1, arg2)")
 | |
| 
 | |
|     def test_null_type_doc(self):
 | |
|         self.assertEqual(_testcapi.NullTpDocType.__doc__, None)
 | |
| 
 | |
|     def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self):
 | |
|         class HeapGcCTypeSubclass(_testcapi.HeapGcCType):
 | |
|             def __init__(self):
 | |
|                 self.value2 = 20
 | |
|                 super().__init__()
 | |
| 
 | |
|         subclass_instance = HeapGcCTypeSubclass()
 | |
|         type_refcnt = sys.getrefcount(HeapGcCTypeSubclass)
 | |
| 
 | |
|         # Test that subclass instance was fully created
 | |
|         self.assertEqual(subclass_instance.value, 10)
 | |
|         self.assertEqual(subclass_instance.value2, 20)
 | |
| 
 | |
|         # Test that the type reference count is only decremented once
 | |
|         del subclass_instance
 | |
|         self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass))
 | |
| 
 | |
|     def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
 | |
|         class A(_testcapi.HeapGcCType):
 | |
|             def __init__(self):
 | |
|                 self.value2 = 20
 | |
|                 super().__init__()
 | |
| 
 | |
|         class B(A):
 | |
|             def __init__(self):
 | |
|                 super().__init__()
 | |
| 
 | |
|             def __del__(self):
 | |
|                 self.__class__ = A
 | |
|                 A.refcnt_in_del = sys.getrefcount(A)
 | |
|                 B.refcnt_in_del = sys.getrefcount(B)
 | |
| 
 | |
|         subclass_instance = B()
 | |
|         type_refcnt = sys.getrefcount(B)
 | |
|         new_type_refcnt = sys.getrefcount(A)
 | |
| 
 | |
|         # Test that subclass instance was fully created
 | |
|         self.assertEqual(subclass_instance.value, 10)
 | |
|         self.assertEqual(subclass_instance.value2, 20)
 | |
| 
 | |
|         del subclass_instance
 | |
| 
 | |
|         # Test that setting __class__ modified the reference counts of the types
 | |
|         if support.Py_DEBUG:
 | |
|             # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference
 | |
|             # to the type while calling tp_dealloc()
 | |
|             self.assertEqual(type_refcnt, B.refcnt_in_del)
 | |
|         else:
 | |
|             self.assertEqual(type_refcnt - 1, B.refcnt_in_del)
 | |
|         self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del)
 | |
| 
 | |
|         # Test that the original type already has decreased its refcnt
 | |
|         self.assertEqual(type_refcnt - 1, sys.getrefcount(B))
 | |
| 
 | |
|         # Test that subtype_dealloc decref the newly assigned __class__ only once
 | |
|         self.assertEqual(new_type_refcnt, sys.getrefcount(A))
 | |
| 
 | |
|     def test_heaptype_with_dict(self):
 | |
|         inst = _testcapi.HeapCTypeWithDict()
 | |
|         inst.foo = 42
 | |
|         self.assertEqual(inst.foo, 42)
 | |
|         self.assertEqual(inst.dictobj, inst.__dict__)
 | |
|         self.assertEqual(inst.dictobj, {"foo": 42})
 | |
| 
 | |
|         inst = _testcapi.HeapCTypeWithDict()
 | |
|         self.assertEqual({}, inst.__dict__)
 | |
| 
 | |
|     def test_heaptype_with_managed_dict(self):
 | |
|         inst = _testcapi.HeapCTypeWithManagedDict()
 | |
|         inst.foo = 42
 | |
|         self.assertEqual(inst.foo, 42)
 | |
|         self.assertEqual(inst.__dict__, {"foo": 42})
 | |
| 
 | |
|         inst = _testcapi.HeapCTypeWithManagedDict()
 | |
|         self.assertEqual({}, inst.__dict__)
 | |
| 
 | |
|         a = _testcapi.HeapCTypeWithManagedDict()
 | |
|         b = _testcapi.HeapCTypeWithManagedDict()
 | |
|         a.b = b
 | |
|         b.a = a
 | |
|         del a, b
 | |
| 
 | |
|     def test_sublclassing_managed_dict(self):
 | |
| 
 | |
|         class C(_testcapi.HeapCTypeWithManagedDict):
 | |
|             pass
 | |
| 
 | |
|         i = C()
 | |
|         i.spam = i
 | |
|         del i
 | |
| 
 | |
|     def test_heaptype_with_negative_dict(self):
 | |
|         inst = _testcapi.HeapCTypeWithNegativeDict()
 | |
|         inst.foo = 42
 | |
|         self.assertEqual(inst.foo, 42)
 | |
|         self.assertEqual(inst.dictobj, inst.__dict__)
 | |
|         self.assertEqual(inst.dictobj, {"foo": 42})
 | |
| 
 | |
|         inst = _testcapi.HeapCTypeWithNegativeDict()
 | |
|         self.assertEqual({}, inst.__dict__)
 | |
| 
 | |
|     def test_heaptype_with_weakref(self):
 | |
|         inst = _testcapi.HeapCTypeWithWeakref()
 | |
|         ref = weakref.ref(inst)
 | |
|         self.assertEqual(ref(), inst)
 | |
|         self.assertEqual(inst.weakreflist, ref)
 | |
| 
 | |
|     def test_heaptype_with_managed_weakref(self):
 | |
|         inst = _testcapi.HeapCTypeWithManagedWeakref()
 | |
|         ref = weakref.ref(inst)
 | |
|         self.assertEqual(ref(), inst)
 | |
| 
 | |
|     def test_sublclassing_managed_weakref(self):
 | |
| 
 | |
|         class C(_testcapi.HeapCTypeWithManagedWeakref):
 | |
|             pass
 | |
| 
 | |
|         inst = C()
 | |
|         ref = weakref.ref(inst)
 | |
|         self.assertEqual(ref(), inst)
 | |
| 
 | |
|     def test_sublclassing_managed_both(self):
 | |
| 
 | |
|         class C1(_testcapi.HeapCTypeWithManagedWeakref, _testcapi.HeapCTypeWithManagedDict):
 | |
|             pass
 | |
| 
 | |
|         class C2(_testcapi.HeapCTypeWithManagedDict, _testcapi.HeapCTypeWithManagedWeakref):
 | |
|             pass
 | |
| 
 | |
|         for cls in (C1, C2):
 | |
|             inst = cls()
 | |
|             ref = weakref.ref(inst)
 | |
|             self.assertEqual(ref(), inst)
 | |
|             inst.spam = inst
 | |
|             del inst
 | |
|             ref = weakref.ref(cls())
 | |
|             self.assertIs(ref(), None)
 | |
| 
 | |
|     def test_heaptype_with_buffer(self):
 | |
|         inst = _testcapi.HeapCTypeWithBuffer()
 | |
|         b = bytes(inst)
 | |
|         self.assertEqual(b, b"1234")
 | |
| 
 | |
|     def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self):
 | |
|         subclass_instance = _testcapi.HeapCTypeSubclass()
 | |
|         type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
 | |
| 
 | |
|         # Test that subclass instance was fully created
 | |
|         self.assertEqual(subclass_instance.value, 10)
 | |
|         self.assertEqual(subclass_instance.value2, 20)
 | |
| 
 | |
|         # Test that the type reference count is only decremented once
 | |
|         del subclass_instance
 | |
|         self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass))
 | |
| 
 | |
|     def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
 | |
|         subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer()
 | |
|         type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)
 | |
|         new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
 | |
| 
 | |
|         # Test that subclass instance was fully created
 | |
|         self.assertEqual(subclass_instance.value, 10)
 | |
|         self.assertEqual(subclass_instance.value2, 20)
 | |
| 
 | |
|         # The tp_finalize slot will set __class__ to HeapCTypeSubclass
 | |
|         del subclass_instance
 | |
| 
 | |
|         # Test that setting __class__ modified the reference counts of the types
 | |
|         if support.Py_DEBUG:
 | |
|             # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference
 | |
|             # to the type while calling tp_dealloc()
 | |
|             self.assertEqual(type_refcnt, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
 | |
|         else:
 | |
|             self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
 | |
|         self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del)
 | |
| 
 | |
|         # Test that the original type already has decreased its refcnt
 | |
|         self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer))
 | |
| 
 | |
|         # Test that subtype_dealloc decref the newly assigned __class__ only once
 | |
|         self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass))
 | |
| 
 | |
|     def test_heaptype_with_setattro(self):
 | |
|         obj = _testcapi.HeapCTypeSetattr()
 | |
|         self.assertEqual(obj.pvalue, 10)
 | |
|         obj.value = 12
 | |
|         self.assertEqual(obj.pvalue, 12)
 | |
|         del obj.value
 | |
|         self.assertEqual(obj.pvalue, 0)
 | |
| 
 | |
|     def test_heaptype_with_custom_metaclass(self):
 | |
|         metaclass = _testcapi.HeapCTypeMetaclass
 | |
|         self.assertTrue(issubclass(metaclass, type))
 | |
| 
 | |
|         # Class creation from C
 | |
|         t = _testcapi.pytype_fromspec_meta(metaclass)
 | |
|         self.assertIsInstance(t, type)
 | |
|         self.assertEqual(t.__name__, "HeapCTypeViaMetaclass")
 | |
|         self.assertIs(type(t), metaclass)
 | |
| 
 | |
|         # Class creation from Python
 | |
|         t = metaclass("PyClassViaMetaclass", (), {})
 | |
|         self.assertIsInstance(t, type)
 | |
|         self.assertEqual(t.__name__, "PyClassViaMetaclass")
 | |
| 
 | |
|     def test_heaptype_with_custom_metaclass_null_new(self):
 | |
|         metaclass = _testcapi.HeapCTypeMetaclassNullNew
 | |
| 
 | |
|         self.assertTrue(issubclass(metaclass, type))
 | |
| 
 | |
|         # Class creation from C
 | |
|         t = _testcapi.pytype_fromspec_meta(metaclass)
 | |
|         self.assertIsInstance(t, type)
 | |
|         self.assertEqual(t.__name__, "HeapCTypeViaMetaclass")
 | |
|         self.assertIs(type(t), metaclass)
 | |
| 
 | |
|         # Class creation from Python
 | |
|         with self.assertRaisesRegex(TypeError, "cannot create .* instances"):
 | |
|             metaclass("PyClassViaMetaclass", (), {})
 | |
| 
 | |
|     def test_heaptype_with_custom_metaclass_custom_new(self):
 | |
|         metaclass = _testcapi.HeapCTypeMetaclassCustomNew
 | |
| 
 | |
|         self.assertTrue(issubclass(_testcapi.HeapCTypeMetaclassCustomNew, type))
 | |
| 
 | |
|         msg = "Metaclasses with custom tp_new are not supported."
 | |
|         with self.assertRaisesRegex(TypeError, msg):
 | |
|             t = _testcapi.pytype_fromspec_meta(metaclass)
 | |
| 
 | |
|     def test_heaptype_with_custom_metaclass_deprecation(self):
 | |
|         metaclass = _testcapi.HeapCTypeMetaclassCustomNew
 | |
| 
 | |
|         # gh-103968: a metaclass with custom tp_new is deprecated, but still
 | |
|         # allowed for functions that existed in 3.11
 | |
|         # (PyType_FromSpecWithBases is used here).
 | |
|         class Base(metaclass=metaclass):
 | |
|             pass
 | |
| 
 | |
|         # Class creation from C
 | |
|         with warnings_helper.check_warnings(
 | |
|                 ('.*custom tp_new.*in Python 3.14.*', DeprecationWarning),
 | |
|                 ):
 | |
|             sub = _testcapi.make_type_with_base(Base)
 | |
|         self.assertTrue(issubclass(sub, Base))
 | |
|         self.assertIsInstance(sub, metaclass)
 | |
| 
 | |
|     def test_multiple_inheritance_ctypes_with_weakref_or_dict(self):
 | |
| 
 | |
|         with self.assertRaises(TypeError):
 | |
|             class Both1(_testcapi.HeapCTypeWithWeakref, _testcapi.HeapCTypeWithDict):
 | |
|                 pass
 | |
|         with self.assertRaises(TypeError):
 | |
|             class Both2(_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithWeakref):
 | |
|                 pass
 | |
| 
 | |
|     def test_multiple_inheritance_ctypes_with_weakref_or_dict_and_other_builtin(self):
 | |
| 
 | |
|         with self.assertRaises(TypeError):
 | |
|             class C1(_testcapi.HeapCTypeWithDict, list):
 | |
|                 pass
 | |
| 
 | |
|         with self.assertRaises(TypeError):
 | |
|             class C2(_testcapi.HeapCTypeWithWeakref, list):
 | |
|                 pass
 | |
| 
 | |
|         class C3(_testcapi.HeapCTypeWithManagedDict, list):
 | |
|             pass
 | |
|         class C4(_testcapi.HeapCTypeWithManagedWeakref, list):
 | |
|             pass
 | |
| 
 | |
|         inst = C3()
 | |
|         inst.append(0)
 | |
|         str(inst.__dict__)
 | |
| 
 | |
|         inst = C4()
 | |
|         inst.append(0)
 | |
|         str(inst.__weakref__)
 | |
| 
 | |
|         for cls in (_testcapi.HeapCTypeWithManagedDict, _testcapi.HeapCTypeWithManagedWeakref):
 | |
|             for cls2 in (_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithWeakref):
 | |
|                 class S(cls, cls2):
 | |
|                     pass
 | |
|             class B1(C3, cls):
 | |
|                 pass
 | |
|             class B2(C4, cls):
 | |
|                 pass
 | |
| 
 | |
|     def test_pytype_fromspec_with_repeated_slots(self):
 | |
|         for variant in range(2):
 | |
|             with self.subTest(variant=variant):
 | |
|                 with self.assertRaises(SystemError):
 | |
|                     _testcapi.create_type_from_repeated_slots(variant)
 | |
| 
 | |
|     @warnings_helper.ignore_warnings(category=DeprecationWarning)
 | |
|     def test_immutable_type_with_mutable_base(self):
 | |
|         # Add deprecation warning here so it's removed in 3.14
 | |
|         warnings._deprecated(
 | |
|             'creating immutable classes with mutable bases', remove=(3, 14))
 | |
| 
 | |
|         class MutableBase:
 | |
|             def meth(self):
 | |
|                 return 'original'
 | |
| 
 | |
|         with self.assertWarns(DeprecationWarning):
 | |
|             ImmutableSubclass = _testcapi.make_immutable_type_with_base(
 | |
|                 MutableBase)
 | |
|         instance = ImmutableSubclass()
 | |
| 
 | |
|         self.assertEqual(instance.meth(), 'original')
 | |
| 
 | |
|         # Cannot override the static type's method
 | |
|         with self.assertRaisesRegex(
 | |
|                 TypeError,
 | |
|                 "cannot set 'meth' attribute of immutable type"):
 | |
|             ImmutableSubclass.meth = lambda self: 'overridden'
 | |
|         self.assertEqual(instance.meth(), 'original')
 | |
| 
 | |
|         # Can change the method on the mutable base
 | |
|         MutableBase.meth = lambda self: 'changed'
 | |
|         self.assertEqual(instance.meth(), 'changed')
 | |
| 
 | |
|     def test_pynumber_tobase(self):
 | |
|         from _testcapi import pynumber_tobase
 | |
|         small_number = 123
 | |
|         large_number = 2**64
 | |
|         class IDX:
 | |
|             def __init__(self, val):
 | |
|                 self.val = val
 | |
|             def __index__(self):
 | |
|                 return self.val
 | |
| 
 | |
|         test_cases = ((2, '0b1111011', '0b10000000000000000000000000000000000000000000000000000000000000000'),
 | |
|                       (8, '0o173', '0o2000000000000000000000'),
 | |
|                       (10, '123', '18446744073709551616'),
 | |
|                       (16, '0x7b', '0x10000000000000000'))
 | |
|         for base, small_target, large_target in test_cases:
 | |
|             with self.subTest(base=base, st=small_target, lt=large_target):
 | |
|                 # Test for small number
 | |
|                 self.assertEqual(pynumber_tobase(small_number, base), small_target)
 | |
|                 self.assertEqual(pynumber_tobase(-small_number, base), '-' + small_target)
 | |
|                 self.assertEqual(pynumber_tobase(IDX(small_number), base), small_target)
 | |
|                 # Test for large number(out of range of a longlong,i.e.[-2**63, 2**63-1])
 | |
|                 self.assertEqual(pynumber_tobase(large_number, base), large_target)
 | |
|                 self.assertEqual(pynumber_tobase(-large_number, base), '-' + large_target)
 | |
|                 self.assertEqual(pynumber_tobase(IDX(large_number), base), large_target)
 | |
|         self.assertRaises(TypeError, pynumber_tobase, IDX(123.0), 10)
 | |
|         self.assertRaises(TypeError, pynumber_tobase, IDX('123'), 10)
 | |
|         self.assertRaises(TypeError, pynumber_tobase, 123.0, 10)
 | |
|         self.assertRaises(TypeError, pynumber_tobase, '123', 10)
 | |
|         self.assertRaises(SystemError, pynumber_tobase, 123, 0)
 | |
| 
 | |
|     def test_pyobject_repr_from_null(self):
 | |
|         s = _testcapi.pyobject_repr_from_null()
 | |
|         self.assertEqual(s, '<NULL>')
 | |
| 
 | |
|     def test_pyobject_str_from_null(self):
 | |
|         s = _testcapi.pyobject_str_from_null()
 | |
|         self.assertEqual(s, '<NULL>')
 | |
| 
 | |
|     def test_pyobject_bytes_from_null(self):
 | |
|         s = _testcapi.pyobject_bytes_from_null()
 | |
|         self.assertEqual(s, b'<NULL>')
 | |
| 
 | |
|     def test_Py_CompileString(self):
 | |
|         # Check that Py_CompileString respects the coding cookie
 | |
|         _compile = _testcapi.Py_CompileString
 | |
|         code = b"# -*- coding: latin1 -*-\nprint('\xc2\xa4')\n"
 | |
|         result = _compile(code)
 | |
|         expected = compile(code, "<string>", "exec")
 | |
|         self.assertEqual(result.co_consts, expected.co_consts)
 | |
| 
 | |
|     def test_export_symbols(self):
 | |
|         # bpo-44133: Ensure that the "Py_FrozenMain" and
 | |
|         # "PyThread_get_thread_native_id" symbols are exported by the Python
 | |
|         # (directly by the binary, or via by the Python dynamic library).
 | |
|         ctypes = import_helper.import_module('ctypes')
 | |
|         names = []
 | |
| 
 | |
|         # Test if the PY_HAVE_THREAD_NATIVE_ID macro is defined
 | |
|         if hasattr(_thread, 'get_native_id'):
 | |
|             names.append('PyThread_get_thread_native_id')
 | |
| 
 | |
|         # Python/frozenmain.c fails to build on Windows when the symbols are
 | |
|         # missing:
 | |
|         # - PyWinFreeze_ExeInit
 | |
|         # - PyWinFreeze_ExeTerm
 | |
|         # - PyInitFrozenExtensions
 | |
|         if os.name != 'nt':
 | |
|             names.append('Py_FrozenMain')
 | |
| 
 | |
|         for name in names:
 | |
|             with self.subTest(name=name):
 | |
|                 self.assertTrue(hasattr(ctypes.pythonapi, name))
 | |
| 
 | |
|     def test_clear_managed_dict(self):
 | |
| 
 | |
|         class C:
 | |
|             def __init__(self):
 | |
|                 self.a = 1
 | |
| 
 | |
|         c = C()
 | |
|         _testcapi.clear_managed_dict(c)
 | |
|         self.assertEqual(c.__dict__, {})
 | |
|         c = C()
 | |
|         self.assertEqual(c.__dict__, {'a':1})
 | |
|         _testcapi.clear_managed_dict(c)
 | |
|         self.assertEqual(c.__dict__, {})
 | |
| 
 | |
|     def test_eval_get_func_name(self):
 | |
|         def function_example(): ...
 | |
| 
 | |
|         class A:
 | |
|             def method_example(self): ...
 | |
| 
 | |
|         self.assertEqual(_testcapi.eval_get_func_name(function_example),
 | |
|                          "function_example")
 | |
|         self.assertEqual(_testcapi.eval_get_func_name(A.method_example),
 | |
|                          "method_example")
 | |
|         self.assertEqual(_testcapi.eval_get_func_name(A().method_example),
 | |
|                          "method_example")
 | |
|         self.assertEqual(_testcapi.eval_get_func_name(sum), "sum")  # c function
 | |
|         self.assertEqual(_testcapi.eval_get_func_name(A), "type")
 | |
| 
 | |
|     def test_eval_get_func_desc(self):
 | |
|         def function_example(): ...
 | |
| 
 | |
|         class A:
 | |
|             def method_example(self): ...
 | |
| 
 | |
|         self.assertEqual(_testcapi.eval_get_func_desc(function_example),
 | |
|                          "()")
 | |
|         self.assertEqual(_testcapi.eval_get_func_desc(A.method_example),
 | |
|                          "()")
 | |
|         self.assertEqual(_testcapi.eval_get_func_desc(A().method_example),
 | |
|                          "()")
 | |
|         self.assertEqual(_testcapi.eval_get_func_desc(sum), "()")  # c function
 | |
|         self.assertEqual(_testcapi.eval_get_func_desc(A), " object")
 | |
| 
 | |
|     def test_function_get_code(self):
 | |
|         import types
 | |
| 
 | |
|         def some():
 | |
|             pass
 | |
| 
 | |
|         code = _testcapi.function_get_code(some)
 | |
|         self.assertIsInstance(code, types.CodeType)
 | |
|         self.assertEqual(code, some.__code__)
 | |
| 
 | |
|         with self.assertRaises(SystemError):
 | |
|             _testcapi.function_get_code(None)  # not a function
 | |
| 
 | |
|     def test_function_get_globals(self):
 | |
|         def some():
 | |
|             pass
 | |
| 
 | |
|         globals_ = _testcapi.function_get_globals(some)
 | |
|         self.assertIsInstance(globals_, dict)
 | |
|         self.assertEqual(globals_, some.__globals__)
 | |
| 
 | |
|         with self.assertRaises(SystemError):
 | |
|             _testcapi.function_get_globals(None)  # not a function
 | |
| 
 | |
|     def test_function_get_module(self):
 | |
|         def some():
 | |
|             pass
 | |
| 
 | |
|         module = _testcapi.function_get_module(some)
 | |
|         self.assertIsInstance(module, str)
 | |
|         self.assertEqual(module, some.__module__)
 | |
| 
 | |
|         with self.assertRaises(SystemError):
 | |
|             _testcapi.function_get_module(None)  # not a function
 | |
| 
 | |
|     def test_function_get_defaults(self):
 | |
|         def some(
 | |
|             pos_only1, pos_only2='p',
 | |
|             /,
 | |
|             zero=0, optional=None,
 | |
|             *,
 | |
|             kw1,
 | |
|             kw2=True,
 | |
|         ):
 | |
|             pass
 | |
| 
 | |
|         defaults = _testcapi.function_get_defaults(some)
 | |
|         self.assertEqual(defaults, ('p', 0, None))
 | |
|         self.assertEqual(defaults, some.__defaults__)
 | |
| 
 | |
|         with self.assertRaises(SystemError):
 | |
|             _testcapi.function_get_defaults(None)  # not a function
 | |
| 
 | |
|     def test_function_set_defaults(self):
 | |
|         def some(
 | |
|             pos_only1, pos_only2='p',
 | |
|             /,
 | |
|             zero=0, optional=None,
 | |
|             *,
 | |
|             kw1,
 | |
|             kw2=True,
 | |
|         ):
 | |
|             pass
 | |
| 
 | |
|         old_defaults = ('p', 0, None)
 | |
|         self.assertEqual(_testcapi.function_get_defaults(some), old_defaults)
 | |
|         self.assertEqual(some.__defaults__, old_defaults)
 | |
| 
 | |
|         with self.assertRaises(SystemError):
 | |
|             _testcapi.function_set_defaults(some, 1)  # not tuple or None
 | |
|         self.assertEqual(_testcapi.function_get_defaults(some), old_defaults)
 | |
|         self.assertEqual(some.__defaults__, old_defaults)
 | |
| 
 | |
|         with self.assertRaises(SystemError):
 | |
|             _testcapi.function_set_defaults(1, ())    # not a function
 | |
|         self.assertEqual(_testcapi.function_get_defaults(some), old_defaults)
 | |
|         self.assertEqual(some.__defaults__, old_defaults)
 | |
| 
 | |
|         new_defaults = ('q', 1, None)
 | |
|         _testcapi.function_set_defaults(some, new_defaults)
 | |
|         self.assertEqual(_testcapi.function_get_defaults(some), new_defaults)
 | |
|         self.assertEqual(some.__defaults__, new_defaults)
 | |
| 
 | |
|         # Empty tuple is fine:
 | |
|         new_defaults = ()
 | |
|         _testcapi.function_set_defaults(some, new_defaults)
 | |
|         self.assertEqual(_testcapi.function_get_defaults(some), new_defaults)
 | |
|         self.assertEqual(some.__defaults__, new_defaults)
 | |
| 
 | |
|         class tuplesub(tuple): ...  # tuple subclasses must work
 | |
| 
 | |
|         new_defaults = tuplesub(((1, 2), ['a', 'b'], None))
 | |
|         _testcapi.function_set_defaults(some, new_defaults)
 | |
|         self.assertEqual(_testcapi.function_get_defaults(some), new_defaults)
 | |
|         self.assertEqual(some.__defaults__, new_defaults)
 | |
| 
 | |
|         # `None` is special, it sets `defaults` to `NULL`,
 | |
|         # it needs special handling in `_testcapi`:
 | |
|         _testcapi.function_set_defaults(some, None)
 | |
|         self.assertEqual(_testcapi.function_get_defaults(some), None)
 | |
|         self.assertEqual(some.__defaults__, None)
 | |
| 
 | |
|     def test_function_get_kw_defaults(self):
 | |
|         def some(
 | |
|             pos_only1, pos_only2='p',
 | |
|             /,
 | |
|             zero=0, optional=None,
 | |
|             *,
 | |
|             kw1,
 | |
|             kw2=True,
 | |
|         ):
 | |
|             pass
 | |
| 
 | |
|         defaults = _testcapi.function_get_kw_defaults(some)
 | |
|         self.assertEqual(defaults, {'kw2': True})
 | |
|         self.assertEqual(defaults, some.__kwdefaults__)
 | |
| 
 | |
|         with self.assertRaises(SystemError):
 | |
|             _testcapi.function_get_kw_defaults(None)  # not a function
 | |
| 
 | |
|     def test_function_set_kw_defaults(self):
 | |
|         def some(
 | |
|             pos_only1, pos_only2='p',
 | |
|             /,
 | |
|             zero=0, optional=None,
 | |
|             *,
 | |
|             kw1,
 | |
|             kw2=True,
 | |
|         ):
 | |
|             pass
 | |
| 
 | |
|         old_defaults = {'kw2': True}
 | |
|         self.assertEqual(_testcapi.function_get_kw_defaults(some), old_defaults)
 | |
|         self.assertEqual(some.__kwdefaults__, old_defaults)
 | |
| 
 | |
|         with self.assertRaises(SystemError):
 | |
|             _testcapi.function_set_kw_defaults(some, 1)  # not dict or None
 | |
|         self.assertEqual(_testcapi.function_get_kw_defaults(some), old_defaults)
 | |
|         self.assertEqual(some.__kwdefaults__, old_defaults)
 | |
| 
 | |
|         with self.assertRaises(SystemError):
 | |
|             _testcapi.function_set_kw_defaults(1, {})    # not a function
 | |
|         self.assertEqual(_testcapi.function_get_kw_defaults(some), old_defaults)
 | |
|         self.assertEqual(some.__kwdefaults__, old_defaults)
 | |
| 
 | |
|         new_defaults = {'kw2': (1, 2, 3)}
 | |
|         _testcapi.function_set_kw_defaults(some, new_defaults)
 | |
|         self.assertEqual(_testcapi.function_get_kw_defaults(some), new_defaults)
 | |
|         self.assertEqual(some.__kwdefaults__, new_defaults)
 | |
| 
 | |
|         # Empty dict is fine:
 | |
|         new_defaults = {}
 | |
|         _testcapi.function_set_kw_defaults(some, new_defaults)
 | |
|         self.assertEqual(_testcapi.function_get_kw_defaults(some), new_defaults)
 | |
|         self.assertEqual(some.__kwdefaults__, new_defaults)
 | |
| 
 | |
|         class dictsub(dict): ...  # dict subclasses must work
 | |
| 
 | |
|         new_defaults = dictsub({'kw2': None})
 | |
|         _testcapi.function_set_kw_defaults(some, new_defaults)
 | |
|         self.assertEqual(_testcapi.function_get_kw_defaults(some), new_defaults)
 | |
|         self.assertEqual(some.__kwdefaults__, new_defaults)
 | |
| 
 | |
|         # `None` is special, it sets `kwdefaults` to `NULL`,
 | |
|         # it needs special handling in `_testcapi`:
 | |
|         _testcapi.function_set_kw_defaults(some, None)
 | |
|         self.assertEqual(_testcapi.function_get_kw_defaults(some), None)
 | |
|         self.assertEqual(some.__kwdefaults__, None)
 | |
| 
 | |
|     def test_unstable_gc_new_with_extra_data(self):
 | |
|         class Data(_testcapi.ObjExtraData):
 | |
|             __slots__ = ('x', 'y')
 | |
| 
 | |
|         d = Data()
 | |
|         d.x = 10
 | |
|         d.y = 20
 | |
|         d.extra = 30
 | |
|         self.assertEqual(d.x, 10)
 | |
|         self.assertEqual(d.y, 20)
 | |
|         self.assertEqual(d.extra, 30)
 | |
|         del d.extra
 | |
|         self.assertIsNone(d.extra)
 | |
| 
 | |
| 
 | |
| @requires_limited_api
 | |
| class TestHeapTypeRelative(unittest.TestCase):
 | |
|     """Test API for extending opaque types (PEP 697)"""
 | |
| 
 | |
|     @requires_limited_api
 | |
|     def test_heaptype_relative_sizes(self):
 | |
|         # Test subclassing using "relative" basicsize, see PEP 697
 | |
|         def check(extra_base_size, extra_size):
 | |
|             Base, Sub, instance, data_ptr, data_offset, data_size = (
 | |
|                 _testcapi.make_sized_heaptypes(
 | |
|                     extra_base_size, -extra_size))
 | |
| 
 | |
|             # no alignment shenanigans when inheriting directly
 | |
|             if extra_size == 0:
 | |
|                 self.assertEqual(Base.__basicsize__, Sub.__basicsize__)
 | |
|                 self.assertEqual(data_size, 0)
 | |
| 
 | |
|             else:
 | |
|                 # The following offsets should be in increasing order:
 | |
|                 offsets = [
 | |
|                     (0, 'start of object'),
 | |
|                     (Base.__basicsize__, 'end of base data'),
 | |
|                     (data_offset, 'subclass data'),
 | |
|                     (data_offset + extra_size, 'end of requested subcls data'),
 | |
|                     (data_offset + data_size, 'end of reserved subcls data'),
 | |
|                     (Sub.__basicsize__, 'end of object'),
 | |
|                 ]
 | |
|                 ordered_offsets = sorted(offsets, key=operator.itemgetter(0))
 | |
|                 self.assertEqual(
 | |
|                     offsets, ordered_offsets,
 | |
|                     msg=f'Offsets not in expected order, got: {ordered_offsets}')
 | |
| 
 | |
|                 # end of reserved subcls data == end of object
 | |
|                 self.assertEqual(Sub.__basicsize__, data_offset + data_size)
 | |
| 
 | |
|                 # we don't reserve (requested + alignment) or more data
 | |
|                 self.assertLess(data_size - extra_size,
 | |
|                                 _testcapi.ALIGNOF_MAX_ALIGN_T)
 | |
| 
 | |
|             # The offsets/sizes we calculated should be aligned.
 | |
|             self.assertEqual(data_offset % _testcapi.ALIGNOF_MAX_ALIGN_T, 0)
 | |
|             self.assertEqual(data_size % _testcapi.ALIGNOF_MAX_ALIGN_T, 0)
 | |
| 
 | |
|         sizes = sorted({0, 1, 2, 3, 4, 7, 8, 123,
 | |
|                         object.__basicsize__,
 | |
|                         object.__basicsize__-1,
 | |
|                         object.__basicsize__+1})
 | |
|         for extra_base_size in sizes:
 | |
|             for extra_size in sizes:
 | |
|                 args = dict(extra_base_size=extra_base_size,
 | |
|                             extra_size=extra_size)
 | |
|                 with self.subTest(**args):
 | |
|                     check(**args)
 | |
| 
 | |
|     def test_HeapCCollection(self):
 | |
|         """Make sure HeapCCollection works properly by itself"""
 | |
|         collection = _testcapi.HeapCCollection(1, 2, 3)
 | |
|         self.assertEqual(list(collection), [1, 2, 3])
 | |
| 
 | |
|     def test_heaptype_inherit_itemsize(self):
 | |
|         """Test HeapCCollection subclasses work properly"""
 | |
|         sizes = sorted({0, 1, 2, 3, 4, 7, 8, 123,
 | |
|                         object.__basicsize__,
 | |
|                         object.__basicsize__-1,
 | |
|                         object.__basicsize__+1})
 | |
|         for extra_size in sizes:
 | |
|             with self.subTest(extra_size=extra_size):
 | |
|                 Sub = _testcapi.subclass_var_heaptype(
 | |
|                     _testcapi.HeapCCollection, -extra_size, 0, 0)
 | |
|                 collection = Sub(1, 2, 3)
 | |
|                 collection.set_data_to_3s()
 | |
| 
 | |
|                 self.assertEqual(list(collection), [1, 2, 3])
 | |
|                 mem = collection.get_data()
 | |
|                 self.assertGreaterEqual(len(mem), extra_size)
 | |
|                 self.assertTrue(set(mem) <= {3}, f'got {mem!r}')
 | |
| 
 | |
|     def test_heaptype_invalid_inheritance(self):
 | |
|         with self.assertRaises(SystemError,
 | |
|                                msg="Cannot extend variable-size class without "
 | |
|                                + "Py_TPFLAGS_ITEMS_AT_END"):
 | |
|             _testcapi.subclass_heaptype(int, -8, 0)
 | |
| 
 | |
|     def test_heaptype_relative_members(self):
 | |
|         """Test HeapCCollection subclasses work properly"""
 | |
|         sizes = sorted({0, 1, 2, 3, 4, 7, 8, 123,
 | |
|                         object.__basicsize__,
 | |
|                         object.__basicsize__-1,
 | |
|                         object.__basicsize__+1})
 | |
|         for extra_base_size in sizes:
 | |
|             for extra_size in sizes:
 | |
|                 for offset in sizes:
 | |
|                     with self.subTest(extra_base_size=extra_base_size, extra_size=extra_size, offset=offset):
 | |
|                         if offset < extra_size:
 | |
|                             Sub = _testcapi.make_heaptype_with_member(
 | |
|                                 extra_base_size, -extra_size, offset, True)
 | |
|                             Base = Sub.mro()[1]
 | |
|                             instance = Sub()
 | |
|                             self.assertEqual(instance.memb, instance.get_memb())
 | |
|                             instance.set_memb(13)
 | |
|                             self.assertEqual(instance.memb, instance.get_memb())
 | |
|                             self.assertEqual(instance.get_memb(), 13)
 | |
|                             instance.memb = 14
 | |
|                             self.assertEqual(instance.memb, instance.get_memb())
 | |
|                             self.assertEqual(instance.get_memb(), 14)
 | |
|                             self.assertGreaterEqual(instance.get_memb_offset(), Base.__basicsize__)
 | |
|                             self.assertLess(instance.get_memb_offset(), Sub.__basicsize__)
 | |
|                             with self.assertRaises(SystemError):
 | |
|                                 instance.get_memb_relative()
 | |
|                             with self.assertRaises(SystemError):
 | |
|                                 instance.set_memb_relative(0)
 | |
|                         else:
 | |
|                             with self.assertRaises(SystemError):
 | |
|                                 Sub = _testcapi.make_heaptype_with_member(
 | |
|                                     extra_base_size, -extra_size, offset, True)
 | |
|                         with self.assertRaises(SystemError):
 | |
|                             Sub = _testcapi.make_heaptype_with_member(
 | |
|                                 extra_base_size, extra_size, offset, True)
 | |
|                 with self.subTest(extra_base_size=extra_base_size, extra_size=extra_size):
 | |
|                     with self.assertRaises(SystemError):
 | |
|                         Sub = _testcapi.make_heaptype_with_member(
 | |
|                             extra_base_size, -extra_size, -1, True)
 | |
| 
 | |
|     def test_heaptype_relative_members_errors(self):
 | |
|         with self.assertRaisesRegex(
 | |
|                 SystemError,
 | |
|                 r"With Py_RELATIVE_OFFSET, basicsize must be negative"):
 | |
|             _testcapi.make_heaptype_with_member(0, 1234, 0, True)
 | |
|         with self.assertRaisesRegex(
 | |
|                 SystemError, r"Member offset out of range \(0\.\.-basicsize\)"):
 | |
|             _testcapi.make_heaptype_with_member(0, -8, 1234, True)
 | |
|         with self.assertRaisesRegex(
 | |
|                 SystemError, r"Member offset out of range \(0\.\.-basicsize\)"):
 | |
|             _testcapi.make_heaptype_with_member(0, -8, -1, True)
 | |
| 
 | |
|         Sub = _testcapi.make_heaptype_with_member(0, -8, 0, True)
 | |
|         instance = Sub()
 | |
|         with self.assertRaisesRegex(
 | |
|                 SystemError, r"PyMember_GetOne used with Py_RELATIVE_OFFSET"):
 | |
|             instance.get_memb_relative()
 | |
|         with self.assertRaisesRegex(
 | |
|                 SystemError, r"PyMember_SetOne used with Py_RELATIVE_OFFSET"):
 | |
|             instance.set_memb_relative(0)
 | |
| 
 | |
|     def test_pyobject_getitemdata_error(self):
 | |
|         """Test PyObject_GetItemData fails on unsupported types"""
 | |
|         with self.assertRaises(TypeError):
 | |
|             # None is not variable-length
 | |
|             _testcapi.pyobject_getitemdata(None)
 | |
|         with self.assertRaises(TypeError):
 | |
|             # int is variable-length, but doesn't have the
 | |
|             # Py_TPFLAGS_ITEMS_AT_END layout (and flag)
 | |
|             _testcapi.pyobject_getitemdata(0)
 | |
| 
 | |
| 
 | |
| class TestPendingCalls(unittest.TestCase):
 | |
| 
 | |
|     # See the comment in ceval.c (at the "handle_eval_breaker" label)
 | |
|     # about when pending calls get run.  This is especially relevant
 | |
|     # here for creating deterministic tests.
 | |
| 
 | |
|     def pendingcalls_submit(self, l, n):
 | |
|         def callback():
 | |
|             #this function can be interrupted by thread switching so let's
 | |
|             #use an atomic operation
 | |
|             l.append(None)
 | |
| 
 | |
|         for i in range(n):
 | |
|             time.sleep(random.random()*0.02) #0.01 secs on average
 | |
|             #try submitting callback until successful.
 | |
|             #rely on regular interrupt to flush queue if we are
 | |
|             #unsuccessful.
 | |
|             while True:
 | |
|                 if _testcapi._pending_threadfunc(callback):
 | |
|                     break
 | |
| 
 | |
|     def pendingcalls_wait(self, l, n, context = None):
 | |
|         #now, stick around until l[0] has grown to 10
 | |
|         count = 0
 | |
|         while len(l) != n:
 | |
|             #this busy loop is where we expect to be interrupted to
 | |
|             #run our callbacks.  Note that callbacks are only run on the
 | |
|             #main thread
 | |
|             if False and support.verbose:
 | |
|                 print("(%i)"%(len(l),),)
 | |
|             for i in range(1000):
 | |
|                 a = i*i
 | |
|             if context and not context.event.is_set():
 | |
|                 continue
 | |
|             count += 1
 | |
|             self.assertTrue(count < 10000,
 | |
|                 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
 | |
|         if False and support.verbose:
 | |
|             print("(%i)"%(len(l),))
 | |
| 
 | |
|     @threading_helper.requires_working_threading()
 | |
|     def test_pendingcalls_threaded(self):
 | |
| 
 | |
|         #do every callback on a separate thread
 | |
|         n = 32 #total callbacks
 | |
|         threads = []
 | |
|         class foo(object):pass
 | |
|         context = foo()
 | |
|         context.l = []
 | |
|         context.n = 2 #submits per thread
 | |
|         context.nThreads = n // context.n
 | |
|         context.nFinished = 0
 | |
|         context.lock = threading.Lock()
 | |
|         context.event = threading.Event()
 | |
| 
 | |
|         threads = [threading.Thread(target=self.pendingcalls_thread,
 | |
|                                     args=(context,))
 | |
|                    for i in range(context.nThreads)]
 | |
|         with threading_helper.start_threads(threads):
 | |
|             self.pendingcalls_wait(context.l, n, context)
 | |
| 
 | |
|     def pendingcalls_thread(self, context):
 | |
|         try:
 | |
|             self.pendingcalls_submit(context.l, context.n)
 | |
|         finally:
 | |
|             with context.lock:
 | |
|                 context.nFinished += 1
 | |
|                 nFinished = context.nFinished
 | |
|                 if False and support.verbose:
 | |
|                     print("finished threads: ", nFinished)
 | |
|             if nFinished == context.nThreads:
 | |
|                 context.event.set()
 | |
| 
 | |
|     def test_pendingcalls_non_threaded(self):
 | |
|         #again, just using the main thread, likely they will all be dispatched at
 | |
|         #once.  It is ok to ask for too many, because we loop until we find a slot.
 | |
|         #the loop can be interrupted to dispatch.
 | |
|         #there are only 32 dispatch slots, so we go for twice that!
 | |
|         l = []
 | |
|         n = 64
 | |
|         self.pendingcalls_submit(l, n)
 | |
|         self.pendingcalls_wait(l, n)
 | |
| 
 | |
|     def test_gen_get_code(self):
 | |
|         def genf(): yield
 | |
|         gen = genf()
 | |
|         self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
 | |
| 
 | |
|     class PendingTask(types.SimpleNamespace):
 | |
| 
 | |
|         _add_pending = _testinternalcapi.pending_threadfunc
 | |
| 
 | |
|         def __init__(self, req, taskid=None, notify_done=None):
 | |
|             self.id = taskid
 | |
|             self.req = req
 | |
|             self.notify_done = notify_done
 | |
| 
 | |
|             self.creator_tid = threading.get_ident()
 | |
|             self.requester_tid = None
 | |
|             self.runner_tid = None
 | |
|             self.result = None
 | |
| 
 | |
|         def run(self):
 | |
|             assert self.result is None
 | |
|             self.runner_tid = threading.get_ident()
 | |
|             self._run()
 | |
|             if self.notify_done is not None:
 | |
|                 self.notify_done()
 | |
| 
 | |
|         def _run(self):
 | |
|             self.result = self.req
 | |
| 
 | |
|         def run_in_pending_call(self, worker_tids):
 | |
|             assert self._add_pending is _testinternalcapi.pending_threadfunc
 | |
|             self.requester_tid = threading.get_ident()
 | |
|             def callback():
 | |
|                 assert self.result is None
 | |
|                 # It can be tricky to control which thread handles
 | |
|                 # the eval breaker, so we take a naive approach to
 | |
|                 # make sure.
 | |
|                 if threading.get_ident() not in worker_tids:
 | |
|                     self._add_pending(callback, ensure_added=True)
 | |
|                     return
 | |
|                 self.run()
 | |
|             self._add_pending(callback, ensure_added=True)
 | |
| 
 | |
|         def create_thread(self, worker_tids):
 | |
|             return threading.Thread(
 | |
|                 target=self.run_in_pending_call,
 | |
|                 args=(worker_tids,),
 | |
|             )
 | |
| 
 | |
|         def wait_for_result(self):
 | |
|             while self.result is None:
 | |
|                 time.sleep(0.01)
 | |
| 
 | |
|     @threading_helper.requires_working_threading()
 | |
|     def test_subthreads_can_handle_pending_calls(self):
 | |
|         payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!'
 | |
| 
 | |
|         task = self.PendingTask(payload)
 | |
|         def do_the_work():
 | |
|             tid = threading.get_ident()
 | |
|             t = task.create_thread({tid})
 | |
|             with threading_helper.start_threads([t]):
 | |
|                 task.wait_for_result()
 | |
|         t = threading.Thread(target=do_the_work)
 | |
|         with threading_helper.start_threads([t]):
 | |
|             pass
 | |
| 
 | |
|         self.assertEqual(task.result, payload)
 | |
| 
 | |
|     @threading_helper.requires_working_threading()
 | |
|     def test_many_subthreads_can_handle_pending_calls(self):
 | |
|         main_tid = threading.get_ident()
 | |
|         self.assertEqual(threading.main_thread().ident, main_tid)
 | |
| 
 | |
|         # We can't use queue.Queue since it isn't reentrant relative
 | |
|         # to pending calls.
 | |
|         _queue = deque()
 | |
|         _active = deque()
 | |
|         _done_lock = threading.Lock()
 | |
|         def queue_put(task):
 | |
|             _queue.append(task)
 | |
|             _active.append(True)
 | |
|         def queue_get():
 | |
|             try:
 | |
|                 task = _queue.popleft()
 | |
|             except IndexError:
 | |
|                 raise queue.Empty
 | |
|             return task
 | |
|         def queue_task_done():
 | |
|             _active.pop()
 | |
|             if not _active:
 | |
|                 try:
 | |
|                     _done_lock.release()
 | |
|                 except RuntimeError:
 | |
|                     assert not _done_lock.locked()
 | |
|         def queue_empty():
 | |
|             return not _queue
 | |
|         def queue_join():
 | |
|             _done_lock.acquire()
 | |
|             _done_lock.release()
 | |
| 
 | |
|         tasks = []
 | |
|         for i in range(20):
 | |
|             task = self.PendingTask(
 | |
|                 req=f'request {i}',
 | |
|                 taskid=i,
 | |
|                 notify_done=queue_task_done,
 | |
|             )
 | |
|             tasks.append(task)
 | |
|             queue_put(task)
 | |
|         # This will be released once all the tasks have finished.
 | |
|         _done_lock.acquire()
 | |
| 
 | |
|         def add_tasks(worker_tids):
 | |
|             while True:
 | |
|                 if done:
 | |
|                     return
 | |
|                 try:
 | |
|                     task = queue_get()
 | |
|                 except queue.Empty:
 | |
|                     break
 | |
|                 task.run_in_pending_call(worker_tids)
 | |
| 
 | |
|         done = False
 | |
|         def run_tasks():
 | |
|             while not queue_empty():
 | |
|                 if done:
 | |
|                     return
 | |
|                 time.sleep(0.01)
 | |
|             # Give the worker a chance to handle any remaining pending calls.
 | |
|             while not done:
 | |
|                 time.sleep(0.01)
 | |
| 
 | |
|         # Start the workers and wait for them to finish.
 | |
|         worker_threads = [threading.Thread(target=run_tasks)
 | |
|                           for _ in range(3)]
 | |
|         with threading_helper.start_threads(worker_threads):
 | |
|             try:
 | |
|                 # Add a pending call for each task.
 | |
|                 worker_tids = [t.ident for t in worker_threads]
 | |
|                 threads = [threading.Thread(target=add_tasks, args=(worker_tids,))
 | |
|                            for _ in range(3)]
 | |
|                 with threading_helper.start_threads(threads):
 | |
|                     try:
 | |
|                         pass
 | |
|                     except BaseException:
 | |
|                         done = True
 | |
|                         raise  # re-raise
 | |
|                 # Wait for the pending calls to finish.
 | |
|                 queue_join()
 | |
|                 # Notify the workers that they can stop.
 | |
|                 done = True
 | |
|             except BaseException:
 | |
|                 done = True
 | |
|                 raise  # re-raise
 | |
|         runner_tids = [t.runner_tid for t in tasks]
 | |
| 
 | |
|         self.assertNotIn(main_tid, runner_tids)
 | |
|         for task in tasks:
 | |
|             with self.subTest(f'task {task.id}'):
 | |
|                 self.assertNotEqual(task.requester_tid, main_tid)
 | |
|                 self.assertNotEqual(task.requester_tid, task.runner_tid)
 | |
|                 self.assertNotIn(task.requester_tid, runner_tids)
 | |
| 
 | |
|     @requires_subinterpreters
 | |
|     def test_isolated_subinterpreter(self):
 | |
|         # We exercise the most important permutations.
 | |
| 
 | |
|         # This test relies on pending calls getting called
 | |
|         # (eval breaker tripped) at each loop iteration
 | |
|         # and at each call.
 | |
| 
 | |
|         maxtext = 250
 | |
|         main_interpid = 0
 | |
|         interpid = _interpreters.create()
 | |
|         _interpreters.run_string(interpid, f"""if True:
 | |
|             import json
 | |
|             import os
 | |
|             import threading
 | |
|             import time
 | |
|             import _testinternalcapi
 | |
|             from test.support import threading_helper
 | |
|             """)
 | |
| 
 | |
|         def create_pipe():
 | |
|             r, w = os.pipe()
 | |
|             self.addCleanup(lambda: os.close(r))
 | |
|             self.addCleanup(lambda: os.close(w))
 | |
|             return r, w
 | |
| 
 | |
|         with self.subTest('add in main, run in subinterpreter'):
 | |
|             r_ready, w_ready = create_pipe()
 | |
|             r_done, w_done= create_pipe()
 | |
|             timeout = time.time() + 30  # seconds
 | |
| 
 | |
|             def do_work():
 | |
|                 _interpreters.run_string(interpid, f"""if True:
 | |
|                     # Wait until this interp has handled the pending call.
 | |
|                     waiting = False
 | |
|                     done = False
 | |
|                     def wait(os_read=os.read):
 | |
|                         global done, waiting
 | |
|                         waiting = True
 | |
|                         os_read({r_done}, 1)
 | |
|                         done = True
 | |
|                     t = threading.Thread(target=wait)
 | |
|                     with threading_helper.start_threads([t]):
 | |
|                         while not waiting:
 | |
|                             pass
 | |
|                         os.write({w_ready}, b'\\0')
 | |
|                         # Loop to trigger the eval breaker.
 | |
|                         while not done:
 | |
|                             time.sleep(0.01)
 | |
|                             if time.time() > {timeout}:
 | |
|                                 raise Exception('timed out!')
 | |
|                     """)
 | |
|             t = threading.Thread(target=do_work)
 | |
|             with threading_helper.start_threads([t]):
 | |
|                 os.read(r_ready, 1)
 | |
|                 # Add the pending call and wait for it to finish.
 | |
|                 actual = _testinternalcapi.pending_identify(interpid)
 | |
|                 # Signal the subinterpreter to stop.
 | |
|                 os.write(w_done, b'\0')
 | |
| 
 | |
|             self.assertEqual(actual, int(interpid))
 | |
| 
 | |
|         with self.subTest('add in main, run in subinterpreter sub-thread'):
 | |
|             r_ready, w_ready = create_pipe()
 | |
|             r_done, w_done= create_pipe()
 | |
|             timeout = time.time() + 30  # seconds
 | |
| 
 | |
|             def do_work():
 | |
|                 _interpreters.run_string(interpid, f"""if True:
 | |
|                     waiting = False
 | |
|                     done = False
 | |
|                     def subthread():
 | |
|                         while not waiting:
 | |
|                             pass
 | |
|                         os.write({w_ready}, b'\\0')
 | |
|                         # Loop to trigger the eval breaker.
 | |
|                         while not done:
 | |
|                             time.sleep(0.01)
 | |
|                             if time.time() > {timeout}:
 | |
|                                 raise Exception('timed out!')
 | |
|                     t = threading.Thread(target=subthread)
 | |
|                     with threading_helper.start_threads([t]):
 | |
|                         # Wait until this interp has handled the pending call.
 | |
|                         waiting = True
 | |
|                         os.read({r_done}, 1)
 | |
|                         done = True
 | |
|                     """)
 | |
|             t = threading.Thread(target=do_work)
 | |
|             with threading_helper.start_threads([t]):
 | |
|                 os.read(r_ready, 1)
 | |
|                 # Add the pending call and wait for it to finish.
 | |
|                 actual = _testinternalcapi.pending_identify(interpid)
 | |
|                 # Signal the subinterpreter to stop.
 | |
|                 os.write(w_done, b'\0')
 | |
| 
 | |
|             self.assertEqual(actual, int(interpid))
 | |
| 
 | |
|         with self.subTest('add in subinterpreter, run in main'):
 | |
|             r_ready, w_ready = create_pipe()
 | |
|             r_done, w_done= create_pipe()
 | |
|             r_data, w_data= create_pipe()
 | |
|             timeout = time.time() + 30  # seconds
 | |
| 
 | |
|             def add_job():
 | |
|                 os.read(r_ready, 1)
 | |
|                 _interpreters.run_string(interpid, f"""if True:
 | |
|                     # Add the pending call and wait for it to finish.
 | |
|                     actual = _testinternalcapi.pending_identify({main_interpid})
 | |
|                     # Signal the subinterpreter to stop.
 | |
|                     os.write({w_done}, b'\\0')
 | |
|                     os.write({w_data}, actual.to_bytes(1, 'little'))
 | |
|                     """)
 | |
|             # Wait until this interp has handled the pending call.
 | |
|             waiting = False
 | |
|             done = False
 | |
|             def wait(os_read=os.read):
 | |
|                 nonlocal done, waiting
 | |
|                 waiting = True
 | |
|                 os_read(r_done, 1)
 | |
|                 done = True
 | |
|             t1 = threading.Thread(target=add_job)
 | |
|             t2 = threading.Thread(target=wait)
 | |
|             with threading_helper.start_threads([t1, t2]):
 | |
|                 while not waiting:
 | |
|                     pass
 | |
|                 os.write(w_ready, b'\0')
 | |
|                 # Loop to trigger the eval breaker.
 | |
|                 while not done:
 | |
|                     time.sleep(0.01)
 | |
|                     if time.time() > timeout:
 | |
|                         raise Exception('timed out!')
 | |
|                 text = os.read(r_data, 1)
 | |
|             actual = int.from_bytes(text, 'little')
 | |
| 
 | |
|             self.assertEqual(actual, int(main_interpid))
 | |
| 
 | |
|         with self.subTest('add in subinterpreter, run in sub-thread'):
 | |
|             r_ready, w_ready = create_pipe()
 | |
|             r_done, w_done= create_pipe()
 | |
|             r_data, w_data= create_pipe()
 | |
|             timeout = time.time() + 30  # seconds
 | |
| 
 | |
|             def add_job():
 | |
|                 os.read(r_ready, 1)
 | |
|                 _interpreters.run_string(interpid, f"""if True:
 | |
|                     # Add the pending call and wait for it to finish.
 | |
|                     actual = _testinternalcapi.pending_identify({main_interpid})
 | |
|                     # Signal the subinterpreter to stop.
 | |
|                     os.write({w_done}, b'\\0')
 | |
|                     os.write({w_data}, actual.to_bytes(1, 'little'))
 | |
|                     """)
 | |
|             # Wait until this interp has handled the pending call.
 | |
|             waiting = False
 | |
|             done = False
 | |
|             def wait(os_read=os.read):
 | |
|                 nonlocal done, waiting
 | |
|                 waiting = True
 | |
|                 os_read(r_done, 1)
 | |
|                 done = True
 | |
|             def subthread():
 | |
|                 while not waiting:
 | |
|                     pass
 | |
|                 os.write(w_ready, b'\0')
 | |
|                 # Loop to trigger the eval breaker.
 | |
|                 while not done:
 | |
|                     time.sleep(0.01)
 | |
|                     if time.time() > timeout:
 | |
|                         raise Exception('timed out!')
 | |
|             t1 = threading.Thread(target=add_job)
 | |
|             t2 = threading.Thread(target=wait)
 | |
|             t3 = threading.Thread(target=subthread)
 | |
|             with threading_helper.start_threads([t1, t2, t3]):
 | |
|                 pass
 | |
|             text = os.read(r_data, 1)
 | |
|             actual = int.from_bytes(text, 'little')
 | |
| 
 | |
|             self.assertEqual(actual, int(main_interpid))
 | |
| 
 | |
|         # XXX We can't use the rest until gh-105716 is fixed.
 | |
|         return
 | |
| 
 | |
|         with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'):
 | |
|             r_ready, w_ready = create_pipe()
 | |
|             r_done, w_done= create_pipe()
 | |
|             r_data, w_data= create_pipe()
 | |
|             timeout = time.time() + 30  # seconds
 | |
| 
 | |
|             def do_work():
 | |
|                 _interpreters.run_string(interpid, f"""if True:
 | |
|                     waiting = False
 | |
|                     done = False
 | |
|                     def subthread():
 | |
|                         while not waiting:
 | |
|                             pass
 | |
|                         os.write({w_ready}, b'\\0')
 | |
|                         # Loop to trigger the eval breaker.
 | |
|                         while not done:
 | |
|                             time.sleep(0.01)
 | |
|                             if time.time() > {timeout}:
 | |
|                                 raise Exception('timed out!')
 | |
|                     t = threading.Thread(target=subthread)
 | |
|                     with threading_helper.start_threads([t]):
 | |
|                         # Wait until this interp has handled the pending call.
 | |
|                         waiting = True
 | |
|                         os.read({r_done}, 1)
 | |
|                         done = True
 | |
|                     """)
 | |
|             t = threading.Thread(target=do_work)
 | |
|             #with threading_helper.start_threads([t]):
 | |
|             t.start()
 | |
|             if True:
 | |
|                 os.read(r_ready, 1)
 | |
|                 _interpreters.run_string(interpid, f"""if True:
 | |
|                     # Add the pending call and wait for it to finish.
 | |
|                     actual = _testinternalcapi.pending_identify({interpid})
 | |
|                     # Signal the subinterpreter to stop.
 | |
|                     os.write({w_done}, b'\\0')
 | |
|                     os.write({w_data}, actual.to_bytes(1, 'little'))
 | |
|                     """)
 | |
|             t.join()
 | |
|             text = os.read(r_data, 1)
 | |
|             actual = int.from_bytes(text, 'little')
 | |
| 
 | |
|             self.assertEqual(actual, int(interpid))
 | |
| 
 | |
| 
 | |
| class SubinterpreterTest(unittest.TestCase):
 | |
| 
 | |
|     @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
 | |
|     def test_subinterps(self):
 | |
|         import builtins
 | |
|         r, w = os.pipe()
 | |
|         code = """if 1:
 | |
|             import sys, builtins, pickle
 | |
|             with open({:d}, "wb") as f:
 | |
|                 pickle.dump(id(sys.modules), f)
 | |
|                 pickle.dump(id(builtins), f)
 | |
|             """.format(w)
 | |
|         with open(r, "rb") as f:
 | |
|             ret = support.run_in_subinterp(code)
 | |
|             self.assertEqual(ret, 0)
 | |
|             self.assertNotEqual(pickle.load(f), id(sys.modules))
 | |
|             self.assertNotEqual(pickle.load(f), id(builtins))
 | |
| 
 | |
|     @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
 | |
|     def test_subinterps_recent_language_features(self):
 | |
|         r, w = os.pipe()
 | |
|         code = """if 1:
 | |
|             import pickle
 | |
|             with open({:d}, "wb") as f:
 | |
| 
 | |
|                 @(lambda x:x)  # Py 3.9
 | |
|                 def noop(x): return x
 | |
| 
 | |
|                 a = (b := f'1{{2}}3') + noop('x')  # Py 3.8 (:=) / 3.6 (f'')
 | |
| 
 | |
|                 async def foo(arg): return await arg  # Py 3.5
 | |
| 
 | |
|                 pickle.dump(dict(a=a, b=b), f)
 | |
|             """.format(w)
 | |
| 
 | |
|         with open(r, "rb") as f:
 | |
|             ret = support.run_in_subinterp(code)
 | |
|             self.assertEqual(ret, 0)
 | |
|             self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'})
 | |
| 
 | |
|     def test_py_config_isoloated_per_interpreter(self):
 | |
|         # A config change in one interpreter must not leak to out to others.
 | |
|         #
 | |
|         # This test could verify ANY config value, it just happens to have been
 | |
|         # written around the time of int_max_str_digits. Refactoring is okay.
 | |
|         code = """if 1:
 | |
|         import sys, _testinternalcapi
 | |
| 
 | |
|         # Any config value would do, this happens to be the one being
 | |
|         # double checked at the time this test was written.
 | |
|         config = _testinternalcapi.get_config()
 | |
|         config['int_max_str_digits'] = 55555
 | |
|         _testinternalcapi.set_config(config)
 | |
|         sub_value = _testinternalcapi.get_config()['int_max_str_digits']
 | |
|         assert sub_value == 55555, sub_value
 | |
|         """
 | |
|         before_config = _testinternalcapi.get_config()
 | |
|         assert before_config['int_max_str_digits'] != 55555
 | |
|         self.assertEqual(support.run_in_subinterp(code), 0,
 | |
|                          'subinterp code failure, check stderr.')
 | |
|         after_config = _testinternalcapi.get_config()
 | |
|         self.assertIsNot(
 | |
|                 before_config, after_config,
 | |
|                 "Expected get_config() to return a new dict on each call")
 | |
|         self.assertEqual(before_config, after_config,
 | |
|                          "CAUTION: Tests executed after this may be "
 | |
|                          "running under an altered config.")
 | |
|         # try:...finally: calling set_config(before_config) not done
 | |
|         # as that results in sys.argv, sys.path, and sys.warnoptions
 | |
|         # "being modified by test_capi" per test.regrtest.  So if this
 | |
|         # test fails, assume that the environment in this process may
 | |
|         # be altered and suspect.
 | |
| 
 | |
|     @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
 | |
|     def test_configured_settings(self):
 | |
|         """
 | |
|         The config with which an interpreter is created corresponds
 | |
|         1-to-1 with the new interpreter's settings.  This test verifies
 | |
|         that they match.
 | |
|         """
 | |
| 
 | |
|         OBMALLOC = 1<<5
 | |
|         EXTENSIONS = 1<<8
 | |
|         THREADS = 1<<10
 | |
|         DAEMON_THREADS = 1<<11
 | |
|         FORK = 1<<15
 | |
|         EXEC = 1<<16
 | |
|         ALL_FLAGS = (OBMALLOC | FORK | EXEC | THREADS | DAEMON_THREADS
 | |
|                      | EXTENSIONS);
 | |
| 
 | |
|         features = [
 | |
|             'obmalloc',
 | |
|             'fork',
 | |
|             'exec',
 | |
|             'threads',
 | |
|             'daemon_threads',
 | |
|             'extensions',
 | |
|             'own_gil',
 | |
|         ]
 | |
|         kwlist = [f'allow_{n}' for n in features]
 | |
|         kwlist[0] = 'use_main_obmalloc'
 | |
|         kwlist[-2] = 'check_multi_interp_extensions'
 | |
|         kwlist[-1] = 'own_gil'
 | |
| 
 | |
|         # expected to work
 | |
|         for config, expected in {
 | |
|             (True, True, True, True, True, True, True):
 | |
|                 (ALL_FLAGS, True),
 | |
|             (True, False, False, False, False, False, False):
 | |
|                 (OBMALLOC, False),
 | |
|             (False, False, False, True, False, True, False):
 | |
|                 (THREADS | EXTENSIONS, False),
 | |
|         }.items():
 | |
|             kwargs = dict(zip(kwlist, config))
 | |
|             exp_flags, exp_gil = expected
 | |
|             expected = {
 | |
|                 'feature_flags': exp_flags,
 | |
|                 'own_gil': exp_gil,
 | |
|             }
 | |
|             with self.subTest(config):
 | |
|                 r, w = os.pipe()
 | |
|                 script = textwrap.dedent(f'''
 | |
|                     import _testinternalcapi, json, os
 | |
|                     settings = _testinternalcapi.get_interp_settings()
 | |
|                     with os.fdopen({w}, "w") as stdin:
 | |
|                         json.dump(settings, stdin)
 | |
|                     ''')
 | |
|                 with os.fdopen(r) as stdout:
 | |
|                     ret = support.run_in_subinterp_with_config(script, **kwargs)
 | |
|                     self.assertEqual(ret, 0)
 | |
|                     out = stdout.read()
 | |
|                 settings = json.loads(out)
 | |
| 
 | |
|                 self.assertEqual(settings, expected)
 | |
| 
 | |
|         # expected to fail
 | |
|         for config in [
 | |
|             (False, False, False, False, False, False, False),
 | |
|         ]:
 | |
|             kwargs = dict(zip(kwlist, config))
 | |
|             with self.subTest(config):
 | |
|                 script = textwrap.dedent(f'''
 | |
|                     import _testinternalcapi
 | |
|                     _testinternalcapi.get_interp_settings()
 | |
|                     raise NotImplementedError('unreachable')
 | |
|                     ''')
 | |
|                 with self.assertRaises(RuntimeError):
 | |
|                     support.run_in_subinterp_with_config(script, **kwargs)
 | |
| 
 | |
|     @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
 | |
|     @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
 | |
|     def test_overridden_setting_extensions_subinterp_check(self):
 | |
|         """
 | |
|         PyInterpreterConfig.check_multi_interp_extensions can be overridden
 | |
|         with PyInterpreterState.override_multi_interp_extensions_check.
 | |
|         This verifies that the override works but does not modify
 | |
|         the underlying setting.
 | |
|         """
 | |
| 
 | |
|         OBMALLOC = 1<<5
 | |
|         EXTENSIONS = 1<<8
 | |
|         THREADS = 1<<10
 | |
|         DAEMON_THREADS = 1<<11
 | |
|         FORK = 1<<15
 | |
|         EXEC = 1<<16
 | |
|         BASE_FLAGS = OBMALLOC | FORK | EXEC | THREADS | DAEMON_THREADS
 | |
|         base_kwargs = {
 | |
|             'use_main_obmalloc': True,
 | |
|             'allow_fork': True,
 | |
|             'allow_exec': True,
 | |
|             'allow_threads': True,
 | |
|             'allow_daemon_threads': True,
 | |
|             'own_gil': False,
 | |
|         }
 | |
| 
 | |
|         def check(enabled, override):
 | |
|             kwargs = dict(
 | |
|                 base_kwargs,
 | |
|                 check_multi_interp_extensions=enabled,
 | |
|             )
 | |
|             flags = BASE_FLAGS | EXTENSIONS if enabled else BASE_FLAGS
 | |
|             settings = {
 | |
|                 'feature_flags': flags,
 | |
|                 'own_gil': False,
 | |
|             }
 | |
| 
 | |
|             expected = {
 | |
|                 'requested': override,
 | |
|                 'override__initial': 0,
 | |
|                 'override_after': override,
 | |
|                 'override_restored': 0,
 | |
|                 # The override should not affect the config or settings.
 | |
|                 'settings__initial': settings,
 | |
|                 'settings_after': settings,
 | |
|                 'settings_restored': settings,
 | |
|                 # These are the most likely values to be wrong.
 | |
|                 'allowed__initial': not enabled,
 | |
|                 'allowed_after': not ((override > 0) if override else enabled),
 | |
|                 'allowed_restored': not enabled,
 | |
|             }
 | |
| 
 | |
|             r, w = os.pipe()
 | |
|             script = textwrap.dedent(f'''
 | |
|                 from test.test_capi.check_config import run_singlephase_check
 | |
|                 run_singlephase_check({override}, {w})
 | |
|                 ''')
 | |
|             with os.fdopen(r) as stdout:
 | |
|                 ret = support.run_in_subinterp_with_config(script, **kwargs)
 | |
|                 self.assertEqual(ret, 0)
 | |
|                 out = stdout.read()
 | |
|             results = json.loads(out)
 | |
| 
 | |
|             self.assertEqual(results, expected)
 | |
| 
 | |
|         self.maxDiff = None
 | |
| 
 | |
|         # setting: check disabled
 | |
|         with self.subTest('config: check disabled; override: disabled'):
 | |
|             check(False, -1)
 | |
|         with self.subTest('config: check disabled; override: use config'):
 | |
|             check(False, 0)
 | |
|         with self.subTest('config: check disabled; override: enabled'):
 | |
|             check(False, 1)
 | |
| 
 | |
|         # setting: check enabled
 | |
|         with self.subTest('config: check enabled; override: disabled'):
 | |
|             check(True, -1)
 | |
|         with self.subTest('config: check enabled; override: use config'):
 | |
|             check(True, 0)
 | |
|         with self.subTest('config: check enabled; override: enabled'):
 | |
|             check(True, 1)
 | |
| 
 | |
|     def test_mutate_exception(self):
 | |
|         """
 | |
|         Exceptions saved in global module state get shared between
 | |
|         individual module instances. This test checks whether or not
 | |
|         a change in one interpreter's module gets reflected into the
 | |
|         other ones.
 | |
|         """
 | |
|         import binascii
 | |
| 
 | |
|         support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'")
 | |
| 
 | |
|         self.assertFalse(hasattr(binascii.Error, "foobar"))
 | |
| 
 | |
|     @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
 | |
|     def test_module_state_shared_in_global(self):
 | |
|         """
 | |
|         bpo-44050: Extension module state should be shared between interpreters
 | |
|         when it doesn't support sub-interpreters.
 | |
|         """
 | |
|         r, w = os.pipe()
 | |
|         self.addCleanup(os.close, r)
 | |
|         self.addCleanup(os.close, w)
 | |
| 
 | |
|         script = textwrap.dedent(f"""
 | |
|             import importlib.machinery
 | |
|             import importlib.util
 | |
|             import os
 | |
| 
 | |
|             fullname = '_test_module_state_shared'
 | |
|             origin = importlib.util.find_spec('_testmultiphase').origin
 | |
|             loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
 | |
|             spec = importlib.util.spec_from_loader(fullname, loader)
 | |
|             module = importlib.util.module_from_spec(spec)
 | |
|             attr_id = str(id(module.Error)).encode()
 | |
| 
 | |
|             os.write({w}, attr_id)
 | |
|             """)
 | |
|         exec(script)
 | |
|         main_attr_id = os.read(r, 100)
 | |
| 
 | |
|         ret = support.run_in_subinterp(script)
 | |
|         self.assertEqual(ret, 0)
 | |
|         subinterp_attr_id = os.read(r, 100)
 | |
|         self.assertEqual(main_attr_id, subinterp_attr_id)
 | |
| 
 | |
| 
 | |
| class BuiltinStaticTypesTests(unittest.TestCase):
 | |
| 
 | |
|     TYPES = [
 | |
|         object,
 | |
|         type,
 | |
|         int,
 | |
|         str,
 | |
|         dict,
 | |
|         type(None),
 | |
|         bool,
 | |
|         BaseException,
 | |
|         Exception,
 | |
|         Warning,
 | |
|         DeprecationWarning,  # Warning subclass
 | |
|     ]
 | |
| 
 | |
|     def test_tp_bases_is_set(self):
 | |
|         # PyTypeObject.tp_bases is documented as public API.
 | |
|         # See https://github.com/python/cpython/issues/105020.
 | |
|         for typeobj in self.TYPES:
 | |
|             with self.subTest(typeobj):
 | |
|                 bases = _testcapi.type_get_tp_bases(typeobj)
 | |
|                 self.assertIsNot(bases, None)
 | |
| 
 | |
|     def test_tp_mro_is_set(self):
 | |
|         # PyTypeObject.tp_bases is documented as public API.
 | |
|         # See https://github.com/python/cpython/issues/105020.
 | |
|         for typeobj in self.TYPES:
 | |
|             with self.subTest(typeobj):
 | |
|                 mro = _testcapi.type_get_tp_mro(typeobj)
 | |
|                 self.assertIsNot(mro, None)
 | |
| 
 | |
| 
 | |
| class TestStaticTypes(unittest.TestCase):
 | |
| 
 | |
|     _has_run = False
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         # The tests here don't play nice with our approach to refleak
 | |
|         # detection, so we bail out in that case.
 | |
|         if cls._has_run:
 | |
|             raise unittest.SkipTest('these tests do not support re-running')
 | |
|         cls._has_run = True
 | |
| 
 | |
|     @contextlib.contextmanager
 | |
|     def basic_static_type(self, *args):
 | |
|         cls = _testcapi.get_basic_static_type(*args)
 | |
|         yield cls
 | |
| 
 | |
|     def test_pytype_ready_always_sets_tp_type(self):
 | |
|         # The point of this test is to prevent something like
 | |
|         # https://github.com/python/cpython/issues/104614
 | |
|         # from happening again.
 | |
| 
 | |
|         # First check when tp_base/tp_bases is *not* set before PyType_Ready().
 | |
|         with self.basic_static_type() as cls:
 | |
|             self.assertIs(cls.__base__, object);
 | |
|             self.assertEqual(cls.__bases__, (object,));
 | |
|             self.assertIs(type(cls), type(object));
 | |
| 
 | |
|         # Then check when we *do* set tp_base/tp_bases first.
 | |
|         with self.basic_static_type(object) as cls:
 | |
|             self.assertIs(cls.__base__, object);
 | |
|             self.assertEqual(cls.__bases__, (object,));
 | |
|             self.assertIs(type(cls), type(object));
 | |
| 
 | |
| 
 | |
| class TestThreadState(unittest.TestCase):
 | |
| 
 | |
|     @threading_helper.reap_threads
 | |
|     @threading_helper.requires_working_threading()
 | |
|     def test_thread_state(self):
 | |
|         # some extra thread-state tests driven via _testcapi
 | |
|         def target():
 | |
|             idents = []
 | |
| 
 | |
|             def callback():
 | |
|                 idents.append(threading.get_ident())
 | |
| 
 | |
|             _testcapi._test_thread_state(callback)
 | |
|             a = b = callback
 | |
|             time.sleep(1)
 | |
|             # Check our main thread is in the list exactly 3 times.
 | |
|             self.assertEqual(idents.count(threading.get_ident()), 3,
 | |
|                              "Couldn't find main thread correctly in the list")
 | |
| 
 | |
|         target()
 | |
|         t = threading.Thread(target=target)
 | |
|         t.start()
 | |
|         t.join()
 | |
| 
 | |
|     @threading_helper.reap_threads
 | |
|     @threading_helper.requires_working_threading()
 | |
|     def test_gilstate_ensure_no_deadlock(self):
 | |
|         # See https://github.com/python/cpython/issues/96071
 | |
|         code = textwrap.dedent("""
 | |
|             import _testcapi
 | |
| 
 | |
|             def callback():
 | |
|                 print('callback called')
 | |
| 
 | |
|             _testcapi._test_thread_state(callback)
 | |
|             """)
 | |
|         ret = assert_python_ok('-X', 'tracemalloc', '-c', code)
 | |
|         self.assertIn(b'callback called', ret.out)
 | |
| 
 | |
|     def test_gilstate_matches_current(self):
 | |
|         _testcapi.test_current_tstate_matches()
 | |
| 
 | |
| 
 | |
| class Test_testcapi(unittest.TestCase):
 | |
|     locals().update((name, getattr(_testcapi, name))
 | |
|                     for name in dir(_testcapi)
 | |
|                     if name.startswith('test_') and not name.endswith('_code'))
 | |
| 
 | |
|     # Suppress warning from PyUnicode_FromUnicode().
 | |
|     @warnings_helper.ignore_warnings(category=DeprecationWarning)
 | |
|     def test_widechar(self):
 | |
|         _testcapi.test_widechar()
 | |
| 
 | |
|     def test_version_api_data(self):
 | |
|         self.assertEqual(_testcapi.Py_Version, sys.hexversion)
 | |
| 
 | |
| 
 | |
| class Test_testinternalcapi(unittest.TestCase):
 | |
|     locals().update((name, getattr(_testinternalcapi, name))
 | |
|                     for name in dir(_testinternalcapi)
 | |
|                     if name.startswith('test_'))
 | |
| 
 | |
| 
 | |
| @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
 | |
| class Test_ModuleStateAccess(unittest.TestCase):
 | |
|     """Test access to module start (PEP 573)"""
 | |
| 
 | |
|     # The C part of the tests lives in _testmultiphase, in a module called
 | |
|     # _testmultiphase_meth_state_access.
 | |
|     # This module has multi-phase initialization, unlike _testcapi.
 | |
| 
 | |
|     def setUp(self):
 | |
|         fullname = '_testmultiphase_meth_state_access'  # XXX
 | |
|         origin = importlib.util.find_spec('_testmultiphase').origin
 | |
|         loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
 | |
|         spec = importlib.util.spec_from_loader(fullname, loader)
 | |
|         module = importlib.util.module_from_spec(spec)
 | |
|         loader.exec_module(module)
 | |
|         self.module = module
 | |
| 
 | |
|     def test_subclass_get_module(self):
 | |
|         """PyType_GetModule for defining_class"""
 | |
|         class StateAccessType_Subclass(self.module.StateAccessType):
 | |
|             pass
 | |
| 
 | |
|         instance = StateAccessType_Subclass()
 | |
|         self.assertIs(instance.get_defining_module(), self.module)
 | |
| 
 | |
|     def test_subclass_get_module_with_super(self):
 | |
|         class StateAccessType_Subclass(self.module.StateAccessType):
 | |
|             def get_defining_module(self):
 | |
|                 return super().get_defining_module()
 | |
| 
 | |
|         instance = StateAccessType_Subclass()
 | |
|         self.assertIs(instance.get_defining_module(), self.module)
 | |
| 
 | |
|     def test_state_access(self):
 | |
|         """Checks methods defined with and without argument clinic
 | |
| 
 | |
|         This tests a no-arg method (get_count) and a method with
 | |
|         both a positional and keyword argument.
 | |
|         """
 | |
| 
 | |
|         a = self.module.StateAccessType()
 | |
|         b = self.module.StateAccessType()
 | |
| 
 | |
|         methods = {
 | |
|             'clinic': a.increment_count_clinic,
 | |
|             'noclinic': a.increment_count_noclinic,
 | |
|         }
 | |
| 
 | |
|         for name, increment_count in methods.items():
 | |
|             with self.subTest(name):
 | |
|                 self.assertEqual(a.get_count(), b.get_count())
 | |
|                 self.assertEqual(a.get_count(), 0)
 | |
| 
 | |
|                 increment_count()
 | |
|                 self.assertEqual(a.get_count(), b.get_count())
 | |
|                 self.assertEqual(a.get_count(), 1)
 | |
| 
 | |
|                 increment_count(3)
 | |
|                 self.assertEqual(a.get_count(), b.get_count())
 | |
|                 self.assertEqual(a.get_count(), 4)
 | |
| 
 | |
|                 increment_count(-2, twice=True)
 | |
|                 self.assertEqual(a.get_count(), b.get_count())
 | |
|                 self.assertEqual(a.get_count(), 0)
 | |
| 
 | |
|                 with self.assertRaises(TypeError):
 | |
|                     increment_count(thrice=3)
 | |
| 
 | |
|                 with self.assertRaises(TypeError):
 | |
|                     increment_count(1, 2, 3)
 | |
| 
 | |
|     def test_get_module_bad_def(self):
 | |
|         # PyType_GetModuleByDef fails gracefully if it doesn't
 | |
|         # find what it's looking for.
 | |
|         # see bpo-46433
 | |
|         instance = self.module.StateAccessType()
 | |
|         with self.assertRaises(TypeError):
 | |
|             instance.getmodulebydef_bad_def()
 | |
| 
 | |
|     def test_get_module_static_in_mro(self):
 | |
|         # Here, the class PyType_GetModuleByDef is looking for
 | |
|         # appears in the MRO after a static type (Exception).
 | |
|         # see bpo-46433
 | |
|         class Subclass(BaseException, self.module.StateAccessType):
 | |
|             pass
 | |
|         self.assertIs(Subclass().get_defining_module(), self.module)
 | |
| 
 | |
| 
 | |
| class TestInternalFrameApi(unittest.TestCase):
 | |
| 
 | |
|     @staticmethod
 | |
|     def func():
 | |
|         return sys._getframe()
 | |
| 
 | |
|     def test_code(self):
 | |
|         frame = self.func()
 | |
|         code = _testinternalcapi.iframe_getcode(frame)
 | |
|         self.assertIs(code, self.func.__code__)
 | |
| 
 | |
|     def test_lasti(self):
 | |
|         frame = self.func()
 | |
|         lasti = _testinternalcapi.iframe_getlasti(frame)
 | |
|         self.assertGreater(lasti, 0)
 | |
|         self.assertLess(lasti, len(self.func.__code__.co_code))
 | |
| 
 | |
|     def test_line(self):
 | |
|         frame = self.func()
 | |
|         line = _testinternalcapi.iframe_getline(frame)
 | |
|         firstline = self.func.__code__.co_firstlineno
 | |
|         self.assertEqual(line, firstline + 2)
 | |
| 
 | |
| 
 | |
| SUFFICIENT_TO_DEOPT_AND_SPECIALIZE = 100
 | |
| 
 | |
| class Test_Pep523API(unittest.TestCase):
 | |
| 
 | |
|     def do_test(self, func, names):
 | |
|         actual_calls = []
 | |
|         start = SUFFICIENT_TO_DEOPT_AND_SPECIALIZE
 | |
|         count = start + SUFFICIENT_TO_DEOPT_AND_SPECIALIZE
 | |
|         try:
 | |
|             for i in range(count):
 | |
|                 if i == start:
 | |
|                     _testinternalcapi.set_eval_frame_record(actual_calls)
 | |
|                 func()
 | |
|         finally:
 | |
|             _testinternalcapi.set_eval_frame_default()
 | |
|         expected_calls = names * SUFFICIENT_TO_DEOPT_AND_SPECIALIZE
 | |
|         self.assertEqual(len(expected_calls), len(actual_calls))
 | |
|         for expected, actual in zip(expected_calls, actual_calls, strict=True):
 | |
|             self.assertEqual(expected, actual)
 | |
| 
 | |
|     def test_inlined_binary_subscr(self):
 | |
|         class C:
 | |
|             def __getitem__(self, other):
 | |
|                 return None
 | |
|         def func():
 | |
|             C()[42]
 | |
|         names = ["func", "__getitem__"]
 | |
|         self.do_test(func, names)
 | |
| 
 | |
|     def test_inlined_call(self):
 | |
|         def inner(x=42):
 | |
|             pass
 | |
|         def func():
 | |
|             inner()
 | |
|             inner(42)
 | |
|         names = ["func", "inner", "inner"]
 | |
|         self.do_test(func, names)
 | |
| 
 | |
|     def test_inlined_call_function_ex(self):
 | |
|         def inner(x):
 | |
|             pass
 | |
|         def func():
 | |
|             inner(*[42])
 | |
|         names = ["func", "inner"]
 | |
|         self.do_test(func, names)
 | |
| 
 | |
|     def test_inlined_for_iter(self):
 | |
|         def gen():
 | |
|             yield 42
 | |
|         def func():
 | |
|             for _ in gen():
 | |
|                 pass
 | |
|         names = ["func", "gen", "gen", "gen"]
 | |
|         self.do_test(func, names)
 | |
| 
 | |
|     def test_inlined_load_attr(self):
 | |
|         class C:
 | |
|             @property
 | |
|             def a(self):
 | |
|                 return 42
 | |
|         class D:
 | |
|             def __getattribute__(self, name):
 | |
|                 return 42
 | |
|         def func():
 | |
|             C().a
 | |
|             D().a
 | |
|         names = ["func", "a", "__getattribute__"]
 | |
|         self.do_test(func, names)
 | |
| 
 | |
|     def test_inlined_send(self):
 | |
|         def inner():
 | |
|             yield 42
 | |
|         def outer():
 | |
|             yield from inner()
 | |
|         def func():
 | |
|             list(outer())
 | |
|         names = ["func", "outer", "outer", "inner", "inner", "outer", "inner"]
 | |
|         self.do_test(func, names)
 | |
| 
 | |
| 
 | |
| @contextlib.contextmanager
 | |
| def temporary_optimizer(opt):
 | |
|     old_opt = _testinternalcapi.get_optimizer()
 | |
|     _testinternalcapi.set_optimizer(opt)
 | |
|     try:
 | |
|         yield
 | |
|     finally:
 | |
|         _testinternalcapi.set_optimizer(old_opt)
 | |
| 
 | |
| 
 | |
| @contextlib.contextmanager
 | |
| def clear_executors(func):
 | |
|     # Clear executors in func before and after running a block
 | |
|     func.__code__ = func.__code__.replace()
 | |
|     try:
 | |
|         yield
 | |
|     finally:
 | |
|         func.__code__ = func.__code__.replace()
 | |
| 
 | |
| 
 | |
| class TestOptimizerAPI(unittest.TestCase):
 | |
| 
 | |
|     def test_get_set_optimizer(self):
 | |
|         self.assertEqual(_testinternalcapi.get_optimizer(), None)
 | |
|         opt = _testinternalcapi.get_counter_optimizer()
 | |
|         _testinternalcapi.set_optimizer(opt)
 | |
|         self.assertEqual(_testinternalcapi.get_optimizer(), opt)
 | |
|         _testinternalcapi.set_optimizer(None)
 | |
|         self.assertEqual(_testinternalcapi.get_optimizer(), None)
 | |
| 
 | |
|     def test_counter_optimizer(self):
 | |
|         # Generate a new function at each call
 | |
|         ns = {}
 | |
|         exec(textwrap.dedent("""
 | |
|             def loop():
 | |
|                 for _ in range(1000):
 | |
|                     pass
 | |
|         """), ns, ns)
 | |
|         loop = ns['loop']
 | |
| 
 | |
|         for repeat in range(5):
 | |
|             opt = _testinternalcapi.get_counter_optimizer()
 | |
|             with temporary_optimizer(opt):
 | |
|                 self.assertEqual(opt.get_count(), 0)
 | |
|                 with clear_executors(loop):
 | |
|                     loop()
 | |
|                 self.assertEqual(opt.get_count(), 1000)
 | |
| 
 | |
|     def test_long_loop(self):
 | |
|         "Check that we aren't confused by EXTENDED_ARG"
 | |
| 
 | |
|         # Generate a new function at each call
 | |
|         ns = {}
 | |
|         exec(textwrap.dedent("""
 | |
|             def nop():
 | |
|                 pass
 | |
| 
 | |
|             def long_loop():
 | |
|                 for _ in range(10):
 | |
|                     nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
 | |
|                     nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
 | |
|                     nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
 | |
|                     nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
 | |
|                     nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
 | |
|                     nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
 | |
|                     nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
 | |
|         """), ns, ns)
 | |
|         long_loop = ns['long_loop']
 | |
| 
 | |
|         opt = _testinternalcapi.get_counter_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             self.assertEqual(opt.get_count(), 0)
 | |
|             long_loop()
 | |
|             self.assertEqual(opt.get_count(), 10)
 | |
| 
 | |
| 
 | |
| def get_first_executor(func):
 | |
|     code = func.__code__
 | |
|     co_code = code.co_code
 | |
|     JUMP_BACKWARD = opcode.opmap["JUMP_BACKWARD"]
 | |
|     for i in range(0, len(co_code), 2):
 | |
|         if co_code[i] == JUMP_BACKWARD or 1:
 | |
|             try:
 | |
|                 return _testinternalcapi.get_executor(code, i)
 | |
|             except ValueError:
 | |
|                 pass
 | |
|     return None
 | |
| 
 | |
| 
 | |
| class TestUops(unittest.TestCase):
 | |
| 
 | |
|     def test_basic_loop(self):
 | |
|         def testfunc(x):
 | |
|             i = 0
 | |
|             while i < x:
 | |
|                 i += 1
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             testfunc(1000)
 | |
| 
 | |
|         ex = get_first_executor(testfunc)
 | |
|         self.assertIsNotNone(ex)
 | |
|         uops = {opname for opname, _ in ex}
 | |
|         self.assertIn("SAVE_IP", uops)
 | |
|         self.assertIn("LOAD_FAST", uops)
 | |
| 
 | |
|     def test_extended_arg(self):
 | |
|         "Check EXTENDED_ARG handling in superblock creation"
 | |
|         def many_vars():
 | |
|             # 260 vars, so z9 should have index 259
 | |
|             a0 = a1 = a2 = a3 = a4 = a5 = a6 = a7 = a8 = a9 = 42
 | |
|             b0 = b1 = b2 = b3 = b4 = b5 = b6 = b7 = b8 = b9 = 42
 | |
|             c0 = c1 = c2 = c3 = c4 = c5 = c6 = c7 = c8 = c9 = 42
 | |
|             d0 = d1 = d2 = d3 = d4 = d5 = d6 = d7 = d8 = d9 = 42
 | |
|             e0 = e1 = e2 = e3 = e4 = e5 = e6 = e7 = e8 = e9 = 42
 | |
|             f0 = f1 = f2 = f3 = f4 = f5 = f6 = f7 = f8 = f9 = 42
 | |
|             g0 = g1 = g2 = g3 = g4 = g5 = g6 = g7 = g8 = g9 = 42
 | |
|             h0 = h1 = h2 = h3 = h4 = h5 = h6 = h7 = h8 = h9 = 42
 | |
|             i0 = i1 = i2 = i3 = i4 = i5 = i6 = i7 = i8 = i9 = 42
 | |
|             j0 = j1 = j2 = j3 = j4 = j5 = j6 = j7 = j8 = j9 = 42
 | |
|             k0 = k1 = k2 = k3 = k4 = k5 = k6 = k7 = k8 = k9 = 42
 | |
|             l0 = l1 = l2 = l3 = l4 = l5 = l6 = l7 = l8 = l9 = 42
 | |
|             m0 = m1 = m2 = m3 = m4 = m5 = m6 = m7 = m8 = m9 = 42
 | |
|             n0 = n1 = n2 = n3 = n4 = n5 = n6 = n7 = n8 = n9 = 42
 | |
|             o0 = o1 = o2 = o3 = o4 = o5 = o6 = o7 = o8 = o9 = 42
 | |
|             p0 = p1 = p2 = p3 = p4 = p5 = p6 = p7 = p8 = p9 = 42
 | |
|             q0 = q1 = q2 = q3 = q4 = q5 = q6 = q7 = q8 = q9 = 42
 | |
|             r0 = r1 = r2 = r3 = r4 = r5 = r6 = r7 = r8 = r9 = 42
 | |
|             s0 = s1 = s2 = s3 = s4 = s5 = s6 = s7 = s8 = s9 = 42
 | |
|             t0 = t1 = t2 = t3 = t4 = t5 = t6 = t7 = t8 = t9 = 42
 | |
|             u0 = u1 = u2 = u3 = u4 = u5 = u6 = u7 = u8 = u9 = 42
 | |
|             v0 = v1 = v2 = v3 = v4 = v5 = v6 = v7 = v8 = v9 = 42
 | |
|             w0 = w1 = w2 = w3 = w4 = w5 = w6 = w7 = w8 = w9 = 42
 | |
|             x0 = x1 = x2 = x3 = x4 = x5 = x6 = x7 = x8 = x9 = 42
 | |
|             y0 = y1 = y2 = y3 = y4 = y5 = y6 = y7 = y8 = y9 = 42
 | |
|             z0 = z1 = z2 = z3 = z4 = z5 = z6 = z7 = z8 = z9 = 42
 | |
|             while z9 > 0:
 | |
|                 z9 = z9 - 1
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             ex = get_first_executor(many_vars)
 | |
|             self.assertIsNone(ex)
 | |
|             many_vars()
 | |
| 
 | |
|         ex = get_first_executor(many_vars)
 | |
|         self.assertIsNotNone(ex)
 | |
|         self.assertIn(("LOAD_FAST", 259), list(ex))
 | |
| 
 | |
|     def test_unspecialized_unpack(self):
 | |
|         # An example of an unspecialized opcode
 | |
|         def testfunc(x):
 | |
|             i = 0
 | |
|             while i < x:
 | |
|                 i += 1
 | |
|                 a, b = {1: 2, 3: 3}
 | |
|             assert a == 1 and b == 3
 | |
|             i = 0
 | |
|             while i < x:
 | |
|                 i += 1
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
| 
 | |
|         with temporary_optimizer(opt):
 | |
|             testfunc(10)
 | |
| 
 | |
|         ex = get_first_executor(testfunc)
 | |
|         self.assertIsNotNone(ex)
 | |
|         uops = {opname for opname, _ in ex}
 | |
|         self.assertIn("UNPACK_SEQUENCE", uops)
 | |
| 
 | |
|     def test_pop_jump_if_false(self):
 | |
|         def testfunc(n):
 | |
|             i = 0
 | |
|             while i < n:
 | |
|                 i += 1
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             testfunc(10)
 | |
| 
 | |
|         ex = get_first_executor(testfunc)
 | |
|         self.assertIsNotNone(ex)
 | |
|         uops = {opname for opname, _ in ex}
 | |
|         self.assertIn("_POP_JUMP_IF_FALSE", uops)
 | |
| 
 | |
|     def test_pop_jump_if_true(self):
 | |
|         def testfunc(n):
 | |
|             i = 0
 | |
|             while not i >= n:
 | |
|                 i += 1
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             testfunc(10)
 | |
| 
 | |
|         ex = get_first_executor(testfunc)
 | |
|         self.assertIsNotNone(ex)
 | |
|         uops = {opname for opname, _ in ex}
 | |
|         self.assertIn("_POP_JUMP_IF_TRUE", uops)
 | |
| 
 | |
|     def test_jump_backward(self):
 | |
|         def testfunc(n):
 | |
|             i = 0
 | |
|             while i < n:
 | |
|                 i += 1
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             testfunc(10)
 | |
| 
 | |
|         ex = get_first_executor(testfunc)
 | |
|         self.assertIsNotNone(ex)
 | |
|         uops = {opname for opname, _ in ex}
 | |
|         self.assertIn("JUMP_TO_TOP", uops)
 | |
| 
 | |
|     def test_jump_forward(self):
 | |
|         def testfunc(n):
 | |
|             a = 0
 | |
|             while a < n:
 | |
|                 if a < 0:
 | |
|                     a = -a
 | |
|                 else:
 | |
|                     a = +a
 | |
|                 a += 1
 | |
|             return a
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             testfunc(10)
 | |
| 
 | |
|         ex = get_first_executor(testfunc)
 | |
|         self.assertIsNotNone(ex)
 | |
|         uops = {opname for opname, _ in ex}
 | |
|         # Since there is no JUMP_FORWARD instruction,
 | |
|         # look for indirect evidence: the += operator
 | |
|         self.assertIn("_BINARY_OP_ADD_INT", uops)
 | |
| 
 | |
|     def test_for_iter_range(self):
 | |
|         def testfunc(n):
 | |
|             total = 0
 | |
|             for i in range(n):
 | |
|                 total += i
 | |
|             return total
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             total = testfunc(10)
 | |
|             self.assertEqual(total, 45)
 | |
| 
 | |
|         ex = get_first_executor(testfunc)
 | |
|         self.assertIsNotNone(ex)
 | |
|         # for i, (opname, oparg) in enumerate(ex):
 | |
|         #     print(f"{i:4d}: {opname:<20s} {oparg:3d}")
 | |
|         uops = {opname for opname, _ in ex}
 | |
|         self.assertIn("_IS_ITER_EXHAUSTED_RANGE", uops)
 | |
|         # Verification that the jump goes past END_FOR
 | |
|         # is done by manual inspection of the output
 | |
| 
 | |
|     def test_for_iter_list(self):
 | |
|         def testfunc(a):
 | |
|             total = 0
 | |
|             for i in a:
 | |
|                 total += i
 | |
|             return total
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             a = list(range(10))
 | |
|             total = testfunc(a)
 | |
|             self.assertEqual(total, 45)
 | |
| 
 | |
|         ex = get_first_executor(testfunc)
 | |
|         self.assertIsNotNone(ex)
 | |
|         # for i, (opname, oparg) in enumerate(ex):
 | |
|         #     print(f"{i:4d}: {opname:<20s} {oparg:3d}")
 | |
|         uops = {opname for opname, _ in ex}
 | |
|         self.assertIn("_IS_ITER_EXHAUSTED_LIST", uops)
 | |
|         # Verification that the jump goes past END_FOR
 | |
|         # is done by manual inspection of the output
 | |
| 
 | |
|     def test_for_iter_tuple(self):
 | |
|         def testfunc(a):
 | |
|             total = 0
 | |
|             for i in a:
 | |
|                 total += i
 | |
|             return total
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             a = tuple(range(10))
 | |
|             total = testfunc(a)
 | |
|             self.assertEqual(total, 45)
 | |
| 
 | |
|         ex = get_first_executor(testfunc)
 | |
|         self.assertIsNotNone(ex)
 | |
|         # for i, (opname, oparg) in enumerate(ex):
 | |
|         #     print(f"{i:4d}: {opname:<20s} {oparg:3d}")
 | |
|         uops = {opname for opname, _ in ex}
 | |
|         self.assertIn("_IS_ITER_EXHAUSTED_TUPLE", uops)
 | |
|         # Verification that the jump goes past END_FOR
 | |
|         # is done by manual inspection of the output
 | |
| 
 | |
|     def test_list_edge_case(self):
 | |
|         def testfunc(it):
 | |
|             for x in it:
 | |
|                 pass
 | |
| 
 | |
|         opt = _testinternalcapi.get_uop_optimizer()
 | |
|         with temporary_optimizer(opt):
 | |
|             a = [1, 2, 3]
 | |
|             it = iter(a)
 | |
|             testfunc(it)
 | |
|             a.append(4)
 | |
|             with self.assertRaises(StopIteration):
 | |
|                 next(it)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 |