mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 15:11:34 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			2121 lines
		
	
	
	
		
			68 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			2121 lines
		
	
	
	
		
			68 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from collections import namedtuple
 | 
						|
import contextlib
 | 
						|
import itertools
 | 
						|
import os
 | 
						|
import pickle
 | 
						|
import sys
 | 
						|
from textwrap import dedent, indent
 | 
						|
import threading
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
 | 
						|
from test import support
 | 
						|
from test.support import script_helper
 | 
						|
 | 
						|
 | 
						|
interpreters = support.import_module('_xxsubinterpreters')
 | 
						|
 | 
						|
 | 
						|
##################################
 | 
						|
# helpers
 | 
						|
 | 
						|
def powerset(*sets):
 | 
						|
    return itertools.chain.from_iterable(
 | 
						|
        combinations(sets, r)
 | 
						|
        for r in range(len(sets)+1))
 | 
						|
 | 
						|
 | 
						|
def _captured_script(script):
 | 
						|
    r, w = os.pipe()
 | 
						|
    indented = script.replace('\n', '\n                ')
 | 
						|
    wrapped = dedent(f"""
 | 
						|
        import contextlib
 | 
						|
        with open({w}, 'w') as spipe:
 | 
						|
            with contextlib.redirect_stdout(spipe):
 | 
						|
                {indented}
 | 
						|
        """)
 | 
						|
    return wrapped, open(r)
 | 
						|
 | 
						|
 | 
						|
def _run_output(interp, request, shared=None):
 | 
						|
    script, rpipe = _captured_script(request)
 | 
						|
    with rpipe:
 | 
						|
        interpreters.run_string(interp, script, shared)
 | 
						|
        return rpipe.read()
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def _running(interp):
 | 
						|
    r, w = os.pipe()
 | 
						|
    def run():
 | 
						|
        interpreters.run_string(interp, dedent(f"""
 | 
						|
            # wait for "signal"
 | 
						|
            with open({r}) as rpipe:
 | 
						|
                rpipe.read()
 | 
						|
            """))
 | 
						|
 | 
						|
    t = threading.Thread(target=run)
 | 
						|
    t.start()
 | 
						|
 | 
						|
    yield
 | 
						|
 | 
						|
    with open(w, 'w') as spipe:
 | 
						|
        spipe.write('done')
 | 
						|
    t.join()
 | 
						|
 | 
						|
 | 
						|
#@contextmanager
 | 
						|
#def run_threaded(id, source, **shared):
 | 
						|
#    def run():
 | 
						|
#        run_interp(id, source, **shared)
 | 
						|
#    t = threading.Thread(target=run)
 | 
						|
#    t.start()
 | 
						|
#    yield
 | 
						|
#    t.join()
 | 
						|
 | 
						|
 | 
						|
def run_interp(id, source, **shared):
 | 
						|
    _run_interp(id, source, shared)
 | 
						|
 | 
						|
 | 
						|
def _run_interp(id, source, shared, _mainns={}):
 | 
						|
    source = dedent(source)
 | 
						|
    main = interpreters.get_main()
 | 
						|
    if main == id:
 | 
						|
        if interpreters.get_current() != main:
 | 
						|
            raise RuntimeError
 | 
						|
        # XXX Run a func?
 | 
						|
        exec(source, _mainns)
 | 
						|
    else:
 | 
						|
        interpreters.run_string(id, source, shared)
 | 
						|
 | 
						|
 | 
						|
def run_interp_threaded(id, source, **shared):
 | 
						|
    def run():
 | 
						|
        _run(id, source, shared)
 | 
						|
    t = threading.Thread(target=run)
 | 
						|
    t.start()
 | 
						|
    t.join()
 | 
						|
 | 
						|
 | 
						|
class Interpreter(namedtuple('Interpreter', 'name id')):
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def from_raw(cls, raw):
 | 
						|
        if isinstance(raw, cls):
 | 
						|
            return raw
 | 
						|
        elif isinstance(raw, str):
 | 
						|
            return cls(raw)
 | 
						|
        else:
 | 
						|
            raise NotImplementedError
 | 
						|
 | 
						|
    def __new__(cls, name=None, id=None):
 | 
						|
        main = interpreters.get_main()
 | 
						|
        if id == main:
 | 
						|
            if not name:
 | 
						|
                name = 'main'
 | 
						|
            elif name != 'main':
 | 
						|
                raise ValueError(
 | 
						|
                    'name mismatch (expected "main", got "{}")'.format(name))
 | 
						|
            id = main
 | 
						|
        elif id is not None:
 | 
						|
            if not name:
 | 
						|
                name = 'interp'
 | 
						|
            elif name == 'main':
 | 
						|
                raise ValueError('name mismatch (unexpected "main")')
 | 
						|
            if not isinstance(id, interpreters.InterpreterID):
 | 
						|
                id = interpreters.InterpreterID(id)
 | 
						|
        elif not name or name == 'main':
 | 
						|
            name = 'main'
 | 
						|
            id = main
 | 
						|
        else:
 | 
						|
            id = interpreters.create()
 | 
						|
        self = super().__new__(cls, name, id)
 | 
						|
        return self
 | 
						|
 | 
						|
 | 
						|
# XXX expect_channel_closed() is unnecessary once we improve exc propagation.
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def expect_channel_closed():
 | 
						|
    try:
 | 
						|
        yield
 | 
						|
    except interpreters.ChannelClosedError:
 | 
						|
        pass
 | 
						|
    else:
 | 
						|
        assert False, 'channel not closed'
 | 
						|
 | 
						|
 | 
						|
class ChannelAction(namedtuple('ChannelAction', 'action end interp')):
 | 
						|
 | 
						|
    def __new__(cls, action, end=None, interp=None):
 | 
						|
        if not end:
 | 
						|
            end = 'both'
 | 
						|
        if not interp:
 | 
						|
            interp = 'main'
 | 
						|
        self = super().__new__(cls, action, end, interp)
 | 
						|
        return self
 | 
						|
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        if self.action == 'use':
 | 
						|
            if self.end not in ('same', 'opposite', 'send', 'recv'):
 | 
						|
                raise ValueError(self.end)
 | 
						|
        elif self.action in ('close', 'force-close'):
 | 
						|
            if self.end not in ('both', 'same', 'opposite', 'send', 'recv'):
 | 
						|
                raise ValueError(self.end)
 | 
						|
        else:
 | 
						|
            raise ValueError(self.action)
 | 
						|
        if self.interp not in ('main', 'same', 'other', 'extra'):
 | 
						|
            raise ValueError(self.interp)
 | 
						|
 | 
						|
    def resolve_end(self, end):
 | 
						|
        if self.end == 'same':
 | 
						|
            return end
 | 
						|
        elif self.end == 'opposite':
 | 
						|
            return 'recv' if end == 'send' else 'send'
 | 
						|
        else:
 | 
						|
            return self.end
 | 
						|
 | 
						|
    def resolve_interp(self, interp, other, extra):
 | 
						|
        if self.interp == 'same':
 | 
						|
            return interp
 | 
						|
        elif self.interp == 'other':
 | 
						|
            if other is None:
 | 
						|
                raise RuntimeError
 | 
						|
            return other
 | 
						|
        elif self.interp == 'extra':
 | 
						|
            if extra is None:
 | 
						|
                raise RuntimeError
 | 
						|
            return extra
 | 
						|
        elif self.interp == 'main':
 | 
						|
            if interp.name == 'main':
 | 
						|
                return interp
 | 
						|
            elif other and other.name == 'main':
 | 
						|
                return other
 | 
						|
            else:
 | 
						|
                raise RuntimeError
 | 
						|
        # Per __init__(), there aren't any others.
 | 
						|
 | 
						|
 | 
						|
class ChannelState(namedtuple('ChannelState', 'pending closed')):
 | 
						|
 | 
						|
    def __new__(cls, pending=0, *, closed=False):
 | 
						|
        self = super().__new__(cls, pending, closed)
 | 
						|
        return self
 | 
						|
 | 
						|
    def incr(self):
 | 
						|
        return type(self)(self.pending + 1, closed=self.closed)
 | 
						|
 | 
						|
    def decr(self):
 | 
						|
        return type(self)(self.pending - 1, closed=self.closed)
 | 
						|
 | 
						|
    def close(self, *, force=True):
 | 
						|
        if self.closed:
 | 
						|
            if not force or self.pending == 0:
 | 
						|
                return self
 | 
						|
        return type(self)(0 if force else self.pending, closed=True)
 | 
						|
 | 
						|
 | 
						|
def run_action(cid, action, end, state, *, hideclosed=True):
 | 
						|
    if state.closed:
 | 
						|
        if action == 'use' and end == 'recv' and state.pending:
 | 
						|
            expectfail = False
 | 
						|
        else:
 | 
						|
            expectfail = True
 | 
						|
    else:
 | 
						|
        expectfail = False
 | 
						|
 | 
						|
    try:
 | 
						|
        result = _run_action(cid, action, end, state)
 | 
						|
    except interpreters.ChannelClosedError:
 | 
						|
        if not hideclosed and not expectfail:
 | 
						|
            raise
 | 
						|
        result = state.close()
 | 
						|
    else:
 | 
						|
        if expectfail:
 | 
						|
            raise ...  # XXX
 | 
						|
    return result
 | 
						|
 | 
						|
 | 
						|
def _run_action(cid, action, end, state):
 | 
						|
    if action == 'use':
 | 
						|
        if end == 'send':
 | 
						|
            interpreters.channel_send(cid, b'spam')
 | 
						|
            return state.incr()
 | 
						|
        elif end == 'recv':
 | 
						|
            if not state.pending:
 | 
						|
                try:
 | 
						|
                    interpreters.channel_recv(cid)
 | 
						|
                except interpreters.ChannelEmptyError:
 | 
						|
                    return state
 | 
						|
                else:
 | 
						|
                    raise Exception('expected ChannelEmptyError')
 | 
						|
            else:
 | 
						|
                interpreters.channel_recv(cid)
 | 
						|
                return state.decr()
 | 
						|
        else:
 | 
						|
            raise ValueError(end)
 | 
						|
    elif action == 'close':
 | 
						|
        kwargs = {}
 | 
						|
        if end in ('recv', 'send'):
 | 
						|
            kwargs[end] = True
 | 
						|
        interpreters.channel_close(cid, **kwargs)
 | 
						|
        return state.close()
 | 
						|
    elif action == 'force-close':
 | 
						|
        kwargs = {
 | 
						|
            'force': True,
 | 
						|
            }
 | 
						|
        if end in ('recv', 'send'):
 | 
						|
            kwargs[end] = True
 | 
						|
        interpreters.channel_close(cid, **kwargs)
 | 
						|
        return state.close(force=True)
 | 
						|
    else:
 | 
						|
        raise ValueError(action)
 | 
						|
 | 
						|
 | 
						|
def clean_up_interpreters():
 | 
						|
    for id in interpreters.list_all():
 | 
						|
        if id == 0:  # main
 | 
						|
            continue
 | 
						|
        try:
 | 
						|
            interpreters.destroy(id)
 | 
						|
        except RuntimeError:
 | 
						|
            pass  # already destroyed
 | 
						|
 | 
						|
 | 
						|
def clean_up_channels():
 | 
						|
    for cid in interpreters.channel_list_all():
 | 
						|
        try:
 | 
						|
            interpreters.channel_destroy(cid)
 | 
						|
        except interpreters.ChannelNotFoundError:
 | 
						|
            pass  # already destroyed
 | 
						|
 | 
						|
 | 
						|
class TestBase(unittest.TestCase):
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        clean_up_interpreters()
 | 
						|
        clean_up_channels()
 | 
						|
 | 
						|
 | 
						|
##################################
 | 
						|
# misc. tests
 | 
						|
 | 
						|
class IsShareableTests(unittest.TestCase):
 | 
						|
 | 
						|
    def test_default_shareables(self):
 | 
						|
        shareables = [
 | 
						|
                # singletons
 | 
						|
                None,
 | 
						|
                # builtin objects
 | 
						|
                b'spam',
 | 
						|
                'spam',
 | 
						|
                10,
 | 
						|
                -10,
 | 
						|
                ]
 | 
						|
        for obj in shareables:
 | 
						|
            with self.subTest(obj):
 | 
						|
                self.assertTrue(
 | 
						|
                    interpreters.is_shareable(obj))
 | 
						|
 | 
						|
    def test_not_shareable(self):
 | 
						|
        class Cheese:
 | 
						|
            def __init__(self, name):
 | 
						|
                self.name = name
 | 
						|
            def __str__(self):
 | 
						|
                return self.name
 | 
						|
 | 
						|
        class SubBytes(bytes):
 | 
						|
            """A subclass of a shareable type."""
 | 
						|
 | 
						|
        not_shareables = [
 | 
						|
                # singletons
 | 
						|
                True,
 | 
						|
                False,
 | 
						|
                NotImplemented,
 | 
						|
                ...,
 | 
						|
                # builtin types and objects
 | 
						|
                type,
 | 
						|
                object,
 | 
						|
                object(),
 | 
						|
                Exception(),
 | 
						|
                100.0,
 | 
						|
                # user-defined types and objects
 | 
						|
                Cheese,
 | 
						|
                Cheese('Wensleydale'),
 | 
						|
                SubBytes(b'spam'),
 | 
						|
                ]
 | 
						|
        for obj in not_shareables:
 | 
						|
            with self.subTest(repr(obj)):
 | 
						|
                self.assertFalse(
 | 
						|
                    interpreters.is_shareable(obj))
 | 
						|
 | 
						|
 | 
						|
class ShareableTypeTests(unittest.TestCase):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super().setUp()
 | 
						|
        self.cid = interpreters.channel_create()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        interpreters.channel_destroy(self.cid)
 | 
						|
        super().tearDown()
 | 
						|
 | 
						|
    def _assert_values(self, values):
 | 
						|
        for obj in values:
 | 
						|
            with self.subTest(obj):
 | 
						|
                interpreters.channel_send(self.cid, obj)
 | 
						|
                got = interpreters.channel_recv(self.cid)
 | 
						|
 | 
						|
                self.assertEqual(got, obj)
 | 
						|
                self.assertIs(type(got), type(obj))
 | 
						|
                # XXX Check the following in the channel tests?
 | 
						|
                #self.assertIsNot(got, obj)
 | 
						|
 | 
						|
    def test_singletons(self):
 | 
						|
        for obj in [None]:
 | 
						|
            with self.subTest(obj):
 | 
						|
                interpreters.channel_send(self.cid, obj)
 | 
						|
                got = interpreters.channel_recv(self.cid)
 | 
						|
 | 
						|
                # XXX What about between interpreters?
 | 
						|
                self.assertIs(got, obj)
 | 
						|
 | 
						|
    def test_types(self):
 | 
						|
        self._assert_values([
 | 
						|
            b'spam',
 | 
						|
            9999,
 | 
						|
            self.cid,
 | 
						|
            ])
 | 
						|
 | 
						|
    def test_bytes(self):
 | 
						|
        self._assert_values(i.to_bytes(2, 'little', signed=True)
 | 
						|
                            for i in range(-1, 258))
 | 
						|
 | 
						|
    def test_int(self):
 | 
						|
        self._assert_values(itertools.chain(range(-1, 258),
 | 
						|
                                            [sys.maxsize, -sys.maxsize - 1]))
 | 
						|
 | 
						|
    def test_non_shareable_int(self):
 | 
						|
        ints = [
 | 
						|
            sys.maxsize + 1,
 | 
						|
            -sys.maxsize - 2,
 | 
						|
            2**1000,
 | 
						|
        ]
 | 
						|
        for i in ints:
 | 
						|
            with self.subTest(i):
 | 
						|
                with self.assertRaises(OverflowError):
 | 
						|
                    interpreters.channel_send(self.cid, i)
 | 
						|
 | 
						|
 | 
						|
##################################
 | 
						|
# interpreter tests
 | 
						|
 | 
						|
class ListAllTests(TestBase):
 | 
						|
 | 
						|
    def test_initial(self):
 | 
						|
        main = interpreters.get_main()
 | 
						|
        ids = interpreters.list_all()
 | 
						|
        self.assertEqual(ids, [main])
 | 
						|
 | 
						|
    def test_after_creating(self):
 | 
						|
        main = interpreters.get_main()
 | 
						|
        first = interpreters.create()
 | 
						|
        second = interpreters.create()
 | 
						|
        ids = interpreters.list_all()
 | 
						|
        self.assertEqual(ids, [main, first, second])
 | 
						|
 | 
						|
    def test_after_destroying(self):
 | 
						|
        main = interpreters.get_main()
 | 
						|
        first = interpreters.create()
 | 
						|
        second = interpreters.create()
 | 
						|
        interpreters.destroy(first)
 | 
						|
        ids = interpreters.list_all()
 | 
						|
        self.assertEqual(ids, [main, second])
 | 
						|
 | 
						|
 | 
						|
class GetCurrentTests(TestBase):
 | 
						|
 | 
						|
    def test_main(self):
 | 
						|
        main = interpreters.get_main()
 | 
						|
        cur = interpreters.get_current()
 | 
						|
        self.assertEqual(cur, main)
 | 
						|
        self.assertIsInstance(cur, interpreters.InterpreterID)
 | 
						|
 | 
						|
    def test_subinterpreter(self):
 | 
						|
        main = interpreters.get_main()
 | 
						|
        interp = interpreters.create()
 | 
						|
        out = _run_output(interp, dedent("""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            cur = _interpreters.get_current()
 | 
						|
            print(cur)
 | 
						|
            assert isinstance(cur, _interpreters.InterpreterID)
 | 
						|
            """))
 | 
						|
        cur = int(out.strip())
 | 
						|
        _, expected = interpreters.list_all()
 | 
						|
        self.assertEqual(cur, expected)
 | 
						|
        self.assertNotEqual(cur, main)
 | 
						|
 | 
						|
 | 
						|
class GetMainTests(TestBase):
 | 
						|
 | 
						|
    def test_from_main(self):
 | 
						|
        [expected] = interpreters.list_all()
 | 
						|
        main = interpreters.get_main()
 | 
						|
        self.assertEqual(main, expected)
 | 
						|
        self.assertIsInstance(main, interpreters.InterpreterID)
 | 
						|
 | 
						|
    def test_from_subinterpreter(self):
 | 
						|
        [expected] = interpreters.list_all()
 | 
						|
        interp = interpreters.create()
 | 
						|
        out = _run_output(interp, dedent("""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            main = _interpreters.get_main()
 | 
						|
            print(main)
 | 
						|
            assert isinstance(main, _interpreters.InterpreterID)
 | 
						|
            """))
 | 
						|
        main = int(out.strip())
 | 
						|
        self.assertEqual(main, expected)
 | 
						|
 | 
						|
 | 
						|
class IsRunningTests(TestBase):
 | 
						|
 | 
						|
    def test_main(self):
 | 
						|
        main = interpreters.get_main()
 | 
						|
        self.assertTrue(interpreters.is_running(main))
 | 
						|
 | 
						|
    def test_subinterpreter(self):
 | 
						|
        interp = interpreters.create()
 | 
						|
        self.assertFalse(interpreters.is_running(interp))
 | 
						|
 | 
						|
        with _running(interp):
 | 
						|
            self.assertTrue(interpreters.is_running(interp))
 | 
						|
        self.assertFalse(interpreters.is_running(interp))
 | 
						|
 | 
						|
    def test_from_subinterpreter(self):
 | 
						|
        interp = interpreters.create()
 | 
						|
        out = _run_output(interp, dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            if _interpreters.is_running({interp}):
 | 
						|
                print(True)
 | 
						|
            else:
 | 
						|
                print(False)
 | 
						|
            """))
 | 
						|
        self.assertEqual(out.strip(), 'True')
 | 
						|
 | 
						|
    def test_already_destroyed(self):
 | 
						|
        interp = interpreters.create()
 | 
						|
        interpreters.destroy(interp)
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.is_running(interp)
 | 
						|
 | 
						|
    def test_does_not_exist(self):
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.is_running(1_000_000)
 | 
						|
 | 
						|
    def test_bad_id(self):
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.is_running(-1)
 | 
						|
 | 
						|
 | 
						|
class InterpreterIDTests(TestBase):
 | 
						|
 | 
						|
    def test_with_int(self):
 | 
						|
        id = interpreters.InterpreterID(10, force=True)
 | 
						|
 | 
						|
        self.assertEqual(int(id), 10)
 | 
						|
 | 
						|
    def test_coerce_id(self):
 | 
						|
        id = interpreters.InterpreterID('10', force=True)
 | 
						|
        self.assertEqual(int(id), 10)
 | 
						|
 | 
						|
        id = interpreters.InterpreterID(10.0, force=True)
 | 
						|
        self.assertEqual(int(id), 10)
 | 
						|
 | 
						|
        class Int(str):
 | 
						|
            def __init__(self, value):
 | 
						|
                self._value = value
 | 
						|
            def __int__(self):
 | 
						|
                return self._value
 | 
						|
 | 
						|
        id = interpreters.InterpreterID(Int(10), force=True)
 | 
						|
        self.assertEqual(int(id), 10)
 | 
						|
 | 
						|
    def test_bad_id(self):
 | 
						|
        for id in [-1, 'spam']:
 | 
						|
            with self.subTest(id):
 | 
						|
                with self.assertRaises(ValueError):
 | 
						|
                    interpreters.InterpreterID(id)
 | 
						|
        with self.assertRaises(OverflowError):
 | 
						|
            interpreters.InterpreterID(2**64)
 | 
						|
        with self.assertRaises(TypeError):
 | 
						|
            interpreters.InterpreterID(object())
 | 
						|
 | 
						|
    def test_does_not_exist(self):
 | 
						|
        id = interpreters.channel_create()
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.InterpreterID(int(id) + 1)  # unforced
 | 
						|
 | 
						|
    def test_str(self):
 | 
						|
        id = interpreters.InterpreterID(10, force=True)
 | 
						|
        self.assertEqual(str(id), '10')
 | 
						|
 | 
						|
    def test_repr(self):
 | 
						|
        id = interpreters.InterpreterID(10, force=True)
 | 
						|
        self.assertEqual(repr(id), 'InterpreterID(10)')
 | 
						|
 | 
						|
    def test_equality(self):
 | 
						|
        id1 = interpreters.create()
 | 
						|
        id2 = interpreters.InterpreterID(int(id1))
 | 
						|
        id3 = interpreters.create()
 | 
						|
 | 
						|
        self.assertTrue(id1 == id1)
 | 
						|
        self.assertTrue(id1 == id2)
 | 
						|
        self.assertTrue(id1 == int(id1))
 | 
						|
        self.assertFalse(id1 == id3)
 | 
						|
 | 
						|
        self.assertFalse(id1 != id1)
 | 
						|
        self.assertFalse(id1 != id2)
 | 
						|
        self.assertTrue(id1 != id3)
 | 
						|
 | 
						|
 | 
						|
class CreateTests(TestBase):
 | 
						|
 | 
						|
    def test_in_main(self):
 | 
						|
        id = interpreters.create()
 | 
						|
        self.assertIsInstance(id, interpreters.InterpreterID)
 | 
						|
 | 
						|
        self.assertIn(id, interpreters.list_all())
 | 
						|
 | 
						|
    @unittest.skip('enable this test when working on pystate.c')
 | 
						|
    def test_unique_id(self):
 | 
						|
        seen = set()
 | 
						|
        for _ in range(100):
 | 
						|
            id = interpreters.create()
 | 
						|
            interpreters.destroy(id)
 | 
						|
            seen.add(id)
 | 
						|
 | 
						|
        self.assertEqual(len(seen), 100)
 | 
						|
 | 
						|
    def test_in_thread(self):
 | 
						|
        lock = threading.Lock()
 | 
						|
        id = None
 | 
						|
        def f():
 | 
						|
            nonlocal id
 | 
						|
            id = interpreters.create()
 | 
						|
            lock.acquire()
 | 
						|
            lock.release()
 | 
						|
 | 
						|
        t = threading.Thread(target=f)
 | 
						|
        with lock:
 | 
						|
            t.start()
 | 
						|
        t.join()
 | 
						|
        self.assertIn(id, interpreters.list_all())
 | 
						|
 | 
						|
    def test_in_subinterpreter(self):
 | 
						|
        main, = interpreters.list_all()
 | 
						|
        id1 = interpreters.create()
 | 
						|
        out = _run_output(id1, dedent("""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            id = _interpreters.create()
 | 
						|
            print(id)
 | 
						|
            assert isinstance(id, _interpreters.InterpreterID)
 | 
						|
            """))
 | 
						|
        id2 = int(out.strip())
 | 
						|
 | 
						|
        self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
 | 
						|
 | 
						|
    def test_in_threaded_subinterpreter(self):
 | 
						|
        main, = interpreters.list_all()
 | 
						|
        id1 = interpreters.create()
 | 
						|
        id2 = None
 | 
						|
        def f():
 | 
						|
            nonlocal id2
 | 
						|
            out = _run_output(id1, dedent("""
 | 
						|
                import _xxsubinterpreters as _interpreters
 | 
						|
                id = _interpreters.create()
 | 
						|
                print(id)
 | 
						|
                """))
 | 
						|
            id2 = int(out.strip())
 | 
						|
 | 
						|
        t = threading.Thread(target=f)
 | 
						|
        t.start()
 | 
						|
        t.join()
 | 
						|
 | 
						|
        self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
 | 
						|
 | 
						|
    def test_after_destroy_all(self):
 | 
						|
        before = set(interpreters.list_all())
 | 
						|
        # Create 3 subinterpreters.
 | 
						|
        ids = []
 | 
						|
        for _ in range(3):
 | 
						|
            id = interpreters.create()
 | 
						|
            ids.append(id)
 | 
						|
        # Now destroy them.
 | 
						|
        for id in ids:
 | 
						|
            interpreters.destroy(id)
 | 
						|
        # Finally, create another.
 | 
						|
        id = interpreters.create()
 | 
						|
        self.assertEqual(set(interpreters.list_all()), before | {id})
 | 
						|
 | 
						|
    def test_after_destroy_some(self):
 | 
						|
        before = set(interpreters.list_all())
 | 
						|
        # Create 3 subinterpreters.
 | 
						|
        id1 = interpreters.create()
 | 
						|
        id2 = interpreters.create()
 | 
						|
        id3 = interpreters.create()
 | 
						|
        # Now destroy 2 of them.
 | 
						|
        interpreters.destroy(id1)
 | 
						|
        interpreters.destroy(id3)
 | 
						|
        # Finally, create another.
 | 
						|
        id = interpreters.create()
 | 
						|
        self.assertEqual(set(interpreters.list_all()), before | {id, id2})
 | 
						|
 | 
						|
 | 
						|
class DestroyTests(TestBase):
 | 
						|
 | 
						|
    def test_one(self):
 | 
						|
        id1 = interpreters.create()
 | 
						|
        id2 = interpreters.create()
 | 
						|
        id3 = interpreters.create()
 | 
						|
        self.assertIn(id2, interpreters.list_all())
 | 
						|
        interpreters.destroy(id2)
 | 
						|
        self.assertNotIn(id2, interpreters.list_all())
 | 
						|
        self.assertIn(id1, interpreters.list_all())
 | 
						|
        self.assertIn(id3, interpreters.list_all())
 | 
						|
 | 
						|
    def test_all(self):
 | 
						|
        before = set(interpreters.list_all())
 | 
						|
        ids = set()
 | 
						|
        for _ in range(3):
 | 
						|
            id = interpreters.create()
 | 
						|
            ids.add(id)
 | 
						|
        self.assertEqual(set(interpreters.list_all()), before | ids)
 | 
						|
        for id in ids:
 | 
						|
            interpreters.destroy(id)
 | 
						|
        self.assertEqual(set(interpreters.list_all()), before)
 | 
						|
 | 
						|
    def test_main(self):
 | 
						|
        main, = interpreters.list_all()
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.destroy(main)
 | 
						|
 | 
						|
        def f():
 | 
						|
            with self.assertRaises(RuntimeError):
 | 
						|
                interpreters.destroy(main)
 | 
						|
 | 
						|
        t = threading.Thread(target=f)
 | 
						|
        t.start()
 | 
						|
        t.join()
 | 
						|
 | 
						|
    def test_already_destroyed(self):
 | 
						|
        id = interpreters.create()
 | 
						|
        interpreters.destroy(id)
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.destroy(id)
 | 
						|
 | 
						|
    def test_does_not_exist(self):
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.destroy(1_000_000)
 | 
						|
 | 
						|
    def test_bad_id(self):
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.destroy(-1)
 | 
						|
 | 
						|
    def test_from_current(self):
 | 
						|
        main, = interpreters.list_all()
 | 
						|
        id = interpreters.create()
 | 
						|
        script = dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            try:
 | 
						|
                _interpreters.destroy({id})
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
            """)
 | 
						|
 | 
						|
        interpreters.run_string(id, script)
 | 
						|
        self.assertEqual(set(interpreters.list_all()), {main, id})
 | 
						|
 | 
						|
    def test_from_sibling(self):
 | 
						|
        main, = interpreters.list_all()
 | 
						|
        id1 = interpreters.create()
 | 
						|
        id2 = interpreters.create()
 | 
						|
        script = dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            _interpreters.destroy({id2})
 | 
						|
            """)
 | 
						|
        interpreters.run_string(id1, script)
 | 
						|
 | 
						|
        self.assertEqual(set(interpreters.list_all()), {main, id1})
 | 
						|
 | 
						|
    def test_from_other_thread(self):
 | 
						|
        id = interpreters.create()
 | 
						|
        def f():
 | 
						|
            interpreters.destroy(id)
 | 
						|
 | 
						|
        t = threading.Thread(target=f)
 | 
						|
        t.start()
 | 
						|
        t.join()
 | 
						|
 | 
						|
    def test_still_running(self):
 | 
						|
        main, = interpreters.list_all()
 | 
						|
        interp = interpreters.create()
 | 
						|
        with _running(interp):
 | 
						|
            with self.assertRaises(RuntimeError):
 | 
						|
                interpreters.destroy(interp)
 | 
						|
            self.assertTrue(interpreters.is_running(interp))
 | 
						|
 | 
						|
 | 
						|
class RunStringTests(TestBase):
 | 
						|
 | 
						|
    SCRIPT = dedent("""
 | 
						|
        with open('{}', 'w') as out:
 | 
						|
            out.write('{}')
 | 
						|
        """)
 | 
						|
    FILENAME = 'spam'
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super().setUp()
 | 
						|
        self.id = interpreters.create()
 | 
						|
        self._fs = None
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        if self._fs is not None:
 | 
						|
            self._fs.close()
 | 
						|
        super().tearDown()
 | 
						|
 | 
						|
    @property
 | 
						|
    def fs(self):
 | 
						|
        if self._fs is None:
 | 
						|
            self._fs = FSFixture(self)
 | 
						|
        return self._fs
 | 
						|
 | 
						|
    def test_success(self):
 | 
						|
        script, file = _captured_script('print("it worked!", end="")')
 | 
						|
        with file:
 | 
						|
            interpreters.run_string(self.id, script)
 | 
						|
            out = file.read()
 | 
						|
 | 
						|
        self.assertEqual(out, 'it worked!')
 | 
						|
 | 
						|
    def test_in_thread(self):
 | 
						|
        script, file = _captured_script('print("it worked!", end="")')
 | 
						|
        with file:
 | 
						|
            def f():
 | 
						|
                interpreters.run_string(self.id, script)
 | 
						|
 | 
						|
            t = threading.Thread(target=f)
 | 
						|
            t.start()
 | 
						|
            t.join()
 | 
						|
            out = file.read()
 | 
						|
 | 
						|
        self.assertEqual(out, 'it worked!')
 | 
						|
 | 
						|
    def test_create_thread(self):
 | 
						|
        script, file = _captured_script("""
 | 
						|
            import threading
 | 
						|
            def f():
 | 
						|
                print('it worked!', end='')
 | 
						|
 | 
						|
            t = threading.Thread(target=f)
 | 
						|
            t.start()
 | 
						|
            t.join()
 | 
						|
            """)
 | 
						|
        with file:
 | 
						|
            interpreters.run_string(self.id, script)
 | 
						|
            out = file.read()
 | 
						|
 | 
						|
        self.assertEqual(out, 'it worked!')
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
 | 
						|
    def test_fork(self):
 | 
						|
        import tempfile
 | 
						|
        with tempfile.NamedTemporaryFile('w+') as file:
 | 
						|
            file.write('')
 | 
						|
            file.flush()
 | 
						|
 | 
						|
            expected = 'spam spam spam spam spam'
 | 
						|
            script = dedent(f"""
 | 
						|
                import os
 | 
						|
                try:
 | 
						|
                    os.fork()
 | 
						|
                except RuntimeError:
 | 
						|
                    with open('{file.name}', 'w') as out:
 | 
						|
                        out.write('{expected}')
 | 
						|
                """)
 | 
						|
            interpreters.run_string(self.id, script)
 | 
						|
 | 
						|
            file.seek(0)
 | 
						|
            content = file.read()
 | 
						|
            self.assertEqual(content, expected)
 | 
						|
 | 
						|
    def test_already_running(self):
 | 
						|
        with _running(self.id):
 | 
						|
            with self.assertRaises(RuntimeError):
 | 
						|
                interpreters.run_string(self.id, 'print("spam")')
 | 
						|
 | 
						|
    def test_does_not_exist(self):
 | 
						|
        id = 0
 | 
						|
        while id in interpreters.list_all():
 | 
						|
            id += 1
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.run_string(id, 'print("spam")')
 | 
						|
 | 
						|
    def test_error_id(self):
 | 
						|
        with self.assertRaises(RuntimeError):
 | 
						|
            interpreters.run_string(-1, 'print("spam")')
 | 
						|
 | 
						|
    def test_bad_id(self):
 | 
						|
        with self.assertRaises(TypeError):
 | 
						|
            interpreters.run_string('spam', 'print("spam")')
 | 
						|
 | 
						|
    def test_bad_script(self):
 | 
						|
        with self.assertRaises(TypeError):
 | 
						|
            interpreters.run_string(self.id, 10)
 | 
						|
 | 
						|
    def test_bytes_for_script(self):
 | 
						|
        with self.assertRaises(TypeError):
 | 
						|
            interpreters.run_string(self.id, b'print("spam")')
 | 
						|
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def assert_run_failed(self, exctype, msg=None):
 | 
						|
        with self.assertRaises(interpreters.RunFailedError) as caught:
 | 
						|
            yield
 | 
						|
        if msg is None:
 | 
						|
            self.assertEqual(str(caught.exception).split(':')[0],
 | 
						|
                             str(exctype))
 | 
						|
        else:
 | 
						|
            self.assertEqual(str(caught.exception),
 | 
						|
                             "{}: {}".format(exctype, msg))
 | 
						|
 | 
						|
    def test_invalid_syntax(self):
 | 
						|
        with self.assert_run_failed(SyntaxError):
 | 
						|
            # missing close paren
 | 
						|
            interpreters.run_string(self.id, 'print("spam"')
 | 
						|
 | 
						|
    def test_failure(self):
 | 
						|
        with self.assert_run_failed(Exception, 'spam'):
 | 
						|
            interpreters.run_string(self.id, 'raise Exception("spam")')
 | 
						|
 | 
						|
    def test_SystemExit(self):
 | 
						|
        with self.assert_run_failed(SystemExit, '42'):
 | 
						|
            interpreters.run_string(self.id, 'raise SystemExit(42)')
 | 
						|
 | 
						|
    def test_sys_exit(self):
 | 
						|
        with self.assert_run_failed(SystemExit):
 | 
						|
            interpreters.run_string(self.id, dedent("""
 | 
						|
                import sys
 | 
						|
                sys.exit()
 | 
						|
                """))
 | 
						|
 | 
						|
        with self.assert_run_failed(SystemExit, '42'):
 | 
						|
            interpreters.run_string(self.id, dedent("""
 | 
						|
                import sys
 | 
						|
                sys.exit(42)
 | 
						|
                """))
 | 
						|
 | 
						|
    def test_with_shared(self):
 | 
						|
        r, w = os.pipe()
 | 
						|
 | 
						|
        shared = {
 | 
						|
                'spam': b'ham',
 | 
						|
                'eggs': b'-1',
 | 
						|
                'cheddar': None,
 | 
						|
                }
 | 
						|
        script = dedent(f"""
 | 
						|
            eggs = int(eggs)
 | 
						|
            spam = 42
 | 
						|
            result = spam + eggs
 | 
						|
 | 
						|
            ns = dict(vars())
 | 
						|
            del ns['__builtins__']
 | 
						|
            import pickle
 | 
						|
            with open({w}, 'wb') as chan:
 | 
						|
                pickle.dump(ns, chan)
 | 
						|
            """)
 | 
						|
        interpreters.run_string(self.id, script, shared)
 | 
						|
        with open(r, 'rb') as chan:
 | 
						|
            ns = pickle.load(chan)
 | 
						|
 | 
						|
        self.assertEqual(ns['spam'], 42)
 | 
						|
        self.assertEqual(ns['eggs'], -1)
 | 
						|
        self.assertEqual(ns['result'], 41)
 | 
						|
        self.assertIsNone(ns['cheddar'])
 | 
						|
 | 
						|
    def test_shared_overwrites(self):
 | 
						|
        interpreters.run_string(self.id, dedent("""
 | 
						|
            spam = 'eggs'
 | 
						|
            ns1 = dict(vars())
 | 
						|
            del ns1['__builtins__']
 | 
						|
            """))
 | 
						|
 | 
						|
        shared = {'spam': b'ham'}
 | 
						|
        script = dedent(f"""
 | 
						|
            ns2 = dict(vars())
 | 
						|
            del ns2['__builtins__']
 | 
						|
        """)
 | 
						|
        interpreters.run_string(self.id, script, shared)
 | 
						|
 | 
						|
        r, w = os.pipe()
 | 
						|
        script = dedent(f"""
 | 
						|
            ns = dict(vars())
 | 
						|
            del ns['__builtins__']
 | 
						|
            import pickle
 | 
						|
            with open({w}, 'wb') as chan:
 | 
						|
                pickle.dump(ns, chan)
 | 
						|
            """)
 | 
						|
        interpreters.run_string(self.id, script)
 | 
						|
        with open(r, 'rb') as chan:
 | 
						|
            ns = pickle.load(chan)
 | 
						|
 | 
						|
        self.assertEqual(ns['ns1']['spam'], 'eggs')
 | 
						|
        self.assertEqual(ns['ns2']['spam'], b'ham')
 | 
						|
        self.assertEqual(ns['spam'], b'ham')
 | 
						|
 | 
						|
    def test_shared_overwrites_default_vars(self):
 | 
						|
        r, w = os.pipe()
 | 
						|
 | 
						|
        shared = {'__name__': b'not __main__'}
 | 
						|
        script = dedent(f"""
 | 
						|
            spam = 42
 | 
						|
 | 
						|
            ns = dict(vars())
 | 
						|
            del ns['__builtins__']
 | 
						|
            import pickle
 | 
						|
            with open({w}, 'wb') as chan:
 | 
						|
                pickle.dump(ns, chan)
 | 
						|
            """)
 | 
						|
        interpreters.run_string(self.id, script, shared)
 | 
						|
        with open(r, 'rb') as chan:
 | 
						|
            ns = pickle.load(chan)
 | 
						|
 | 
						|
        self.assertEqual(ns['__name__'], b'not __main__')
 | 
						|
 | 
						|
    def test_main_reused(self):
 | 
						|
        r, w = os.pipe()
 | 
						|
        interpreters.run_string(self.id, dedent(f"""
 | 
						|
            spam = True
 | 
						|
 | 
						|
            ns = dict(vars())
 | 
						|
            del ns['__builtins__']
 | 
						|
            import pickle
 | 
						|
            with open({w}, 'wb') as chan:
 | 
						|
                pickle.dump(ns, chan)
 | 
						|
            del ns, pickle, chan
 | 
						|
            """))
 | 
						|
        with open(r, 'rb') as chan:
 | 
						|
            ns1 = pickle.load(chan)
 | 
						|
 | 
						|
        r, w = os.pipe()
 | 
						|
        interpreters.run_string(self.id, dedent(f"""
 | 
						|
            eggs = False
 | 
						|
 | 
						|
            ns = dict(vars())
 | 
						|
            del ns['__builtins__']
 | 
						|
            import pickle
 | 
						|
            with open({w}, 'wb') as chan:
 | 
						|
                pickle.dump(ns, chan)
 | 
						|
            """))
 | 
						|
        with open(r, 'rb') as chan:
 | 
						|
            ns2 = pickle.load(chan)
 | 
						|
 | 
						|
        self.assertIn('spam', ns1)
 | 
						|
        self.assertNotIn('eggs', ns1)
 | 
						|
        self.assertIn('eggs', ns2)
 | 
						|
        self.assertIn('spam', ns2)
 | 
						|
 | 
						|
    def test_execution_namespace_is_main(self):
 | 
						|
        r, w = os.pipe()
 | 
						|
 | 
						|
        script = dedent(f"""
 | 
						|
            spam = 42
 | 
						|
 | 
						|
            ns = dict(vars())
 | 
						|
            ns['__builtins__'] = str(ns['__builtins__'])
 | 
						|
            import pickle
 | 
						|
            with open({w}, 'wb') as chan:
 | 
						|
                pickle.dump(ns, chan)
 | 
						|
            """)
 | 
						|
        interpreters.run_string(self.id, script)
 | 
						|
        with open(r, 'rb') as chan:
 | 
						|
            ns = pickle.load(chan)
 | 
						|
 | 
						|
        ns.pop('__builtins__')
 | 
						|
        ns.pop('__loader__')
 | 
						|
        self.assertEqual(ns, {
 | 
						|
            '__name__': '__main__',
 | 
						|
            '__annotations__': {},
 | 
						|
            '__doc__': None,
 | 
						|
            '__package__': None,
 | 
						|
            '__spec__': None,
 | 
						|
            'spam': 42,
 | 
						|
            })
 | 
						|
 | 
						|
    # XXX Fix this test!
 | 
						|
    @unittest.skip('blocking forever')
 | 
						|
    def test_still_running_at_exit(self):
 | 
						|
        script = dedent(f"""
 | 
						|
        from textwrap import dedent
 | 
						|
        import threading
 | 
						|
        import _xxsubinterpreters as _interpreters
 | 
						|
        id = _interpreters.create()
 | 
						|
        def f():
 | 
						|
            _interpreters.run_string(id, dedent('''
 | 
						|
                import time
 | 
						|
                # Give plenty of time for the main interpreter to finish.
 | 
						|
                time.sleep(1_000_000)
 | 
						|
                '''))
 | 
						|
 | 
						|
        t = threading.Thread(target=f)
 | 
						|
        t.start()
 | 
						|
        """)
 | 
						|
        with support.temp_dir() as dirname:
 | 
						|
            filename = script_helper.make_script(dirname, 'interp', script)
 | 
						|
            with script_helper.spawn_python(filename) as proc:
 | 
						|
                retcode = proc.wait()
 | 
						|
 | 
						|
        self.assertEqual(retcode, 0)
 | 
						|
 | 
						|
 | 
						|
##################################
 | 
						|
# channel tests
 | 
						|
 | 
						|
class ChannelIDTests(TestBase):
 | 
						|
 | 
						|
    def test_default_kwargs(self):
 | 
						|
        cid = interpreters._channel_id(10, force=True)
 | 
						|
 | 
						|
        self.assertEqual(int(cid), 10)
 | 
						|
        self.assertEqual(cid.end, 'both')
 | 
						|
 | 
						|
    def test_with_kwargs(self):
 | 
						|
        cid = interpreters._channel_id(10, send=True, force=True)
 | 
						|
        self.assertEqual(cid.end, 'send')
 | 
						|
 | 
						|
        cid = interpreters._channel_id(10, send=True, recv=False, force=True)
 | 
						|
        self.assertEqual(cid.end, 'send')
 | 
						|
 | 
						|
        cid = interpreters._channel_id(10, recv=True, force=True)
 | 
						|
        self.assertEqual(cid.end, 'recv')
 | 
						|
 | 
						|
        cid = interpreters._channel_id(10, recv=True, send=False, force=True)
 | 
						|
        self.assertEqual(cid.end, 'recv')
 | 
						|
 | 
						|
        cid = interpreters._channel_id(10, send=True, recv=True, force=True)
 | 
						|
        self.assertEqual(cid.end, 'both')
 | 
						|
 | 
						|
    def test_coerce_id(self):
 | 
						|
        cid = interpreters._channel_id('10', force=True)
 | 
						|
        self.assertEqual(int(cid), 10)
 | 
						|
 | 
						|
        cid = interpreters._channel_id(10.0, force=True)
 | 
						|
        self.assertEqual(int(cid), 10)
 | 
						|
 | 
						|
        class Int(str):
 | 
						|
            def __init__(self, value):
 | 
						|
                self._value = value
 | 
						|
            def __int__(self):
 | 
						|
                return self._value
 | 
						|
 | 
						|
        cid = interpreters._channel_id(Int(10), force=True)
 | 
						|
        self.assertEqual(int(cid), 10)
 | 
						|
 | 
						|
    def test_bad_id(self):
 | 
						|
        for cid in [-1, 'spam']:
 | 
						|
            with self.subTest(cid):
 | 
						|
                with self.assertRaises(ValueError):
 | 
						|
                    interpreters._channel_id(cid)
 | 
						|
        with self.assertRaises(OverflowError):
 | 
						|
            interpreters._channel_id(2**64)
 | 
						|
        with self.assertRaises(TypeError):
 | 
						|
            interpreters._channel_id(object())
 | 
						|
 | 
						|
    def test_bad_kwargs(self):
 | 
						|
        with self.assertRaises(ValueError):
 | 
						|
            interpreters._channel_id(10, send=False, recv=False)
 | 
						|
 | 
						|
    def test_does_not_exist(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        with self.assertRaises(interpreters.ChannelNotFoundError):
 | 
						|
            interpreters._channel_id(int(cid) + 1)  # unforced
 | 
						|
 | 
						|
    def test_str(self):
 | 
						|
        cid = interpreters._channel_id(10, force=True)
 | 
						|
        self.assertEqual(str(cid), '10')
 | 
						|
 | 
						|
    def test_repr(self):
 | 
						|
        cid = interpreters._channel_id(10, force=True)
 | 
						|
        self.assertEqual(repr(cid), 'ChannelID(10)')
 | 
						|
 | 
						|
        cid = interpreters._channel_id(10, send=True, force=True)
 | 
						|
        self.assertEqual(repr(cid), 'ChannelID(10, send=True)')
 | 
						|
 | 
						|
        cid = interpreters._channel_id(10, recv=True, force=True)
 | 
						|
        self.assertEqual(repr(cid), 'ChannelID(10, recv=True)')
 | 
						|
 | 
						|
        cid = interpreters._channel_id(10, send=True, recv=True, force=True)
 | 
						|
        self.assertEqual(repr(cid), 'ChannelID(10)')
 | 
						|
 | 
						|
    def test_equality(self):
 | 
						|
        cid1 = interpreters.channel_create()
 | 
						|
        cid2 = interpreters._channel_id(int(cid1))
 | 
						|
        cid3 = interpreters.channel_create()
 | 
						|
 | 
						|
        self.assertTrue(cid1 == cid1)
 | 
						|
        self.assertTrue(cid1 == cid2)
 | 
						|
        self.assertTrue(cid1 == int(cid1))
 | 
						|
        self.assertFalse(cid1 == cid3)
 | 
						|
 | 
						|
        self.assertFalse(cid1 != cid1)
 | 
						|
        self.assertFalse(cid1 != cid2)
 | 
						|
        self.assertTrue(cid1 != cid3)
 | 
						|
 | 
						|
 | 
						|
class ChannelTests(TestBase):
 | 
						|
 | 
						|
    def test_create_cid(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        self.assertIsInstance(cid, interpreters.ChannelID)
 | 
						|
 | 
						|
    def test_sequential_ids(self):
 | 
						|
        before = interpreters.channel_list_all()
 | 
						|
        id1 = interpreters.channel_create()
 | 
						|
        id2 = interpreters.channel_create()
 | 
						|
        id3 = interpreters.channel_create()
 | 
						|
        after = interpreters.channel_list_all()
 | 
						|
 | 
						|
        self.assertEqual(id2, int(id1) + 1)
 | 
						|
        self.assertEqual(id3, int(id2) + 1)
 | 
						|
        self.assertEqual(set(after) - set(before), {id1, id2, id3})
 | 
						|
 | 
						|
    def test_ids_global(self):
 | 
						|
        id1 = interpreters.create()
 | 
						|
        out = _run_output(id1, dedent("""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            cid = _interpreters.channel_create()
 | 
						|
            print(cid)
 | 
						|
            """))
 | 
						|
        cid1 = int(out.strip())
 | 
						|
 | 
						|
        id2 = interpreters.create()
 | 
						|
        out = _run_output(id2, dedent("""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            cid = _interpreters.channel_create()
 | 
						|
            print(cid)
 | 
						|
            """))
 | 
						|
        cid2 = int(out.strip())
 | 
						|
 | 
						|
        self.assertEqual(cid2, int(cid1) + 1)
 | 
						|
 | 
						|
    ####################
 | 
						|
 | 
						|
    def test_send_recv_main(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        orig = b'spam'
 | 
						|
        interpreters.channel_send(cid, orig)
 | 
						|
        obj = interpreters.channel_recv(cid)
 | 
						|
 | 
						|
        self.assertEqual(obj, orig)
 | 
						|
        self.assertIsNot(obj, orig)
 | 
						|
 | 
						|
    def test_send_recv_same_interpreter(self):
 | 
						|
        id1 = interpreters.create()
 | 
						|
        out = _run_output(id1, dedent("""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            cid = _interpreters.channel_create()
 | 
						|
            orig = b'spam'
 | 
						|
            _interpreters.channel_send(cid, orig)
 | 
						|
            obj = _interpreters.channel_recv(cid)
 | 
						|
            assert obj is not orig
 | 
						|
            assert obj == orig
 | 
						|
            """))
 | 
						|
 | 
						|
    def test_send_recv_different_interpreters(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        id1 = interpreters.create()
 | 
						|
        out = _run_output(id1, dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            _interpreters.channel_send({cid}, b'spam')
 | 
						|
            """))
 | 
						|
        obj = interpreters.channel_recv(cid)
 | 
						|
 | 
						|
        self.assertEqual(obj, b'spam')
 | 
						|
 | 
						|
    def test_send_recv_different_threads(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
 | 
						|
        def f():
 | 
						|
            while True:
 | 
						|
                try:
 | 
						|
                    obj = interpreters.channel_recv(cid)
 | 
						|
                    break
 | 
						|
                except interpreters.ChannelEmptyError:
 | 
						|
                    time.sleep(0.1)
 | 
						|
            interpreters.channel_send(cid, obj)
 | 
						|
        t = threading.Thread(target=f)
 | 
						|
        t.start()
 | 
						|
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        t.join()
 | 
						|
        obj = interpreters.channel_recv(cid)
 | 
						|
 | 
						|
        self.assertEqual(obj, b'spam')
 | 
						|
 | 
						|
    def test_send_recv_different_interpreters_and_threads(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        id1 = interpreters.create()
 | 
						|
        out = None
 | 
						|
 | 
						|
        def f():
 | 
						|
            nonlocal out
 | 
						|
            out = _run_output(id1, dedent(f"""
 | 
						|
                import time
 | 
						|
                import _xxsubinterpreters as _interpreters
 | 
						|
                while True:
 | 
						|
                    try:
 | 
						|
                        obj = _interpreters.channel_recv({cid})
 | 
						|
                        break
 | 
						|
                    except _interpreters.ChannelEmptyError:
 | 
						|
                        time.sleep(0.1)
 | 
						|
                assert(obj == b'spam')
 | 
						|
                _interpreters.channel_send({cid}, b'eggs')
 | 
						|
                """))
 | 
						|
        t = threading.Thread(target=f)
 | 
						|
        t.start()
 | 
						|
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        t.join()
 | 
						|
        obj = interpreters.channel_recv(cid)
 | 
						|
 | 
						|
        self.assertEqual(obj, b'eggs')
 | 
						|
 | 
						|
    def test_send_not_found(self):
 | 
						|
        with self.assertRaises(interpreters.ChannelNotFoundError):
 | 
						|
            interpreters.channel_send(10, b'spam')
 | 
						|
 | 
						|
    def test_recv_not_found(self):
 | 
						|
        with self.assertRaises(interpreters.ChannelNotFoundError):
 | 
						|
            interpreters.channel_recv(10)
 | 
						|
 | 
						|
    def test_recv_empty(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        with self.assertRaises(interpreters.ChannelEmptyError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_run_string_arg_unresolved(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interp = interpreters.create()
 | 
						|
 | 
						|
        out = _run_output(interp, dedent("""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            print(cid.end)
 | 
						|
            _interpreters.channel_send(cid, b'spam')
 | 
						|
            """),
 | 
						|
            dict(cid=cid.send))
 | 
						|
        obj = interpreters.channel_recv(cid)
 | 
						|
 | 
						|
        self.assertEqual(obj, b'spam')
 | 
						|
        self.assertEqual(out.strip(), 'send')
 | 
						|
 | 
						|
    # XXX For now there is no high-level channel into which the
 | 
						|
    # sent channel ID can be converted...
 | 
						|
    # Note: this test caused crashes on some buildbots (bpo-33615).
 | 
						|
    @unittest.skip('disabled until high-level channels exist')
 | 
						|
    def test_run_string_arg_resolved(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        cid = interpreters._channel_id(cid, _resolve=True)
 | 
						|
        interp = interpreters.create()
 | 
						|
 | 
						|
        out = _run_output(interp, dedent("""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            print(chan.id.end)
 | 
						|
            _interpreters.channel_send(chan.id, b'spam')
 | 
						|
            """),
 | 
						|
            dict(chan=cid.send))
 | 
						|
        obj = interpreters.channel_recv(cid)
 | 
						|
 | 
						|
        self.assertEqual(obj, b'spam')
 | 
						|
        self.assertEqual(out.strip(), 'send')
 | 
						|
 | 
						|
    # close
 | 
						|
 | 
						|
    def test_close_single_user(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_close(cid)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_close_multiple_users(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        id1 = interpreters.create()
 | 
						|
        id2 = interpreters.create()
 | 
						|
        interpreters.run_string(id1, dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            _interpreters.channel_send({cid}, b'spam')
 | 
						|
            """))
 | 
						|
        interpreters.run_string(id2, dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            _interpreters.channel_recv({cid})
 | 
						|
            """))
 | 
						|
        interpreters.channel_close(cid)
 | 
						|
        with self.assertRaises(interpreters.RunFailedError) as cm:
 | 
						|
            interpreters.run_string(id1, dedent(f"""
 | 
						|
                _interpreters.channel_send({cid}, b'spam')
 | 
						|
                """))
 | 
						|
        self.assertIn('ChannelClosedError', str(cm.exception))
 | 
						|
        with self.assertRaises(interpreters.RunFailedError) as cm:
 | 
						|
            interpreters.run_string(id2, dedent(f"""
 | 
						|
                _interpreters.channel_send({cid}, b'spam')
 | 
						|
                """))
 | 
						|
        self.assertIn('ChannelClosedError', str(cm.exception))
 | 
						|
 | 
						|
    def test_close_multiple_times(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_close(cid)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_close(cid)
 | 
						|
 | 
						|
    def test_close_empty(self):
 | 
						|
        tests = [
 | 
						|
            (False, False),
 | 
						|
            (True, False),
 | 
						|
            (False, True),
 | 
						|
            (True, True),
 | 
						|
            ]
 | 
						|
        for send, recv in tests:
 | 
						|
            with self.subTest((send, recv)):
 | 
						|
                cid = interpreters.channel_create()
 | 
						|
                interpreters.channel_send(cid, b'spam')
 | 
						|
                interpreters.channel_recv(cid)
 | 
						|
                interpreters.channel_close(cid, send=send, recv=recv)
 | 
						|
 | 
						|
                with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
                    interpreters.channel_send(cid, b'eggs')
 | 
						|
                with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
                    interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_close_defaults_with_unused_items(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'ham')
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelNotEmptyError):
 | 
						|
            interpreters.channel_close(cid)
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_send(cid, b'eggs')
 | 
						|
 | 
						|
    def test_close_recv_with_unused_items_unforced(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'ham')
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelNotEmptyError):
 | 
						|
            interpreters.channel_close(cid, recv=True)
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_send(cid, b'eggs')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_close(cid, recv=True)
 | 
						|
 | 
						|
    def test_close_send_with_unused_items_unforced(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'ham')
 | 
						|
        interpreters.channel_close(cid, send=True)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_close_both_with_unused_items_unforced(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'ham')
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelNotEmptyError):
 | 
						|
            interpreters.channel_close(cid, recv=True, send=True)
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_send(cid, b'eggs')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_close(cid, recv=True)
 | 
						|
 | 
						|
    def test_close_recv_with_unused_items_forced(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'ham')
 | 
						|
        interpreters.channel_close(cid, recv=True, force=True)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_close_send_with_unused_items_forced(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'ham')
 | 
						|
        interpreters.channel_close(cid, send=True, force=True)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_close_both_with_unused_items_forced(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'ham')
 | 
						|
        interpreters.channel_close(cid, send=True, recv=True, force=True)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_close_never_used(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_close(cid)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'spam')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_close_by_unassociated_interp(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interp = interpreters.create()
 | 
						|
        interpreters.run_string(interp, dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            _interpreters.channel_close({cid}, force=True)
 | 
						|
            """))
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_close(cid)
 | 
						|
 | 
						|
    def test_close_used_multiple_times_by_single_user(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_close(cid, force=True)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
 | 
						|
class ChannelReleaseTests(TestBase):
 | 
						|
 | 
						|
    # XXX Add more test coverage a la the tests for close().
 | 
						|
 | 
						|
    """
 | 
						|
    - main / interp / other
 | 
						|
    - run in: current thread / new thread / other thread / different threads
 | 
						|
    - end / opposite
 | 
						|
    - force / no force
 | 
						|
    - used / not used  (associated / not associated)
 | 
						|
    - empty / emptied / never emptied / partly emptied
 | 
						|
    - closed / not closed
 | 
						|
    - released / not released
 | 
						|
    - creator (interp) / other
 | 
						|
    - associated interpreter not running
 | 
						|
    - associated interpreter destroyed
 | 
						|
    """
 | 
						|
 | 
						|
    """
 | 
						|
    use
 | 
						|
    pre-release
 | 
						|
    release
 | 
						|
    after
 | 
						|
    check
 | 
						|
    """
 | 
						|
 | 
						|
    """
 | 
						|
    release in:         main, interp1
 | 
						|
    creator:            same, other (incl. interp2)
 | 
						|
 | 
						|
    use:                None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
 | 
						|
    pre-release:        None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
 | 
						|
    pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
 | 
						|
 | 
						|
    release:            same
 | 
						|
    release forced:     same
 | 
						|
 | 
						|
    use after:          None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
 | 
						|
    release after:      None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
 | 
						|
    check released:     send/recv for same/other(incl. interp2)
 | 
						|
    check closed:       send/recv for same/other(incl. interp2)
 | 
						|
    """
 | 
						|
 | 
						|
    def test_single_user(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_release(cid, send=True, recv=True)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_multiple_users(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        id1 = interpreters.create()
 | 
						|
        id2 = interpreters.create()
 | 
						|
        interpreters.run_string(id1, dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            _interpreters.channel_send({cid}, b'spam')
 | 
						|
            """))
 | 
						|
        out = _run_output(id2, dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            obj = _interpreters.channel_recv({cid})
 | 
						|
            _interpreters.channel_release({cid})
 | 
						|
            print(repr(obj))
 | 
						|
            """))
 | 
						|
        interpreters.run_string(id1, dedent(f"""
 | 
						|
            _interpreters.channel_release({cid})
 | 
						|
            """))
 | 
						|
 | 
						|
        self.assertEqual(out.strip(), "b'spam'")
 | 
						|
 | 
						|
    def test_no_kwargs(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_release(cid)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_multiple_times(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_release(cid, send=True, recv=True)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_release(cid, send=True, recv=True)
 | 
						|
 | 
						|
    def test_with_unused_items(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'ham')
 | 
						|
        interpreters.channel_release(cid, send=True, recv=True)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_never_used(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_release(cid)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'spam')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_by_unassociated_interp(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interp = interpreters.create()
 | 
						|
        interpreters.run_string(interp, dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            _interpreters.channel_release({cid})
 | 
						|
            """))
 | 
						|
        obj = interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_release(cid)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        self.assertEqual(obj, b'spam')
 | 
						|
 | 
						|
    def test_close_if_unassociated(self):
 | 
						|
        # XXX Something's not right with this test...
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interp = interpreters.create()
 | 
						|
        interpreters.run_string(interp, dedent(f"""
 | 
						|
            import _xxsubinterpreters as _interpreters
 | 
						|
            obj = _interpreters.channel_send({cid}, b'spam')
 | 
						|
            _interpreters.channel_release({cid})
 | 
						|
            """))
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
    def test_partially(self):
 | 
						|
        # XXX Is partial close too weird/confusing?
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, None)
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_release(cid, send=True)
 | 
						|
        obj = interpreters.channel_recv(cid)
 | 
						|
 | 
						|
        self.assertEqual(obj, b'spam')
 | 
						|
 | 
						|
    def test_used_multiple_times_by_single_user(self):
 | 
						|
        cid = interpreters.channel_create()
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_send(cid, b'spam')
 | 
						|
        interpreters.channel_recv(cid)
 | 
						|
        interpreters.channel_release(cid, send=True, recv=True)
 | 
						|
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_send(cid, b'eggs')
 | 
						|
        with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
            interpreters.channel_recv(cid)
 | 
						|
 | 
						|
 | 
						|
class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
 | 
						|
                                     'end interp other extra creator')):
 | 
						|
 | 
						|
    # Set this to True to avoid creating interpreters, e.g. when
 | 
						|
    # scanning through test permutations without running them.
 | 
						|
    QUICK = False
 | 
						|
 | 
						|
    def __new__(cls, end, interp, other, extra, creator):
 | 
						|
        assert end in ('send', 'recv')
 | 
						|
        if cls.QUICK:
 | 
						|
            known = {}
 | 
						|
        else:
 | 
						|
            interp = Interpreter.from_raw(interp)
 | 
						|
            other = Interpreter.from_raw(other)
 | 
						|
            extra = Interpreter.from_raw(extra)
 | 
						|
            known = {
 | 
						|
                interp.name: interp,
 | 
						|
                other.name: other,
 | 
						|
                extra.name: extra,
 | 
						|
                }
 | 
						|
        if not creator:
 | 
						|
            creator = 'same'
 | 
						|
        self = super().__new__(cls, end, interp, other, extra, creator)
 | 
						|
        self._prepped = set()
 | 
						|
        self._state = ChannelState()
 | 
						|
        self._known = known
 | 
						|
        return self
 | 
						|
 | 
						|
    @property
 | 
						|
    def state(self):
 | 
						|
        return self._state
 | 
						|
 | 
						|
    @property
 | 
						|
    def cid(self):
 | 
						|
        try:
 | 
						|
            return self._cid
 | 
						|
        except AttributeError:
 | 
						|
            creator = self._get_interpreter(self.creator)
 | 
						|
            self._cid = self._new_channel(creator)
 | 
						|
            return self._cid
 | 
						|
 | 
						|
    def get_interpreter(self, interp):
 | 
						|
        interp = self._get_interpreter(interp)
 | 
						|
        self._prep_interpreter(interp)
 | 
						|
        return interp
 | 
						|
 | 
						|
    def expect_closed_error(self, end=None):
 | 
						|
        if end is None:
 | 
						|
            end = self.end
 | 
						|
        if end == 'recv' and self.state.closed == 'send':
 | 
						|
            return False
 | 
						|
        return bool(self.state.closed)
 | 
						|
 | 
						|
    def prep_interpreter(self, interp):
 | 
						|
        self._prep_interpreter(interp)
 | 
						|
 | 
						|
    def record_action(self, action, result):
 | 
						|
        self._state = result
 | 
						|
 | 
						|
    def clean_up(self):
 | 
						|
        clean_up_interpreters()
 | 
						|
        clean_up_channels()
 | 
						|
 | 
						|
    # internal methods
 | 
						|
 | 
						|
    def _new_channel(self, creator):
 | 
						|
        if creator.name == 'main':
 | 
						|
            return interpreters.channel_create()
 | 
						|
        else:
 | 
						|
            ch = interpreters.channel_create()
 | 
						|
            run_interp(creator.id, f"""
 | 
						|
                import _xxsubinterpreters
 | 
						|
                cid = _xxsubinterpreters.channel_create()
 | 
						|
                # We purposefully send back an int to avoid tying the
 | 
						|
                # channel to the other interpreter.
 | 
						|
                _xxsubinterpreters.channel_send({ch}, int(cid))
 | 
						|
                del _xxsubinterpreters
 | 
						|
                """)
 | 
						|
            self._cid = interpreters.channel_recv(ch)
 | 
						|
        return self._cid
 | 
						|
 | 
						|
    def _get_interpreter(self, interp):
 | 
						|
        if interp in ('same', 'interp'):
 | 
						|
            return self.interp
 | 
						|
        elif interp == 'other':
 | 
						|
            return self.other
 | 
						|
        elif interp == 'extra':
 | 
						|
            return self.extra
 | 
						|
        else:
 | 
						|
            name = interp
 | 
						|
            try:
 | 
						|
                interp = self._known[name]
 | 
						|
            except KeyError:
 | 
						|
                interp = self._known[name] = Interpreter(name)
 | 
						|
            return interp
 | 
						|
 | 
						|
    def _prep_interpreter(self, interp):
 | 
						|
        if interp.id in self._prepped:
 | 
						|
            return
 | 
						|
        self._prepped.add(interp.id)
 | 
						|
        if interp.name == 'main':
 | 
						|
            return
 | 
						|
        run_interp(interp.id, f"""
 | 
						|
            import _xxsubinterpreters as interpreters
 | 
						|
            import test.test__xxsubinterpreters as helpers
 | 
						|
            ChannelState = helpers.ChannelState
 | 
						|
            try:
 | 
						|
                cid
 | 
						|
            except NameError:
 | 
						|
                cid = interpreters._channel_id({self.cid})
 | 
						|
            """)
 | 
						|
 | 
						|
 | 
						|
@unittest.skip('these tests take several hours to run')
 | 
						|
class ExhaustiveChannelTests(TestBase):
 | 
						|
 | 
						|
    """
 | 
						|
    - main / interp / other
 | 
						|
    - run in: current thread / new thread / other thread / different threads
 | 
						|
    - end / opposite
 | 
						|
    - force / no force
 | 
						|
    - used / not used  (associated / not associated)
 | 
						|
    - empty / emptied / never emptied / partly emptied
 | 
						|
    - closed / not closed
 | 
						|
    - released / not released
 | 
						|
    - creator (interp) / other
 | 
						|
    - associated interpreter not running
 | 
						|
    - associated interpreter destroyed
 | 
						|
 | 
						|
    - close after unbound
 | 
						|
    """
 | 
						|
 | 
						|
    """
 | 
						|
    use
 | 
						|
    pre-close
 | 
						|
    close
 | 
						|
    after
 | 
						|
    check
 | 
						|
    """
 | 
						|
 | 
						|
    """
 | 
						|
    close in:         main, interp1
 | 
						|
    creator:          same, other, extra
 | 
						|
 | 
						|
    use:              None,send,recv,send/recv in None,same,other,same+other,all
 | 
						|
    pre-close:        None,send,recv in None,same,other,same+other,all
 | 
						|
    pre-close forced: None,send,recv in None,same,other,same+other,all
 | 
						|
 | 
						|
    close:            same
 | 
						|
    close forced:     same
 | 
						|
 | 
						|
    use after:        None,send,recv,send/recv in None,same,other,extra,same+other,all
 | 
						|
    close after:      None,send,recv,send/recv in None,same,other,extra,same+other,all
 | 
						|
    check closed:     send/recv for same/other(incl. interp2)
 | 
						|
    """
 | 
						|
 | 
						|
    def iter_action_sets(self):
 | 
						|
        # - used / not used  (associated / not associated)
 | 
						|
        # - empty / emptied / never emptied / partly emptied
 | 
						|
        # - closed / not closed
 | 
						|
        # - released / not released
 | 
						|
 | 
						|
        # never used
 | 
						|
        yield []
 | 
						|
 | 
						|
        # only pre-closed (and possible used after)
 | 
						|
        for closeactions in self._iter_close_action_sets('same', 'other'):
 | 
						|
            yield closeactions
 | 
						|
            for postactions in self._iter_post_close_action_sets():
 | 
						|
                yield closeactions + postactions
 | 
						|
        for closeactions in self._iter_close_action_sets('other', 'extra'):
 | 
						|
            yield closeactions
 | 
						|
            for postactions in self._iter_post_close_action_sets():
 | 
						|
                yield closeactions + postactions
 | 
						|
 | 
						|
        # used
 | 
						|
        for useactions in self._iter_use_action_sets('same', 'other'):
 | 
						|
            yield useactions
 | 
						|
            for closeactions in self._iter_close_action_sets('same', 'other'):
 | 
						|
                actions = useactions + closeactions
 | 
						|
                yield actions
 | 
						|
                for postactions in self._iter_post_close_action_sets():
 | 
						|
                    yield actions + postactions
 | 
						|
            for closeactions in self._iter_close_action_sets('other', 'extra'):
 | 
						|
                actions = useactions + closeactions
 | 
						|
                yield actions
 | 
						|
                for postactions in self._iter_post_close_action_sets():
 | 
						|
                    yield actions + postactions
 | 
						|
        for useactions in self._iter_use_action_sets('other', 'extra'):
 | 
						|
            yield useactions
 | 
						|
            for closeactions in self._iter_close_action_sets('same', 'other'):
 | 
						|
                actions = useactions + closeactions
 | 
						|
                yield actions
 | 
						|
                for postactions in self._iter_post_close_action_sets():
 | 
						|
                    yield actions + postactions
 | 
						|
            for closeactions in self._iter_close_action_sets('other', 'extra'):
 | 
						|
                actions = useactions + closeactions
 | 
						|
                yield actions
 | 
						|
                for postactions in self._iter_post_close_action_sets():
 | 
						|
                    yield actions + postactions
 | 
						|
 | 
						|
    def _iter_use_action_sets(self, interp1, interp2):
 | 
						|
        interps = (interp1, interp2)
 | 
						|
 | 
						|
        # only recv end used
 | 
						|
        yield [
 | 
						|
            ChannelAction('use', 'recv', interp1),
 | 
						|
            ]
 | 
						|
        yield [
 | 
						|
            ChannelAction('use', 'recv', interp2),
 | 
						|
            ]
 | 
						|
        yield [
 | 
						|
            ChannelAction('use', 'recv', interp1),
 | 
						|
            ChannelAction('use', 'recv', interp2),
 | 
						|
            ]
 | 
						|
 | 
						|
        # never emptied
 | 
						|
        yield [
 | 
						|
            ChannelAction('use', 'send', interp1),
 | 
						|
            ]
 | 
						|
        yield [
 | 
						|
            ChannelAction('use', 'send', interp2),
 | 
						|
            ]
 | 
						|
        yield [
 | 
						|
            ChannelAction('use', 'send', interp1),
 | 
						|
            ChannelAction('use', 'send', interp2),
 | 
						|
            ]
 | 
						|
 | 
						|
        # partially emptied
 | 
						|
        for interp1 in interps:
 | 
						|
            for interp2 in interps:
 | 
						|
                for interp3 in interps:
 | 
						|
                    yield [
 | 
						|
                        ChannelAction('use', 'send', interp1),
 | 
						|
                        ChannelAction('use', 'send', interp2),
 | 
						|
                        ChannelAction('use', 'recv', interp3),
 | 
						|
                        ]
 | 
						|
 | 
						|
        # fully emptied
 | 
						|
        for interp1 in interps:
 | 
						|
            for interp2 in interps:
 | 
						|
                for interp3 in interps:
 | 
						|
                    for interp4 in interps:
 | 
						|
                        yield [
 | 
						|
                            ChannelAction('use', 'send', interp1),
 | 
						|
                            ChannelAction('use', 'send', interp2),
 | 
						|
                            ChannelAction('use', 'recv', interp3),
 | 
						|
                            ChannelAction('use', 'recv', interp4),
 | 
						|
                            ]
 | 
						|
 | 
						|
    def _iter_close_action_sets(self, interp1, interp2):
 | 
						|
        ends = ('recv', 'send')
 | 
						|
        interps = (interp1, interp2)
 | 
						|
        for force in (True, False):
 | 
						|
            op = 'force-close' if force else 'close'
 | 
						|
            for interp in interps:
 | 
						|
                for end in ends:
 | 
						|
                    yield [
 | 
						|
                        ChannelAction(op, end, interp),
 | 
						|
                        ]
 | 
						|
        for recvop in ('close', 'force-close'):
 | 
						|
            for sendop in ('close', 'force-close'):
 | 
						|
                for recv in interps:
 | 
						|
                    for send in interps:
 | 
						|
                        yield [
 | 
						|
                            ChannelAction(recvop, 'recv', recv),
 | 
						|
                            ChannelAction(sendop, 'send', send),
 | 
						|
                            ]
 | 
						|
 | 
						|
    def _iter_post_close_action_sets(self):
 | 
						|
        for interp in ('same', 'extra', 'other'):
 | 
						|
            yield [
 | 
						|
                ChannelAction('use', 'recv', interp),
 | 
						|
                ]
 | 
						|
            yield [
 | 
						|
                ChannelAction('use', 'send', interp),
 | 
						|
                ]
 | 
						|
 | 
						|
    def run_actions(self, fix, actions):
 | 
						|
        for action in actions:
 | 
						|
            self.run_action(fix, action)
 | 
						|
 | 
						|
    def run_action(self, fix, action, *, hideclosed=True):
 | 
						|
        end = action.resolve_end(fix.end)
 | 
						|
        interp = action.resolve_interp(fix.interp, fix.other, fix.extra)
 | 
						|
        fix.prep_interpreter(interp)
 | 
						|
        if interp.name == 'main':
 | 
						|
            result = run_action(
 | 
						|
                fix.cid,
 | 
						|
                action.action,
 | 
						|
                end,
 | 
						|
                fix.state,
 | 
						|
                hideclosed=hideclosed,
 | 
						|
                )
 | 
						|
            fix.record_action(action, result)
 | 
						|
        else:
 | 
						|
            _cid = interpreters.channel_create()
 | 
						|
            run_interp(interp.id, f"""
 | 
						|
                result = helpers.run_action(
 | 
						|
                    {fix.cid},
 | 
						|
                    {repr(action.action)},
 | 
						|
                    {repr(end)},
 | 
						|
                    {repr(fix.state)},
 | 
						|
                    hideclosed={hideclosed},
 | 
						|
                    )
 | 
						|
                interpreters.channel_send({_cid}, result.pending.to_bytes(1, 'little'))
 | 
						|
                interpreters.channel_send({_cid}, b'X' if result.closed else b'')
 | 
						|
                """)
 | 
						|
            result = ChannelState(
 | 
						|
                pending=int.from_bytes(interpreters.channel_recv(_cid), 'little'),
 | 
						|
                closed=bool(interpreters.channel_recv(_cid)),
 | 
						|
                )
 | 
						|
            fix.record_action(action, result)
 | 
						|
 | 
						|
    def iter_fixtures(self):
 | 
						|
        # XXX threads?
 | 
						|
        interpreters = [
 | 
						|
            ('main', 'interp', 'extra'),
 | 
						|
            ('interp', 'main', 'extra'),
 | 
						|
            ('interp1', 'interp2', 'extra'),
 | 
						|
            ('interp1', 'interp2', 'main'),
 | 
						|
        ]
 | 
						|
        for interp, other, extra in interpreters:
 | 
						|
            for creator in ('same', 'other', 'creator'):
 | 
						|
                for end in ('send', 'recv'):
 | 
						|
                    yield ChannelCloseFixture(end, interp, other, extra, creator)
 | 
						|
 | 
						|
    def _close(self, fix, *, force):
 | 
						|
        op = 'force-close' if force else 'close'
 | 
						|
        close = ChannelAction(op, fix.end, 'same')
 | 
						|
        if not fix.expect_closed_error():
 | 
						|
            self.run_action(fix, close, hideclosed=False)
 | 
						|
        else:
 | 
						|
            with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
                self.run_action(fix, close, hideclosed=False)
 | 
						|
 | 
						|
    def _assert_closed_in_interp(self, fix, interp=None):
 | 
						|
        if interp is None or interp.name == 'main':
 | 
						|
            with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
                interpreters.channel_recv(fix.cid)
 | 
						|
            with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
                interpreters.channel_send(fix.cid, b'spam')
 | 
						|
            with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
                interpreters.channel_close(fix.cid)
 | 
						|
            with self.assertRaises(interpreters.ChannelClosedError):
 | 
						|
                interpreters.channel_close(fix.cid, force=True)
 | 
						|
        else:
 | 
						|
            run_interp(interp.id, f"""
 | 
						|
                with helpers.expect_channel_closed():
 | 
						|
                    interpreters.channel_recv(cid)
 | 
						|
                """)
 | 
						|
            run_interp(interp.id, f"""
 | 
						|
                with helpers.expect_channel_closed():
 | 
						|
                    interpreters.channel_send(cid, b'spam')
 | 
						|
                """)
 | 
						|
            run_interp(interp.id, f"""
 | 
						|
                with helpers.expect_channel_closed():
 | 
						|
                    interpreters.channel_close(cid)
 | 
						|
                """)
 | 
						|
            run_interp(interp.id, f"""
 | 
						|
                with helpers.expect_channel_closed():
 | 
						|
                    interpreters.channel_close(cid, force=True)
 | 
						|
                """)
 | 
						|
 | 
						|
    def _assert_closed(self, fix):
 | 
						|
        self.assertTrue(fix.state.closed)
 | 
						|
 | 
						|
        for _ in range(fix.state.pending):
 | 
						|
            interpreters.channel_recv(fix.cid)
 | 
						|
        self._assert_closed_in_interp(fix)
 | 
						|
 | 
						|
        for interp in ('same', 'other'):
 | 
						|
            interp = fix.get_interpreter(interp)
 | 
						|
            if interp.name == 'main':
 | 
						|
                continue
 | 
						|
            self._assert_closed_in_interp(fix, interp)
 | 
						|
 | 
						|
        interp = fix.get_interpreter('fresh')
 | 
						|
        self._assert_closed_in_interp(fix, interp)
 | 
						|
 | 
						|
    def _iter_close_tests(self, verbose=False):
 | 
						|
        i = 0
 | 
						|
        for actions in self.iter_action_sets():
 | 
						|
            print()
 | 
						|
            for fix in self.iter_fixtures():
 | 
						|
                i += 1
 | 
						|
                if i > 1000:
 | 
						|
                    return
 | 
						|
                if verbose:
 | 
						|
                    if (i - 1) % 6 == 0:
 | 
						|
                        print()
 | 
						|
                    print(i, fix, '({} actions)'.format(len(actions)))
 | 
						|
                else:
 | 
						|
                    if (i - 1) % 6 == 0:
 | 
						|
                        print(' ', end='')
 | 
						|
                    print('.', end=''); sys.stdout.flush()
 | 
						|
                yield i, fix, actions
 | 
						|
            if verbose:
 | 
						|
                print('---')
 | 
						|
        print()
 | 
						|
 | 
						|
    # This is useful for scanning through the possible tests.
 | 
						|
    def _skim_close_tests(self):
 | 
						|
        ChannelCloseFixture.QUICK = True
 | 
						|
        for i, fix, actions in self._iter_close_tests():
 | 
						|
            pass
 | 
						|
 | 
						|
    def test_close(self):
 | 
						|
        for i, fix, actions in self._iter_close_tests():
 | 
						|
            with self.subTest('{} {}  {}'.format(i, fix, actions)):
 | 
						|
                fix.prep_interpreter(fix.interp)
 | 
						|
                self.run_actions(fix, actions)
 | 
						|
 | 
						|
                self._close(fix, force=False)
 | 
						|
 | 
						|
                self._assert_closed(fix)
 | 
						|
            # XXX Things slow down if we have too many interpreters.
 | 
						|
            fix.clean_up()
 | 
						|
 | 
						|
    def test_force_close(self):
 | 
						|
        for i, fix, actions in self._iter_close_tests():
 | 
						|
            with self.subTest('{} {}  {}'.format(i, fix, actions)):
 | 
						|
                fix.prep_interpreter(fix.interp)
 | 
						|
                self.run_actions(fix, actions)
 | 
						|
 | 
						|
                self._close(fix, force=True)
 | 
						|
 | 
						|
                self._assert_closed(fix)
 | 
						|
            # XXX Things slow down if we have too many interpreters.
 | 
						|
            fix.clean_up()
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main()
 |