cpython/Lib/test/test_free_threading/test_list.py
bkap123 2af2a38302
[3.14] gh-145036: Fix data race for list capacity in free-threading (GH-145365) (#145881)
(cherry picked from commit 9e0802330c)

Co-authored-by: Kumar Aditya <kumaraditya@python.org>
2026-03-12 22:06:12 -04:00

117 lines
3 KiB
Python

import unittest
from threading import Thread, Barrier
from unittest import TestCase
from test.support import threading_helper
NTHREAD = 10
OBJECT_COUNT = 5_000
class C:
def __init__(self, v):
self.v = v
@threading_helper.requires_working_threading()
class TestList(TestCase):
def test_racing_iter_append(self):
l = []
barrier = Barrier(NTHREAD + 1)
def writer_func(l):
barrier.wait()
for i in range(OBJECT_COUNT):
l.append(C(i + OBJECT_COUNT))
def reader_func(l):
barrier.wait()
while True:
count = len(l)
for i, x in enumerate(l):
self.assertEqual(x.v, i + OBJECT_COUNT)
if count == OBJECT_COUNT:
break
writer = Thread(target=writer_func, args=(l,))
readers = []
for x in range(NTHREAD):
reader = Thread(target=reader_func, args=(l,))
readers.append(reader)
reader.start()
writer.start()
writer.join()
for reader in readers:
reader.join()
def test_racing_iter_extend(self):
l = []
barrier = Barrier(NTHREAD + 1)
def writer_func():
barrier.wait()
for i in range(OBJECT_COUNT):
l.extend([C(i + OBJECT_COUNT)])
def reader_func():
barrier.wait()
while True:
count = len(l)
for i, x in enumerate(l):
self.assertEqual(x.v, i + OBJECT_COUNT)
if count == OBJECT_COUNT:
break
writer = Thread(target=writer_func)
readers = []
for x in range(NTHREAD):
reader = Thread(target=reader_func)
readers.append(reader)
reader.start()
writer.start()
writer.join()
for reader in readers:
reader.join()
def test_store_list_int(self):
def copy_back_and_forth(b, l):
b.wait()
for _ in range(100):
l[0] = l[1]
l[1] = l[0]
l = [0, 1]
barrier = Barrier(NTHREAD)
threads = [Thread(target=copy_back_and_forth, args=(barrier, l))
for _ in range(NTHREAD)]
with threading_helper.start_threads(threads):
pass
# gh-145036: race condition with list.__sizeof__()
def test_list_sizeof_free_threaded_build(self):
L = []
def mutate_function():
for _ in range(100):
L.append(1)
L.pop()
def size_function():
for _ in range(100):
L.__sizeof__()
threads = []
for _ in range(4):
threads.append(Thread(target=mutate_function))
threads.append(Thread(target=size_function))
with threading_helper.start_threads(threads):
pass
if __name__ == "__main__":
unittest.main()