| 
									
										
										
										
											2024-03-04 13:59:30 -07:00
										 |  |  | import importlib | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  | import pickle | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | import threading | 
					
						
							|  |  |  | from textwrap import dedent | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  | from test.support import import_helper, Py_DEBUG | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | # Raise SkipTest if subinterpreters not supported. | 
					
						
							| 
									
										
										
										
											2024-04-24 10:18:24 -06:00
										 |  |  | _queues = import_helper.import_module('_interpqueues') | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | from test.support import interpreters | 
					
						
							| 
									
										
										
										
											2024-07-15 13:43:59 -06:00
										 |  |  | from test.support.interpreters import queues, _crossinterp | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  | from .utils import _run_output, TestBase as _TestBase | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-15 13:43:59 -06:00
										 |  |  | REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND] | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  | def get_num_queues(): | 
					
						
							|  |  |  |     return len(_queues.list_all()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestBase(_TestBase): | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |     def tearDown(self): | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |         for qid, _, _ in _queues.list_all(): | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |             try: | 
					
						
							|  |  |  |                 _queues.destroy(qid) | 
					
						
							|  |  |  |             except Exception: | 
					
						
							|  |  |  |                 pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-04 13:59:30 -07:00
										 |  |  | class LowLevelTests(TestBase): | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-04-02 17:16:50 -06:00
										 |  |  |     # The behaviors in the low-level module are important in as much | 
					
						
							|  |  |  |     # as they are exercised by the high-level module.  Therefore the | 
					
						
							|  |  |  |     # most important testing happens in the high-level tests. | 
					
						
							| 
									
										
										
										
											2024-03-04 13:59:30 -07:00
										 |  |  |     # These low-level tests cover corner cases that are not | 
					
						
							|  |  |  |     # encountered by the high-level module, thus they | 
					
						
							|  |  |  |     # mostly shouldn't matter as much. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_highlevel_reloaded(self): | 
					
						
							|  |  |  |         # See gh-115490 (https://github.com/python/cpython/issues/115490). | 
					
						
							|  |  |  |         importlib.reload(queues) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  |     def test_create_destroy(self): | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |         qid = _queues.create(2, 0, REPLACE) | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  |         _queues.destroy(qid) | 
					
						
							|  |  |  |         self.assertEqual(get_num_queues(), 0) | 
					
						
							|  |  |  |         with self.assertRaises(queues.QueueNotFoundError): | 
					
						
							|  |  |  |             _queues.get(qid) | 
					
						
							|  |  |  |         with self.assertRaises(queues.QueueNotFoundError): | 
					
						
							|  |  |  |             _queues.destroy(qid) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_not_destroyed(self): | 
					
						
							|  |  |  |         # It should have cleaned up any remaining queues. | 
					
						
							|  |  |  |         stdout, stderr = self.assert_python_ok( | 
					
						
							|  |  |  |             '-c', | 
					
						
							|  |  |  |             dedent(f"""
 | 
					
						
							|  |  |  |                 import {_queues.__name__} as _queues | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |                 _queues.create(2, 0, {REPLACE}) | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  |                 """),
 | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         self.assertEqual(stdout, '') | 
					
						
							|  |  |  |         if Py_DEBUG: | 
					
						
							|  |  |  |             self.assertNotEqual(stderr, '') | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self.assertEqual(stderr, '') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_bind_release(self): | 
					
						
							|  |  |  |         with self.subTest('typical'): | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |             qid = _queues.create(2, 0, REPLACE) | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  |             _queues.bind(qid) | 
					
						
							|  |  |  |             _queues.release(qid) | 
					
						
							|  |  |  |             self.assertEqual(get_num_queues(), 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('bind too much'): | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |             qid = _queues.create(2, 0, REPLACE) | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  |             _queues.bind(qid) | 
					
						
							|  |  |  |             _queues.bind(qid) | 
					
						
							|  |  |  |             _queues.release(qid) | 
					
						
							|  |  |  |             _queues.destroy(qid) | 
					
						
							|  |  |  |             self.assertEqual(get_num_queues(), 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('nested'): | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |             qid = _queues.create(2, 0, REPLACE) | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  |             _queues.bind(qid) | 
					
						
							|  |  |  |             _queues.bind(qid) | 
					
						
							|  |  |  |             _queues.release(qid) | 
					
						
							|  |  |  |             _queues.release(qid) | 
					
						
							|  |  |  |             self.assertEqual(get_num_queues(), 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('release without binding'): | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |             qid = _queues.create(2, 0, REPLACE) | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  |             with self.assertRaises(queues.QueueError): | 
					
						
							|  |  |  |                 _queues.release(qid) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-04 13:59:30 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | class QueueTests(TestBase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_create(self): | 
					
						
							|  |  |  |         with self.subTest('vanilla'): | 
					
						
							|  |  |  |             queue = queues.create() | 
					
						
							|  |  |  |             self.assertEqual(queue.maxsize, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('small maxsize'): | 
					
						
							|  |  |  |             queue = queues.create(3) | 
					
						
							|  |  |  |             self.assertEqual(queue.maxsize, 3) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('big maxsize'): | 
					
						
							|  |  |  |             queue = queues.create(100) | 
					
						
							|  |  |  |             self.assertEqual(queue.maxsize, 100) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('no maxsize'): | 
					
						
							|  |  |  |             queue = queues.create(0) | 
					
						
							|  |  |  |             self.assertEqual(queue.maxsize, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('negative maxsize'): | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |             queue = queues.create(-10) | 
					
						
							|  |  |  |             self.assertEqual(queue.maxsize, -10) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('bad maxsize'): | 
					
						
							|  |  |  |             with self.assertRaises(TypeError): | 
					
						
							|  |  |  |                 queues.create('1') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_shareable(self): | 
					
						
							|  |  |  |         queue1 = queues.create() | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         interp = interpreters.create() | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         interp.exec(dedent(f"""
 | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |             from test.support.interpreters import queues | 
					
						
							|  |  |  |             queue1 = queues.Queue({queue1.id}) | 
					
						
							|  |  |  |             """));
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('same interpreter'): | 
					
						
							|  |  |  |             queue2 = queues.create() | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |             queue1.put(queue2, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |             queue3 = queue1.get() | 
					
						
							|  |  |  |             self.assertIs(queue3, queue2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('from current interpreter'): | 
					
						
							|  |  |  |             queue4 = queues.create() | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |             queue1.put(queue4, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |             out = _run_output(interp, dedent("""
 | 
					
						
							|  |  |  |                 queue4 = queue1.get() | 
					
						
							|  |  |  |                 print(queue4.id) | 
					
						
							|  |  |  |                 """))
 | 
					
						
							|  |  |  |             qid = int(out) | 
					
						
							|  |  |  |             self.assertEqual(qid, queue4.id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('from subinterpreter'): | 
					
						
							|  |  |  |             out = _run_output(interp, dedent("""
 | 
					
						
							|  |  |  |                 queue5 = queues.create() | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |                 queue1.put(queue5, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |                 print(queue5.id) | 
					
						
							|  |  |  |                 """))
 | 
					
						
							|  |  |  |             qid = int(out) | 
					
						
							|  |  |  |             queue5 = queue1.get() | 
					
						
							|  |  |  |             self.assertEqual(queue5.id, qid) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_id_type(self): | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         self.assertIsInstance(queue.id, int) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_custom_id(self): | 
					
						
							|  |  |  |         with self.assertRaises(queues.QueueNotFoundError): | 
					
						
							|  |  |  |             queues.Queue(1_000_000) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_id_readonly(self): | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         with self.assertRaises(AttributeError): | 
					
						
							|  |  |  |             queue.id = 1_000_000 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_maxsize_readonly(self): | 
					
						
							|  |  |  |         queue = queues.create(10) | 
					
						
							|  |  |  |         with self.assertRaises(AttributeError): | 
					
						
							|  |  |  |             queue.maxsize = 1_000_000 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_hashable(self): | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         expected = hash(queue.id) | 
					
						
							|  |  |  |         actual = hash(queue) | 
					
						
							|  |  |  |         self.assertEqual(actual, expected) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_equality(self): | 
					
						
							|  |  |  |         queue1 = queues.create() | 
					
						
							|  |  |  |         queue2 = queues.create() | 
					
						
							|  |  |  |         self.assertEqual(queue1, queue1) | 
					
						
							|  |  |  |         self.assertNotEqual(queue1, queue2) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-05 08:54:46 -07:00
										 |  |  |     def test_pickle(self): | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         data = pickle.dumps(queue) | 
					
						
							|  |  |  |         unpickled = pickle.loads(data) | 
					
						
							|  |  |  |         self.assertEqual(unpickled, queue) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | class TestQueueOps(TestBase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_empty(self): | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         before = queue.empty() | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         queue.put(None, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |         during = queue.empty() | 
					
						
							|  |  |  |         queue.get() | 
					
						
							|  |  |  |         after = queue.empty() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertIs(before, True) | 
					
						
							|  |  |  |         self.assertIs(during, False) | 
					
						
							|  |  |  |         self.assertIs(after, True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_full(self): | 
					
						
							|  |  |  |         expected = [False, False, False, True, False, False, False] | 
					
						
							|  |  |  |         actual = [] | 
					
						
							|  |  |  |         queue = queues.create(3) | 
					
						
							|  |  |  |         for _ in range(3): | 
					
						
							|  |  |  |             actual.append(queue.full()) | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |             queue.put(None, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |         actual.append(queue.full()) | 
					
						
							|  |  |  |         for _ in range(3): | 
					
						
							|  |  |  |             queue.get() | 
					
						
							|  |  |  |             actual.append(queue.full()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(actual, expected) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_qsize(self): | 
					
						
							|  |  |  |         expected = [0, 1, 2, 3, 2, 3, 2, 1, 0, 1, 0] | 
					
						
							|  |  |  |         actual = [] | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         for _ in range(3): | 
					
						
							|  |  |  |             actual.append(queue.qsize()) | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |             queue.put(None, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |         actual.append(queue.qsize()) | 
					
						
							|  |  |  |         queue.get() | 
					
						
							|  |  |  |         actual.append(queue.qsize()) | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         queue.put(None, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |         actual.append(queue.qsize()) | 
					
						
							|  |  |  |         for _ in range(3): | 
					
						
							|  |  |  |             queue.get() | 
					
						
							|  |  |  |             actual.append(queue.qsize()) | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         queue.put(None, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |         actual.append(queue.qsize()) | 
					
						
							|  |  |  |         queue.get() | 
					
						
							|  |  |  |         actual.append(queue.qsize()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(actual, expected) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_put_get_main(self): | 
					
						
							|  |  |  |         expected = list(range(20)) | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         for syncobj in (True, False): | 
					
						
							|  |  |  |             kwds = dict(syncobj=syncobj) | 
					
						
							|  |  |  |             with self.subTest(f'syncobj={syncobj}'): | 
					
						
							|  |  |  |                 queue = queues.create() | 
					
						
							|  |  |  |                 for i in range(20): | 
					
						
							|  |  |  |                     queue.put(i, **kwds) | 
					
						
							|  |  |  |                 actual = [queue.get() for _ in range(20)] | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |                 self.assertEqual(actual, expected) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_put_timeout(self): | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         for syncobj in (True, False): | 
					
						
							|  |  |  |             kwds = dict(syncobj=syncobj) | 
					
						
							|  |  |  |             with self.subTest(f'syncobj={syncobj}'): | 
					
						
							|  |  |  |                 queue = queues.create(2) | 
					
						
							|  |  |  |                 queue.put(None, **kwds) | 
					
						
							|  |  |  |                 queue.put(None, **kwds) | 
					
						
							|  |  |  |                 with self.assertRaises(queues.QueueFull): | 
					
						
							|  |  |  |                     queue.put(None, timeout=0.1, **kwds) | 
					
						
							|  |  |  |                 queue.get() | 
					
						
							|  |  |  |                 queue.put(None, **kwds) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_put_nowait(self): | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         for syncobj in (True, False): | 
					
						
							|  |  |  |             kwds = dict(syncobj=syncobj) | 
					
						
							|  |  |  |             with self.subTest(f'syncobj={syncobj}'): | 
					
						
							|  |  |  |                 queue = queues.create(2) | 
					
						
							|  |  |  |                 queue.put_nowait(None, **kwds) | 
					
						
							|  |  |  |                 queue.put_nowait(None, **kwds) | 
					
						
							|  |  |  |                 with self.assertRaises(queues.QueueFull): | 
					
						
							|  |  |  |                     queue.put_nowait(None, **kwds) | 
					
						
							|  |  |  |                 queue.get() | 
					
						
							|  |  |  |                 queue.put_nowait(None, **kwds) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_put_syncobj(self): | 
					
						
							|  |  |  |         for obj in [ | 
					
						
							|  |  |  |             None, | 
					
						
							|  |  |  |             True, | 
					
						
							|  |  |  |             10, | 
					
						
							|  |  |  |             'spam', | 
					
						
							|  |  |  |             b'spam', | 
					
						
							|  |  |  |             (0, 'a'), | 
					
						
							|  |  |  |         ]: | 
					
						
							|  |  |  |             with self.subTest(repr(obj)): | 
					
						
							|  |  |  |                 queue = queues.create() | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |                 queue.put(obj, syncobj=True) | 
					
						
							|  |  |  |                 obj2 = queue.get() | 
					
						
							|  |  |  |                 self.assertEqual(obj2, obj) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  |                 queue.put(obj, syncobj=True) | 
					
						
							|  |  |  |                 obj2 = queue.get_nowait() | 
					
						
							|  |  |  |                 self.assertEqual(obj2, obj) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         for obj in [ | 
					
						
							|  |  |  |             [1, 2, 3], | 
					
						
							|  |  |  |             {'a': 13, 'b': 17}, | 
					
						
							|  |  |  |         ]: | 
					
						
							|  |  |  |             with self.subTest(repr(obj)): | 
					
						
							|  |  |  |                 queue = queues.create() | 
					
						
							|  |  |  |                 with self.assertRaises(interpreters.NotShareableError): | 
					
						
							|  |  |  |                     queue.put(obj, syncobj=True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_put_not_syncobj(self): | 
					
						
							|  |  |  |         for obj in [ | 
					
						
							|  |  |  |             None, | 
					
						
							|  |  |  |             True, | 
					
						
							|  |  |  |             10, | 
					
						
							|  |  |  |             'spam', | 
					
						
							|  |  |  |             b'spam', | 
					
						
							|  |  |  |             (0, 'a'), | 
					
						
							|  |  |  |             # not shareable | 
					
						
							|  |  |  |             [1, 2, 3], | 
					
						
							|  |  |  |             {'a': 13, 'b': 17}, | 
					
						
							|  |  |  |         ]: | 
					
						
							|  |  |  |             with self.subTest(repr(obj)): | 
					
						
							|  |  |  |                 queue = queues.create() | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |                 queue.put(obj, syncobj=False) | 
					
						
							|  |  |  |                 obj2 = queue.get() | 
					
						
							|  |  |  |                 self.assertEqual(obj2, obj) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  |                 queue.put(obj, syncobj=False) | 
					
						
							|  |  |  |                 obj2 = queue.get_nowait() | 
					
						
							|  |  |  |                 self.assertEqual(obj2, obj) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |     def test_get_timeout(self): | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         with self.assertRaises(queues.QueueEmpty): | 
					
						
							|  |  |  |             queue.get(timeout=0.1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_nowait(self): | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         with self.assertRaises(queues.QueueEmpty): | 
					
						
							|  |  |  |             queue.get_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |     def test_put_get_default_syncobj(self): | 
					
						
							|  |  |  |         expected = list(range(20)) | 
					
						
							|  |  |  |         queue = queues.create(syncobj=True) | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  |         for methname in ('get', 'get_nowait'): | 
					
						
							|  |  |  |             with self.subTest(f'{methname}()'): | 
					
						
							|  |  |  |                 get = getattr(queue, methname) | 
					
						
							|  |  |  |                 for i in range(20): | 
					
						
							|  |  |  |                     queue.put(i) | 
					
						
							|  |  |  |                 actual = [get() for _ in range(20)] | 
					
						
							|  |  |  |                 self.assertEqual(actual, expected) | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         obj = [1, 2, 3]  # lists are not shareable | 
					
						
							|  |  |  |         with self.assertRaises(interpreters.NotShareableError): | 
					
						
							|  |  |  |             queue.put(obj) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_put_get_default_not_syncobj(self): | 
					
						
							|  |  |  |         expected = list(range(20)) | 
					
						
							|  |  |  |         queue = queues.create(syncobj=False) | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  |         for methname in ('get', 'get_nowait'): | 
					
						
							|  |  |  |             with self.subTest(f'{methname}()'): | 
					
						
							|  |  |  |                 get = getattr(queue, methname) | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  |                 for i in range(20): | 
					
						
							|  |  |  |                     queue.put(i) | 
					
						
							|  |  |  |                 actual = [get() for _ in range(20)] | 
					
						
							|  |  |  |                 self.assertEqual(actual, expected) | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  |                 obj = [1, 2, 3]  # lists are not shareable | 
					
						
							|  |  |  |                 queue.put(obj) | 
					
						
							|  |  |  |                 obj2 = get() | 
					
						
							|  |  |  |                 self.assertEqual(obj, obj2) | 
					
						
							|  |  |  |                 self.assertIsNot(obj, obj2) | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |     def test_put_get_same_interpreter(self): | 
					
						
							|  |  |  |         interp = interpreters.create() | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         interp.exec(dedent("""
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |             from test.support.interpreters import queues | 
					
						
							|  |  |  |             queue = queues.create() | 
					
						
							|  |  |  |             """))
 | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  |         for methname in ('get', 'get_nowait'): | 
					
						
							|  |  |  |             with self.subTest(f'{methname}()'): | 
					
						
							|  |  |  |                 interp.exec(dedent(f"""
 | 
					
						
							|  |  |  |                     orig = b'spam' | 
					
						
							|  |  |  |                     queue.put(orig, syncobj=True) | 
					
						
							|  |  |  |                     obj = queue.{methname}() | 
					
						
							|  |  |  |                     assert obj == orig, 'expected: obj == orig' | 
					
						
							|  |  |  |                     assert obj is not orig, 'expected: obj is not orig' | 
					
						
							|  |  |  |                     """))
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_put_get_different_interpreters(self): | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |         interp = interpreters.create() | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |         queue1 = queues.create() | 
					
						
							|  |  |  |         queue2 = queues.create() | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |         self.assertEqual(len(queues.list_all()), 2) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-01 09:36:35 -07:00
										 |  |  |         for methname in ('get', 'get_nowait'): | 
					
						
							|  |  |  |             with self.subTest(f'{methname}()'): | 
					
						
							|  |  |  |                 obj1 = b'spam' | 
					
						
							|  |  |  |                 queue1.put(obj1, syncobj=True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 out = _run_output( | 
					
						
							|  |  |  |                     interp, | 
					
						
							|  |  |  |                     dedent(f"""
 | 
					
						
							|  |  |  |                         from test.support.interpreters import queues | 
					
						
							|  |  |  |                         queue1 = queues.Queue({queue1.id}) | 
					
						
							|  |  |  |                         queue2 = queues.Queue({queue2.id}) | 
					
						
							|  |  |  |                         assert queue1.qsize() == 1, 'expected: queue1.qsize() == 1' | 
					
						
							|  |  |  |                         obj = queue1.{methname}() | 
					
						
							|  |  |  |                         assert queue1.qsize() == 0, 'expected: queue1.qsize() == 0' | 
					
						
							|  |  |  |                         assert obj == b'spam', 'expected: obj == obj1' | 
					
						
							|  |  |  |                         # When going to another interpreter we get a copy. | 
					
						
							|  |  |  |                         assert id(obj) != {id(obj1)}, 'expected: obj is not obj1' | 
					
						
							|  |  |  |                         obj2 = b'eggs' | 
					
						
							|  |  |  |                         print(id(obj2)) | 
					
						
							|  |  |  |                         assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0' | 
					
						
							|  |  |  |                         queue2.put(obj2, syncobj=True) | 
					
						
							|  |  |  |                         assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1' | 
					
						
							|  |  |  |                         """))
 | 
					
						
							|  |  |  |                 self.assertEqual(len(queues.list_all()), 2) | 
					
						
							|  |  |  |                 self.assertEqual(queue1.qsize(), 0) | 
					
						
							|  |  |  |                 self.assertEqual(queue2.qsize(), 1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 get = getattr(queue2, methname) | 
					
						
							|  |  |  |                 obj2 = get() | 
					
						
							|  |  |  |                 self.assertEqual(obj2, b'eggs') | 
					
						
							|  |  |  |                 self.assertNotEqual(id(obj2), int(out)) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |     def test_put_cleared_with_subinterpreter(self): | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |         def common(queue, unbound=None, presize=0): | 
					
						
							|  |  |  |             if not unbound: | 
					
						
							|  |  |  |                 extraargs = '' | 
					
						
							|  |  |  |             elif unbound is queues.UNBOUND: | 
					
						
							|  |  |  |                 extraargs = ', unbound=queues.UNBOUND' | 
					
						
							|  |  |  |             elif unbound is queues.UNBOUND_ERROR: | 
					
						
							|  |  |  |                 extraargs = ', unbound=queues.UNBOUND_ERROR' | 
					
						
							|  |  |  |             elif unbound is queues.UNBOUND_REMOVE: | 
					
						
							|  |  |  |                 extraargs = ', unbound=queues.UNBOUND_REMOVE' | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 raise NotImplementedError(repr(unbound)) | 
					
						
							|  |  |  |             interp = interpreters.create() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             _run_output(interp, dedent(f"""
 | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |                 from test.support.interpreters import queues | 
					
						
							|  |  |  |                 queue = queues.Queue({queue.id}) | 
					
						
							|  |  |  |                 obj1 = b'spam' | 
					
						
							|  |  |  |                 obj2 = b'eggs' | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |                 queue.put(obj1, syncobj=True{extraargs}) | 
					
						
							|  |  |  |                 queue.put(obj2, syncobj=True{extraargs}) | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |                 """))
 | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |             self.assertEqual(queue.qsize(), presize + 2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if presize == 0: | 
					
						
							|  |  |  |                 obj1 = queue.get() | 
					
						
							|  |  |  |                 self.assertEqual(obj1, b'spam') | 
					
						
							|  |  |  |                 self.assertEqual(queue.qsize(), presize + 1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             return interp | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('default'):  # UNBOUND | 
					
						
							|  |  |  |             queue = queues.create() | 
					
						
							|  |  |  |             interp = common(queue) | 
					
						
							|  |  |  |             del interp | 
					
						
							|  |  |  |             obj1 = queue.get() | 
					
						
							|  |  |  |             self.assertIs(obj1, queues.UNBOUND) | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 0) | 
					
						
							|  |  |  |             with self.assertRaises(queues.QueueEmpty): | 
					
						
							|  |  |  |                 queue.get_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('UNBOUND'): | 
					
						
							|  |  |  |             queue = queues.create() | 
					
						
							|  |  |  |             interp = common(queue, queues.UNBOUND) | 
					
						
							|  |  |  |             del interp | 
					
						
							|  |  |  |             obj1 = queue.get() | 
					
						
							|  |  |  |             self.assertIs(obj1, queues.UNBOUND) | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 0) | 
					
						
							|  |  |  |             with self.assertRaises(queues.QueueEmpty): | 
					
						
							|  |  |  |                 queue.get_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('UNBOUND_ERROR'): | 
					
						
							|  |  |  |             queue = queues.create() | 
					
						
							|  |  |  |             interp = common(queue, queues.UNBOUND_ERROR) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             del interp | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 1) | 
					
						
							|  |  |  |             with self.assertRaises(queues.ItemInterpreterDestroyed): | 
					
						
							|  |  |  |                 queue.get() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 0) | 
					
						
							|  |  |  |             with self.assertRaises(queues.QueueEmpty): | 
					
						
							|  |  |  |                 queue.get_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.subTest('UNBOUND_REMOVE'): | 
					
						
							|  |  |  |             queue = queues.create() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             interp = common(queue, queues.UNBOUND_REMOVE) | 
					
						
							|  |  |  |             del interp | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 0) | 
					
						
							|  |  |  |             with self.assertRaises(queues.QueueEmpty): | 
					
						
							|  |  |  |                 queue.get_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             queue.put(b'ham', unbound=queues.UNBOUND_REMOVE) | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 1) | 
					
						
							|  |  |  |             interp = common(queue, queues.UNBOUND_REMOVE, 1) | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 3) | 
					
						
							|  |  |  |             queue.put(42, unbound=queues.UNBOUND_REMOVE) | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 4) | 
					
						
							|  |  |  |             del interp | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 2) | 
					
						
							|  |  |  |             obj1 = queue.get() | 
					
						
							|  |  |  |             obj2 = queue.get() | 
					
						
							|  |  |  |             self.assertEqual(obj1, b'ham') | 
					
						
							|  |  |  |             self.assertEqual(obj2, 42) | 
					
						
							|  |  |  |             self.assertEqual(queue.qsize(), 0) | 
					
						
							|  |  |  |             with self.assertRaises(queues.QueueEmpty): | 
					
						
							|  |  |  |                 queue.get_nowait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_put_cleared_with_subinterpreter_mixed(self): | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         interp = interpreters.create() | 
					
						
							|  |  |  |         _run_output(interp, dedent(f"""
 | 
					
						
							|  |  |  |             from test.support.interpreters import queues | 
					
						
							|  |  |  |             queue = queues.Queue({queue.id}) | 
					
						
							|  |  |  |             queue.put(1, syncobj=True, unbound=queues.UNBOUND) | 
					
						
							|  |  |  |             queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR) | 
					
						
							|  |  |  |             queue.put(3, syncobj=True) | 
					
						
							|  |  |  |             queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE) | 
					
						
							|  |  |  |             queue.put(5, syncobj=True, unbound=queues.UNBOUND) | 
					
						
							|  |  |  |             """))
 | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 5) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         del interp | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 4) | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         obj1 = queue.get() | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |         self.assertIs(obj1, queues.UNBOUND) | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 3) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.assertRaises(queues.ItemInterpreterDestroyed): | 
					
						
							|  |  |  |             queue.get() | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         obj2 = queue.get() | 
					
						
							|  |  |  |         self.assertIs(obj2, queues.UNBOUND) | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |         self.assertEqual(queue.qsize(), 1) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
											  
											
												gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.
When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.
Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.
My approach, here, to improving the situation is to give users three options:
1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)
The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.
The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
											
										 
											2024-07-15 12:49:23 -06:00
										 |  |  |         obj3 = queue.get() | 
					
						
							|  |  |  |         self.assertIs(obj3, queues.UNBOUND) | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_put_cleared_with_subinterpreter_multiple(self): | 
					
						
							|  |  |  |         queue = queues.create() | 
					
						
							|  |  |  |         interp1 = interpreters.create() | 
					
						
							|  |  |  |         interp2 = interpreters.create() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         queue.put(1, syncobj=True) | 
					
						
							|  |  |  |         _run_output(interp1, dedent(f"""
 | 
					
						
							|  |  |  |             from test.support.interpreters import queues | 
					
						
							|  |  |  |             queue = queues.Queue({queue.id}) | 
					
						
							|  |  |  |             obj1 = queue.get() | 
					
						
							|  |  |  |             queue.put(2, syncobj=True, unbound=queues.UNBOUND) | 
					
						
							|  |  |  |             queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE) | 
					
						
							|  |  |  |             """))
 | 
					
						
							|  |  |  |         _run_output(interp2, dedent(f"""
 | 
					
						
							|  |  |  |             from test.support.interpreters import queues | 
					
						
							|  |  |  |             queue = queues.Queue({queue.id}) | 
					
						
							|  |  |  |             obj2 = queue.get() | 
					
						
							|  |  |  |             obj1 = queue.get() | 
					
						
							|  |  |  |             """))
 | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 0) | 
					
						
							|  |  |  |         queue.put(3) | 
					
						
							|  |  |  |         _run_output(interp1, dedent("""
 | 
					
						
							|  |  |  |             queue.put(4, syncobj=True, unbound=queues.UNBOUND) | 
					
						
							|  |  |  |             # interp closed here | 
					
						
							|  |  |  |             queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE) | 
					
						
							|  |  |  |             queue.put(6, syncobj=True, unbound=queues.UNBOUND) | 
					
						
							|  |  |  |             """))
 | 
					
						
							|  |  |  |         _run_output(interp2, dedent("""
 | 
					
						
							|  |  |  |             queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR) | 
					
						
							|  |  |  |             # interp closed here | 
					
						
							|  |  |  |             queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR) | 
					
						
							|  |  |  |             queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE) | 
					
						
							|  |  |  |             queue.put(8, syncobj=True, unbound=queues.UNBOUND) | 
					
						
							|  |  |  |             """))
 | 
					
						
							|  |  |  |         _run_output(interp1, dedent("""
 | 
					
						
							|  |  |  |             queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE) | 
					
						
							|  |  |  |             queue.put(10, syncobj=True, unbound=queues.UNBOUND) | 
					
						
							|  |  |  |             """))
 | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 10) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         obj3 = queue.get() | 
					
						
							|  |  |  |         self.assertEqual(obj3, 3) | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 9) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         obj4 = queue.get() | 
					
						
							|  |  |  |         self.assertEqual(obj4, 4) | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 8) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         del interp1 | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 6) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # obj5 was removed | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         obj6 = queue.get() | 
					
						
							|  |  |  |         self.assertIs(obj6, queues.UNBOUND) | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 5) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         obj7 = queue.get() | 
					
						
							|  |  |  |         self.assertEqual(obj7, 7) | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 4) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         del interp2 | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 3) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # obj1 | 
					
						
							|  |  |  |         with self.assertRaises(queues.ItemInterpreterDestroyed): | 
					
						
							|  |  |  |             queue.get() | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # obj2 was removed | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         obj8 = queue.get() | 
					
						
							|  |  |  |         self.assertIs(obj8, queues.UNBOUND) | 
					
						
							|  |  |  |         self.assertEqual(queue.qsize(), 1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # obj9 was removed | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         obj10 = queue.get() | 
					
						
							|  |  |  |         self.assertIs(obj10, queues.UNBOUND) | 
					
						
							| 
									
										
										
										
											2023-12-12 10:43:30 -07:00
										 |  |  |         self.assertEqual(queue.qsize(), 0) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |     def test_put_get_different_threads(self): | 
					
						
							|  |  |  |         queue1 = queues.create() | 
					
						
							|  |  |  |         queue2 = queues.create() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def f(): | 
					
						
							|  |  |  |             while True: | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     obj = queue1.get(timeout=0.1) | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |                 except queues.QueueEmpty: | 
					
						
							|  |  |  |                     continue | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |             queue2.put(obj, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |         t = threading.Thread(target=f) | 
					
						
							|  |  |  |         t.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         orig = b'spam' | 
					
						
							| 
									
										
										
										
											2024-02-28 16:08:08 -07:00
										 |  |  |         queue1.put(orig, syncobj=True) | 
					
						
							| 
									
										
										
										
											2023-12-12 08:24:31 -07:00
										 |  |  |         obj = queue2.get() | 
					
						
							|  |  |  |         t.join() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(obj, orig) | 
					
						
							|  |  |  |         self.assertIsNot(obj, orig) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     # Test needs to be a package, so we can do relative imports. | 
					
						
							|  |  |  |     unittest.main() |