mirror of
https://github.com/python/cpython.git
synced 2025-11-01 14:11:41 +00:00
This changes the `sym_set_...()` functions to return a `bool` which is `false` when the symbol is `bottom` after the operation. All calls to such functions now check this result and go to `hit_bottom`, a special error label that prints a different message and then reports that it wasn't able to optimize the trace. No executor will be produced in this case.
927 lines
30 KiB
Python
927 lines
30 KiB
Python
import contextlib
|
|
import opcode
|
|
import sys
|
|
import textwrap
|
|
import unittest
|
|
import gc
|
|
import os
|
|
|
|
import _testinternalcapi
|
|
|
|
from test.support import script_helper
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def temporary_optimizer(opt):
|
|
old_opt = _testinternalcapi.get_optimizer()
|
|
_testinternalcapi.set_optimizer(opt)
|
|
try:
|
|
yield
|
|
finally:
|
|
_testinternalcapi.set_optimizer(old_opt)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def clear_executors(func):
|
|
# Clear executors in func before and after running a block
|
|
func.__code__ = func.__code__.replace()
|
|
try:
|
|
yield
|
|
finally:
|
|
func.__code__ = func.__code__.replace()
|
|
|
|
|
|
class TestOptimizerAPI(unittest.TestCase):
|
|
|
|
def test_new_counter_optimizer_dealloc(self):
|
|
# See gh-108727
|
|
def f():
|
|
_testinternalcapi.new_counter_optimizer()
|
|
|
|
f()
|
|
|
|
def test_get_set_optimizer(self):
|
|
old = _testinternalcapi.get_optimizer()
|
|
opt = _testinternalcapi.new_counter_optimizer()
|
|
try:
|
|
_testinternalcapi.set_optimizer(opt)
|
|
self.assertEqual(_testinternalcapi.get_optimizer(), opt)
|
|
_testinternalcapi.set_optimizer(None)
|
|
self.assertEqual(_testinternalcapi.get_optimizer(), None)
|
|
finally:
|
|
_testinternalcapi.set_optimizer(old)
|
|
|
|
|
|
def test_counter_optimizer(self):
|
|
# Generate a new function at each call
|
|
ns = {}
|
|
exec(textwrap.dedent("""
|
|
def loop():
|
|
for _ in range(1000):
|
|
pass
|
|
"""), ns, ns)
|
|
loop = ns['loop']
|
|
|
|
for repeat in range(5):
|
|
opt = _testinternalcapi.new_counter_optimizer()
|
|
with temporary_optimizer(opt):
|
|
self.assertEqual(opt.get_count(), 0)
|
|
with clear_executors(loop):
|
|
loop()
|
|
self.assertEqual(opt.get_count(), 1000)
|
|
|
|
def test_long_loop(self):
|
|
"Check that we aren't confused by EXTENDED_ARG"
|
|
|
|
# Generate a new function at each call
|
|
ns = {}
|
|
exec(textwrap.dedent("""
|
|
def nop():
|
|
pass
|
|
|
|
def long_loop():
|
|
for _ in range(10):
|
|
nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
|
|
nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
|
|
nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
|
|
nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
|
|
nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
|
|
nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
|
|
nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
|
|
"""), ns, ns)
|
|
long_loop = ns['long_loop']
|
|
|
|
opt = _testinternalcapi.new_counter_optimizer()
|
|
with temporary_optimizer(opt):
|
|
self.assertEqual(opt.get_count(), 0)
|
|
long_loop()
|
|
self.assertEqual(opt.get_count(), 10)
|
|
|
|
def test_code_restore_for_ENTER_EXECUTOR(self):
|
|
def testfunc(x):
|
|
i = 0
|
|
while i < x:
|
|
i += 1
|
|
|
|
opt = _testinternalcapi.new_counter_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(1000)
|
|
code, replace_code = testfunc.__code__, testfunc.__code__.replace()
|
|
self.assertEqual(code, replace_code)
|
|
self.assertEqual(hash(code), hash(replace_code))
|
|
|
|
|
|
def get_first_executor(func):
|
|
code = func.__code__
|
|
co_code = code.co_code
|
|
JUMP_BACKWARD = opcode.opmap["JUMP_BACKWARD"]
|
|
for i in range(0, len(co_code), 2):
|
|
if co_code[i] == JUMP_BACKWARD:
|
|
try:
|
|
return _testinternalcapi.get_executor(code, i)
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def iter_opnames(ex):
|
|
for item in ex:
|
|
yield item[0]
|
|
|
|
|
|
def get_opnames(ex):
|
|
return set(iter_opnames(ex))
|
|
|
|
|
|
class TestExecutorInvalidation(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.old = _testinternalcapi.get_optimizer()
|
|
self.opt = _testinternalcapi.new_counter_optimizer()
|
|
_testinternalcapi.set_optimizer(self.opt)
|
|
|
|
def tearDown(self):
|
|
_testinternalcapi.set_optimizer(self.old)
|
|
|
|
def test_invalidate_object(self):
|
|
# Generate a new set of functions at each call
|
|
ns = {}
|
|
func_src = "\n".join(
|
|
f"""
|
|
def f{n}():
|
|
for _ in range(1000):
|
|
pass
|
|
""" for n in range(5)
|
|
)
|
|
exec(textwrap.dedent(func_src), ns, ns)
|
|
funcs = [ ns[f'f{n}'] for n in range(5)]
|
|
objects = [object() for _ in range(5)]
|
|
|
|
for f in funcs:
|
|
f()
|
|
executors = [get_first_executor(f) for f in funcs]
|
|
# Set things up so each executor depends on the objects
|
|
# with an equal or lower index.
|
|
for i, exe in enumerate(executors):
|
|
self.assertTrue(exe.is_valid())
|
|
for obj in objects[:i+1]:
|
|
_testinternalcapi.add_executor_dependency(exe, obj)
|
|
self.assertTrue(exe.is_valid())
|
|
# Assert that the correct executors are invalidated
|
|
# and check that nothing crashes when we invalidate
|
|
# an executor mutliple times.
|
|
for i in (4,3,2,1,0):
|
|
_testinternalcapi.invalidate_executors(objects[i])
|
|
for exe in executors[i:]:
|
|
self.assertFalse(exe.is_valid())
|
|
for exe in executors[:i]:
|
|
self.assertTrue(exe.is_valid())
|
|
|
|
def test_uop_optimizer_invalidation(self):
|
|
# Generate a new function at each call
|
|
ns = {}
|
|
exec(textwrap.dedent("""
|
|
def f():
|
|
for i in range(1000):
|
|
pass
|
|
"""), ns, ns)
|
|
f = ns['f']
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
f()
|
|
exe = get_first_executor(f)
|
|
self.assertIsNotNone(exe)
|
|
self.assertTrue(exe.is_valid())
|
|
_testinternalcapi.invalidate_executors(f.__code__)
|
|
self.assertFalse(exe.is_valid())
|
|
|
|
def test_sys__clear_internal_caches(self):
|
|
def f():
|
|
for _ in range(1000):
|
|
pass
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
f()
|
|
exe = get_first_executor(f)
|
|
self.assertIsNotNone(exe)
|
|
self.assertTrue(exe.is_valid())
|
|
sys._clear_internal_caches()
|
|
self.assertFalse(exe.is_valid())
|
|
exe = get_first_executor(f)
|
|
self.assertIsNone(exe)
|
|
|
|
|
|
@unittest.skipIf(os.getenv("PYTHON_UOPS_OPTIMIZE") == "0", "Needs uop optimizer to run.")
|
|
class TestUops(unittest.TestCase):
|
|
|
|
def test_basic_loop(self):
|
|
def testfunc(x):
|
|
i = 0
|
|
while i < x:
|
|
i += 1
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(1000)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_SET_IP", uops)
|
|
self.assertIn("_LOAD_FAST_0", uops)
|
|
|
|
def test_extended_arg(self):
|
|
"Check EXTENDED_ARG handling in superblock creation"
|
|
ns = {}
|
|
exec(textwrap.dedent("""
|
|
def many_vars():
|
|
# 260 vars, so z9 should have index 259
|
|
a0 = a1 = a2 = a3 = a4 = a5 = a6 = a7 = a8 = a9 = 42
|
|
b0 = b1 = b2 = b3 = b4 = b5 = b6 = b7 = b8 = b9 = 42
|
|
c0 = c1 = c2 = c3 = c4 = c5 = c6 = c7 = c8 = c9 = 42
|
|
d0 = d1 = d2 = d3 = d4 = d5 = d6 = d7 = d8 = d9 = 42
|
|
e0 = e1 = e2 = e3 = e4 = e5 = e6 = e7 = e8 = e9 = 42
|
|
f0 = f1 = f2 = f3 = f4 = f5 = f6 = f7 = f8 = f9 = 42
|
|
g0 = g1 = g2 = g3 = g4 = g5 = g6 = g7 = g8 = g9 = 42
|
|
h0 = h1 = h2 = h3 = h4 = h5 = h6 = h7 = h8 = h9 = 42
|
|
i0 = i1 = i2 = i3 = i4 = i5 = i6 = i7 = i8 = i9 = 42
|
|
j0 = j1 = j2 = j3 = j4 = j5 = j6 = j7 = j8 = j9 = 42
|
|
k0 = k1 = k2 = k3 = k4 = k5 = k6 = k7 = k8 = k9 = 42
|
|
l0 = l1 = l2 = l3 = l4 = l5 = l6 = l7 = l8 = l9 = 42
|
|
m0 = m1 = m2 = m3 = m4 = m5 = m6 = m7 = m8 = m9 = 42
|
|
n0 = n1 = n2 = n3 = n4 = n5 = n6 = n7 = n8 = n9 = 42
|
|
o0 = o1 = o2 = o3 = o4 = o5 = o6 = o7 = o8 = o9 = 42
|
|
p0 = p1 = p2 = p3 = p4 = p5 = p6 = p7 = p8 = p9 = 42
|
|
q0 = q1 = q2 = q3 = q4 = q5 = q6 = q7 = q8 = q9 = 42
|
|
r0 = r1 = r2 = r3 = r4 = r5 = r6 = r7 = r8 = r9 = 42
|
|
s0 = s1 = s2 = s3 = s4 = s5 = s6 = s7 = s8 = s9 = 42
|
|
t0 = t1 = t2 = t3 = t4 = t5 = t6 = t7 = t8 = t9 = 42
|
|
u0 = u1 = u2 = u3 = u4 = u5 = u6 = u7 = u8 = u9 = 42
|
|
v0 = v1 = v2 = v3 = v4 = v5 = v6 = v7 = v8 = v9 = 42
|
|
w0 = w1 = w2 = w3 = w4 = w5 = w6 = w7 = w8 = w9 = 42
|
|
x0 = x1 = x2 = x3 = x4 = x5 = x6 = x7 = x8 = x9 = 42
|
|
y0 = y1 = y2 = y3 = y4 = y5 = y6 = y7 = y8 = y9 = 42
|
|
z0 = z1 = z2 = z3 = z4 = z5 = z6 = z7 = z8 = z9 = 42
|
|
while z9 > 0:
|
|
z9 = z9 - 1
|
|
"""), ns, ns)
|
|
many_vars = ns["many_vars"]
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
ex = get_first_executor(many_vars)
|
|
self.assertIsNone(ex)
|
|
many_vars()
|
|
|
|
ex = get_first_executor(many_vars)
|
|
self.assertIsNotNone(ex)
|
|
self.assertTrue(any((opcode, oparg, operand) == ("_LOAD_FAST", 259, 0)
|
|
for opcode, oparg, _, operand in list(ex)))
|
|
|
|
def test_unspecialized_unpack(self):
|
|
# An example of an unspecialized opcode
|
|
def testfunc(x):
|
|
i = 0
|
|
while i < x:
|
|
i += 1
|
|
a, b = {1: 2, 3: 3}
|
|
assert a == 1 and b == 3
|
|
i = 0
|
|
while i < x:
|
|
i += 1
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
|
|
with temporary_optimizer(opt):
|
|
testfunc(20)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_UNPACK_SEQUENCE", uops)
|
|
|
|
def test_pop_jump_if_false(self):
|
|
def testfunc(n):
|
|
i = 0
|
|
while i < n:
|
|
i += 1
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(20)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_GUARD_IS_TRUE_POP", uops)
|
|
|
|
def test_pop_jump_if_none(self):
|
|
def testfunc(a):
|
|
for x in a:
|
|
if x is None:
|
|
x = 0
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(range(20))
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_GUARD_IS_NOT_NONE_POP", uops)
|
|
|
|
def test_pop_jump_if_not_none(self):
|
|
def testfunc(a):
|
|
for x in a:
|
|
x = None
|
|
if x is not None:
|
|
x = 0
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(range(20))
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_GUARD_IS_NONE_POP", uops)
|
|
|
|
def test_pop_jump_if_true(self):
|
|
def testfunc(n):
|
|
i = 0
|
|
while not i >= n:
|
|
i += 1
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(20)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_GUARD_IS_FALSE_POP", uops)
|
|
|
|
def test_jump_backward(self):
|
|
def testfunc(n):
|
|
i = 0
|
|
while i < n:
|
|
i += 1
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(20)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_JUMP_TO_TOP", uops)
|
|
|
|
def test_jump_forward(self):
|
|
def testfunc(n):
|
|
a = 0
|
|
while a < n:
|
|
if a < 0:
|
|
a = -a
|
|
else:
|
|
a = +a
|
|
a += 1
|
|
return a
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(20)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
# Since there is no JUMP_FORWARD instruction,
|
|
# look for indirect evidence: the += operator
|
|
self.assertIn("_BINARY_OP_ADD_INT", uops)
|
|
|
|
def test_for_iter_range(self):
|
|
def testfunc(n):
|
|
total = 0
|
|
for i in range(n):
|
|
total += i
|
|
return total
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
total = testfunc(20)
|
|
self.assertEqual(total, 190)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
# for i, (opname, oparg) in enumerate(ex):
|
|
# print(f"{i:4d}: {opname:<20s} {oparg:3d}")
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_GUARD_NOT_EXHAUSTED_RANGE", uops)
|
|
# Verification that the jump goes past END_FOR
|
|
# is done by manual inspection of the output
|
|
|
|
def test_for_iter_list(self):
|
|
def testfunc(a):
|
|
total = 0
|
|
for i in a:
|
|
total += i
|
|
return total
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
a = list(range(20))
|
|
total = testfunc(a)
|
|
self.assertEqual(total, 190)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
# for i, (opname, oparg) in enumerate(ex):
|
|
# print(f"{i:4d}: {opname:<20s} {oparg:3d}")
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_GUARD_NOT_EXHAUSTED_LIST", uops)
|
|
# Verification that the jump goes past END_FOR
|
|
# is done by manual inspection of the output
|
|
|
|
def test_for_iter_tuple(self):
|
|
def testfunc(a):
|
|
total = 0
|
|
for i in a:
|
|
total += i
|
|
return total
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
a = tuple(range(20))
|
|
total = testfunc(a)
|
|
self.assertEqual(total, 190)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
# for i, (opname, oparg) in enumerate(ex):
|
|
# print(f"{i:4d}: {opname:<20s} {oparg:3d}")
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_GUARD_NOT_EXHAUSTED_TUPLE", uops)
|
|
# Verification that the jump goes past END_FOR
|
|
# is done by manual inspection of the output
|
|
|
|
def test_list_edge_case(self):
|
|
def testfunc(it):
|
|
for x in it:
|
|
pass
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
a = [1, 2, 3]
|
|
it = iter(a)
|
|
testfunc(it)
|
|
a.append(4)
|
|
with self.assertRaises(StopIteration):
|
|
next(it)
|
|
|
|
def test_call_py_exact_args(self):
|
|
def testfunc(n):
|
|
def dummy(x):
|
|
return x+1
|
|
for i in range(n):
|
|
dummy(i)
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(20)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_PUSH_FRAME", uops)
|
|
self.assertIn("_BINARY_OP_ADD_INT", uops)
|
|
|
|
def test_branch_taken(self):
|
|
def testfunc(n):
|
|
for i in range(n):
|
|
if i < 0:
|
|
i = 0
|
|
else:
|
|
i = 1
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
testfunc(20)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_GUARD_IS_FALSE_POP", uops)
|
|
|
|
def test_for_iter_tier_two(self):
|
|
class MyIter:
|
|
def __init__(self, n):
|
|
self.n = n
|
|
def __iter__(self):
|
|
return self
|
|
def __next__(self):
|
|
self.n -= 1
|
|
if self.n < 0:
|
|
raise StopIteration
|
|
return self.n
|
|
|
|
def testfunc(n, m):
|
|
x = 0
|
|
for i in range(m):
|
|
for j in MyIter(n):
|
|
x += 1000*i + j
|
|
return x
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
x = testfunc(10, 10)
|
|
|
|
self.assertEqual(x, sum(range(10)) * 10010)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_FOR_ITER_TIER_TWO", uops)
|
|
|
|
def test_confidence_score(self):
|
|
def testfunc(n):
|
|
bits = 0
|
|
for i in range(n):
|
|
if i & 0x01:
|
|
bits += 1
|
|
if i & 0x02:
|
|
bits += 1
|
|
if i&0x04:
|
|
bits += 1
|
|
if i&0x08:
|
|
bits += 1
|
|
if i&0x10:
|
|
bits += 1
|
|
if i&0x20:
|
|
bits += 1
|
|
return bits
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
x = testfunc(20)
|
|
|
|
self.assertEqual(x, 40)
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
ops = list(iter_opnames(ex))
|
|
#Since branch is 50/50 the trace could go either way.
|
|
count = ops.count("_GUARD_IS_TRUE_POP") + ops.count("_GUARD_IS_FALSE_POP")
|
|
self.assertLessEqual(count, 2)
|
|
|
|
|
|
@unittest.skipIf(os.getenv("PYTHON_UOPS_OPTIMIZE") == "0", "Needs uop optimizer to run.")
|
|
class TestUopsOptimization(unittest.TestCase):
|
|
|
|
def _run_with_optimizer(self, testfunc, arg):
|
|
res = None
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
with temporary_optimizer(opt):
|
|
res = testfunc(arg)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
return res, ex
|
|
|
|
|
|
def test_int_type_propagation(self):
|
|
def testfunc(loops):
|
|
num = 0
|
|
for i in range(loops):
|
|
x = num + num
|
|
a = x + 1
|
|
num += 1
|
|
return a
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertIsNotNone(ex)
|
|
self.assertEqual(res, 63)
|
|
binop_count = [opname for opname in iter_opnames(ex) if opname == "_BINARY_OP_ADD_INT"]
|
|
guard_both_int_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_INT"]
|
|
self.assertGreaterEqual(len(binop_count), 3)
|
|
self.assertLessEqual(len(guard_both_int_count), 1)
|
|
|
|
def test_int_type_propagation_through_frame(self):
|
|
def double(x):
|
|
return x + x
|
|
def testfunc(loops):
|
|
num = 0
|
|
for i in range(loops):
|
|
x = num + num
|
|
a = double(x)
|
|
num += 1
|
|
return a
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
res = None
|
|
with temporary_optimizer(opt):
|
|
res = testfunc(32)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
self.assertEqual(res, 124)
|
|
binop_count = [opname for opname in iter_opnames(ex) if opname == "_BINARY_OP_ADD_INT"]
|
|
guard_both_int_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_INT"]
|
|
self.assertGreaterEqual(len(binop_count), 3)
|
|
self.assertLessEqual(len(guard_both_int_count), 1)
|
|
|
|
def test_int_type_propagation_from_frame(self):
|
|
def double(x):
|
|
return x + x
|
|
def testfunc(loops):
|
|
num = 0
|
|
for i in range(loops):
|
|
a = double(num)
|
|
x = a + a
|
|
num += 1
|
|
return x
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
res = None
|
|
with temporary_optimizer(opt):
|
|
res = testfunc(32)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
self.assertIsNotNone(ex)
|
|
self.assertEqual(res, 124)
|
|
binop_count = [opname for opname in iter_opnames(ex) if opname == "_BINARY_OP_ADD_INT"]
|
|
guard_both_int_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_INT"]
|
|
self.assertGreaterEqual(len(binop_count), 3)
|
|
self.assertLessEqual(len(guard_both_int_count), 1)
|
|
|
|
def test_int_impure_region(self):
|
|
def testfunc(loops):
|
|
num = 0
|
|
while num < loops:
|
|
x = num + num
|
|
y = 1
|
|
x // 2
|
|
a = x + y
|
|
num += 1
|
|
return a
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 64)
|
|
self.assertIsNotNone(ex)
|
|
binop_count = [opname for opname in iter_opnames(ex) if opname == "_BINARY_OP_ADD_INT"]
|
|
self.assertGreaterEqual(len(binop_count), 3)
|
|
|
|
def test_call_py_exact_args(self):
|
|
def testfunc(n):
|
|
def dummy(x):
|
|
return x+1
|
|
for i in range(n):
|
|
dummy(i)
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_PUSH_FRAME", uops)
|
|
self.assertIn("_BINARY_OP_ADD_INT", uops)
|
|
self.assertNotIn("_CHECK_PEP_523", uops)
|
|
|
|
def test_int_type_propagate_through_range(self):
|
|
def testfunc(n):
|
|
|
|
for i in range(n):
|
|
x = i + i
|
|
return x
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertEqual(res, 62)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertNotIn("_GUARD_BOTH_INT", uops)
|
|
|
|
def test_int_value_numbering(self):
|
|
def testfunc(n):
|
|
|
|
y = 1
|
|
for i in range(n):
|
|
x = y
|
|
z = x
|
|
a = z
|
|
b = a
|
|
res = x + z + a + b
|
|
return res
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertEqual(res, 4)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertIn("_GUARD_BOTH_INT", uops)
|
|
guard_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_INT"]
|
|
self.assertEqual(len(guard_count), 1)
|
|
|
|
def test_comprehension(self):
|
|
def testfunc(n):
|
|
for _ in range(n):
|
|
return [i for i in range(n)]
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertEqual(res, list(range(32)))
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertNotIn("_BINARY_OP_ADD_INT", uops)
|
|
|
|
def test_call_py_exact_args_disappearing(self):
|
|
def dummy(x):
|
|
return x+1
|
|
|
|
def testfunc(n):
|
|
for i in range(n):
|
|
dummy(i)
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
# Trigger specialization
|
|
testfunc(8)
|
|
with temporary_optimizer(opt):
|
|
del dummy
|
|
gc.collect()
|
|
|
|
def dummy(x):
|
|
return x + 2
|
|
testfunc(32)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
# Honestly as long as it doesn't crash it's fine.
|
|
# Whether we get an executor or not is non-deterministic,
|
|
# because it's decided by when the function is freed.
|
|
# This test is a little implementation specific.
|
|
|
|
def test_promote_globals_to_constants(self):
|
|
|
|
result = script_helper.run_python_until_end('-c', textwrap.dedent("""
|
|
import _testinternalcapi
|
|
import opcode
|
|
|
|
def get_first_executor(func):
|
|
code = func.__code__
|
|
co_code = code.co_code
|
|
JUMP_BACKWARD = opcode.opmap["JUMP_BACKWARD"]
|
|
for i in range(0, len(co_code), 2):
|
|
if co_code[i] == JUMP_BACKWARD:
|
|
try:
|
|
return _testinternalcapi.get_executor(code, i)
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
def get_opnames(ex):
|
|
return {item[0] for item in ex}
|
|
|
|
def testfunc(n):
|
|
for i in range(n):
|
|
x = range(i)
|
|
return x
|
|
|
|
opt = _testinternalcapi.new_uop_optimizer()
|
|
_testinternalcapi.set_optimizer(opt)
|
|
testfunc(64)
|
|
|
|
ex = get_first_executor(testfunc)
|
|
assert ex is not None
|
|
uops = get_opnames(ex)
|
|
assert "_LOAD_GLOBAL_BUILTINS" not in uops
|
|
assert "_LOAD_CONST_INLINE_BORROW_WITH_NULL" in uops
|
|
"""))
|
|
self.assertEqual(result[0].rc, 0, result)
|
|
|
|
def test_float_add_constant_propagation(self):
|
|
def testfunc(n):
|
|
a = 1.0
|
|
for _ in range(n):
|
|
a = a + 0.1
|
|
return a
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertAlmostEqual(res, 4.2)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
guard_both_float_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_FLOAT"]
|
|
self.assertLessEqual(len(guard_both_float_count), 1)
|
|
# TODO gh-115506: this assertion may change after propagating constants.
|
|
# We'll also need to verify that propagation actually occurs.
|
|
self.assertIn("_BINARY_OP_ADD_FLOAT", uops)
|
|
|
|
def test_float_subtract_constant_propagation(self):
|
|
def testfunc(n):
|
|
a = 1.0
|
|
for _ in range(n):
|
|
a = a - 0.1
|
|
return a
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertAlmostEqual(res, -2.2)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
guard_both_float_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_FLOAT"]
|
|
self.assertLessEqual(len(guard_both_float_count), 1)
|
|
# TODO gh-115506: this assertion may change after propagating constants.
|
|
# We'll also need to verify that propagation actually occurs.
|
|
self.assertIn("_BINARY_OP_SUBTRACT_FLOAT", uops)
|
|
|
|
def test_float_multiply_constant_propagation(self):
|
|
def testfunc(n):
|
|
a = 1.0
|
|
for _ in range(n):
|
|
a = a * 2.0
|
|
return a
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertAlmostEqual(res, 2 ** 32)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
guard_both_float_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_FLOAT"]
|
|
self.assertLessEqual(len(guard_both_float_count), 1)
|
|
# TODO gh-115506: this assertion may change after propagating constants.
|
|
# We'll also need to verify that propagation actually occurs.
|
|
self.assertIn("_BINARY_OP_MULTIPLY_FLOAT", uops)
|
|
|
|
def test_compare_op_type_propagation_float(self):
|
|
def testfunc(n):
|
|
a = 1.0
|
|
for _ in range(n):
|
|
x = a == a
|
|
x = a == a
|
|
x = a == a
|
|
x = a == a
|
|
return x
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertTrue(res)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
guard_both_float_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_FLOAT"]
|
|
self.assertLessEqual(len(guard_both_float_count), 1)
|
|
self.assertIn("_COMPARE_OP_FLOAT", uops)
|
|
|
|
def test_compare_op_type_propagation_int(self):
|
|
def testfunc(n):
|
|
a = 1
|
|
for _ in range(n):
|
|
x = a == a
|
|
x = a == a
|
|
x = a == a
|
|
x = a == a
|
|
return x
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertTrue(res)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
guard_both_float_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_INT"]
|
|
self.assertLessEqual(len(guard_both_float_count), 1)
|
|
self.assertIn("_COMPARE_OP_INT", uops)
|
|
|
|
def test_compare_op_type_propagation_unicode(self):
|
|
def testfunc(n):
|
|
a = ""
|
|
for _ in range(n):
|
|
x = a == a
|
|
x = a == a
|
|
x = a == a
|
|
x = a == a
|
|
return x
|
|
|
|
res, ex = self._run_with_optimizer(testfunc, 32)
|
|
self.assertTrue(res)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
guard_both_float_count = [opname for opname in iter_opnames(ex) if opname == "_GUARD_BOTH_UNICODE"]
|
|
self.assertLessEqual(len(guard_both_float_count), 1)
|
|
self.assertIn("_COMPARE_OP_STR", uops)
|
|
|
|
def test_type_inconsistency(self):
|
|
ns = {}
|
|
src = textwrap.dedent("""
|
|
def testfunc(n):
|
|
for i in range(n):
|
|
x = _test_global + _test_global
|
|
""")
|
|
exec(src, ns, ns)
|
|
testfunc = ns['testfunc']
|
|
ns['_test_global'] = 0
|
|
_, ex = self._run_with_optimizer(testfunc, 16)
|
|
self.assertIsNone(ex)
|
|
ns['_test_global'] = 1
|
|
_, ex = self._run_with_optimizer(testfunc, 16)
|
|
self.assertIsNotNone(ex)
|
|
uops = get_opnames(ex)
|
|
self.assertNotIn("_GUARD_BOTH_INT", uops)
|
|
self.assertIn("_BINARY_OP_ADD_INT", uops)
|
|
# Try again, but between the runs, set the global to a float.
|
|
# This should result in no executor the second time.
|
|
ns = {}
|
|
exec(src, ns, ns)
|
|
testfunc = ns['testfunc']
|
|
ns['_test_global'] = 0
|
|
_, ex = self._run_with_optimizer(testfunc, 16)
|
|
self.assertIsNone(ex)
|
|
ns['_test_global'] = 3.14
|
|
_, ex = self._run_with_optimizer(testfunc, 16)
|
|
self.assertIsNone(ex)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|