| 
									
										
										
										
											2024-03-04 13:59:30 -07:00
										 |  |  | import importlib | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | import threading | 
					
						
							|  |  |  | from textwrap import dedent | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | import time | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from test.support import import_helper | 
					
						
							|  |  |  | # Raise SkipTest if subinterpreters not supported. | 
					
						
							|  |  |  | _channels = import_helper.import_module('_xxinterpchannels') | 
					
						
							|  |  |  | from test.support import interpreters | 
					
						
							|  |  |  | from test.support.interpreters import channels | 
					
						
							|  |  |  | from .utils import _run_output, TestBase | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-04 13:59:30 -07:00
										 |  |  | class LowLevelTests(TestBase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # The behaviors in the low-level module is important in as much | 
					
						
							|  |  |  |     # as it is exercised by the high-level module.  Therefore the | 
					
						
							|  |  |  |     # most # important testing happens in the high-level tests. | 
					
						
							|  |  |  |     # These low-level tests cover corner cases that are not | 
					
						
							|  |  |  |     # encountered by the high-level module, thus they | 
					
						
							|  |  |  |     # mostly shouldn't matter as much. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Additional tests are found in Lib/test/test__xxinterpchannels.py. | 
					
						
							|  |  |  |     # XXX Those should be either moved to LowLevelTests or eliminated | 
					
						
							|  |  |  |     # in favor of high-level tests in this file. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_highlevel_reloaded(self): | 
					
						
							|  |  |  |         # See gh-115490 (https://github.com/python/cpython/issues/115490). | 
					
						
							|  |  |  |         importlib.reload(channels) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | class TestChannels(TestBase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_create(self): | 
					
						
							|  |  |  |         r, s = channels.create() | 
					
						
							|  |  |  |         self.assertIsInstance(r, channels.RecvChannel) | 
					
						
							|  |  |  |         self.assertIsInstance(s, channels.SendChannel) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_list_all(self): | 
					
						
							|  |  |  |         self.assertEqual(channels.list_all(), []) | 
					
						
							|  |  |  |         created = set() | 
					
						
							|  |  |  |         for _ in range(3): | 
					
						
							|  |  |  |             ch = channels.create() | 
					
						
							|  |  |  |             created.add(ch) | 
					
						
							|  |  |  |         after = set(channels.list_all()) | 
					
						
							|  |  |  |         self.assertEqual(after, created) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_shareable(self): | 
					
						
							|  |  |  |         rch, sch = channels.create() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertTrue( | 
					
						
							|  |  |  |             interpreters.is_shareable(rch)) | 
					
						
							|  |  |  |         self.assertTrue( | 
					
						
							|  |  |  |             interpreters.is_shareable(sch)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         sch.send_nowait(rch) | 
					
						
							|  |  |  |         sch.send_nowait(sch) | 
					
						
							|  |  |  |         rch2 = rch.recv() | 
					
						
							|  |  |  |         sch2 = rch.recv() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(rch2, rch) | 
					
						
							|  |  |  |         self.assertEqual(sch2, sch) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_is_closed(self): | 
					
						
							|  |  |  |         rch, sch = channels.create() | 
					
						
							|  |  |  |         rbefore = rch.is_closed | 
					
						
							|  |  |  |         sbefore = sch.is_closed | 
					
						
							|  |  |  |         rch.close() | 
					
						
							|  |  |  |         rafter = rch.is_closed | 
					
						
							|  |  |  |         safter = sch.is_closed | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertFalse(rbefore) | 
					
						
							|  |  |  |         self.assertFalse(sbefore) | 
					
						
							|  |  |  |         self.assertTrue(rafter) | 
					
						
							|  |  |  |         self.assertTrue(safter) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestRecvChannelAttrs(TestBase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_id_type(self): | 
					
						
							|  |  |  |         rch, _ = channels.create() | 
					
						
							|  |  |  |         self.assertIsInstance(rch.id, _channels.ChannelID) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_custom_id(self): | 
					
						
							|  |  |  |         rch = channels.RecvChannel(1) | 
					
						
							|  |  |  |         self.assertEqual(rch.id, 1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.assertRaises(TypeError): | 
					
						
							|  |  |  |             channels.RecvChannel('1') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_id_readonly(self): | 
					
						
							|  |  |  |         rch = channels.RecvChannel(1) | 
					
						
							|  |  |  |         with self.assertRaises(AttributeError): | 
					
						
							|  |  |  |             rch.id = 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_equality(self): | 
					
						
							|  |  |  |         ch1, _ = channels.create() | 
					
						
							|  |  |  |         ch2, _ = channels.create() | 
					
						
							|  |  |  |         self.assertEqual(ch1, ch1) | 
					
						
							|  |  |  |         self.assertNotEqual(ch1, ch2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestSendChannelAttrs(TestBase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_id_type(self): | 
					
						
							|  |  |  |         _, sch = channels.create() | 
					
						
							|  |  |  |         self.assertIsInstance(sch.id, _channels.ChannelID) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_custom_id(self): | 
					
						
							|  |  |  |         sch = channels.SendChannel(1) | 
					
						
							|  |  |  |         self.assertEqual(sch.id, 1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.assertRaises(TypeError): | 
					
						
							|  |  |  |             channels.SendChannel('1') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_id_readonly(self): | 
					
						
							|  |  |  |         sch = channels.SendChannel(1) | 
					
						
							|  |  |  |         with self.assertRaises(AttributeError): | 
					
						
							|  |  |  |             sch.id = 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_equality(self): | 
					
						
							|  |  |  |         _, ch1 = channels.create() | 
					
						
							|  |  |  |         _, ch2 = channels.create() | 
					
						
							|  |  |  |         self.assertEqual(ch1, ch1) | 
					
						
							|  |  |  |         self.assertNotEqual(ch1, ch2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestSendRecv(TestBase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_recv_main(self): | 
					
						
							|  |  |  |         r, s = channels.create() | 
					
						
							|  |  |  |         orig = b'spam' | 
					
						
							|  |  |  |         s.send_nowait(orig) | 
					
						
							|  |  |  |         obj = r.recv() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(obj, orig) | 
					
						
							|  |  |  |         self.assertIsNot(obj, orig) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_recv_same_interpreter(self): | 
					
						
							|  |  |  |         interp = interpreters.create() | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         interp.exec(dedent("""
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |             from test.support.interpreters import channels | 
					
						
							|  |  |  |             r, s = channels.create() | 
					
						
							|  |  |  |             orig = b'spam' | 
					
						
							|  |  |  |             s.send_nowait(orig) | 
					
						
							|  |  |  |             obj = r.recv() | 
					
						
							|  |  |  |             assert obj == orig, 'expected: obj == orig' | 
					
						
							|  |  |  |             assert obj is not orig, 'expected: obj is not orig' | 
					
						
							|  |  |  |             """))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @unittest.skip('broken (see BPO-...)') | 
					
						
							|  |  |  |     def test_send_recv_different_interpreters(self): | 
					
						
							|  |  |  |         r1, s1 = channels.create() | 
					
						
							|  |  |  |         r2, s2 = channels.create() | 
					
						
							|  |  |  |         orig1 = b'spam' | 
					
						
							|  |  |  |         s1.send_nowait(orig1) | 
					
						
							|  |  |  |         out = _run_output( | 
					
						
							|  |  |  |             interpreters.create(), | 
					
						
							|  |  |  |             dedent(f"""
 | 
					
						
							|  |  |  |                 obj1 = r.recv() | 
					
						
							|  |  |  |                 assert obj1 == b'spam', 'expected: obj1 == orig1' | 
					
						
							|  |  |  |                 # When going to another interpreter we get a copy. | 
					
						
							|  |  |  |                 assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1' | 
					
						
							|  |  |  |                 orig2 = b'eggs' | 
					
						
							|  |  |  |                 print(id(orig2)) | 
					
						
							|  |  |  |                 s.send_nowait(orig2) | 
					
						
							|  |  |  |                 """),
 | 
					
						
							|  |  |  |             channels=dict(r=r1, s=s2), | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         obj2 = r2.recv() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(obj2, b'eggs') | 
					
						
							|  |  |  |         self.assertNotEqual(id(obj2), int(out)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_recv_different_threads(self): | 
					
						
							|  |  |  |         r, s = channels.create() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def f(): | 
					
						
							|  |  |  |             while True: | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     obj = r.recv() | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |                 except channels.ChannelEmptyError: | 
					
						
							|  |  |  |                     time.sleep(0.1) | 
					
						
							|  |  |  |             s.send(obj) | 
					
						
							|  |  |  |         t = threading.Thread(target=f) | 
					
						
							|  |  |  |         t.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         orig = b'spam' | 
					
						
							|  |  |  |         s.send(orig) | 
					
						
							|  |  |  |         obj = r.recv() | 
					
						
							|  |  |  |         t.join() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(obj, orig) | 
					
						
							|  |  |  |         self.assertIsNot(obj, orig) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_recv_nowait_main(self): | 
					
						
							|  |  |  |         r, s = channels.create() | 
					
						
							|  |  |  |         orig = b'spam' | 
					
						
							|  |  |  |         s.send_nowait(orig) | 
					
						
							|  |  |  |         obj = r.recv_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(obj, orig) | 
					
						
							|  |  |  |         self.assertIsNot(obj, orig) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_recv_nowait_main_with_default(self): | 
					
						
							|  |  |  |         r, _ = channels.create() | 
					
						
							|  |  |  |         obj = r.recv_nowait(None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertIsNone(obj) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_recv_nowait_same_interpreter(self): | 
					
						
							|  |  |  |         interp = interpreters.create() | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         interp.exec(dedent("""
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |             from test.support.interpreters import channels | 
					
						
							|  |  |  |             r, s = channels.create() | 
					
						
							|  |  |  |             orig = b'spam' | 
					
						
							|  |  |  |             s.send_nowait(orig) | 
					
						
							|  |  |  |             obj = r.recv_nowait() | 
					
						
							|  |  |  |             assert obj == orig, 'expected: obj == orig' | 
					
						
							|  |  |  |             # When going back to the same interpreter we get the same object. | 
					
						
							|  |  |  |             assert obj is not orig, 'expected: obj is not orig' | 
					
						
							|  |  |  |             """))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @unittest.skip('broken (see BPO-...)') | 
					
						
							|  |  |  |     def test_send_recv_nowait_different_interpreters(self): | 
					
						
							|  |  |  |         r1, s1 = channels.create() | 
					
						
							|  |  |  |         r2, s2 = channels.create() | 
					
						
							|  |  |  |         orig1 = b'spam' | 
					
						
							|  |  |  |         s1.send_nowait(orig1) | 
					
						
							|  |  |  |         out = _run_output( | 
					
						
							|  |  |  |             interpreters.create(), | 
					
						
							|  |  |  |             dedent(f"""
 | 
					
						
							|  |  |  |                 obj1 = r.recv_nowait() | 
					
						
							|  |  |  |                 assert obj1 == b'spam', 'expected: obj1 == orig1' | 
					
						
							|  |  |  |                 # When going to another interpreter we get a copy. | 
					
						
							|  |  |  |                 assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1' | 
					
						
							|  |  |  |                 orig2 = b'eggs' | 
					
						
							|  |  |  |                 print(id(orig2)) | 
					
						
							|  |  |  |                 s.send_nowait(orig2) | 
					
						
							|  |  |  |                 """),
 | 
					
						
							|  |  |  |             channels=dict(r=r1, s=s2), | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         obj2 = r2.recv_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(obj2, b'eggs') | 
					
						
							|  |  |  |         self.assertNotEqual(id(obj2), int(out)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_recv_timeout(self): | 
					
						
							|  |  |  |         r, _ = channels.create() | 
					
						
							|  |  |  |         with self.assertRaises(TimeoutError): | 
					
						
							|  |  |  |             r.recv(timeout=1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_recv_channel_does_not_exist(self): | 
					
						
							|  |  |  |         ch = channels.RecvChannel(1_000_000) | 
					
						
							|  |  |  |         with self.assertRaises(channels.ChannelNotFoundError): | 
					
						
							|  |  |  |             ch.recv() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_channel_does_not_exist(self): | 
					
						
							|  |  |  |         ch = channels.SendChannel(1_000_000) | 
					
						
							|  |  |  |         with self.assertRaises(channels.ChannelNotFoundError): | 
					
						
							|  |  |  |             ch.send(b'spam') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_recv_nowait_channel_does_not_exist(self): | 
					
						
							|  |  |  |         ch = channels.RecvChannel(1_000_000) | 
					
						
							|  |  |  |         with self.assertRaises(channels.ChannelNotFoundError): | 
					
						
							|  |  |  |             ch.recv_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_nowait_channel_does_not_exist(self): | 
					
						
							|  |  |  |         ch = channels.SendChannel(1_000_000) | 
					
						
							|  |  |  |         with self.assertRaises(channels.ChannelNotFoundError): | 
					
						
							|  |  |  |             ch.send_nowait(b'spam') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_recv_nowait_empty(self): | 
					
						
							|  |  |  |         ch, _ = channels.create() | 
					
						
							|  |  |  |         with self.assertRaises(channels.ChannelEmptyError): | 
					
						
							|  |  |  |             ch.recv_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_recv_nowait_default(self): | 
					
						
							|  |  |  |         default = object() | 
					
						
							|  |  |  |         rch, sch = channels.create() | 
					
						
							|  |  |  |         obj1 = rch.recv_nowait(default) | 
					
						
							|  |  |  |         sch.send_nowait(None) | 
					
						
							|  |  |  |         sch.send_nowait(1) | 
					
						
							|  |  |  |         sch.send_nowait(b'spam') | 
					
						
							|  |  |  |         sch.send_nowait(b'eggs') | 
					
						
							|  |  |  |         obj2 = rch.recv_nowait(default) | 
					
						
							|  |  |  |         obj3 = rch.recv_nowait(default) | 
					
						
							|  |  |  |         obj4 = rch.recv_nowait() | 
					
						
							|  |  |  |         obj5 = rch.recv_nowait(default) | 
					
						
							|  |  |  |         obj6 = rch.recv_nowait(default) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertIs(obj1, default) | 
					
						
							|  |  |  |         self.assertIs(obj2, None) | 
					
						
							|  |  |  |         self.assertEqual(obj3, 1) | 
					
						
							|  |  |  |         self.assertEqual(obj4, b'spam') | 
					
						
							|  |  |  |         self.assertEqual(obj5, b'eggs') | 
					
						
							|  |  |  |         self.assertIs(obj6, default) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_buffer(self): | 
					
						
							|  |  |  |         buf = bytearray(b'spamspamspam') | 
					
						
							|  |  |  |         obj = None | 
					
						
							|  |  |  |         rch, sch = channels.create() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def f(): | 
					
						
							|  |  |  |             nonlocal obj | 
					
						
							|  |  |  |             while True: | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     obj = rch.recv() | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |                 except channels.ChannelEmptyError: | 
					
						
							|  |  |  |                     time.sleep(0.1) | 
					
						
							|  |  |  |         t = threading.Thread(target=f) | 
					
						
							|  |  |  |         t.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         sch.send_buffer(buf) | 
					
						
							|  |  |  |         t.join() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertIsNot(obj, buf) | 
					
						
							|  |  |  |         self.assertIsInstance(obj, memoryview) | 
					
						
							|  |  |  |         self.assertEqual(obj, buf) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         buf[4:8] = b'eggs' | 
					
						
							|  |  |  |         self.assertEqual(obj, buf) | 
					
						
							|  |  |  |         obj[4:8] = b'ham.' | 
					
						
							|  |  |  |         self.assertEqual(obj, buf) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_send_buffer_nowait(self): | 
					
						
							|  |  |  |         buf = bytearray(b'spamspamspam') | 
					
						
							|  |  |  |         rch, sch = channels.create() | 
					
						
							|  |  |  |         sch.send_buffer_nowait(buf) | 
					
						
							|  |  |  |         obj = rch.recv() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertIsNot(obj, buf) | 
					
						
							|  |  |  |         self.assertIsInstance(obj, memoryview) | 
					
						
							|  |  |  |         self.assertEqual(obj, buf) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         buf[4:8] = b'eggs' | 
					
						
							|  |  |  |         self.assertEqual(obj, buf) | 
					
						
							|  |  |  |         obj[4:8] = b'ham.' | 
					
						
							|  |  |  |         self.assertEqual(obj, buf) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     # Test needs to be a package, so we can do relative imports. | 
					
						
							|  |  |  |     unittest.main() |