mirror of
https://github.com/python/cpython.git
synced 2025-10-29 20:51:26 +00:00
[3.14] gh-132775: Expand the Capability of Interpreter.call() (gh-134933)
It now supports most callables, full args, and return values.
(cherry picked from commit 52deabe, AKA gh-133484)
Co-authored-by: Eric Snow ericsnowcurrently@gmail.com
This commit is contained in:
parent
69536093de
commit
d45d053267
11 changed files with 32773 additions and 31776 deletions
|
|
@ -1,17 +1,22 @@
|
|||
import contextlib
|
||||
import os
|
||||
import pickle
|
||||
import sys
|
||||
from textwrap import dedent
|
||||
import threading
|
||||
import types
|
||||
import unittest
|
||||
|
||||
from test import support
|
||||
from test.support import os_helper
|
||||
from test.support import script_helper
|
||||
from test.support import import_helper
|
||||
# Raise SkipTest if subinterpreters not supported.
|
||||
_interpreters = import_helper.import_module('_interpreters')
|
||||
from test.support import Py_GIL_DISABLED
|
||||
from test.support import interpreters
|
||||
from test.support import force_not_colorized
|
||||
import test._crossinterp_definitions as defs
|
||||
from test.support.interpreters import (
|
||||
InterpreterError, InterpreterNotFoundError, ExecutionFailed,
|
||||
)
|
||||
|
|
@ -29,6 +34,59 @@
|
|||
WHENCE_STR_STDLIB = '_interpreters module'
|
||||
|
||||
|
||||
def is_pickleable(obj):
|
||||
try:
|
||||
pickle.dumps(obj)
|
||||
except Exception:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def defined_in___main__(name, script, *, remove=False):
|
||||
import __main__ as mainmod
|
||||
mainns = vars(mainmod)
|
||||
assert name not in mainns
|
||||
exec(script, mainns, mainns)
|
||||
if remove:
|
||||
yield mainns.pop(name)
|
||||
else:
|
||||
try:
|
||||
yield mainns[name]
|
||||
finally:
|
||||
mainns.pop(name, None)
|
||||
|
||||
|
||||
def build_excinfo(exctype, msg=None, formatted=None, errdisplay=None):
|
||||
if isinstance(exctype, type):
|
||||
assert issubclass(exctype, BaseException), exctype
|
||||
exctype = types.SimpleNamespace(
|
||||
__name__=exctype.__name__,
|
||||
__qualname__=exctype.__qualname__,
|
||||
__module__=exctype.__module__,
|
||||
)
|
||||
elif isinstance(exctype, str):
|
||||
module, _, name = exctype.rpartition(exctype)
|
||||
if not module and name in __builtins__:
|
||||
module = 'builtins'
|
||||
exctype = types.SimpleNamespace(
|
||||
__name__=name,
|
||||
__qualname__=exctype,
|
||||
__module__=module or None,
|
||||
)
|
||||
else:
|
||||
assert isinstance(exctype, types.SimpleNamespace)
|
||||
assert msg is None or isinstance(msg, str), msg
|
||||
assert formatted is None or isinstance(formatted, str), formatted
|
||||
assert errdisplay is None or isinstance(errdisplay, str), errdisplay
|
||||
return types.SimpleNamespace(
|
||||
type=exctype,
|
||||
msg=msg,
|
||||
formatted=formatted,
|
||||
errdisplay=errdisplay,
|
||||
)
|
||||
|
||||
|
||||
class ModuleTests(TestBase):
|
||||
|
||||
def test_queue_aliases(self):
|
||||
|
|
@ -890,24 +948,26 @@ def test_created_with_capi(self):
|
|||
# Interpreter.exec() behavior.
|
||||
|
||||
|
||||
def call_func_noop():
|
||||
pass
|
||||
call_func_noop = defs.spam_minimal
|
||||
call_func_ident = defs.spam_returns_arg
|
||||
call_func_failure = defs.spam_raises
|
||||
|
||||
|
||||
def call_func_return_shareable():
|
||||
return (1, None)
|
||||
|
||||
|
||||
def call_func_return_not_shareable():
|
||||
def call_func_return_stateless_func():
|
||||
return (lambda x: x)
|
||||
|
||||
|
||||
def call_func_return_pickleable():
|
||||
return [1, 2, 3]
|
||||
|
||||
|
||||
def call_func_failure():
|
||||
raise Exception('spam!')
|
||||
|
||||
|
||||
def call_func_ident(value):
|
||||
return value
|
||||
def call_func_return_unpickleable():
|
||||
x = 42
|
||||
return (lambda: x)
|
||||
|
||||
|
||||
def get_call_func_closure(value):
|
||||
|
|
@ -916,6 +976,11 @@ def call_func_closure():
|
|||
return call_func_closure
|
||||
|
||||
|
||||
def call_func_exec_wrapper(script, ns):
|
||||
res = exec(script, ns, ns)
|
||||
return res, ns, id(ns)
|
||||
|
||||
|
||||
class Spam:
|
||||
|
||||
@staticmethod
|
||||
|
|
@ -1012,86 +1077,375 @@ class TestInterpreterCall(TestBase):
|
|||
# - preserves info (e.g. SyntaxError)
|
||||
# - matching error display
|
||||
|
||||
def test_call(self):
|
||||
@contextlib.contextmanager
|
||||
def assert_fails(self, expected):
|
||||
with self.assertRaises(ExecutionFailed) as cm:
|
||||
yield cm
|
||||
uncaught = cm.exception.excinfo
|
||||
self.assertEqual(uncaught.type.__name__, expected.__name__)
|
||||
|
||||
def assert_fails_not_shareable(self):
|
||||
return self.assert_fails(interpreters.NotShareableError)
|
||||
|
||||
def assert_code_equal(self, code1, code2):
|
||||
if code1 == code2:
|
||||
return
|
||||
self.assertEqual(code1.co_name, code2.co_name)
|
||||
self.assertEqual(code1.co_flags, code2.co_flags)
|
||||
self.assertEqual(code1.co_consts, code2.co_consts)
|
||||
self.assertEqual(code1.co_varnames, code2.co_varnames)
|
||||
self.assertEqual(code1.co_cellvars, code2.co_cellvars)
|
||||
self.assertEqual(code1.co_freevars, code2.co_freevars)
|
||||
self.assertEqual(code1.co_names, code2.co_names)
|
||||
self.assertEqual(
|
||||
_testinternalcapi.get_code_var_counts(code1),
|
||||
_testinternalcapi.get_code_var_counts(code2),
|
||||
)
|
||||
self.assertEqual(code1.co_code, code2.co_code)
|
||||
|
||||
def assert_funcs_equal(self, func1, func2):
|
||||
if func1 == func2:
|
||||
return
|
||||
self.assertIs(type(func1), type(func2))
|
||||
self.assertEqual(func1.__name__, func2.__name__)
|
||||
self.assertEqual(func1.__defaults__, func2.__defaults__)
|
||||
self.assertEqual(func1.__kwdefaults__, func2.__kwdefaults__)
|
||||
self.assertEqual(func1.__closure__, func2.__closure__)
|
||||
self.assert_code_equal(func1.__code__, func2.__code__)
|
||||
self.assertEqual(
|
||||
_testinternalcapi.get_code_var_counts(func1),
|
||||
_testinternalcapi.get_code_var_counts(func2),
|
||||
)
|
||||
|
||||
def assert_exceptions_equal(self, exc1, exc2):
|
||||
assert isinstance(exc1, Exception)
|
||||
assert isinstance(exc2, Exception)
|
||||
if exc1 == exc2:
|
||||
return
|
||||
self.assertIs(type(exc1), type(exc2))
|
||||
self.assertEqual(exc1.args, exc2.args)
|
||||
|
||||
def test_stateless_funcs(self):
|
||||
interp = interpreters.create()
|
||||
|
||||
for i, (callable, args, kwargs) in enumerate([
|
||||
(call_func_noop, (), {}),
|
||||
(Spam.noop, (), {}),
|
||||
]):
|
||||
with self.subTest(f'success case #{i+1}'):
|
||||
res = interp.call(callable)
|
||||
self.assertIs(res, None)
|
||||
func = call_func_noop
|
||||
with self.subTest('no args, no return'):
|
||||
res = interp.call(func)
|
||||
self.assertIsNone(res)
|
||||
|
||||
for i, (callable, args, kwargs) in enumerate([
|
||||
(call_func_ident, ('spamspamspam',), {}),
|
||||
(get_call_func_closure, (42,), {}),
|
||||
(get_call_func_closure(42), (), {}),
|
||||
(Spam.from_values, (), {}),
|
||||
(Spam.from_values, (1, 2, 3), {}),
|
||||
(Spam, ('???'), {}),
|
||||
(Spam(101), (), {}),
|
||||
(Spam(10101).run, (), {}),
|
||||
(call_func_complex, ('ident', 'spam'), {}),
|
||||
(call_func_complex, ('full-ident', 'spam'), {}),
|
||||
(call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
|
||||
(call_func_complex, ('globals',), {}),
|
||||
(call_func_complex, ('interpid',), {}),
|
||||
(call_func_complex, ('closure',), {'value': '~~~'}),
|
||||
(call_func_complex, ('custom', 'spam!'), {}),
|
||||
(call_func_complex, ('custom-inner', 'eggs!'), {}),
|
||||
(call_func_complex, ('???',), {'exc': ValueError('spam')}),
|
||||
(call_func_return_shareable, (), {}),
|
||||
(call_func_return_not_shareable, (), {}),
|
||||
]):
|
||||
with self.subTest(f'invalid case #{i+1}'):
|
||||
with self.assertRaises(Exception):
|
||||
if args or kwargs:
|
||||
raise Exception((args, kwargs))
|
||||
interp.call(callable)
|
||||
func = call_func_return_shareable
|
||||
with self.subTest('no args, returns shareable'):
|
||||
res = interp.call(func)
|
||||
self.assertEqual(res, (1, None))
|
||||
|
||||
func = call_func_return_stateless_func
|
||||
expected = (lambda x: x)
|
||||
with self.subTest('no args, returns stateless func'):
|
||||
res = interp.call(func)
|
||||
self.assert_funcs_equal(res, expected)
|
||||
|
||||
func = call_func_return_pickleable
|
||||
with self.subTest('no args, returns pickleable'):
|
||||
res = interp.call(func)
|
||||
self.assertEqual(res, [1, 2, 3])
|
||||
|
||||
func = call_func_return_unpickleable
|
||||
with self.subTest('no args, returns unpickleable'):
|
||||
with self.assertRaises(interpreters.NotShareableError):
|
||||
interp.call(func)
|
||||
|
||||
def test_stateless_func_returns_arg(self):
|
||||
interp = interpreters.create()
|
||||
|
||||
for arg in [
|
||||
None,
|
||||
10,
|
||||
'spam!',
|
||||
b'spam!',
|
||||
(1, 2, 'spam!'),
|
||||
memoryview(b'spam!'),
|
||||
]:
|
||||
with self.subTest(f'shareable {arg!r}'):
|
||||
assert _interpreters.is_shareable(arg)
|
||||
res = interp.call(defs.spam_returns_arg, arg)
|
||||
self.assertEqual(res, arg)
|
||||
|
||||
for arg in defs.STATELESS_FUNCTIONS:
|
||||
with self.subTest(f'stateless func {arg!r}'):
|
||||
res = interp.call(defs.spam_returns_arg, arg)
|
||||
self.assert_funcs_equal(res, arg)
|
||||
|
||||
for arg in defs.TOP_FUNCTIONS:
|
||||
if arg in defs.STATELESS_FUNCTIONS:
|
||||
continue
|
||||
with self.subTest(f'stateful func {arg!r}'):
|
||||
res = interp.call(defs.spam_returns_arg, arg)
|
||||
self.assert_funcs_equal(res, arg)
|
||||
assert is_pickleable(arg)
|
||||
|
||||
for arg in [
|
||||
Ellipsis,
|
||||
NotImplemented,
|
||||
object(),
|
||||
2**1000,
|
||||
[1, 2, 3],
|
||||
{'a': 1, 'b': 2},
|
||||
types.SimpleNamespace(x=42),
|
||||
# builtin types
|
||||
object,
|
||||
type,
|
||||
Exception,
|
||||
ModuleNotFoundError,
|
||||
# builtin exceptions
|
||||
Exception('uh-oh!'),
|
||||
ModuleNotFoundError('mymodule'),
|
||||
# builtin fnctions
|
||||
len,
|
||||
sys.exit,
|
||||
# user classes
|
||||
*defs.TOP_CLASSES,
|
||||
*(c(*a) for c, a in defs.TOP_CLASSES.items()
|
||||
if c not in defs.CLASSES_WITHOUT_EQUALITY),
|
||||
]:
|
||||
with self.subTest(f'pickleable {arg!r}'):
|
||||
res = interp.call(defs.spam_returns_arg, arg)
|
||||
if type(arg) is object:
|
||||
self.assertIs(type(res), object)
|
||||
elif isinstance(arg, BaseException):
|
||||
self.assert_exceptions_equal(res, arg)
|
||||
else:
|
||||
self.assertEqual(res, arg)
|
||||
assert is_pickleable(arg)
|
||||
|
||||
for arg in [
|
||||
types.MappingProxyType({}),
|
||||
*(f for f in defs.NESTED_FUNCTIONS
|
||||
if f not in defs.STATELESS_FUNCTIONS),
|
||||
]:
|
||||
with self.subTest(f'unpickleable {arg!r}'):
|
||||
assert not _interpreters.is_shareable(arg)
|
||||
assert not is_pickleable(arg)
|
||||
with self.assertRaises(interpreters.NotShareableError):
|
||||
interp.call(defs.spam_returns_arg, arg)
|
||||
|
||||
def test_full_args(self):
|
||||
interp = interpreters.create()
|
||||
expected = (1, 2, 3, 4, 5, 6, ('?',), {'g': 7, 'h': 8})
|
||||
func = defs.spam_full_args
|
||||
res = interp.call(func, 1, 2, 3, 4, '?', e=5, f=6, g=7, h=8)
|
||||
self.assertEqual(res, expected)
|
||||
|
||||
def test_full_defaults(self):
|
||||
# pickleable, but not stateless
|
||||
interp = interpreters.create()
|
||||
expected = (-1, -2, -3, -4, -5, -6, (), {'g': 8, 'h': 9})
|
||||
res = interp.call(defs.spam_full_args_with_defaults, g=8, h=9)
|
||||
self.assertEqual(res, expected)
|
||||
|
||||
def test_modified_arg(self):
|
||||
interp = interpreters.create()
|
||||
script = dedent("""
|
||||
a = 7
|
||||
b = 2
|
||||
c = a ** b
|
||||
""")
|
||||
ns = {}
|
||||
expected = {'a': 7, 'b': 2, 'c': 49}
|
||||
res = interp.call(call_func_exec_wrapper, script, ns)
|
||||
obj, resns, resid = res
|
||||
del resns['__builtins__']
|
||||
self.assertIsNone(obj)
|
||||
self.assertEqual(ns, {})
|
||||
self.assertEqual(resns, expected)
|
||||
self.assertNotEqual(resid, id(ns))
|
||||
self.assertNotEqual(resid, id(resns))
|
||||
|
||||
def test_func_in___main___valid(self):
|
||||
# pickleable, already there'
|
||||
|
||||
with os_helper.temp_dir() as tempdir:
|
||||
def new_mod(name, text):
|
||||
script_helper.make_script(tempdir, name, dedent(text))
|
||||
|
||||
def run(text):
|
||||
name = 'myscript'
|
||||
text = dedent(f"""
|
||||
import sys
|
||||
sys.path.insert(0, {tempdir!r})
|
||||
|
||||
""") + dedent(text)
|
||||
filename = script_helper.make_script(tempdir, name, text)
|
||||
res = script_helper.assert_python_ok(filename)
|
||||
return res.out.decode('utf-8').strip()
|
||||
|
||||
# no module indirection
|
||||
with self.subTest('no indirection'):
|
||||
text = run(f"""
|
||||
from test.support import interpreters
|
||||
|
||||
def spam():
|
||||
# This a global var...
|
||||
return __name__
|
||||
|
||||
if __name__ == '__main__':
|
||||
interp = interpreters.create()
|
||||
res = interp.call(spam)
|
||||
print(res)
|
||||
""")
|
||||
self.assertEqual(text, '<fake __main__>')
|
||||
|
||||
# indirect as func, direct interp
|
||||
new_mod('mymod', f"""
|
||||
def run(interp, func):
|
||||
return interp.call(func)
|
||||
""")
|
||||
with self.subTest('indirect as func, direct interp'):
|
||||
text = run(f"""
|
||||
from test.support import interpreters
|
||||
import mymod
|
||||
|
||||
def spam():
|
||||
# This a global var...
|
||||
return __name__
|
||||
|
||||
if __name__ == '__main__':
|
||||
interp = interpreters.create()
|
||||
res = mymod.run(interp, spam)
|
||||
print(res)
|
||||
""")
|
||||
self.assertEqual(text, '<fake __main__>')
|
||||
|
||||
# indirect as func, indirect interp
|
||||
new_mod('mymod', f"""
|
||||
from test.support import interpreters
|
||||
def run(func):
|
||||
interp = interpreters.create()
|
||||
return interp.call(func)
|
||||
""")
|
||||
with self.subTest('indirect as func, indirect interp'):
|
||||
text = run(f"""
|
||||
import mymod
|
||||
|
||||
def spam():
|
||||
# This a global var...
|
||||
return __name__
|
||||
|
||||
if __name__ == '__main__':
|
||||
res = mymod.run(spam)
|
||||
print(res)
|
||||
""")
|
||||
self.assertEqual(text, '<fake __main__>')
|
||||
|
||||
def test_func_in___main___invalid(self):
|
||||
interp = interpreters.create()
|
||||
|
||||
funcname = f'{__name__.replace(".", "_")}_spam_okay'
|
||||
script = dedent(f"""
|
||||
def {funcname}():
|
||||
# This a global var...
|
||||
return __name__
|
||||
""")
|
||||
|
||||
with self.subTest('pickleable, added dynamically'):
|
||||
with defined_in___main__(funcname, script) as arg:
|
||||
with self.assertRaises(interpreters.NotShareableError):
|
||||
interp.call(defs.spam_returns_arg, arg)
|
||||
|
||||
with self.subTest('lying about __main__'):
|
||||
with defined_in___main__(funcname, script, remove=True) as arg:
|
||||
with self.assertRaises(interpreters.NotShareableError):
|
||||
interp.call(defs.spam_returns_arg, arg)
|
||||
|
||||
def test_raises(self):
|
||||
interp = interpreters.create()
|
||||
with self.assertRaises(ExecutionFailed):
|
||||
interp.call(call_func_failure)
|
||||
|
||||
with self.assert_fails(ValueError):
|
||||
interp.call(call_func_complex, '???', exc=ValueError('spam'))
|
||||
|
||||
def test_call_valid(self):
|
||||
interp = interpreters.create()
|
||||
|
||||
for i, (callable, args, kwargs, expected) in enumerate([
|
||||
(call_func_noop, (), {}, None),
|
||||
(call_func_ident, ('spamspamspam',), {}, 'spamspamspam'),
|
||||
(call_func_return_shareable, (), {}, (1, None)),
|
||||
(call_func_return_pickleable, (), {}, [1, 2, 3]),
|
||||
(Spam.noop, (), {}, None),
|
||||
(Spam.from_values, (), {}, Spam(())),
|
||||
(Spam.from_values, (1, 2, 3), {}, Spam((1, 2, 3))),
|
||||
(Spam, ('???',), {}, Spam('???')),
|
||||
(Spam(101), (), {}, (101, (), {})),
|
||||
(Spam(10101).run, (), {}, (10101, (), {})),
|
||||
(call_func_complex, ('ident', 'spam'), {}, 'spam'),
|
||||
(call_func_complex, ('full-ident', 'spam'), {}, ('spam', (), {})),
|
||||
(call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'},
|
||||
('spam', ('ham',), {'eggs': '!!!'})),
|
||||
(call_func_complex, ('globals',), {}, __name__),
|
||||
(call_func_complex, ('interpid',), {}, interp.id),
|
||||
(call_func_complex, ('custom', 'spam!'), {}, Spam('spam!')),
|
||||
]):
|
||||
with self.subTest(f'success case #{i+1}'):
|
||||
res = interp.call(callable, *args, **kwargs)
|
||||
self.assertEqual(res, expected)
|
||||
|
||||
def test_call_invalid(self):
|
||||
interp = interpreters.create()
|
||||
|
||||
func = get_call_func_closure
|
||||
with self.subTest(func):
|
||||
with self.assertRaises(interpreters.NotShareableError):
|
||||
interp.call(func, 42)
|
||||
|
||||
func = get_call_func_closure(42)
|
||||
with self.subTest(func):
|
||||
with self.assertRaises(interpreters.NotShareableError):
|
||||
interp.call(func)
|
||||
|
||||
func = call_func_complex
|
||||
op = 'closure'
|
||||
with self.subTest(f'{func} ({op})'):
|
||||
with self.assertRaises(interpreters.NotShareableError):
|
||||
interp.call(func, op, value='~~~')
|
||||
|
||||
op = 'custom-inner'
|
||||
with self.subTest(f'{func} ({op})'):
|
||||
with self.assertRaises(interpreters.NotShareableError):
|
||||
interp.call(func, op, 'eggs!')
|
||||
|
||||
def test_call_in_thread(self):
|
||||
interp = interpreters.create()
|
||||
|
||||
for i, (callable, args, kwargs) in enumerate([
|
||||
(call_func_noop, (), {}),
|
||||
(Spam.noop, (), {}),
|
||||
]):
|
||||
with self.subTest(f'success case #{i+1}'):
|
||||
with self.captured_thread_exception() as ctx:
|
||||
t = interp.call_in_thread(callable)
|
||||
t.join()
|
||||
self.assertIsNone(ctx.caught)
|
||||
|
||||
for i, (callable, args, kwargs) in enumerate([
|
||||
(call_func_ident, ('spamspamspam',), {}),
|
||||
(get_call_func_closure, (42,), {}),
|
||||
(get_call_func_closure(42), (), {}),
|
||||
(call_func_return_shareable, (), {}),
|
||||
(call_func_return_pickleable, (), {}),
|
||||
(Spam.from_values, (), {}),
|
||||
(Spam.from_values, (1, 2, 3), {}),
|
||||
(Spam, ('???'), {}),
|
||||
(Spam(101), (), {}),
|
||||
(Spam(10101).run, (), {}),
|
||||
(Spam.noop, (), {}),
|
||||
(call_func_complex, ('ident', 'spam'), {}),
|
||||
(call_func_complex, ('full-ident', 'spam'), {}),
|
||||
(call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
|
||||
(call_func_complex, ('globals',), {}),
|
||||
(call_func_complex, ('interpid',), {}),
|
||||
(call_func_complex, ('closure',), {'value': '~~~'}),
|
||||
(call_func_complex, ('custom', 'spam!'), {}),
|
||||
(call_func_complex, ('custom-inner', 'eggs!'), {}),
|
||||
(call_func_complex, ('???',), {'exc': ValueError('spam')}),
|
||||
(call_func_return_shareable, (), {}),
|
||||
(call_func_return_not_shareable, (), {}),
|
||||
]):
|
||||
with self.subTest(f'success case #{i+1}'):
|
||||
with self.captured_thread_exception() as ctx:
|
||||
t = interp.call_in_thread(callable, *args, **kwargs)
|
||||
t.join()
|
||||
self.assertIsNone(ctx.caught)
|
||||
|
||||
for i, (callable, args, kwargs) in enumerate([
|
||||
(get_call_func_closure, (42,), {}),
|
||||
(get_call_func_closure(42), (), {}),
|
||||
]):
|
||||
with self.subTest(f'invalid case #{i+1}'):
|
||||
if args or kwargs:
|
||||
continue
|
||||
with self.captured_thread_exception() as ctx:
|
||||
t = interp.call_in_thread(callable)
|
||||
t = interp.call_in_thread(callable, *args, **kwargs)
|
||||
t.join()
|
||||
self.assertIsNotNone(ctx.caught)
|
||||
|
||||
|
|
@ -1600,18 +1954,14 @@ def test_exec(self):
|
|||
with results:
|
||||
exc = _interpreters.exec(interpid, script)
|
||||
out = results.stdout()
|
||||
self.assertEqual(out, '')
|
||||
self.assert_ns_equal(exc, types.SimpleNamespace(
|
||||
type=types.SimpleNamespace(
|
||||
__name__='Exception',
|
||||
__qualname__='Exception',
|
||||
__module__='builtins',
|
||||
),
|
||||
msg='uh-oh!',
|
||||
expected = build_excinfo(
|
||||
Exception, 'uh-oh!',
|
||||
# We check these in other tests.
|
||||
formatted=exc.formatted,
|
||||
errdisplay=exc.errdisplay,
|
||||
))
|
||||
)
|
||||
self.assertEqual(out, '')
|
||||
self.assert_ns_equal(exc, expected)
|
||||
|
||||
with self.subTest('from C-API'):
|
||||
with self.interpreter_from_capi() as interpid:
|
||||
|
|
@ -1623,25 +1973,50 @@ def test_exec(self):
|
|||
self.assertEqual(exc.msg, 'it worked!')
|
||||
|
||||
def test_call(self):
|
||||
with self.subTest('no args'):
|
||||
interpid = _interpreters.create()
|
||||
with self.assertRaises(ValueError):
|
||||
_interpreters.call(interpid, call_func_return_shareable)
|
||||
interpid = _interpreters.create()
|
||||
|
||||
# Here we focus on basic args and return values.
|
||||
# See TestInterpreterCall for full operational coverage,
|
||||
# including supported callables.
|
||||
|
||||
with self.subTest('no args, return None'):
|
||||
func = defs.spam_minimal
|
||||
res, exc = _interpreters.call(interpid, func)
|
||||
self.assertIsNone(exc)
|
||||
self.assertIsNone(res)
|
||||
|
||||
with self.subTest('empty args, return None'):
|
||||
func = defs.spam_minimal
|
||||
res, exc = _interpreters.call(interpid, func, (), {})
|
||||
self.assertIsNone(exc)
|
||||
self.assertIsNone(res)
|
||||
|
||||
with self.subTest('no args, return non-None'):
|
||||
func = defs.script_with_return
|
||||
res, exc = _interpreters.call(interpid, func)
|
||||
self.assertIsNone(exc)
|
||||
self.assertIs(res, True)
|
||||
|
||||
with self.subTest('full args, return non-None'):
|
||||
expected = (1, 2, 3, 4, 5, 6, (7, 8), {'g': 9, 'h': 0})
|
||||
func = defs.spam_full_args
|
||||
args = (1, 2, 3, 4, 7, 8)
|
||||
kwargs = dict(e=5, f=6, g=9, h=0)
|
||||
res, exc = _interpreters.call(interpid, func, args, kwargs)
|
||||
self.assertIsNone(exc)
|
||||
self.assertEqual(res, expected)
|
||||
|
||||
with self.subTest('uncaught exception'):
|
||||
interpid = _interpreters.create()
|
||||
exc = _interpreters.call(interpid, call_func_failure)
|
||||
self.assertEqual(exc, types.SimpleNamespace(
|
||||
type=types.SimpleNamespace(
|
||||
__name__='Exception',
|
||||
__qualname__='Exception',
|
||||
__module__='builtins',
|
||||
),
|
||||
msg='spam!',
|
||||
func = defs.spam_raises
|
||||
res, exc = _interpreters.call(interpid, func)
|
||||
expected = build_excinfo(
|
||||
Exception, 'spam!',
|
||||
# We check these in other tests.
|
||||
formatted=exc.formatted,
|
||||
errdisplay=exc.errdisplay,
|
||||
))
|
||||
)
|
||||
self.assertIsNone(res)
|
||||
self.assertEqual(exc, expected)
|
||||
|
||||
@requires_test_modules
|
||||
def test_set___main___attrs(self):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue