import functools import sys import threading import unittest from test.support import threading_helper threading_helper.requires_working_threading(module=True) def run_with_frame(funcs, runner=None, iters=10): """Run funcs with a frame from another thread that is currently executing. Args: funcs: A function or list of functions that take a frame argument runner: Optional function to run in the executor thread. If provided, it will be called and should return eventually. The frame passed to funcs will be the runner's frame. iters: Number of iterations each func should run """ if not isinstance(funcs, list): funcs = [funcs] frame_var = None e = threading.Event() b = threading.Barrier(len(funcs) + 1) if runner is None: def runner(): j = 0 for i in range(100): j += i def executor(): nonlocal frame_var frame_var = sys._getframe() e.set() b.wait() runner() def func_wrapper(func): e.wait() frame = frame_var b.wait() for _ in range(iters): func(frame) test_funcs = [functools.partial(func_wrapper, f) for f in funcs] threading_helper.run_concurrently([executor] + test_funcs) class TestFrameRaces(unittest.TestCase): def test_concurrent_f_lasti(self): run_with_frame(lambda frame: frame.f_lasti) def test_concurrent_f_lineno(self): run_with_frame(lambda frame: frame.f_lineno) def test_concurrent_f_code(self): run_with_frame(lambda frame: frame.f_code) def test_concurrent_f_back(self): run_with_frame(lambda frame: frame.f_back) def test_concurrent_f_globals(self): run_with_frame(lambda frame: frame.f_globals) def test_concurrent_f_builtins(self): run_with_frame(lambda frame: frame.f_builtins) def test_concurrent_f_locals(self): run_with_frame(lambda frame: frame.f_locals) def test_concurrent_f_trace_read(self): run_with_frame(lambda frame: frame.f_trace) def test_concurrent_f_trace_opcodes_read(self): run_with_frame(lambda frame: frame.f_trace_opcodes) def test_concurrent_repr(self): run_with_frame(lambda frame: repr(frame)) def test_concurrent_f_trace_write(self): def trace_func(frame, event, arg): return trace_func def writer(frame): frame.f_trace = trace_func frame.f_trace = None run_with_frame(writer) def test_concurrent_f_trace_read_write(self): # Test concurrent reads and writes of f_trace on a live frame. def trace_func(frame, event, arg): return trace_func def reader(frame): _ = frame.f_trace def writer(frame): frame.f_trace = trace_func frame.f_trace = None run_with_frame([reader, writer, reader, writer]) def test_concurrent_f_trace_opcodes_write(self): def writer(frame): frame.f_trace_opcodes = True frame.f_trace_opcodes = False run_with_frame(writer) def test_concurrent_f_trace_opcodes_read_write(self): # Test concurrent reads and writes of f_trace_opcodes on a live frame. def reader(frame): _ = frame.f_trace_opcodes def writer(frame): frame.f_trace_opcodes = True frame.f_trace_opcodes = False run_with_frame([reader, writer, reader, writer]) def test_concurrent_frame_clear(self): # Test race between frame.clear() and attribute reads. def create_frame(): x = 1 y = 2 return sys._getframe() frame = create_frame() def reader(): for _ in range(10): try: _ = frame.f_locals _ = frame.f_code _ = frame.f_lineno except ValueError: # Frame may be cleared pass def clearer(): frame.clear() threading_helper.run_concurrently([reader, reader, clearer]) if __name__ == "__main__": unittest.main()