import unittest from test.support import import_helper, threading_helper from test.support.threading_helper import run_concurrently lzma = import_helper.import_module("lzma") from lzma import LZMACompressor, LZMADecompressor from test.test_lzma import INPUT NTHREADS = 10 @threading_helper.requires_working_threading() class TestLZMA(unittest.TestCase): def test_compressor(self): lzc = LZMACompressor() # First compress() outputs LZMA header header = lzc.compress(INPUT) self.assertGreater(len(header), 0) def worker(): # it should return empty bytes as it buffers data internally data = lzc.compress(INPUT) self.assertEqual(data, b"") run_concurrently(worker_func=worker, nthreads=NTHREADS - 1) full_compressed = header + lzc.flush() decompressed = lzma.decompress(full_compressed) # The decompressed data should be INPUT repeated NTHREADS times self.assertEqual(decompressed, INPUT * NTHREADS) def test_decompressor(self): chunk_size = 128 chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)] input_data = b"".join(chunks) compressed = lzma.compress(input_data) lzd = LZMADecompressor() output = [] def worker(): data = lzd.decompress(compressed, chunk_size) self.assertEqual(len(data), chunk_size) output.append(data) run_concurrently(worker_func=worker, nthreads=NTHREADS) self.assertEqual(len(output), NTHREADS) # Verify the expected chunks (order doesn't matter due to append race) self.assertSetEqual(set(output), set(chunks)) if __name__ == "__main__": unittest.main()