| 
									
										
										
										
											2022-11-16 20:13:32 +01:00
										 |  |  | import unittest | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from contextlib import contextmanager, ExitStack | 
					
						
							|  |  |  | from test.support import catch_unraisable_exception, import_helper | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Skip this test if the _testcapi module isn't available. | 
					
						
							|  |  |  | _testcapi = import_helper.import_module('_testcapi') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestDictWatchers(unittest.TestCase): | 
					
						
							|  |  |  |     # types of watchers testcapimodule can add: | 
					
						
							|  |  |  |     EVENTS = 0   # appends dict events as strings to global event list | 
					
						
							|  |  |  |     ERROR = 1    # unconditionally sets and signals a RuntimeException | 
					
						
							|  |  |  |     SECOND = 2   # always appends "second" to global event list | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_watcher(self, kind=EVENTS): | 
					
						
							|  |  |  |         return _testcapi.add_dict_watcher(kind) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def clear_watcher(self, watcher_id): | 
					
						
							|  |  |  |         _testcapi.clear_dict_watcher(watcher_id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @contextmanager | 
					
						
							|  |  |  |     def watcher(self, kind=EVENTS): | 
					
						
							|  |  |  |         wid = self.add_watcher(kind) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             yield wid | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self.clear_watcher(wid) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def assert_events(self, expected): | 
					
						
							|  |  |  |         actual = _testcapi.get_dict_watcher_events() | 
					
						
							|  |  |  |         self.assertEqual(actual, expected) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def watch(self, wid, d): | 
					
						
							|  |  |  |         _testcapi.watch_dict(wid, d) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def unwatch(self, wid, d): | 
					
						
							|  |  |  |         _testcapi.unwatch_dict(wid, d) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_set_new_item(self): | 
					
						
							|  |  |  |         d = {} | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, d) | 
					
						
							|  |  |  |             d["foo"] = "bar" | 
					
						
							|  |  |  |             self.assert_events(["new:foo:bar"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_set_existing_item(self): | 
					
						
							|  |  |  |         d = {"foo": "bar"} | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, d) | 
					
						
							|  |  |  |             d["foo"] = "baz" | 
					
						
							|  |  |  |             self.assert_events(["mod:foo:baz"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clone(self): | 
					
						
							|  |  |  |         d = {} | 
					
						
							|  |  |  |         d2 = {"foo": "bar"} | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, d) | 
					
						
							|  |  |  |             d.update(d2) | 
					
						
							|  |  |  |             self.assert_events(["clone"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_no_event_if_not_watched(self): | 
					
						
							|  |  |  |         d = {} | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             d["foo"] = "bar" | 
					
						
							|  |  |  |             self.assert_events([]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_del(self): | 
					
						
							|  |  |  |         d = {"foo": "bar"} | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, d) | 
					
						
							|  |  |  |             del d["foo"] | 
					
						
							|  |  |  |             self.assert_events(["del:foo"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_pop(self): | 
					
						
							|  |  |  |         d = {"foo": "bar"} | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, d) | 
					
						
							|  |  |  |             d.pop("foo") | 
					
						
							|  |  |  |             self.assert_events(["del:foo"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear(self): | 
					
						
							|  |  |  |         d = {"foo": "bar"} | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, d) | 
					
						
							|  |  |  |             d.clear() | 
					
						
							|  |  |  |             self.assert_events(["clear"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_dealloc(self): | 
					
						
							|  |  |  |         d = {"foo": "bar"} | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, d) | 
					
						
							|  |  |  |             del d | 
					
						
							|  |  |  |             self.assert_events(["dealloc"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unwatch(self): | 
					
						
							|  |  |  |         d = {} | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, d) | 
					
						
							|  |  |  |             d["foo"] = "bar" | 
					
						
							|  |  |  |             self.unwatch(wid, d) | 
					
						
							|  |  |  |             d["hmm"] = "baz" | 
					
						
							|  |  |  |             self.assert_events(["new:foo:bar"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_error(self): | 
					
						
							|  |  |  |         d = {} | 
					
						
							|  |  |  |         with self.watcher(kind=self.ERROR) as wid: | 
					
						
							|  |  |  |             self.watch(wid, d) | 
					
						
							|  |  |  |             with catch_unraisable_exception() as cm: | 
					
						
							|  |  |  |                 d["foo"] = "bar" | 
					
						
							|  |  |  |                 self.assertIs(cm.unraisable.object, d) | 
					
						
							|  |  |  |                 self.assertEqual(str(cm.unraisable.exc_value), "boom!") | 
					
						
							|  |  |  |             self.assert_events([]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_two_watchers(self): | 
					
						
							|  |  |  |         d1 = {} | 
					
						
							|  |  |  |         d2 = {} | 
					
						
							|  |  |  |         with self.watcher() as wid1: | 
					
						
							|  |  |  |             with self.watcher(kind=self.SECOND) as wid2: | 
					
						
							|  |  |  |                 self.watch(wid1, d1) | 
					
						
							|  |  |  |                 self.watch(wid2, d2) | 
					
						
							|  |  |  |                 d1["foo"] = "bar" | 
					
						
							|  |  |  |                 d2["hmm"] = "baz" | 
					
						
							|  |  |  |                 self.assert_events(["new:foo:bar", "second"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_watch_non_dict(self): | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"): | 
					
						
							|  |  |  |                 self.watch(wid, 1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_watch_out_of_range_watcher_id(self): | 
					
						
							|  |  |  |         d = {} | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): | 
					
						
							|  |  |  |             self.watch(-1, d) | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): | 
					
						
							|  |  |  |             self.watch(8, d)  # DICT_MAX_WATCHERS = 8 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_watch_unassigned_watcher_id(self): | 
					
						
							|  |  |  |         d = {} | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"): | 
					
						
							|  |  |  |             self.watch(1, d) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unwatch_non_dict(self): | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"): | 
					
						
							|  |  |  |                 self.unwatch(wid, 1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unwatch_out_of_range_watcher_id(self): | 
					
						
							|  |  |  |         d = {} | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): | 
					
						
							|  |  |  |             self.unwatch(-1, d) | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): | 
					
						
							|  |  |  |             self.unwatch(8, d)  # DICT_MAX_WATCHERS = 8 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unwatch_unassigned_watcher_id(self): | 
					
						
							|  |  |  |         d = {} | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"): | 
					
						
							|  |  |  |             self.unwatch(1, d) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_out_of_range_watcher_id(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): | 
					
						
							|  |  |  |             self.clear_watcher(-1) | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): | 
					
						
							|  |  |  |             self.clear_watcher(8)  # DICT_MAX_WATCHERS = 8 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_unassigned_watcher_id(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"): | 
					
						
							|  |  |  |             self.clear_watcher(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestTypeWatchers(unittest.TestCase): | 
					
						
							|  |  |  |     # types of watchers testcapimodule can add: | 
					
						
							|  |  |  |     TYPES = 0    # appends modified types to global event list | 
					
						
							|  |  |  |     ERROR = 1    # unconditionally sets and signals a RuntimeException | 
					
						
							|  |  |  |     WRAP = 2     # appends modified type wrapped in list to global event list | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # duplicating the C constant | 
					
						
							|  |  |  |     TYPE_MAX_WATCHERS = 8 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_watcher(self, kind=TYPES): | 
					
						
							|  |  |  |         return _testcapi.add_type_watcher(kind) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def clear_watcher(self, watcher_id): | 
					
						
							|  |  |  |         _testcapi.clear_type_watcher(watcher_id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @contextmanager | 
					
						
							|  |  |  |     def watcher(self, kind=TYPES): | 
					
						
							|  |  |  |         wid = self.add_watcher(kind) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             yield wid | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self.clear_watcher(wid) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def assert_events(self, expected): | 
					
						
							|  |  |  |         actual = _testcapi.get_type_modified_events() | 
					
						
							|  |  |  |         self.assertEqual(actual, expected) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def watch(self, wid, t): | 
					
						
							|  |  |  |         _testcapi.watch_type(wid, t) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def unwatch(self, wid, t): | 
					
						
							|  |  |  |         _testcapi.unwatch_type(wid, t) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_watch_type(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, C) | 
					
						
							|  |  |  |             C.foo = "bar" | 
					
						
							|  |  |  |             self.assert_events([C]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_event_aggregation(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, C) | 
					
						
							|  |  |  |             C.foo = "bar" | 
					
						
							|  |  |  |             C.bar = "baz" | 
					
						
							|  |  |  |             # only one event registered for both modifications | 
					
						
							|  |  |  |             self.assert_events([C]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_lookup_resets_aggregation(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, C) | 
					
						
							|  |  |  |             C.foo = "bar" | 
					
						
							|  |  |  |             # lookup resets type version tag | 
					
						
							|  |  |  |             self.assertEqual(C.foo, "bar") | 
					
						
							|  |  |  |             C.bar = "baz" | 
					
						
							|  |  |  |             # both events registered | 
					
						
							|  |  |  |             self.assert_events([C, C]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unwatch_type(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, C) | 
					
						
							|  |  |  |             C.foo = "bar" | 
					
						
							|  |  |  |             self.assertEqual(C.foo, "bar") | 
					
						
							|  |  |  |             self.assert_events([C]) | 
					
						
							|  |  |  |             self.unwatch(wid, C) | 
					
						
							|  |  |  |             C.bar = "baz" | 
					
						
							|  |  |  |             self.assert_events([C]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_watcher(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         # outer watcher is unused, it's just to keep events list alive | 
					
						
							|  |  |  |         with self.watcher() as _: | 
					
						
							|  |  |  |             with self.watcher() as wid: | 
					
						
							|  |  |  |                 self.watch(wid, C) | 
					
						
							|  |  |  |                 C.foo = "bar" | 
					
						
							|  |  |  |                 self.assertEqual(C.foo, "bar") | 
					
						
							|  |  |  |                 self.assert_events([C]) | 
					
						
							|  |  |  |             C.bar = "baz" | 
					
						
							|  |  |  |             # Watcher on C has been cleared, no new event | 
					
						
							|  |  |  |             self.assert_events([C]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_watch_type_subclass(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         class D(C): pass | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             self.watch(wid, D) | 
					
						
							|  |  |  |             C.foo = "bar" | 
					
						
							|  |  |  |             self.assert_events([D]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_error(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         with self.watcher(kind=self.ERROR) as wid: | 
					
						
							|  |  |  |             self.watch(wid, C) | 
					
						
							|  |  |  |             with catch_unraisable_exception() as cm: | 
					
						
							|  |  |  |                 C.foo = "bar" | 
					
						
							|  |  |  |                 self.assertIs(cm.unraisable.object, C) | 
					
						
							|  |  |  |                 self.assertEqual(str(cm.unraisable.exc_value), "boom!") | 
					
						
							|  |  |  |             self.assert_events([]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_two_watchers(self): | 
					
						
							|  |  |  |         class C1: pass | 
					
						
							|  |  |  |         class C2: pass | 
					
						
							|  |  |  |         with self.watcher() as wid1: | 
					
						
							|  |  |  |             with self.watcher(kind=self.WRAP) as wid2: | 
					
						
							|  |  |  |                 self.assertNotEqual(wid1, wid2) | 
					
						
							|  |  |  |                 self.watch(wid1, C1) | 
					
						
							|  |  |  |                 self.watch(wid2, C2) | 
					
						
							|  |  |  |                 C1.foo = "bar" | 
					
						
							|  |  |  |                 C2.hmm = "baz" | 
					
						
							|  |  |  |                 self.assert_events([C1, [C2]]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_watch_non_type(self): | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"): | 
					
						
							|  |  |  |                 self.watch(wid, 1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_watch_out_of_range_watcher_id(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): | 
					
						
							|  |  |  |             self.watch(-1, C) | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): | 
					
						
							|  |  |  |             self.watch(self.TYPE_MAX_WATCHERS, C) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_watch_unassigned_watcher_id(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): | 
					
						
							|  |  |  |             self.watch(1, C) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unwatch_non_type(self): | 
					
						
							|  |  |  |         with self.watcher() as wid: | 
					
						
							|  |  |  |             with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"): | 
					
						
							|  |  |  |                 self.unwatch(wid, 1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unwatch_out_of_range_watcher_id(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): | 
					
						
							|  |  |  |             self.unwatch(-1, C) | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): | 
					
						
							|  |  |  |             self.unwatch(self.TYPE_MAX_WATCHERS, C) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unwatch_unassigned_watcher_id(self): | 
					
						
							|  |  |  |         class C: pass | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): | 
					
						
							|  |  |  |             self.unwatch(1, C) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_out_of_range_watcher_id(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): | 
					
						
							|  |  |  |             self.clear_watcher(-1) | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): | 
					
						
							|  |  |  |             self.clear_watcher(self.TYPE_MAX_WATCHERS) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_unassigned_watcher_id(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): | 
					
						
							|  |  |  |             self.clear_watcher(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_no_more_ids_available(self): | 
					
						
							|  |  |  |         contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)] | 
					
						
							|  |  |  |         with ExitStack() as stack: | 
					
						
							|  |  |  |             for ctx in contexts: | 
					
						
							|  |  |  |                 stack.enter_context(ctx) | 
					
						
							|  |  |  |             with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"): | 
					
						
							|  |  |  |                 self.add_watcher() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-02 09:28:27 -08:00
										 |  |  | class TestCodeObjectWatchers(unittest.TestCase): | 
					
						
							|  |  |  |     @contextmanager | 
					
						
							|  |  |  |     def code_watcher(self, which_watcher): | 
					
						
							|  |  |  |         wid = _testcapi.add_code_watcher(which_watcher) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             yield wid | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             _testcapi.clear_code_watcher(wid) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def assert_event_counts(self, exp_created_0, exp_destroyed_0, | 
					
						
							|  |  |  |                             exp_created_1, exp_destroyed_1): | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             exp_created_0, _testcapi.get_code_watcher_num_created_events(0)) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             exp_destroyed_0, _testcapi.get_code_watcher_num_destroyed_events(0)) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             exp_created_1, _testcapi.get_code_watcher_num_created_events(1)) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             exp_destroyed_1, _testcapi.get_code_watcher_num_destroyed_events(1)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_code_object_events_dispatched(self): | 
					
						
							|  |  |  |         # verify that all counts are zero before any watchers are registered | 
					
						
							|  |  |  |         self.assert_event_counts(0, 0, 0, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # verify that all counts remain zero when a code object is | 
					
						
							|  |  |  |         # created and destroyed with no watchers registered | 
					
						
							|  |  |  |         co1 = _testcapi.code_newempty("test_watchers", "dummy1", 0) | 
					
						
							|  |  |  |         self.assert_event_counts(0, 0, 0, 0) | 
					
						
							|  |  |  |         del co1 | 
					
						
							|  |  |  |         self.assert_event_counts(0, 0, 0, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # verify counts are as expected when first watcher is registered | 
					
						
							|  |  |  |         with self.code_watcher(0): | 
					
						
							|  |  |  |             self.assert_event_counts(0, 0, 0, 0) | 
					
						
							|  |  |  |             co2 = _testcapi.code_newempty("test_watchers", "dummy2", 0) | 
					
						
							|  |  |  |             self.assert_event_counts(1, 0, 0, 0) | 
					
						
							|  |  |  |             del co2 | 
					
						
							|  |  |  |             self.assert_event_counts(1, 1, 0, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # again with second watcher registered | 
					
						
							|  |  |  |             with self.code_watcher(1): | 
					
						
							|  |  |  |                 self.assert_event_counts(1, 1, 0, 0) | 
					
						
							|  |  |  |                 co3 = _testcapi.code_newempty("test_watchers", "dummy3", 0) | 
					
						
							|  |  |  |                 self.assert_event_counts(2, 1, 1, 0) | 
					
						
							|  |  |  |                 del co3 | 
					
						
							|  |  |  |                 self.assert_event_counts(2, 2, 1, 1) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-04 04:38:21 -08:00
										 |  |  |         # verify counts are reset and don't change after both watchers are cleared | 
					
						
							| 
									
										
										
										
											2022-12-02 09:28:27 -08:00
										 |  |  |         co4 = _testcapi.code_newempty("test_watchers", "dummy4", 0) | 
					
						
							| 
									
										
										
										
											2022-12-04 04:38:21 -08:00
										 |  |  |         self.assert_event_counts(0, 0, 0, 0) | 
					
						
							| 
									
										
										
										
											2022-12-02 09:28:27 -08:00
										 |  |  |         del co4 | 
					
						
							| 
									
										
										
										
											2022-12-04 04:38:21 -08:00
										 |  |  |         self.assert_event_counts(0, 0, 0, 0) | 
					
						
							| 
									
										
										
										
											2022-12-02 09:28:27 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_out_of_range_watcher_id(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID -1"): | 
					
						
							|  |  |  |             _testcapi.clear_code_watcher(-1) | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID 8"): | 
					
						
							|  |  |  |             _testcapi.clear_code_watcher(8)  # CODE_MAX_WATCHERS = 8 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_unassigned_watcher_id(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"No code watcher set for ID 1"): | 
					
						
							|  |  |  |             _testcapi.clear_code_watcher(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_allocate_too_many_watchers(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(RuntimeError, r"no more code watcher IDs available"): | 
					
						
							|  |  |  |             _testcapi.allocate_too_many_code_watchers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-22 04:06:44 -08:00
										 |  |  | class TestFuncWatchers(unittest.TestCase): | 
					
						
							|  |  |  |     @contextmanager | 
					
						
							|  |  |  |     def add_watcher(self, func): | 
					
						
							|  |  |  |         wid = _testcapi.add_func_watcher(func) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             yield | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             _testcapi.clear_func_watcher(wid) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_func_events_dispatched(self): | 
					
						
							|  |  |  |         events = [] | 
					
						
							|  |  |  |         def watcher(*args): | 
					
						
							|  |  |  |             events.append(args) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.add_watcher(watcher): | 
					
						
							|  |  |  |             def myfunc(): | 
					
						
							|  |  |  |                 pass | 
					
						
							|  |  |  |             self.assertIn((_testcapi.PYFUNC_EVENT_CREATE, myfunc, None), events) | 
					
						
							|  |  |  |             myfunc_id = id(myfunc) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             new_code = self.test_func_events_dispatched.__code__ | 
					
						
							|  |  |  |             myfunc.__code__ = new_code | 
					
						
							|  |  |  |             self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_CODE, myfunc, new_code), events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             new_defaults = (123,) | 
					
						
							|  |  |  |             myfunc.__defaults__ = new_defaults | 
					
						
							|  |  |  |             self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             new_defaults = (456,) | 
					
						
							|  |  |  |             _testcapi.set_func_defaults_via_capi(myfunc, new_defaults) | 
					
						
							|  |  |  |             self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             new_kwdefaults = {"self": 123} | 
					
						
							|  |  |  |             myfunc.__kwdefaults__ = new_kwdefaults | 
					
						
							|  |  |  |             self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             new_kwdefaults = {"self": 456} | 
					
						
							|  |  |  |             _testcapi.set_func_kwdefaults_via_capi(myfunc, new_kwdefaults) | 
					
						
							|  |  |  |             self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Clear events reference to func | 
					
						
							|  |  |  |             events = [] | 
					
						
							|  |  |  |             del myfunc | 
					
						
							|  |  |  |             self.assertIn((_testcapi.PYFUNC_EVENT_DESTROY, myfunc_id, None), events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_multiple_watchers(self): | 
					
						
							|  |  |  |         events0 = [] | 
					
						
							|  |  |  |         def first_watcher(*args): | 
					
						
							|  |  |  |             events0.append(args) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         events1 = [] | 
					
						
							|  |  |  |         def second_watcher(*args): | 
					
						
							|  |  |  |             events1.append(args) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.add_watcher(first_watcher): | 
					
						
							|  |  |  |             with self.add_watcher(second_watcher): | 
					
						
							|  |  |  |                 def myfunc(): | 
					
						
							|  |  |  |                     pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 event = (_testcapi.PYFUNC_EVENT_CREATE, myfunc, None) | 
					
						
							|  |  |  |                 self.assertIn(event, events0) | 
					
						
							|  |  |  |                 self.assertIn(event, events1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_watcher_raises_error(self): | 
					
						
							|  |  |  |         class MyError(Exception): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def watcher(*args): | 
					
						
							|  |  |  |             raise MyError("testing 123") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.add_watcher(watcher): | 
					
						
							|  |  |  |             with catch_unraisable_exception() as cm: | 
					
						
							|  |  |  |                 def myfunc(): | 
					
						
							|  |  |  |                     pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 self.assertIs(cm.unraisable.object, myfunc) | 
					
						
							|  |  |  |                 self.assertIsInstance(cm.unraisable.exc_value, MyError) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_out_of_range_watcher_id(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"invalid func watcher ID -1"): | 
					
						
							|  |  |  |             _testcapi.clear_func_watcher(-1) | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"invalid func watcher ID 8"): | 
					
						
							|  |  |  |             _testcapi.clear_func_watcher(8)  # FUNC_MAX_WATCHERS = 8 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_unassigned_watcher_id(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(ValueError, r"no func watcher set for ID 1"): | 
					
						
							|  |  |  |             _testcapi.clear_func_watcher(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_allocate_too_many_watchers(self): | 
					
						
							|  |  |  |         with self.assertRaisesRegex(RuntimeError, r"no more func watcher IDs"): | 
					
						
							|  |  |  |             _testcapi.allocate_too_many_func_watchers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-16 20:13:32 +01:00
										 |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     unittest.main() |