mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 21:51:50 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1160 lines
		
	
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1160 lines
		
	
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import copy
 | |
| import pickle
 | |
| import dis
 | |
| import threading
 | |
| import types
 | |
| import unittest
 | |
| from test.support import threading_helper, check_impl_detail, requires_specialization
 | |
| from test.support.import_helper import import_module
 | |
| 
 | |
| # Skip this module on other interpreters, it is cpython specific:
 | |
| if check_impl_detail(cpython=False):
 | |
|     raise unittest.SkipTest('implementation detail specific to cpython')
 | |
| 
 | |
| _testinternalcapi = import_module("_testinternalcapi")
 | |
| 
 | |
| 
 | |
| def disabling_optimizer(func):
 | |
|     def wrapper(*args, **kwargs):
 | |
|         if not hasattr(_testinternalcapi, "get_optimizer"):
 | |
|             return func(*args, **kwargs)
 | |
|         old_opt = _testinternalcapi.get_optimizer()
 | |
|         _testinternalcapi.set_optimizer(None)
 | |
|         try:
 | |
|             return func(*args, **kwargs)
 | |
|         finally:
 | |
|             _testinternalcapi.set_optimizer(old_opt)
 | |
| 
 | |
|     return wrapper
 | |
| 
 | |
| 
 | |
| class TestBase(unittest.TestCase):
 | |
|     def assert_specialized(self, f, opname):
 | |
|         instructions = dis.get_instructions(f, adaptive=True)
 | |
|         opnames = {instruction.opname for instruction in instructions}
 | |
|         self.assertIn(opname, opnames)
 | |
| 
 | |
| 
 | |
| class TestLoadSuperAttrCache(unittest.TestCase):
 | |
|     def test_descriptor_not_double_executed_on_spec_fail(self):
 | |
|         calls = []
 | |
|         class Descriptor:
 | |
|             def __get__(self, instance, owner):
 | |
|                 calls.append((instance, owner))
 | |
|                 return lambda: 1
 | |
| 
 | |
|         class C:
 | |
|             d = Descriptor()
 | |
| 
 | |
|         class D(C):
 | |
|             def f(self):
 | |
|                 return super().d()
 | |
| 
 | |
|         d = D()
 | |
| 
 | |
|         self.assertEqual(d.f(), 1)  # warmup
 | |
|         calls.clear()
 | |
|         self.assertEqual(d.f(), 1)  # try to specialize
 | |
|         self.assertEqual(calls, [(d, D)])
 | |
| 
 | |
| 
 | |
| class TestLoadAttrCache(unittest.TestCase):
 | |
|     def test_descriptor_added_after_optimization(self):
 | |
|         class Descriptor:
 | |
|             pass
 | |
| 
 | |
|         class C:
 | |
|             def __init__(self):
 | |
|                 self.x = 1
 | |
|             x = Descriptor()
 | |
| 
 | |
|         def f(o):
 | |
|             return o.x
 | |
| 
 | |
|         o = C()
 | |
|         for i in range(1025):
 | |
|             assert f(o) == 1
 | |
| 
 | |
|         Descriptor.__get__ = lambda self, instance, value: 2
 | |
|         Descriptor.__set__ = lambda *args: None
 | |
| 
 | |
|         self.assertEqual(f(o), 2)
 | |
| 
 | |
|     def test_metaclass_descriptor_added_after_optimization(self):
 | |
|         class Descriptor:
 | |
|             pass
 | |
| 
 | |
|         class Metaclass(type):
 | |
|             attribute = Descriptor()
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             attribute = True
 | |
| 
 | |
|         def __get__(self, instance, owner):
 | |
|             return False
 | |
| 
 | |
|         def __set__(self, instance, value):
 | |
|             return None
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|         Descriptor.__get__ = __get__
 | |
|         Descriptor.__set__ = __set__
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertFalse(f())
 | |
| 
 | |
|     def test_metaclass_descriptor_shadows_class_attribute(self):
 | |
|         class Metaclass(type):
 | |
|             @property
 | |
|             def attribute(self):
 | |
|                 return True
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             attribute = False
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|     def test_metaclass_set_descriptor_after_optimization(self):
 | |
|         class Metaclass(type):
 | |
|             pass
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             attribute = True
 | |
| 
 | |
|         @property
 | |
|         def attribute(self):
 | |
|             return False
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|         Metaclass.attribute = attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertFalse(f())
 | |
| 
 | |
|     def test_metaclass_del_descriptor_after_optimization(self):
 | |
|         class Metaclass(type):
 | |
|             @property
 | |
|             def attribute(self):
 | |
|                 return True
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             attribute = False
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|         del Metaclass.attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertFalse(f())
 | |
| 
 | |
|     def test_type_descriptor_shadows_attribute_method(self):
 | |
|         class Class:
 | |
|             mro = None
 | |
| 
 | |
|         def f():
 | |
|             return Class.mro
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertIsNone(f())
 | |
| 
 | |
|     def test_type_descriptor_shadows_attribute_member(self):
 | |
|         class Class:
 | |
|             __base__ = None
 | |
| 
 | |
|         def f():
 | |
|             return Class.__base__
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertIs(f(), object)
 | |
| 
 | |
|     def test_type_descriptor_shadows_attribute_getset(self):
 | |
|         class Class:
 | |
|             __name__ = "Spam"
 | |
| 
 | |
|         def f():
 | |
|             return Class.__name__
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertEqual(f(), "Class")
 | |
| 
 | |
|     def test_metaclass_getattribute(self):
 | |
|         class Metaclass(type):
 | |
|             def __getattribute__(self, name):
 | |
|                 return True
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             attribute = False
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|     def test_metaclass_swap(self):
 | |
|         class OldMetaclass(type):
 | |
|             @property
 | |
|             def attribute(self):
 | |
|                 return True
 | |
| 
 | |
|         class NewMetaclass(type):
 | |
|             @property
 | |
|             def attribute(self):
 | |
|                 return False
 | |
| 
 | |
|         class Class(metaclass=OldMetaclass):
 | |
|             pass
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|         Class.__class__ = NewMetaclass
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertFalse(f())
 | |
| 
 | |
|     def test_load_shadowing_slot_should_raise_type_error(self):
 | |
|         class Class:
 | |
|             __slots__ = ("slot",)
 | |
| 
 | |
|         class Sneaky:
 | |
|             __slots__ = ("shadowed",)
 | |
|             shadowing = Class.slot
 | |
| 
 | |
|         def f(o):
 | |
|             o.shadowing
 | |
| 
 | |
|         o = Sneaky()
 | |
|         o.shadowed = 42
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             with self.assertRaises(TypeError):
 | |
|                 f(o)
 | |
| 
 | |
|     def test_store_shadowing_slot_should_raise_type_error(self):
 | |
|         class Class:
 | |
|             __slots__ = ("slot",)
 | |
| 
 | |
|         class Sneaky:
 | |
|             __slots__ = ("shadowed",)
 | |
|             shadowing = Class.slot
 | |
| 
 | |
|         def f(o):
 | |
|             o.shadowing = 42
 | |
| 
 | |
|         o = Sneaky()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             with self.assertRaises(TypeError):
 | |
|                 f(o)
 | |
| 
 | |
|     def test_load_borrowed_slot_should_not_crash(self):
 | |
|         class Class:
 | |
|             __slots__ = ("slot",)
 | |
| 
 | |
|         class Sneaky:
 | |
|             borrowed = Class.slot
 | |
| 
 | |
|         def f(o):
 | |
|             o.borrowed
 | |
| 
 | |
|         o = Sneaky()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             with self.assertRaises(TypeError):
 | |
|                 f(o)
 | |
| 
 | |
|     def test_store_borrowed_slot_should_not_crash(self):
 | |
|         class Class:
 | |
|             __slots__ = ("slot",)
 | |
| 
 | |
|         class Sneaky:
 | |
|             borrowed = Class.slot
 | |
| 
 | |
|         def f(o):
 | |
|             o.borrowed = 42
 | |
| 
 | |
|         o = Sneaky()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             with self.assertRaises(TypeError):
 | |
|                 f(o)
 | |
| 
 | |
| 
 | |
| class TestLoadMethodCache(unittest.TestCase):
 | |
|     def test_descriptor_added_after_optimization(self):
 | |
|         class Descriptor:
 | |
|             pass
 | |
| 
 | |
|         class Class:
 | |
|             attribute = Descriptor()
 | |
| 
 | |
|         def __get__(self, instance, owner):
 | |
|             return lambda: False
 | |
| 
 | |
|         def __set__(self, instance, value):
 | |
|             return None
 | |
| 
 | |
|         def attribute():
 | |
|             return True
 | |
| 
 | |
|         instance = Class()
 | |
|         instance.attribute = attribute
 | |
| 
 | |
|         def f():
 | |
|             return instance.attribute()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|         Descriptor.__get__ = __get__
 | |
|         Descriptor.__set__ = __set__
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertFalse(f())
 | |
| 
 | |
|     def test_metaclass_descriptor_added_after_optimization(self):
 | |
|         class Descriptor:
 | |
|             pass
 | |
| 
 | |
|         class Metaclass(type):
 | |
|             attribute = Descriptor()
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             def attribute():
 | |
|                 return True
 | |
| 
 | |
|         def __get__(self, instance, owner):
 | |
|             return lambda: False
 | |
| 
 | |
|         def __set__(self, instance, value):
 | |
|             return None
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|         Descriptor.__get__ = __get__
 | |
|         Descriptor.__set__ = __set__
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertFalse(f())
 | |
| 
 | |
|     def test_metaclass_descriptor_shadows_class_attribute(self):
 | |
|         class Metaclass(type):
 | |
|             @property
 | |
|             def attribute(self):
 | |
|                 return lambda: True
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             def attribute():
 | |
|                 return False
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|     def test_metaclass_set_descriptor_after_optimization(self):
 | |
|         class Metaclass(type):
 | |
|             pass
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             def attribute():
 | |
|                 return True
 | |
| 
 | |
|         @property
 | |
|         def attribute(self):
 | |
|             return lambda: False
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|         Metaclass.attribute = attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertFalse(f())
 | |
| 
 | |
|     def test_metaclass_del_descriptor_after_optimization(self):
 | |
|         class Metaclass(type):
 | |
|             @property
 | |
|             def attribute(self):
 | |
|                 return lambda: True
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             def attribute():
 | |
|                 return False
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|         del Metaclass.attribute
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertFalse(f())
 | |
| 
 | |
|     def test_type_descriptor_shadows_attribute_method(self):
 | |
|         class Class:
 | |
|             def mro():
 | |
|                 return ["Spam", "eggs"]
 | |
| 
 | |
|         def f():
 | |
|             return Class.mro()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertEqual(f(), ["Spam", "eggs"])
 | |
| 
 | |
|     def test_type_descriptor_shadows_attribute_member(self):
 | |
|         class Class:
 | |
|             def __base__():
 | |
|                 return "Spam"
 | |
| 
 | |
|         def f():
 | |
|             return Class.__base__()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertNotEqual(f(), "Spam")
 | |
| 
 | |
|     def test_metaclass_getattribute(self):
 | |
|         class Metaclass(type):
 | |
|             def __getattribute__(self, name):
 | |
|                 return lambda: True
 | |
| 
 | |
|         class Class(metaclass=Metaclass):
 | |
|             def attribute():
 | |
|                 return False
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|     def test_metaclass_swap(self):
 | |
|         class OldMetaclass(type):
 | |
|             @property
 | |
|             def attribute(self):
 | |
|                 return lambda: True
 | |
| 
 | |
|         class NewMetaclass(type):
 | |
|             @property
 | |
|             def attribute(self):
 | |
|                 return lambda: False
 | |
| 
 | |
|         class Class(metaclass=OldMetaclass):
 | |
|             pass
 | |
| 
 | |
|         def f():
 | |
|             return Class.attribute()
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertTrue(f())
 | |
| 
 | |
|         Class.__class__ = NewMetaclass
 | |
| 
 | |
|         for _ in range(1025):
 | |
|             self.assertFalse(f())
 | |
| 
 | |
| 
 | |
| class TestCallCache(TestBase):
 | |
|     def test_too_many_defaults_0(self):
 | |
|         def f():
 | |
|             pass
 | |
| 
 | |
|         f.__defaults__ = (None,)
 | |
|         for _ in range(1025):
 | |
|             f()
 | |
| 
 | |
|     def test_too_many_defaults_1(self):
 | |
|         def f(x):
 | |
|             pass
 | |
| 
 | |
|         f.__defaults__ = (None, None)
 | |
|         for _ in range(1025):
 | |
|             f(None)
 | |
|             f()
 | |
| 
 | |
|     def test_too_many_defaults_2(self):
 | |
|         def f(x, y):
 | |
|             pass
 | |
| 
 | |
|         f.__defaults__ = (None, None, None)
 | |
|         for _ in range(1025):
 | |
|             f(None, None)
 | |
|             f(None)
 | |
|             f()
 | |
| 
 | |
|     @disabling_optimizer
 | |
|     @requires_specialization
 | |
|     def test_assign_init_code(self):
 | |
|         class MyClass:
 | |
|             def __init__(self):
 | |
|                 pass
 | |
| 
 | |
|         def instantiate():
 | |
|             return MyClass()
 | |
| 
 | |
|         # Trigger specialization
 | |
|         for _ in range(1025):
 | |
|             instantiate()
 | |
|         self.assert_specialized(instantiate, "CALL_ALLOC_AND_ENTER_INIT")
 | |
| 
 | |
|         def count_args(self, *args):
 | |
|             self.num_args = len(args)
 | |
| 
 | |
|         # Set MyClass.__init__.__code__ to a code object that uses different
 | |
|         # args
 | |
|         MyClass.__init__.__code__ = count_args.__code__
 | |
|         instantiate()
 | |
| 
 | |
| 
 | |
| @threading_helper.requires_working_threading()
 | |
| @requires_specialization
 | |
| class TestRacesDoNotCrash(TestBase):
 | |
|     # Careful with these. Bigger numbers have a higher chance of catching bugs,
 | |
|     # but you can also burn through a *ton* of type/dict/function versions:
 | |
|     ITEMS = 1000
 | |
|     LOOPS = 4
 | |
|     WARMUPS = 2
 | |
|     WRITERS = 2
 | |
| 
 | |
|     @disabling_optimizer
 | |
|     def assert_races_do_not_crash(
 | |
|         self, opname, get_items, read, write, *, check_items=False
 | |
|     ):
 | |
|         # This might need a few dozen loops in some cases:
 | |
|         for _ in range(self.LOOPS):
 | |
|             items = get_items()
 | |
|             # Reset:
 | |
|             if check_items:
 | |
|                 for item in items:
 | |
|                     item.__code__ = item.__code__.replace()
 | |
|             else:
 | |
|                 read.__code__ = read.__code__.replace()
 | |
|             # Specialize:
 | |
|             for _ in range(self.WARMUPS):
 | |
|                 read(items)
 | |
|             if check_items:
 | |
|                 for item in items:
 | |
|                     self.assert_specialized(item, opname)
 | |
|             else:
 | |
|                 self.assert_specialized(read, opname)
 | |
|             # Create writers:
 | |
|             writers = []
 | |
|             for _ in range(self.WRITERS):
 | |
|                 writer = threading.Thread(target=write, args=[items])
 | |
|                 writers.append(writer)
 | |
|             # Run:
 | |
|             for writer in writers:
 | |
|                 writer.start()
 | |
|             read(items)  # BOOM!
 | |
|             for writer in writers:
 | |
|                 writer.join()
 | |
| 
 | |
|     def test_binary_subscr_getitem(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 __getitem__ = lambda self, item: None
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item[None]
 | |
|                 except TypeError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     del item.__getitem__
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
|                 type(item).__getitem__ = lambda self, item: None
 | |
| 
 | |
|         opname = "BINARY_SUBSCR_GETITEM"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_binary_subscr_list_int(self):
 | |
|         def get_items():
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = [None]
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item[0]
 | |
|                 except IndexError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 item.clear()
 | |
|                 item.append(None)
 | |
| 
 | |
|         opname = "BINARY_SUBSCR_LIST_INT"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_for_iter_gen(self):
 | |
|         def get_items():
 | |
|             def g():
 | |
|                 yield
 | |
|                 yield
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = g()
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     for _ in item:
 | |
|                         break
 | |
|                 except ValueError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     for _ in item:
 | |
|                         break
 | |
|                 except ValueError:
 | |
|                     pass
 | |
| 
 | |
|         opname = "FOR_ITER_GEN"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_for_iter_list(self):
 | |
|         def get_items():
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = [None]
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 for item in item:
 | |
|                     break
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 item.clear()
 | |
|                 item.append(None)
 | |
| 
 | |
|         opname = "FOR_ITER_LIST"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_attr_class(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 a = object()
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item.a
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     del item.a
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
|                 item.a = object()
 | |
| 
 | |
|         opname = "LOAD_ATTR_CLASS"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_attr_getattribute_overridden(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 __getattribute__ = lambda self, name: None
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item.a
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     del item.__getattribute__
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
|                 type(item).__getattribute__ = lambda self, name: None
 | |
| 
 | |
|         opname = "LOAD_ATTR_GETATTRIBUTE_OVERRIDDEN"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_attr_instance_value(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 pass
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 item.a = None
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 item.a
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 item.__dict__[None] = None
 | |
| 
 | |
|         opname = "LOAD_ATTR_INSTANCE_VALUE"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_attr_method_lazy_dict(self):
 | |
|         def get_items():
 | |
|             class C(Exception):
 | |
|                 m = lambda self: None
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item.m()
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     del item.m
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
|                 type(item).m = lambda self: None
 | |
| 
 | |
|         opname = "LOAD_ATTR_METHOD_LAZY_DICT"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_attr_method_no_dict(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 __slots__ = ()
 | |
|                 m = lambda self: None
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item.m()
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     del item.m
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
|                 type(item).m = lambda self: None
 | |
| 
 | |
|         opname = "LOAD_ATTR_METHOD_NO_DICT"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_attr_method_with_values(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 m = lambda self: None
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item.m()
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     del item.m
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
|                 type(item).m = lambda self: None
 | |
| 
 | |
|         opname = "LOAD_ATTR_METHOD_WITH_VALUES"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_attr_module(self):
 | |
|         def get_items():
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = types.ModuleType("<item>")
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item.__name__
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 d = item.__dict__.copy()
 | |
|                 item.__dict__.clear()
 | |
|                 item.__dict__.update(d)
 | |
| 
 | |
|         opname = "LOAD_ATTR_MODULE"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_attr_property(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 a = property(lambda self: None)
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item.a
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     del type(item).a
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
|                 type(item).a = property(lambda self: None)
 | |
| 
 | |
|         opname = "LOAD_ATTR_PROPERTY"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_attr_with_hint(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 pass
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 item.a = None
 | |
|                 # Resize into a combined unicode dict:
 | |
|                 for i in range(29):
 | |
|                     setattr(item, f"_{i}", None)
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 item.a
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 item.__dict__[None] = None
 | |
| 
 | |
|         opname = "LOAD_ATTR_WITH_HINT"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_load_global_module(self):
 | |
|         def get_items():
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = eval("lambda: x", {"x": None})
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 item()
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 item.__globals__[None] = None
 | |
| 
 | |
|         opname = "LOAD_GLOBAL_MODULE"
 | |
|         self.assert_races_do_not_crash(
 | |
|             opname, get_items, read, write, check_items=True
 | |
|         )
 | |
| 
 | |
|     def test_store_attr_instance_value(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 pass
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 item.a = None
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 item.__dict__[None] = None
 | |
| 
 | |
|         opname = "STORE_ATTR_INSTANCE_VALUE"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_store_attr_with_hint(self):
 | |
|         def get_items():
 | |
|             class C:
 | |
|                 pass
 | |
| 
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = C()
 | |
|                 # Resize into a combined unicode dict:
 | |
|                 for i in range(29):
 | |
|                     setattr(item, f"_{i}", None)
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 item.a = None
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 item.__dict__[None] = None
 | |
| 
 | |
|         opname = "STORE_ATTR_WITH_HINT"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_store_subscr_list_int(self):
 | |
|         def get_items():
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = [None]
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     item[0] = None
 | |
|                 except IndexError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 item.clear()
 | |
|                 item.append(None)
 | |
| 
 | |
|         opname = "STORE_SUBSCR_LIST_INT"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
|     def test_unpack_sequence_list(self):
 | |
|         def get_items():
 | |
|             items = []
 | |
|             for _ in range(self.ITEMS):
 | |
|                 item = [None]
 | |
|                 items.append(item)
 | |
|             return items
 | |
| 
 | |
|         def read(items):
 | |
|             for item in items:
 | |
|                 try:
 | |
|                     [_] = item
 | |
|                 except ValueError:
 | |
|                     pass
 | |
| 
 | |
|         def write(items):
 | |
|             for item in items:
 | |
|                 item.clear()
 | |
|                 item.append(None)
 | |
| 
 | |
|         opname = "UNPACK_SEQUENCE_LIST"
 | |
|         self.assert_races_do_not_crash(opname, get_items, read, write)
 | |
| 
 | |
| class C:
 | |
|     pass
 | |
| 
 | |
| @requires_specialization
 | |
| class TestInstanceDict(unittest.TestCase):
 | |
| 
 | |
|     def setUp(self):
 | |
|         c = C()
 | |
|         c.a, c.b, c.c = 0,0,0
 | |
| 
 | |
|     def test_values_on_instance(self):
 | |
|         c = C()
 | |
|         c.a = 1
 | |
|         C().b = 2
 | |
|         c.c = 3
 | |
|         self.assertEqual(
 | |
|             _testinternalcapi.get_object_dict_values(c),
 | |
|             (1, '<NULL>', 3)
 | |
|         )
 | |
| 
 | |
|     def test_dict_materialization(self):
 | |
|         c = C()
 | |
|         c.a = 1
 | |
|         c.b = 2
 | |
|         c.__dict__
 | |
|         self.assertEqual(c.__dict__, {"a":1, "b": 2})
 | |
| 
 | |
|     def test_dict_dematerialization(self):
 | |
|         c = C()
 | |
|         c.a = 1
 | |
|         c.b = 2
 | |
|         c.__dict__
 | |
|         for _ in range(100):
 | |
|             c.a
 | |
|         self.assertEqual(
 | |
|             _testinternalcapi.get_object_dict_values(c),
 | |
|             (1, 2, '<NULL>')
 | |
|         )
 | |
| 
 | |
|     def test_dict_dematerialization_multiple_refs(self):
 | |
|         c = C()
 | |
|         c.a = 1
 | |
|         c.b = 2
 | |
|         d = c.__dict__
 | |
|         for _ in range(100):
 | |
|             c.a
 | |
|         self.assertIs(c.__dict__, d)
 | |
| 
 | |
|     def test_dict_dematerialization_copy(self):
 | |
|         c = C()
 | |
|         c.a = 1
 | |
|         c.b = 2
 | |
|         c2 = copy.copy(c)
 | |
|         for _ in range(100):
 | |
|             c.a
 | |
|             c2.a
 | |
|         self.assertEqual(
 | |
|             _testinternalcapi.get_object_dict_values(c),
 | |
|             (1, 2, '<NULL>')
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             _testinternalcapi.get_object_dict_values(c2),
 | |
|             (1, 2, '<NULL>')
 | |
|         )
 | |
|         c3 = copy.deepcopy(c)
 | |
|         for _ in range(100):
 | |
|             c.a
 | |
|             c3.a
 | |
|         self.assertEqual(
 | |
|             _testinternalcapi.get_object_dict_values(c),
 | |
|             (1, 2, '<NULL>')
 | |
|         )
 | |
|         #NOTE -- c3.__dict__ does not de-materialize
 | |
| 
 | |
|     def test_dict_dematerialization_pickle(self):
 | |
|         c = C()
 | |
|         c.a = 1
 | |
|         c.b = 2
 | |
|         c2 = pickle.loads(pickle.dumps(c))
 | |
|         for _ in range(100):
 | |
|             c.a
 | |
|             c2.a
 | |
|         self.assertEqual(
 | |
|             _testinternalcapi.get_object_dict_values(c),
 | |
|             (1, 2, '<NULL>')
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             _testinternalcapi.get_object_dict_values(c2),
 | |
|             (1, 2, '<NULL>')
 | |
|         )
 | |
| 
 | |
|     def test_dict_dematerialization_subclass(self):
 | |
|         class D(dict): pass
 | |
|         c = C()
 | |
|         c.a = 1
 | |
|         c.b = 2
 | |
|         c.__dict__ = D(c.__dict__)
 | |
|         for _ in range(100):
 | |
|             c.a
 | |
|         self.assertIs(
 | |
|             _testinternalcapi.get_object_dict_values(c),
 | |
|             None
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             c.__dict__,
 | |
|             {'a':1, 'b':2}
 | |
|         )
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 | 
