mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
gh-116738: use PyMutex in lzma module (#140711)
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
This commit is contained in:
parent
2befce86e6
commit
c13b59204a
2 changed files with 69 additions and 33 deletions
56
Lib/test/test_free_threading/test_lzma.py
Normal file
56
Lib/test/test_free_threading/test_lzma.py
Normal file
|
|
@ -0,0 +1,56 @@
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from test.support import import_helper, threading_helper
|
||||||
|
from test.support.threading_helper import run_concurrently
|
||||||
|
|
||||||
|
lzma = import_helper.import_module("lzma")
|
||||||
|
from lzma import LZMACompressor, LZMADecompressor
|
||||||
|
|
||||||
|
from test.test_lzma import INPUT
|
||||||
|
|
||||||
|
|
||||||
|
NTHREADS = 10
|
||||||
|
|
||||||
|
|
||||||
|
@threading_helper.requires_working_threading()
|
||||||
|
class TestLZMA(unittest.TestCase):
|
||||||
|
def test_compressor(self):
|
||||||
|
lzc = LZMACompressor()
|
||||||
|
|
||||||
|
# First compress() outputs LZMA header
|
||||||
|
header = lzc.compress(INPUT)
|
||||||
|
self.assertGreater(len(header), 0)
|
||||||
|
|
||||||
|
def worker():
|
||||||
|
# it should return empty bytes as it buffers data internally
|
||||||
|
data = lzc.compress(INPUT)
|
||||||
|
self.assertEqual(data, b"")
|
||||||
|
|
||||||
|
run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
|
||||||
|
full_compressed = header + lzc.flush()
|
||||||
|
decompressed = lzma.decompress(full_compressed)
|
||||||
|
# The decompressed data should be INPUT repeated NTHREADS times
|
||||||
|
self.assertEqual(decompressed, INPUT * NTHREADS)
|
||||||
|
|
||||||
|
def test_decompressor(self):
|
||||||
|
chunk_size = 128
|
||||||
|
chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
|
||||||
|
input_data = b"".join(chunks)
|
||||||
|
compressed = lzma.compress(input_data)
|
||||||
|
|
||||||
|
lzd = LZMADecompressor()
|
||||||
|
output = []
|
||||||
|
|
||||||
|
def worker():
|
||||||
|
data = lzd.decompress(compressed, chunk_size)
|
||||||
|
self.assertEqual(len(data), chunk_size)
|
||||||
|
output.append(data)
|
||||||
|
|
||||||
|
run_concurrently(worker_func=worker, nthreads=NTHREADS)
|
||||||
|
self.assertEqual(len(output), NTHREADS)
|
||||||
|
# Verify the expected chunks (order doesn't matter due to append race)
|
||||||
|
self.assertSetEqual(set(output), set(chunks))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
|
|
@ -72,13 +72,6 @@ OutputBuffer_OnError(_BlocksOutputBuffer *buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#define ACQUIRE_LOCK(obj) do { \
|
|
||||||
if (!PyThread_acquire_lock((obj)->lock, 0)) { \
|
|
||||||
Py_BEGIN_ALLOW_THREADS \
|
|
||||||
PyThread_acquire_lock((obj)->lock, 1); \
|
|
||||||
Py_END_ALLOW_THREADS \
|
|
||||||
} } while (0)
|
|
||||||
#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock)
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
PyTypeObject *lzma_compressor_type;
|
PyTypeObject *lzma_compressor_type;
|
||||||
|
|
@ -111,7 +104,7 @@ typedef struct {
|
||||||
lzma_allocator alloc;
|
lzma_allocator alloc;
|
||||||
lzma_stream lzs;
|
lzma_stream lzs;
|
||||||
int flushed;
|
int flushed;
|
||||||
PyThread_type_lock lock;
|
PyMutex mutex;
|
||||||
} Compressor;
|
} Compressor;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
@ -124,7 +117,7 @@ typedef struct {
|
||||||
char needs_input;
|
char needs_input;
|
||||||
uint8_t *input_buffer;
|
uint8_t *input_buffer;
|
||||||
size_t input_buffer_size;
|
size_t input_buffer_size;
|
||||||
PyThread_type_lock lock;
|
PyMutex mutex;
|
||||||
} Decompressor;
|
} Decompressor;
|
||||||
|
|
||||||
#define Compressor_CAST(op) ((Compressor *)(op))
|
#define Compressor_CAST(op) ((Compressor *)(op))
|
||||||
|
|
@ -617,14 +610,14 @@ _lzma_LZMACompressor_compress_impl(Compressor *self, Py_buffer *data)
|
||||||
{
|
{
|
||||||
PyObject *result = NULL;
|
PyObject *result = NULL;
|
||||||
|
|
||||||
ACQUIRE_LOCK(self);
|
PyMutex_Lock(&self->mutex);
|
||||||
if (self->flushed) {
|
if (self->flushed) {
|
||||||
PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
|
PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
result = compress(self, data->buf, data->len, LZMA_RUN);
|
result = compress(self, data->buf, data->len, LZMA_RUN);
|
||||||
}
|
}
|
||||||
RELEASE_LOCK(self);
|
PyMutex_Unlock(&self->mutex);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -644,14 +637,14 @@ _lzma_LZMACompressor_flush_impl(Compressor *self)
|
||||||
{
|
{
|
||||||
PyObject *result = NULL;
|
PyObject *result = NULL;
|
||||||
|
|
||||||
ACQUIRE_LOCK(self);
|
PyMutex_Lock(&self->mutex);
|
||||||
if (self->flushed) {
|
if (self->flushed) {
|
||||||
PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
|
PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
|
||||||
} else {
|
} else {
|
||||||
self->flushed = 1;
|
self->flushed = 1;
|
||||||
result = compress(self, NULL, 0, LZMA_FINISH);
|
result = compress(self, NULL, 0, LZMA_FINISH);
|
||||||
}
|
}
|
||||||
RELEASE_LOCK(self);
|
PyMutex_Unlock(&self->mutex);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -820,12 +813,7 @@ Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
|
||||||
self->alloc.free = PyLzma_Free;
|
self->alloc.free = PyLzma_Free;
|
||||||
self->lzs.allocator = &self->alloc;
|
self->lzs.allocator = &self->alloc;
|
||||||
|
|
||||||
self->lock = PyThread_allocate_lock();
|
self->mutex = (PyMutex){0};
|
||||||
if (self->lock == NULL) {
|
|
||||||
Py_DECREF(self);
|
|
||||||
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
self->flushed = 0;
|
self->flushed = 0;
|
||||||
switch (format) {
|
switch (format) {
|
||||||
|
|
@ -867,10 +855,8 @@ static void
|
||||||
Compressor_dealloc(PyObject *op)
|
Compressor_dealloc(PyObject *op)
|
||||||
{
|
{
|
||||||
Compressor *self = Compressor_CAST(op);
|
Compressor *self = Compressor_CAST(op);
|
||||||
|
assert(!PyMutex_IsLocked(&self->mutex));
|
||||||
lzma_end(&self->lzs);
|
lzma_end(&self->lzs);
|
||||||
if (self->lock != NULL) {
|
|
||||||
PyThread_free_lock(self->lock);
|
|
||||||
}
|
|
||||||
PyTypeObject *tp = Py_TYPE(self);
|
PyTypeObject *tp = Py_TYPE(self);
|
||||||
tp->tp_free(self);
|
tp->tp_free(self);
|
||||||
Py_DECREF(tp);
|
Py_DECREF(tp);
|
||||||
|
|
@ -1146,12 +1132,12 @@ _lzma_LZMADecompressor_decompress_impl(Decompressor *self, Py_buffer *data,
|
||||||
{
|
{
|
||||||
PyObject *result = NULL;
|
PyObject *result = NULL;
|
||||||
|
|
||||||
ACQUIRE_LOCK(self);
|
PyMutex_Lock(&self->mutex);
|
||||||
if (self->eof)
|
if (self->eof)
|
||||||
PyErr_SetString(PyExc_EOFError, "Already at end of stream");
|
PyErr_SetString(PyExc_EOFError, "Already at end of stream");
|
||||||
else
|
else
|
||||||
result = decompress(self, data->buf, data->len, max_length);
|
result = decompress(self, data->buf, data->len, max_length);
|
||||||
RELEASE_LOCK(self);
|
PyMutex_Unlock(&self->mutex);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1244,12 +1230,7 @@ _lzma_LZMADecompressor_impl(PyTypeObject *type, int format,
|
||||||
self->lzs.allocator = &self->alloc;
|
self->lzs.allocator = &self->alloc;
|
||||||
self->lzs.next_in = NULL;
|
self->lzs.next_in = NULL;
|
||||||
|
|
||||||
self->lock = PyThread_allocate_lock();
|
self->mutex = (PyMutex){0};
|
||||||
if (self->lock == NULL) {
|
|
||||||
Py_DECREF(self);
|
|
||||||
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
self->check = LZMA_CHECK_UNKNOWN;
|
self->check = LZMA_CHECK_UNKNOWN;
|
||||||
self->needs_input = 1;
|
self->needs_input = 1;
|
||||||
|
|
@ -1304,14 +1285,13 @@ static void
|
||||||
Decompressor_dealloc(PyObject *op)
|
Decompressor_dealloc(PyObject *op)
|
||||||
{
|
{
|
||||||
Decompressor *self = Decompressor_CAST(op);
|
Decompressor *self = Decompressor_CAST(op);
|
||||||
|
assert(!PyMutex_IsLocked(&self->mutex));
|
||||||
|
|
||||||
if(self->input_buffer != NULL)
|
if(self->input_buffer != NULL)
|
||||||
PyMem_Free(self->input_buffer);
|
PyMem_Free(self->input_buffer);
|
||||||
|
|
||||||
lzma_end(&self->lzs);
|
lzma_end(&self->lzs);
|
||||||
Py_CLEAR(self->unused_data);
|
Py_CLEAR(self->unused_data);
|
||||||
if (self->lock != NULL) {
|
|
||||||
PyThread_free_lock(self->lock);
|
|
||||||
}
|
|
||||||
PyTypeObject *tp = Py_TYPE(self);
|
PyTypeObject *tp = Py_TYPE(self);
|
||||||
tp->tp_free(self);
|
tp->tp_free(self);
|
||||||
Py_DECREF(tp);
|
Py_DECREF(tp);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue