cpython/Lib/test/test_free_threading/test_code.py
Alper a56532771a
gh-144981: Make PyUnstable_Code_SetExtra/GetExtra thread-safe (#144980)
* Make PyUnstable_Code_SetExtra/GetExtra thread-safe
2026-02-20 10:52:18 -08:00

139 lines
4.3 KiB
Python

import unittest
try:
import ctypes
except ImportError:
ctypes = None
from threading import Thread
from unittest import TestCase
from test.support import threading_helper
from test.support.threading_helper import run_concurrently
if ctypes is not None:
capi = ctypes.pythonapi
freefunc = ctypes.CFUNCTYPE(None, ctypes.c_voidp)
RequestCodeExtraIndex = capi.PyUnstable_Eval_RequestCodeExtraIndex
RequestCodeExtraIndex.argtypes = (freefunc,)
RequestCodeExtraIndex.restype = ctypes.c_ssize_t
SetExtra = capi.PyUnstable_Code_SetExtra
SetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.c_voidp)
SetExtra.restype = ctypes.c_int
GetExtra = capi.PyUnstable_Code_GetExtra
GetExtra.argtypes = (
ctypes.py_object,
ctypes.c_ssize_t,
ctypes.POINTER(ctypes.c_voidp),
)
GetExtra.restype = ctypes.c_int
# Note: each call to RequestCodeExtraIndex permanently allocates a slot
# (the counter is monotonically increasing), up to MAX_CO_EXTRA_USERS (255).
NTHREADS = 20
@threading_helper.requires_working_threading()
class TestCode(TestCase):
def test_code_attrs(self):
"""Test concurrent accesses to lazily initialized code attributes"""
code_objects = []
for _ in range(1000):
code_objects.append(compile("a + b", "<string>", "eval"))
def run_in_thread():
for code in code_objects:
self.assertIsInstance(code.co_code, bytes)
self.assertIsInstance(code.co_freevars, tuple)
self.assertIsInstance(code.co_varnames, tuple)
threads = [Thread(target=run_in_thread) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
@unittest.skipUnless(ctypes, "ctypes is required")
def test_request_code_extra_index_concurrent(self):
"""Test concurrent calls to RequestCodeExtraIndex"""
results = []
def worker():
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)
results.append(idx)
run_concurrently(worker_func=worker, nthreads=NTHREADS)
# Every thread must get a unique index.
self.assertEqual(len(results), NTHREADS)
self.assertEqual(len(set(results)), NTHREADS)
@unittest.skipUnless(ctypes, "ctypes is required")
def test_code_extra_all_ops_concurrent(self):
"""Test concurrent RequestCodeExtraIndex + SetExtra + GetExtra"""
LOOP = 100
def f():
pass
code = f.__code__
def worker():
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)
for i in range(LOOP):
ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
self.assertEqual(ret, 0)
for _ in range(LOOP):
extra = ctypes.c_voidp()
ret = GetExtra(code, idx, extra)
self.assertEqual(ret, 0)
# The slot was set by this thread, so the value must
# be the last one written.
self.assertEqual(extra.value, LOOP)
run_concurrently(worker_func=worker, nthreads=NTHREADS)
@unittest.skipUnless(ctypes, "ctypes is required")
def test_code_extra_set_get_concurrent(self):
"""Test concurrent SetExtra + GetExtra on a shared index"""
LOOP = 100
def f():
pass
code = f.__code__
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)
def worker():
for i in range(LOOP):
ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
self.assertEqual(ret, 0)
for _ in range(LOOP):
extra = ctypes.c_voidp()
ret = GetExtra(code, idx, extra)
self.assertEqual(ret, 0)
# Value is set by any writer thread.
self.assertTrue(1 <= extra.value <= LOOP)
run_concurrently(worker_func=worker, nthreads=NTHREADS)
# Every thread's last write is LOOP, so the final value must be LOOP.
extra = ctypes.c_voidp()
ret = GetExtra(code, idx, extra)
self.assertEqual(ret, 0)
self.assertEqual(extra.value, LOOP)
if __name__ == "__main__":
unittest.main()