mirror of
https://github.com/python/cpython.git
synced 2026-01-05 06:52:26 +00:00
Make the attributes in _bz2 module thread-safe on the free-threading build. Attributes (eof, needs_input, unused_data) are now stored atomically or accessed via mutex-protected getters.
64 lines
2.2 KiB
Python
64 lines
2.2 KiB
Python
import unittest
|
|
|
|
from test.support import import_helper, threading_helper
|
|
from test.support.threading_helper import run_concurrently
|
|
|
|
bz2 = import_helper.import_module("bz2")
|
|
from bz2 import BZ2Compressor, BZ2Decompressor
|
|
|
|
from test.test_bz2 import ext_decompress, BaseTest
|
|
|
|
|
|
NTHREADS = 10
|
|
TEXT = BaseTest.TEXT
|
|
|
|
|
|
@threading_helper.requires_working_threading()
|
|
class TestBZ2(unittest.TestCase):
|
|
def test_compressor(self):
|
|
bz2c = BZ2Compressor()
|
|
|
|
def worker():
|
|
# it should return empty bytes as it buffers data internally
|
|
data = bz2c.compress(TEXT)
|
|
self.assertEqual(data, b"")
|
|
|
|
run_concurrently(worker_func=worker, nthreads=NTHREADS)
|
|
data = bz2c.flush()
|
|
# The decompressed data should be TEXT repeated NTHREADS times
|
|
decompressed = ext_decompress(data)
|
|
self.assertEqual(decompressed, TEXT * NTHREADS)
|
|
|
|
def test_decompressor(self):
|
|
chunk_size = 128
|
|
chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
|
|
input_data = b"".join(chunks)
|
|
compressed = bz2.compress(input_data)
|
|
|
|
bz2d = BZ2Decompressor()
|
|
output = []
|
|
|
|
def worker():
|
|
data = bz2d.decompress(compressed, chunk_size)
|
|
self.assertEqual(len(data), chunk_size)
|
|
output.append(data)
|
|
# Read attributes concurrently with other threads decompressing
|
|
self.assertIsInstance(bz2d.eof, bool)
|
|
self.assertIsInstance(bz2d.needs_input, bool)
|
|
self.assertIsInstance(bz2d.unused_data, bytes)
|
|
|
|
run_concurrently(worker_func=worker, nthreads=NTHREADS)
|
|
self.assertEqual(len(output), NTHREADS)
|
|
# Verify the expected chunks (order doesn't matter due to append race)
|
|
self.assertEqual(set(output), set(chunks))
|
|
self.assertTrue(bz2d.eof)
|
|
self.assertFalse(bz2d.needs_input)
|
|
# Each thread added full compressed data to the buffer, but only 1 copy
|
|
# is consumed to produce the output. The rest remains as unused_data.
|
|
self.assertEqual(
|
|
len(bz2d.unused_data), len(compressed) * (NTHREADS - 1)
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|