import unittest try: import ctypes except ImportError: ctypes = None from threading import Thread from unittest import TestCase from test.support import threading_helper from test.support.threading_helper import run_concurrently if ctypes is not None: capi = ctypes.pythonapi freefunc = ctypes.CFUNCTYPE(None, ctypes.c_voidp) RequestCodeExtraIndex = capi.PyUnstable_Eval_RequestCodeExtraIndex RequestCodeExtraIndex.argtypes = (freefunc,) RequestCodeExtraIndex.restype = ctypes.c_ssize_t SetExtra = capi.PyUnstable_Code_SetExtra SetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.c_voidp) SetExtra.restype = ctypes.c_int GetExtra = capi.PyUnstable_Code_GetExtra GetExtra.argtypes = ( ctypes.py_object, ctypes.c_ssize_t, ctypes.POINTER(ctypes.c_voidp), ) GetExtra.restype = ctypes.c_int # Note: each call to RequestCodeExtraIndex permanently allocates a slot # (the counter is monotonically increasing), up to MAX_CO_EXTRA_USERS (255). NTHREADS = 20 @threading_helper.requires_working_threading() class TestCode(TestCase): def test_code_attrs(self): """Test concurrent accesses to lazily initialized code attributes""" code_objects = [] for _ in range(1000): code_objects.append(compile("a + b", "", "eval")) def run_in_thread(): for code in code_objects: self.assertIsInstance(code.co_code, bytes) self.assertIsInstance(code.co_freevars, tuple) self.assertIsInstance(code.co_varnames, tuple) threads = [Thread(target=run_in_thread) for _ in range(2)] for thread in threads: thread.start() for thread in threads: thread.join() @unittest.skipUnless(ctypes, "ctypes is required") def test_request_code_extra_index_concurrent(self): """Test concurrent calls to RequestCodeExtraIndex""" results = [] def worker(): idx = RequestCodeExtraIndex(freefunc(0)) self.assertGreaterEqual(idx, 0) results.append(idx) run_concurrently(worker_func=worker, nthreads=NTHREADS) # Every thread must get a unique index. self.assertEqual(len(results), NTHREADS) self.assertEqual(len(set(results)), NTHREADS) @unittest.skipUnless(ctypes, "ctypes is required") def test_code_extra_all_ops_concurrent(self): """Test concurrent RequestCodeExtraIndex + SetExtra + GetExtra""" LOOP = 100 def f(): pass code = f.__code__ def worker(): idx = RequestCodeExtraIndex(freefunc(0)) self.assertGreaterEqual(idx, 0) for i in range(LOOP): ret = SetExtra(code, idx, ctypes.c_voidp(i + 1)) self.assertEqual(ret, 0) for _ in range(LOOP): extra = ctypes.c_voidp() ret = GetExtra(code, idx, extra) self.assertEqual(ret, 0) # The slot was set by this thread, so the value must # be the last one written. self.assertEqual(extra.value, LOOP) run_concurrently(worker_func=worker, nthreads=NTHREADS) @unittest.skipUnless(ctypes, "ctypes is required") def test_code_extra_set_get_concurrent(self): """Test concurrent SetExtra + GetExtra on a shared index""" LOOP = 100 def f(): pass code = f.__code__ idx = RequestCodeExtraIndex(freefunc(0)) self.assertGreaterEqual(idx, 0) def worker(): for i in range(LOOP): ret = SetExtra(code, idx, ctypes.c_voidp(i + 1)) self.assertEqual(ret, 0) for _ in range(LOOP): extra = ctypes.c_voidp() ret = GetExtra(code, idx, extra) self.assertEqual(ret, 0) # Value is set by any writer thread. self.assertTrue(1 <= extra.value <= LOOP) run_concurrently(worker_func=worker, nthreads=NTHREADS) # Every thread's last write is LOOP, so the final value must be LOOP. extra = ctypes.c_voidp() ret = GetExtra(code, idx, extra) self.assertEqual(ret, 0) self.assertEqual(extra.value, LOOP) if __name__ == "__main__": unittest.main()