mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	The PEP 649 implementation will require a way to load NotImplementedError from the bytecode. @markshannon suggested implementing this by converting LOAD_ASSERTION_ERROR into a more general mechanism for loading constants. This PR adds this new opcode. I will work on the rest of the implementation of the PEP separately. Co-authored-by: Irit Katriel <1055913+iritkatriel@users.noreply.github.com>
		
			
				
	
	
		
			915 lines
		
	
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			915 lines
		
	
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""This module includes tests of the code object representation.
 | 
						|
 | 
						|
>>> def f(x):
 | 
						|
...     def g(y):
 | 
						|
...         return x + y
 | 
						|
...     return g
 | 
						|
...
 | 
						|
 | 
						|
>>> dump(f.__code__)
 | 
						|
name: f
 | 
						|
argcount: 1
 | 
						|
posonlyargcount: 0
 | 
						|
kwonlyargcount: 0
 | 
						|
names: ()
 | 
						|
varnames: ('x', 'g')
 | 
						|
cellvars: ('x',)
 | 
						|
freevars: ()
 | 
						|
nlocals: 2
 | 
						|
flags: 3
 | 
						|
consts: ('None', '<code object g>')
 | 
						|
 | 
						|
>>> dump(f(4).__code__)
 | 
						|
name: g
 | 
						|
argcount: 1
 | 
						|
posonlyargcount: 0
 | 
						|
kwonlyargcount: 0
 | 
						|
names: ()
 | 
						|
varnames: ('y',)
 | 
						|
cellvars: ()
 | 
						|
freevars: ('x',)
 | 
						|
nlocals: 1
 | 
						|
flags: 19
 | 
						|
consts: ('None',)
 | 
						|
 | 
						|
>>> def h(x, y):
 | 
						|
...     a = x + y
 | 
						|
...     b = x - y
 | 
						|
...     c = a * b
 | 
						|
...     return c
 | 
						|
...
 | 
						|
 | 
						|
>>> dump(h.__code__)
 | 
						|
name: h
 | 
						|
argcount: 2
 | 
						|
posonlyargcount: 0
 | 
						|
kwonlyargcount: 0
 | 
						|
names: ()
 | 
						|
varnames: ('x', 'y', 'a', 'b', 'c')
 | 
						|
cellvars: ()
 | 
						|
freevars: ()
 | 
						|
nlocals: 5
 | 
						|
flags: 3
 | 
						|
consts: ('None',)
 | 
						|
 | 
						|
>>> def attrs(obj):
 | 
						|
...     print(obj.attr1)
 | 
						|
...     print(obj.attr2)
 | 
						|
...     print(obj.attr3)
 | 
						|
 | 
						|
>>> dump(attrs.__code__)
 | 
						|
name: attrs
 | 
						|
argcount: 1
 | 
						|
posonlyargcount: 0
 | 
						|
kwonlyargcount: 0
 | 
						|
names: ('print', 'attr1', 'attr2', 'attr3')
 | 
						|
varnames: ('obj',)
 | 
						|
cellvars: ()
 | 
						|
freevars: ()
 | 
						|
nlocals: 1
 | 
						|
flags: 3
 | 
						|
consts: ('None',)
 | 
						|
 | 
						|
>>> def optimize_away():
 | 
						|
...     'doc string'
 | 
						|
...     'not a docstring'
 | 
						|
...     53
 | 
						|
...     0x53
 | 
						|
 | 
						|
>>> dump(optimize_away.__code__)
 | 
						|
name: optimize_away
 | 
						|
argcount: 0
 | 
						|
posonlyargcount: 0
 | 
						|
kwonlyargcount: 0
 | 
						|
names: ()
 | 
						|
varnames: ()
 | 
						|
cellvars: ()
 | 
						|
freevars: ()
 | 
						|
nlocals: 0
 | 
						|
flags: 3
 | 
						|
consts: ("'doc string'", 'None')
 | 
						|
 | 
						|
>>> def keywordonly_args(a,b,*,k1):
 | 
						|
...     return a,b,k1
 | 
						|
...
 | 
						|
 | 
						|
>>> dump(keywordonly_args.__code__)
 | 
						|
name: keywordonly_args
 | 
						|
argcount: 2
 | 
						|
posonlyargcount: 0
 | 
						|
kwonlyargcount: 1
 | 
						|
names: ()
 | 
						|
varnames: ('a', 'b', 'k1')
 | 
						|
cellvars: ()
 | 
						|
freevars: ()
 | 
						|
nlocals: 3
 | 
						|
flags: 3
 | 
						|
consts: ('None',)
 | 
						|
 | 
						|
>>> def posonly_args(a,b,/,c):
 | 
						|
...     return a,b,c
 | 
						|
...
 | 
						|
 | 
						|
>>> dump(posonly_args.__code__)
 | 
						|
name: posonly_args
 | 
						|
argcount: 3
 | 
						|
posonlyargcount: 2
 | 
						|
kwonlyargcount: 0
 | 
						|
names: ()
 | 
						|
varnames: ('a', 'b', 'c')
 | 
						|
cellvars: ()
 | 
						|
freevars: ()
 | 
						|
nlocals: 3
 | 
						|
flags: 3
 | 
						|
consts: ('None',)
 | 
						|
 | 
						|
"""
 | 
						|
 | 
						|
import copy
 | 
						|
import inspect
 | 
						|
import sys
 | 
						|
import threading
 | 
						|
import doctest
 | 
						|
import unittest
 | 
						|
import textwrap
 | 
						|
import weakref
 | 
						|
import dis
 | 
						|
 | 
						|
try:
 | 
						|
    import ctypes
 | 
						|
except ImportError:
 | 
						|
    ctypes = None
 | 
						|
from test.support import (cpython_only,
 | 
						|
                          check_impl_detail, requires_debug_ranges,
 | 
						|
                          gc_collect, Py_GIL_DISABLED,
 | 
						|
                          suppress_immortalization,
 | 
						|
                          skip_if_suppress_immortalization)
 | 
						|
from test.support.script_helper import assert_python_ok
 | 
						|
from test.support import threading_helper, import_helper
 | 
						|
from test.support.bytecode_helper import instructions_with_positions
 | 
						|
from opcode import opmap, opname
 | 
						|
COPY_FREE_VARS = opmap['COPY_FREE_VARS']
 | 
						|
 | 
						|
 | 
						|
def consts(t):
 | 
						|
    """Yield a doctest-safe sequence of object reprs."""
 | 
						|
    for elt in t:
 | 
						|
        r = repr(elt)
 | 
						|
        if r.startswith("<code object"):
 | 
						|
            yield "<code object %s>" % elt.co_name
 | 
						|
        else:
 | 
						|
            yield r
 | 
						|
 | 
						|
def dump(co):
 | 
						|
    """Print out a text representation of a code object."""
 | 
						|
    for attr in ["name", "argcount", "posonlyargcount",
 | 
						|
                 "kwonlyargcount", "names", "varnames",
 | 
						|
                 "cellvars", "freevars", "nlocals", "flags"]:
 | 
						|
        print("%s: %s" % (attr, getattr(co, "co_" + attr)))
 | 
						|
    print("consts:", tuple(consts(co.co_consts)))
 | 
						|
 | 
						|
# Needed for test_closure_injection below
 | 
						|
# Defined at global scope to avoid implicitly closing over __class__
 | 
						|
def external_getitem(self, i):
 | 
						|
    return f"Foreign getitem: {super().__getitem__(i)}"
 | 
						|
 | 
						|
class CodeTest(unittest.TestCase):
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    def test_newempty(self):
 | 
						|
        _testcapi = import_helper.import_module("_testcapi")
 | 
						|
        co = _testcapi.code_newempty("filename", "funcname", 15)
 | 
						|
        self.assertEqual(co.co_filename, "filename")
 | 
						|
        self.assertEqual(co.co_name, "funcname")
 | 
						|
        self.assertEqual(co.co_firstlineno, 15)
 | 
						|
        #Empty code object should raise, but not crash the VM
 | 
						|
        with self.assertRaises(Exception):
 | 
						|
            exec(co)
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    def test_closure_injection(self):
 | 
						|
        # From https://bugs.python.org/issue32176
 | 
						|
        from types import FunctionType
 | 
						|
 | 
						|
        def create_closure(__class__):
 | 
						|
            return (lambda: __class__).__closure__
 | 
						|
 | 
						|
        def new_code(c):
 | 
						|
            '''A new code object with a __class__ cell added to freevars'''
 | 
						|
            return c.replace(co_freevars=c.co_freevars + ('__class__',), co_code=bytes([COPY_FREE_VARS, 1])+c.co_code)
 | 
						|
 | 
						|
        def add_foreign_method(cls, name, f):
 | 
						|
            code = new_code(f.__code__)
 | 
						|
            assert not f.__closure__
 | 
						|
            closure = create_closure(cls)
 | 
						|
            defaults = f.__defaults__
 | 
						|
            setattr(cls, name, FunctionType(code, globals(), name, defaults, closure))
 | 
						|
 | 
						|
        class List(list):
 | 
						|
            pass
 | 
						|
 | 
						|
        add_foreign_method(List, "__getitem__", external_getitem)
 | 
						|
 | 
						|
        # Ensure the closure injection actually worked
 | 
						|
        function = List.__getitem__
 | 
						|
        class_ref = function.__closure__[0].cell_contents
 | 
						|
        self.assertIs(class_ref, List)
 | 
						|
 | 
						|
        # Ensure the zero-arg super() call in the injected method works
 | 
						|
        obj = List([1, 2, 3])
 | 
						|
        self.assertEqual(obj[0], "Foreign getitem: 1")
 | 
						|
 | 
						|
    def test_constructor(self):
 | 
						|
        def func(): pass
 | 
						|
        co = func.__code__
 | 
						|
        CodeType = type(co)
 | 
						|
 | 
						|
        # test code constructor
 | 
						|
        CodeType(co.co_argcount,
 | 
						|
                        co.co_posonlyargcount,
 | 
						|
                        co.co_kwonlyargcount,
 | 
						|
                        co.co_nlocals,
 | 
						|
                        co.co_stacksize,
 | 
						|
                        co.co_flags,
 | 
						|
                        co.co_code,
 | 
						|
                        co.co_consts,
 | 
						|
                        co.co_names,
 | 
						|
                        co.co_varnames,
 | 
						|
                        co.co_filename,
 | 
						|
                        co.co_name,
 | 
						|
                        co.co_qualname,
 | 
						|
                        co.co_firstlineno,
 | 
						|
                        co.co_linetable,
 | 
						|
                        co.co_exceptiontable,
 | 
						|
                        co.co_freevars,
 | 
						|
                        co.co_cellvars)
 | 
						|
 | 
						|
    def test_qualname(self):
 | 
						|
        self.assertEqual(
 | 
						|
            CodeTest.test_qualname.__code__.co_qualname,
 | 
						|
            CodeTest.test_qualname.__qualname__
 | 
						|
        )
 | 
						|
 | 
						|
    def test_replace(self):
 | 
						|
        def func():
 | 
						|
            x = 1
 | 
						|
            return x
 | 
						|
        code = func.__code__
 | 
						|
 | 
						|
        # different co_name, co_varnames, co_consts
 | 
						|
        def func2():
 | 
						|
            y = 2
 | 
						|
            z = 3
 | 
						|
            return y
 | 
						|
        code2 = func2.__code__
 | 
						|
 | 
						|
        for attr, value in (
 | 
						|
            ("co_argcount", 0),
 | 
						|
            ("co_posonlyargcount", 0),
 | 
						|
            ("co_kwonlyargcount", 0),
 | 
						|
            ("co_nlocals", 1),
 | 
						|
            ("co_stacksize", 1),
 | 
						|
            ("co_flags", code.co_flags | inspect.CO_COROUTINE),
 | 
						|
            ("co_firstlineno", 100),
 | 
						|
            ("co_code", code2.co_code),
 | 
						|
            ("co_consts", code2.co_consts),
 | 
						|
            ("co_names", ("myname",)),
 | 
						|
            ("co_varnames", ('spam',)),
 | 
						|
            ("co_freevars", ("freevar",)),
 | 
						|
            ("co_cellvars", ("cellvar",)),
 | 
						|
            ("co_filename", "newfilename"),
 | 
						|
            ("co_name", "newname"),
 | 
						|
            ("co_linetable", code2.co_linetable),
 | 
						|
        ):
 | 
						|
            with self.subTest(attr=attr, value=value):
 | 
						|
                new_code = code.replace(**{attr: value})
 | 
						|
                self.assertEqual(getattr(new_code, attr), value)
 | 
						|
                new_code = copy.replace(code, **{attr: value})
 | 
						|
                self.assertEqual(getattr(new_code, attr), value)
 | 
						|
 | 
						|
        new_code = code.replace(co_varnames=code2.co_varnames,
 | 
						|
                                co_nlocals=code2.co_nlocals)
 | 
						|
        self.assertEqual(new_code.co_varnames, code2.co_varnames)
 | 
						|
        self.assertEqual(new_code.co_nlocals, code2.co_nlocals)
 | 
						|
        new_code = copy.replace(code, co_varnames=code2.co_varnames,
 | 
						|
                                co_nlocals=code2.co_nlocals)
 | 
						|
        self.assertEqual(new_code.co_varnames, code2.co_varnames)
 | 
						|
        self.assertEqual(new_code.co_nlocals, code2.co_nlocals)
 | 
						|
 | 
						|
    def test_nlocals_mismatch(self):
 | 
						|
        def func():
 | 
						|
            x = 1
 | 
						|
            return x
 | 
						|
        co = func.__code__
 | 
						|
        assert co.co_nlocals > 0;
 | 
						|
 | 
						|
        # First we try the constructor.
 | 
						|
        CodeType = type(co)
 | 
						|
        for diff in (-1, 1):
 | 
						|
            with self.assertRaises(ValueError):
 | 
						|
                CodeType(co.co_argcount,
 | 
						|
                         co.co_posonlyargcount,
 | 
						|
                         co.co_kwonlyargcount,
 | 
						|
                         # This is the only change.
 | 
						|
                         co.co_nlocals + diff,
 | 
						|
                         co.co_stacksize,
 | 
						|
                         co.co_flags,
 | 
						|
                         co.co_code,
 | 
						|
                         co.co_consts,
 | 
						|
                         co.co_names,
 | 
						|
                         co.co_varnames,
 | 
						|
                         co.co_filename,
 | 
						|
                         co.co_name,
 | 
						|
                         co.co_qualname,
 | 
						|
                         co.co_firstlineno,
 | 
						|
                         co.co_linetable,
 | 
						|
                         co.co_exceptiontable,
 | 
						|
                         co.co_freevars,
 | 
						|
                         co.co_cellvars,
 | 
						|
                         )
 | 
						|
        # Then we try the replace method.
 | 
						|
        with self.assertRaises(ValueError):
 | 
						|
            co.replace(co_nlocals=co.co_nlocals - 1)
 | 
						|
        with self.assertRaises(ValueError):
 | 
						|
            co.replace(co_nlocals=co.co_nlocals + 1)
 | 
						|
 | 
						|
    def test_shrinking_localsplus(self):
 | 
						|
        # Check that PyCode_NewWithPosOnlyArgs resizes both
 | 
						|
        # localsplusnames and localspluskinds, if an argument is a cell.
 | 
						|
        def func(arg):
 | 
						|
            return lambda: arg
 | 
						|
        code = func.__code__
 | 
						|
        newcode = code.replace(co_name="func")  # Should not raise SystemError
 | 
						|
        self.assertEqual(code, newcode)
 | 
						|
 | 
						|
    def test_empty_linetable(self):
 | 
						|
        def func():
 | 
						|
            pass
 | 
						|
        new_code = code = func.__code__.replace(co_linetable=b'')
 | 
						|
        self.assertEqual(list(new_code.co_lines()), [])
 | 
						|
 | 
						|
    def test_co_lnotab_is_deprecated(self):  # TODO: remove in 3.14
 | 
						|
        def func():
 | 
						|
            pass
 | 
						|
 | 
						|
        with self.assertWarns(DeprecationWarning):
 | 
						|
            func.__code__.co_lnotab
 | 
						|
 | 
						|
    def test_invalid_bytecode(self):
 | 
						|
        def foo():
 | 
						|
            pass
 | 
						|
 | 
						|
        # assert that opcode 229 is invalid
 | 
						|
        self.assertEqual(opname[229], '<229>')
 | 
						|
 | 
						|
        # change first opcode to 0xeb (=229)
 | 
						|
        foo.__code__ = foo.__code__.replace(
 | 
						|
            co_code=b'\xe5' + foo.__code__.co_code[1:])
 | 
						|
 | 
						|
        msg = "unknown opcode 229"
 | 
						|
        with self.assertRaisesRegex(SystemError, msg):
 | 
						|
            foo()
 | 
						|
 | 
						|
    @requires_debug_ranges()
 | 
						|
    def test_co_positions_artificial_instructions(self):
 | 
						|
        import dis
 | 
						|
 | 
						|
        namespace = {}
 | 
						|
        exec(textwrap.dedent("""\
 | 
						|
        try:
 | 
						|
            1/0
 | 
						|
        except Exception as e:
 | 
						|
            exc = e
 | 
						|
        """), namespace)
 | 
						|
 | 
						|
        exc = namespace['exc']
 | 
						|
        traceback = exc.__traceback__
 | 
						|
        code = traceback.tb_frame.f_code
 | 
						|
 | 
						|
        artificial_instructions = []
 | 
						|
        for instr, positions in instructions_with_positions(
 | 
						|
            dis.get_instructions(code), code.co_positions()
 | 
						|
        ):
 | 
						|
            # If any of the positions is None, then all have to
 | 
						|
            # be None as well for the case above. There are still
 | 
						|
            # some places in the compiler, where the artificial instructions
 | 
						|
            # get assigned the first_lineno but they don't have other positions.
 | 
						|
            # There is no easy way of inferring them at that stage, so for now
 | 
						|
            # we don't support it.
 | 
						|
            self.assertIn(positions.count(None), [0, 3, 4])
 | 
						|
 | 
						|
            if not any(positions):
 | 
						|
                artificial_instructions.append(instr)
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
            [
 | 
						|
                (instruction.opname, instruction.argval)
 | 
						|
                for instruction in artificial_instructions
 | 
						|
            ],
 | 
						|
            [
 | 
						|
                ("PUSH_EXC_INFO", None),
 | 
						|
                ("LOAD_CONST", None), # artificial 'None'
 | 
						|
                ("STORE_NAME", "e"),  # XX: we know the location for this
 | 
						|
                ("DELETE_NAME", "e"),
 | 
						|
                ("RERAISE", 1),
 | 
						|
                ("COPY", 3),
 | 
						|
                ("POP_EXCEPT", None),
 | 
						|
                ("RERAISE", 1)
 | 
						|
            ]
 | 
						|
        )
 | 
						|
 | 
						|
    def test_endline_and_columntable_none_when_no_debug_ranges(self):
 | 
						|
        # Make sure that if `-X no_debug_ranges` is used, there is
 | 
						|
        # minimal debug info
 | 
						|
        code = textwrap.dedent("""
 | 
						|
            def f():
 | 
						|
                pass
 | 
						|
 | 
						|
            positions = f.__code__.co_positions()
 | 
						|
            for line, end_line, column, end_column in positions:
 | 
						|
                assert line == end_line
 | 
						|
                assert column is None
 | 
						|
                assert end_column is None
 | 
						|
            """)
 | 
						|
        assert_python_ok('-X', 'no_debug_ranges', '-c', code)
 | 
						|
 | 
						|
    def test_endline_and_columntable_none_when_no_debug_ranges_env(self):
 | 
						|
        # Same as above but using the environment variable opt out.
 | 
						|
        code = textwrap.dedent("""
 | 
						|
            def f():
 | 
						|
                pass
 | 
						|
 | 
						|
            positions = f.__code__.co_positions()
 | 
						|
            for line, end_line, column, end_column in positions:
 | 
						|
                assert line == end_line
 | 
						|
                assert column is None
 | 
						|
                assert end_column is None
 | 
						|
            """)
 | 
						|
        assert_python_ok('-c', code, PYTHONNODEBUGRANGES='1')
 | 
						|
 | 
						|
    # co_positions behavior when info is missing.
 | 
						|
 | 
						|
    @requires_debug_ranges()
 | 
						|
    def test_co_positions_empty_linetable(self):
 | 
						|
        def func():
 | 
						|
            x = 1
 | 
						|
        new_code = func.__code__.replace(co_linetable=b'')
 | 
						|
        positions = new_code.co_positions()
 | 
						|
        for line, end_line, column, end_column in positions:
 | 
						|
            self.assertIsNone(line)
 | 
						|
            self.assertEqual(end_line, new_code.co_firstlineno + 1)
 | 
						|
 | 
						|
    def test_code_equality(self):
 | 
						|
        def f():
 | 
						|
            try:
 | 
						|
                a()
 | 
						|
            except:
 | 
						|
                b()
 | 
						|
            else:
 | 
						|
                c()
 | 
						|
            finally:
 | 
						|
                d()
 | 
						|
        code_a = f.__code__
 | 
						|
        code_b = code_a.replace(co_linetable=b"")
 | 
						|
        code_c = code_a.replace(co_exceptiontable=b"")
 | 
						|
        code_d = code_b.replace(co_exceptiontable=b"")
 | 
						|
        self.assertNotEqual(code_a, code_b)
 | 
						|
        self.assertNotEqual(code_a, code_c)
 | 
						|
        self.assertNotEqual(code_a, code_d)
 | 
						|
        self.assertNotEqual(code_b, code_c)
 | 
						|
        self.assertNotEqual(code_b, code_d)
 | 
						|
        self.assertNotEqual(code_c, code_d)
 | 
						|
 | 
						|
    def test_code_hash_uses_firstlineno(self):
 | 
						|
        c1 = (lambda: 1).__code__
 | 
						|
        c2 = (lambda: 1).__code__
 | 
						|
        self.assertNotEqual(c1, c2)
 | 
						|
        self.assertNotEqual(hash(c1), hash(c2))
 | 
						|
        c3 = c1.replace(co_firstlineno=17)
 | 
						|
        self.assertNotEqual(c1, c3)
 | 
						|
        self.assertNotEqual(hash(c1), hash(c3))
 | 
						|
 | 
						|
    def test_code_hash_uses_order(self):
 | 
						|
        # Swapping posonlyargcount and kwonlyargcount should change the hash.
 | 
						|
        c = (lambda x, y, *, z=1, w=1: 1).__code__
 | 
						|
        self.assertEqual(c.co_argcount, 2)
 | 
						|
        self.assertEqual(c.co_posonlyargcount, 0)
 | 
						|
        self.assertEqual(c.co_kwonlyargcount, 2)
 | 
						|
        swapped = c.replace(co_posonlyargcount=2, co_kwonlyargcount=0)
 | 
						|
        self.assertNotEqual(c, swapped)
 | 
						|
        self.assertNotEqual(hash(c), hash(swapped))
 | 
						|
 | 
						|
    def test_code_hash_uses_bytecode(self):
 | 
						|
        c = (lambda x, y: x + y).__code__
 | 
						|
        d = (lambda x, y: x * y).__code__
 | 
						|
        c1 = c.replace(co_code=d.co_code)
 | 
						|
        self.assertNotEqual(c, c1)
 | 
						|
        self.assertNotEqual(hash(c), hash(c1))
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    def test_code_equal_with_instrumentation(self):
 | 
						|
        """ GH-109052
 | 
						|
 | 
						|
        Make sure the instrumentation doesn't affect the code equality
 | 
						|
        The validity of this test relies on the fact that "x is x" and
 | 
						|
        "x in x" have only one different instruction and the instructions
 | 
						|
        have the same argument.
 | 
						|
 | 
						|
        """
 | 
						|
        code1 = compile("x is x", "example.py", "eval")
 | 
						|
        code2 = compile("x in x", "example.py", "eval")
 | 
						|
        sys._getframe().f_trace_opcodes = True
 | 
						|
        sys.settrace(lambda *args: None)
 | 
						|
        exec(code1, {'x': []})
 | 
						|
        exec(code2, {'x': []})
 | 
						|
        self.assertNotEqual(code1, code2)
 | 
						|
        sys.settrace(None)
 | 
						|
 | 
						|
 | 
						|
def isinterned(s):
 | 
						|
    return s is sys.intern(('_' + s + '_')[1:-1])
 | 
						|
 | 
						|
class CodeConstsTest(unittest.TestCase):
 | 
						|
 | 
						|
    def find_const(self, consts, value):
 | 
						|
        for v in consts:
 | 
						|
            if v == value:
 | 
						|
                return v
 | 
						|
        self.assertIn(value, consts)  # raises an exception
 | 
						|
        self.fail('Should never be reached')
 | 
						|
 | 
						|
    def assertIsInterned(self, s):
 | 
						|
        if not isinterned(s):
 | 
						|
            self.fail('String %r is not interned' % (s,))
 | 
						|
 | 
						|
    def assertIsNotInterned(self, s):
 | 
						|
        if isinterned(s):
 | 
						|
            self.fail('String %r is interned' % (s,))
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    def test_interned_string(self):
 | 
						|
        co = compile('res = "str_value"', '?', 'exec')
 | 
						|
        v = self.find_const(co.co_consts, 'str_value')
 | 
						|
        self.assertIsInterned(v)
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    def test_interned_string_in_tuple(self):
 | 
						|
        co = compile('res = ("str_value",)', '?', 'exec')
 | 
						|
        v = self.find_const(co.co_consts, ('str_value',))
 | 
						|
        self.assertIsInterned(v[0])
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    def test_interned_string_in_frozenset(self):
 | 
						|
        co = compile('res = a in {"str_value"}', '?', 'exec')
 | 
						|
        v = self.find_const(co.co_consts, frozenset(('str_value',)))
 | 
						|
        self.assertIsInterned(tuple(v)[0])
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    def test_interned_string_default(self):
 | 
						|
        def f(a='str_value'):
 | 
						|
            return a
 | 
						|
        self.assertIsInterned(f())
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    @unittest.skipIf(Py_GIL_DISABLED, "free-threaded build interns all string constants")
 | 
						|
    def test_interned_string_with_null(self):
 | 
						|
        co = compile(r'res = "str\0value!"', '?', 'exec')
 | 
						|
        v = self.find_const(co.co_consts, 'str\0value!')
 | 
						|
        self.assertIsNotInterned(v)
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    @unittest.skipUnless(Py_GIL_DISABLED, "does not intern all constants")
 | 
						|
    @skip_if_suppress_immortalization()
 | 
						|
    def test_interned_constants(self):
 | 
						|
        # compile separately to avoid compile time de-duping
 | 
						|
 | 
						|
        globals = {}
 | 
						|
        exec(textwrap.dedent("""
 | 
						|
            def func1():
 | 
						|
                return (0.0, (1, 2, "hello"))
 | 
						|
        """), globals)
 | 
						|
 | 
						|
        exec(textwrap.dedent("""
 | 
						|
            def func2():
 | 
						|
                return (0.0, (1, 2, "hello"))
 | 
						|
        """), globals)
 | 
						|
 | 
						|
        self.assertTrue(globals["func1"]() is globals["func2"]())
 | 
						|
 | 
						|
 | 
						|
class CodeWeakRefTest(unittest.TestCase):
 | 
						|
 | 
						|
    @suppress_immortalization()
 | 
						|
    def test_basic(self):
 | 
						|
        # Create a code object in a clean environment so that we know we have
 | 
						|
        # the only reference to it left.
 | 
						|
        namespace = {}
 | 
						|
        exec("def f(): pass", globals(), namespace)
 | 
						|
        f = namespace["f"]
 | 
						|
        del namespace
 | 
						|
 | 
						|
        self.called = False
 | 
						|
        def callback(code):
 | 
						|
            self.called = True
 | 
						|
 | 
						|
        # f is now the last reference to the function, and through it, the code
 | 
						|
        # object.  While we hold it, check that we can create a weakref and
 | 
						|
        # deref it.  Then delete it, and check that the callback gets called and
 | 
						|
        # the reference dies.
 | 
						|
        coderef = weakref.ref(f.__code__, callback)
 | 
						|
        self.assertTrue(bool(coderef()))
 | 
						|
        del f
 | 
						|
        gc_collect()  # For PyPy or other GCs.
 | 
						|
        self.assertFalse(bool(coderef()))
 | 
						|
        self.assertTrue(self.called)
 | 
						|
 | 
						|
# Python implementation of location table parsing algorithm
 | 
						|
def read(it):
 | 
						|
    return next(it)
 | 
						|
 | 
						|
def read_varint(it):
 | 
						|
    b = read(it)
 | 
						|
    val = b & 63;
 | 
						|
    shift = 0;
 | 
						|
    while b & 64:
 | 
						|
        b = read(it)
 | 
						|
        shift += 6
 | 
						|
        val |= (b&63) << shift
 | 
						|
    return val
 | 
						|
 | 
						|
def read_signed_varint(it):
 | 
						|
    uval = read_varint(it)
 | 
						|
    if uval & 1:
 | 
						|
        return -(uval >> 1)
 | 
						|
    else:
 | 
						|
        return uval >> 1
 | 
						|
 | 
						|
def parse_location_table(code):
 | 
						|
    line = code.co_firstlineno
 | 
						|
    it = iter(code.co_linetable)
 | 
						|
    while True:
 | 
						|
        try:
 | 
						|
            first_byte = read(it)
 | 
						|
        except StopIteration:
 | 
						|
            return
 | 
						|
        code = (first_byte >> 3) & 15
 | 
						|
        length = (first_byte & 7) + 1
 | 
						|
        if code == 15:
 | 
						|
            yield (code, length, None, None, None, None)
 | 
						|
        elif code == 14:
 | 
						|
            line_delta = read_signed_varint(it)
 | 
						|
            line += line_delta
 | 
						|
            end_line = line + read_varint(it)
 | 
						|
            col = read_varint(it)
 | 
						|
            if col == 0:
 | 
						|
                col = None
 | 
						|
            else:
 | 
						|
                col -= 1
 | 
						|
            end_col = read_varint(it)
 | 
						|
            if end_col == 0:
 | 
						|
                end_col = None
 | 
						|
            else:
 | 
						|
                end_col -= 1
 | 
						|
            yield (code, length, line, end_line, col, end_col)
 | 
						|
        elif code == 13: # No column
 | 
						|
            line_delta = read_signed_varint(it)
 | 
						|
            line += line_delta
 | 
						|
            yield (code, length, line, line, None, None)
 | 
						|
        elif code in (10, 11, 12): # new line
 | 
						|
            line_delta = code - 10
 | 
						|
            line += line_delta
 | 
						|
            column = read(it)
 | 
						|
            end_column = read(it)
 | 
						|
            yield (code, length, line, line, column, end_column)
 | 
						|
        else:
 | 
						|
            assert (0 <= code < 10)
 | 
						|
            second_byte = read(it)
 | 
						|
            column = code << 3 | (second_byte >> 4)
 | 
						|
            yield (code, length, line, line, column, column + (second_byte & 15))
 | 
						|
 | 
						|
def positions_from_location_table(code):
 | 
						|
    for _, length, line, end_line, col, end_col in parse_location_table(code):
 | 
						|
        for _ in range(length):
 | 
						|
            yield (line, end_line, col, end_col)
 | 
						|
 | 
						|
def dedup(lst, prev=object()):
 | 
						|
    for item in lst:
 | 
						|
        if item != prev:
 | 
						|
            yield item
 | 
						|
            prev = item
 | 
						|
 | 
						|
def lines_from_postions(positions):
 | 
						|
    return dedup(l for (l, _, _, _) in positions)
 | 
						|
 | 
						|
def misshappen():
 | 
						|
    """
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
    """
 | 
						|
    x = (
 | 
						|
 | 
						|
 | 
						|
        4
 | 
						|
 | 
						|
        +
 | 
						|
 | 
						|
        y
 | 
						|
 | 
						|
    )
 | 
						|
    y = (
 | 
						|
        a
 | 
						|
        +
 | 
						|
            b
 | 
						|
                +
 | 
						|
 | 
						|
                d
 | 
						|
        )
 | 
						|
    return q if (
 | 
						|
 | 
						|
        x
 | 
						|
 | 
						|
        ) else p
 | 
						|
 | 
						|
def bug93662():
 | 
						|
    example_report_generation_message= (
 | 
						|
            """
 | 
						|
            """
 | 
						|
    ).strip()
 | 
						|
    raise ValueError()
 | 
						|
 | 
						|
 | 
						|
class CodeLocationTest(unittest.TestCase):
 | 
						|
 | 
						|
    def check_positions(self, func):
 | 
						|
        pos1 = list(func.__code__.co_positions())
 | 
						|
        pos2 = list(positions_from_location_table(func.__code__))
 | 
						|
        for l1, l2 in zip(pos1, pos2):
 | 
						|
            self.assertEqual(l1, l2)
 | 
						|
        self.assertEqual(len(pos1), len(pos2))
 | 
						|
 | 
						|
    def test_positions(self):
 | 
						|
        self.check_positions(parse_location_table)
 | 
						|
        self.check_positions(misshappen)
 | 
						|
        self.check_positions(bug93662)
 | 
						|
 | 
						|
    def check_lines(self, func):
 | 
						|
        co = func.__code__
 | 
						|
        lines1 = [line for _, _, line in co.co_lines()]
 | 
						|
        self.assertEqual(lines1, list(dedup(lines1)))
 | 
						|
        lines2 = list(lines_from_postions(positions_from_location_table(co)))
 | 
						|
        for l1, l2 in zip(lines1, lines2):
 | 
						|
            self.assertEqual(l1, l2)
 | 
						|
        self.assertEqual(len(lines1), len(lines2))
 | 
						|
 | 
						|
    def test_lines(self):
 | 
						|
        self.check_lines(parse_location_table)
 | 
						|
        self.check_lines(misshappen)
 | 
						|
        self.check_lines(bug93662)
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    def test_code_new_empty(self):
 | 
						|
        # If this test fails, it means that the construction of PyCode_NewEmpty
 | 
						|
        # needs to be modified! Please update this test *and* PyCode_NewEmpty,
 | 
						|
        # so that they both stay in sync.
 | 
						|
        def f():
 | 
						|
            pass
 | 
						|
        PY_CODE_LOCATION_INFO_NO_COLUMNS = 13
 | 
						|
        f.__code__ = f.__code__.replace(
 | 
						|
            co_stacksize=1,
 | 
						|
            co_firstlineno=42,
 | 
						|
            co_code=bytes(
 | 
						|
                [
 | 
						|
                    dis.opmap["RESUME"], 0,
 | 
						|
                    dis.opmap["LOAD_COMMON_CONSTANT"], 0,
 | 
						|
                    dis.opmap["RAISE_VARARGS"], 1,
 | 
						|
                ]
 | 
						|
            ),
 | 
						|
            co_linetable=bytes(
 | 
						|
                [
 | 
						|
                    (1 << 7)
 | 
						|
                    | (PY_CODE_LOCATION_INFO_NO_COLUMNS << 3)
 | 
						|
                    | (3 - 1),
 | 
						|
                    0,
 | 
						|
                ]
 | 
						|
            ),
 | 
						|
        )
 | 
						|
        self.assertRaises(AssertionError, f)
 | 
						|
        self.assertEqual(
 | 
						|
            list(f.__code__.co_positions()),
 | 
						|
            3 * [(42, 42, None, None)],
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
if check_impl_detail(cpython=True) and ctypes is not None:
 | 
						|
    py = ctypes.pythonapi
 | 
						|
    freefunc = ctypes.CFUNCTYPE(None,ctypes.c_voidp)
 | 
						|
 | 
						|
    RequestCodeExtraIndex = py.PyUnstable_Eval_RequestCodeExtraIndex
 | 
						|
    RequestCodeExtraIndex.argtypes = (freefunc,)
 | 
						|
    RequestCodeExtraIndex.restype = ctypes.c_ssize_t
 | 
						|
 | 
						|
    SetExtra = py.PyUnstable_Code_SetExtra
 | 
						|
    SetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.c_voidp)
 | 
						|
    SetExtra.restype = ctypes.c_int
 | 
						|
 | 
						|
    GetExtra = py.PyUnstable_Code_GetExtra
 | 
						|
    GetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t,
 | 
						|
                         ctypes.POINTER(ctypes.c_voidp))
 | 
						|
    GetExtra.restype = ctypes.c_int
 | 
						|
 | 
						|
    LAST_FREED = None
 | 
						|
    def myfree(ptr):
 | 
						|
        global LAST_FREED
 | 
						|
        LAST_FREED = ptr
 | 
						|
 | 
						|
    FREE_FUNC = freefunc(myfree)
 | 
						|
    FREE_INDEX = RequestCodeExtraIndex(FREE_FUNC)
 | 
						|
 | 
						|
    class CoExtra(unittest.TestCase):
 | 
						|
        def get_func(self):
 | 
						|
            # Defining a function causes the containing function to have a
 | 
						|
            # reference to the code object.  We need the code objects to go
 | 
						|
            # away, so we eval a lambda.
 | 
						|
            return eval('lambda:42')
 | 
						|
 | 
						|
        def test_get_non_code(self):
 | 
						|
            f = self.get_func()
 | 
						|
 | 
						|
            self.assertRaises(SystemError, SetExtra, 42, FREE_INDEX,
 | 
						|
                              ctypes.c_voidp(100))
 | 
						|
            self.assertRaises(SystemError, GetExtra, 42, FREE_INDEX,
 | 
						|
                              ctypes.c_voidp(100))
 | 
						|
 | 
						|
        def test_bad_index(self):
 | 
						|
            f = self.get_func()
 | 
						|
            self.assertRaises(SystemError, SetExtra, f.__code__,
 | 
						|
                              FREE_INDEX+100, ctypes.c_voidp(100))
 | 
						|
            self.assertEqual(GetExtra(f.__code__, FREE_INDEX+100,
 | 
						|
                              ctypes.c_voidp(100)), 0)
 | 
						|
 | 
						|
        @suppress_immortalization()
 | 
						|
        def test_free_called(self):
 | 
						|
            # Verify that the provided free function gets invoked
 | 
						|
            # when the code object is cleaned up.
 | 
						|
            f = self.get_func()
 | 
						|
 | 
						|
            SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(100))
 | 
						|
            del f
 | 
						|
            gc_collect()  # For free-threaded build
 | 
						|
            self.assertEqual(LAST_FREED, 100)
 | 
						|
 | 
						|
        def test_get_set(self):
 | 
						|
            # Test basic get/set round tripping.
 | 
						|
            f = self.get_func()
 | 
						|
 | 
						|
            extra = ctypes.c_voidp()
 | 
						|
 | 
						|
            SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(200))
 | 
						|
            # reset should free...
 | 
						|
            SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(300))
 | 
						|
            self.assertEqual(LAST_FREED, 200)
 | 
						|
 | 
						|
            extra = ctypes.c_voidp()
 | 
						|
            GetExtra(f.__code__, FREE_INDEX, extra)
 | 
						|
            self.assertEqual(extra.value, 300)
 | 
						|
            del f
 | 
						|
 | 
						|
        @threading_helper.requires_working_threading()
 | 
						|
        @suppress_immortalization()
 | 
						|
        def test_free_different_thread(self):
 | 
						|
            # Freeing a code object on a different thread then
 | 
						|
            # where the co_extra was set should be safe.
 | 
						|
            f = self.get_func()
 | 
						|
            class ThreadTest(threading.Thread):
 | 
						|
                def __init__(self, f, test):
 | 
						|
                    super().__init__()
 | 
						|
                    self.f = f
 | 
						|
                    self.test = test
 | 
						|
                def run(self):
 | 
						|
                    del self.f
 | 
						|
                    gc_collect()
 | 
						|
                    # gh-117683: In the free-threaded build, the code object's
 | 
						|
                    # destructor may still be running concurrently in the main
 | 
						|
                    # thread.
 | 
						|
                    if not Py_GIL_DISABLED:
 | 
						|
                        self.test.assertEqual(LAST_FREED, 500)
 | 
						|
 | 
						|
            SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(500))
 | 
						|
            tt = ThreadTest(f, self)
 | 
						|
            del f
 | 
						|
            tt.start()
 | 
						|
            tt.join()
 | 
						|
            gc_collect()  # For free-threaded build
 | 
						|
            self.assertEqual(LAST_FREED, 500)
 | 
						|
 | 
						|
 | 
						|
def load_tests(loader, tests, pattern):
 | 
						|
    tests.addTest(doctest.DocTestSuite())
 | 
						|
    return tests
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |