gh-116738: Use PyMutex for bz2 module (gh-140555)

The methods are already wrapped with a lock, which makes them thread-safe in
free-threaded build. This replaces `PyThread_acquire_lock` with `PyMutex` and
removes some macros and allocation handling code.

Also add a test for free-threading to ensure we aren't getting data races and
that the locking is working.
This commit is contained in:
Alper 2025-10-27 06:52:30 -07:00 committed by GitHub
parent e8b5cb8f33
commit 9479a62a51
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 65 additions and 37 deletions

View file

@ -0,0 +1,53 @@
import unittest
from test.support import import_helper, threading_helper
from test.support.threading_helper import run_concurrently
bz2 = import_helper.import_module("bz2")
from bz2 import BZ2Compressor, BZ2Decompressor
from test.test_bz2 import ext_decompress, BaseTest
NTHREADS = 10
TEXT = BaseTest.TEXT
@threading_helper.requires_working_threading()
class TestBZ2(unittest.TestCase):
def test_compressor(self):
bz2c = BZ2Compressor()
def worker():
# it should return empty bytes as it buffers data internally
data = bz2c.compress(TEXT)
self.assertEqual(data, b"")
run_concurrently(worker_func=worker, nthreads=NTHREADS)
data = bz2c.flush()
# The decompressed data should be TEXT repeated NTHREADS times
decompressed = ext_decompress(data)
self.assertEqual(decompressed, TEXT * NTHREADS)
def test_decompressor(self):
chunk_size = 128
chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
input_data = b"".join(chunks)
compressed = bz2.compress(input_data)
bz2d = BZ2Decompressor()
output = []
def worker():
data = bz2d.decompress(compressed, chunk_size)
self.assertEqual(len(data), chunk_size)
output.append(data)
run_concurrently(worker_func=worker, nthreads=NTHREADS)
self.assertEqual(len(output), NTHREADS)
# Verify the expected chunks (order doesn't matter due to append race)
self.assertEqual(set(output), set(chunks))
if __name__ == "__main__":
unittest.main()

View file

@ -97,20 +97,11 @@ OutputBuffer_OnError(_BlocksOutputBuffer *buffer)
#endif /* ! BZ_CONFIG_ERROR */
#define ACQUIRE_LOCK(obj) do { \
if (!PyThread_acquire_lock((obj)->lock, 0)) { \
Py_BEGIN_ALLOW_THREADS \
PyThread_acquire_lock((obj)->lock, 1); \
Py_END_ALLOW_THREADS \
} } while (0)
#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock)
typedef struct {
PyObject_HEAD
bz_stream bzs;
int flushed;
PyThread_type_lock lock;
PyMutex mutex;
} BZ2Compressor;
typedef struct {
@ -126,7 +117,7 @@ typedef struct {
separately. Conversion and looping is encapsulated in
decompress_buf() */
size_t bzs_avail_in_real;
PyThread_type_lock lock;
PyMutex mutex;
} BZ2Decompressor;
#define _BZ2Compressor_CAST(op) ((BZ2Compressor *)(op))
@ -271,12 +262,12 @@ _bz2_BZ2Compressor_compress_impl(BZ2Compressor *self, Py_buffer *data)
{
PyObject *result = NULL;
ACQUIRE_LOCK(self);
PyMutex_Lock(&self->mutex);
if (self->flushed)
PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
else
result = compress(self, data->buf, data->len, BZ_RUN);
RELEASE_LOCK(self);
PyMutex_Unlock(&self->mutex);
return result;
}
@ -296,14 +287,14 @@ _bz2_BZ2Compressor_flush_impl(BZ2Compressor *self)
{
PyObject *result = NULL;
ACQUIRE_LOCK(self);
PyMutex_Lock(&self->mutex);
if (self->flushed)
PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
else {
self->flushed = 1;
result = compress(self, NULL, 0, BZ_FINISH);
}
RELEASE_LOCK(self);
PyMutex_Unlock(&self->mutex);
return result;
}
@ -357,13 +348,7 @@ _bz2_BZ2Compressor_impl(PyTypeObject *type, int compresslevel)
return NULL;
}
self->lock = PyThread_allocate_lock();
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
return NULL;
}
self->mutex = (PyMutex){0};
self->bzs.opaque = NULL;
self->bzs.bzalloc = BZ2_Malloc;
self->bzs.bzfree = BZ2_Free;
@ -382,10 +367,8 @@ static void
BZ2Compressor_dealloc(PyObject *op)
{
BZ2Compressor *self = _BZ2Compressor_CAST(op);
assert(!PyMutex_IsLocked(&self->mutex));
BZ2_bzCompressEnd(&self->bzs);
if (self->lock != NULL) {
PyThread_free_lock(self->lock);
}
PyTypeObject *tp = Py_TYPE(self);
tp->tp_free((PyObject *)self);
Py_DECREF(tp);
@ -619,12 +602,12 @@ _bz2_BZ2Decompressor_decompress_impl(BZ2Decompressor *self, Py_buffer *data,
{
PyObject *result = NULL;
ACQUIRE_LOCK(self);
PyMutex_Lock(&self->mutex);
if (self->eof)
PyErr_SetString(PyExc_EOFError, "End of stream already reached");
else
result = decompress(self, data->buf, data->len, max_length);
RELEASE_LOCK(self);
PyMutex_Unlock(&self->mutex);
return result;
}
@ -650,13 +633,7 @@ _bz2_BZ2Decompressor_impl(PyTypeObject *type)
return NULL;
}
self->lock = PyThread_allocate_lock();
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
return NULL;
}
self->mutex = (PyMutex){0};
self->needs_input = 1;
self->bzs_avail_in_real = 0;
self->input_buffer = NULL;
@ -678,15 +655,13 @@ static void
BZ2Decompressor_dealloc(PyObject *op)
{
BZ2Decompressor *self = _BZ2Decompressor_CAST(op);
assert(!PyMutex_IsLocked(&self->mutex));
if(self->input_buffer != NULL) {
PyMem_Free(self->input_buffer);
}
BZ2_bzDecompressEnd(&self->bzs);
Py_CLEAR(self->unused_data);
if (self->lock != NULL) {
PyThread_free_lock(self->lock);
}
PyTypeObject *tp = Py_TYPE(self);
tp->tp_free((PyObject *)self);