mirror of
https://github.com/python/cpython.git
synced 2026-04-14 07:41:00 +00:00
172 lines
5.3 KiB
Python
172 lines
5.3 KiB
Python
import codecs
|
|
import io
|
|
import _pyio as pyio
|
|
import threading
|
|
from unittest import TestCase
|
|
from test.support import threading_helper
|
|
from test.support.threading_helper import run_concurrently
|
|
from random import randint
|
|
from sys import getsizeof
|
|
|
|
threading_helper.requires_working_threading(module=True)
|
|
|
|
|
|
class ThreadSafetyMixin:
|
|
# Test pretty much everything that can break under free-threading.
|
|
# Non-deterministic, but at least one of these things will fail if
|
|
# BytesIO object is not free-thread safe.
|
|
|
|
def check(self, funcs, *args):
|
|
barrier = threading.Barrier(len(funcs))
|
|
threads = []
|
|
|
|
for func in funcs:
|
|
thread = threading.Thread(target=func, args=(barrier, *args))
|
|
|
|
threads.append(thread)
|
|
|
|
with threading_helper.start_threads(threads):
|
|
pass
|
|
|
|
@threading_helper.requires_working_threading()
|
|
@threading_helper.reap_threads
|
|
def test_free_threading(self):
|
|
"""Test for segfaults and aborts."""
|
|
|
|
def write(barrier, b, *ignore):
|
|
barrier.wait()
|
|
try: b.write(b'0' * randint(100, 1000))
|
|
except ValueError: pass # ignore write fail to closed file
|
|
|
|
def writelines(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.write(b'0\n' * randint(100, 1000))
|
|
|
|
def truncate(barrier, b, *ignore):
|
|
barrier.wait()
|
|
try: b.truncate(0)
|
|
except BufferError: pass # ignore exported buffer
|
|
|
|
def read(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.read()
|
|
|
|
def read1(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.read1()
|
|
|
|
def readline(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.readline()
|
|
|
|
def readlines(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.readlines()
|
|
|
|
def readinto(barrier, b, into, *ignore):
|
|
barrier.wait()
|
|
b.readinto(into)
|
|
|
|
def close(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.close()
|
|
|
|
def getvalue(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.getvalue()
|
|
|
|
def getbuffer(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.getbuffer()
|
|
|
|
def iter(barrier, b, *ignore):
|
|
barrier.wait()
|
|
list(b)
|
|
|
|
def getstate(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.__getstate__()
|
|
|
|
def setstate(barrier, b, st, *ignore):
|
|
barrier.wait()
|
|
b.__setstate__(st)
|
|
|
|
def sizeof(barrier, b, *ignore):
|
|
barrier.wait()
|
|
getsizeof(b)
|
|
|
|
self.check([write] * 10, self.ioclass())
|
|
self.check([writelines] * 10, self.ioclass())
|
|
self.check([write] * 10 + [truncate] * 10, self.ioclass())
|
|
self.check([truncate] + [read] * 10, self.ioclass(b'0\n'*204800))
|
|
self.check([truncate] + [read1] * 10, self.ioclass(b'0\n'*204800))
|
|
self.check([truncate] + [readline] * 10, self.ioclass(b'0\n'*20480))
|
|
self.check([truncate] + [readlines] * 10, self.ioclass(b'0\n'*20480))
|
|
self.check([truncate] + [readinto] * 10, self.ioclass(b'0\n'*204800), bytearray(b'0\n'*204800))
|
|
self.check([close] + [write] * 10, self.ioclass())
|
|
self.check([truncate] + [getvalue] * 10, self.ioclass(b'0\n'*204800))
|
|
self.check([truncate] + [getbuffer] * 10, self.ioclass(b'0\n'*204800))
|
|
self.check([truncate] + [iter] * 10, self.ioclass(b'0\n'*20480))
|
|
self.check([truncate] + [getstate] * 10, self.ioclass(b'0\n'*204800))
|
|
state = self.ioclass(b'123').__getstate__()
|
|
self.check([truncate] + [setstate] * 10, self.ioclass(b'0\n'*204800), state)
|
|
self.check([truncate] + [sizeof] * 10, self.ioclass(b'0\n'*204800))
|
|
|
|
# no tests for seek or tell because they don't break anything
|
|
|
|
class CBytesIOTest(ThreadSafetyMixin, TestCase):
|
|
ioclass = io.BytesIO
|
|
|
|
class PyBytesIOTest(ThreadSafetyMixin, TestCase):
|
|
ioclass = pyio.BytesIO
|
|
|
|
|
|
class IncrementalNewlineDecoderTest(TestCase):
|
|
def make_decoder(self):
|
|
utf8_decoder = codecs.getincrementaldecoder('utf-8')()
|
|
return io.IncrementalNewlineDecoder(utf8_decoder, translate=True)
|
|
|
|
def test_concurrent_reset(self):
|
|
decoder = self.make_decoder()
|
|
|
|
def worker():
|
|
for _ in range(100):
|
|
decoder.reset()
|
|
|
|
run_concurrently(worker_func=worker, nthreads=2)
|
|
|
|
def test_concurrent_decode(self):
|
|
decoder = self.make_decoder()
|
|
|
|
def worker():
|
|
for _ in range(100):
|
|
decoder.decode(b"line\r\n", final=False)
|
|
|
|
run_concurrently(worker_func=worker, nthreads=2)
|
|
|
|
def test_concurrent_getstate_setstate(self):
|
|
decoder = self.make_decoder()
|
|
state = decoder.getstate()
|
|
|
|
def getstate_worker():
|
|
for _ in range(100):
|
|
decoder.getstate()
|
|
|
|
def setstate_worker():
|
|
for _ in range(100):
|
|
decoder.setstate(state)
|
|
|
|
run_concurrently([getstate_worker] * 2 + [setstate_worker] * 2)
|
|
|
|
def test_concurrent_decode_and_reset(self):
|
|
decoder = self.make_decoder()
|
|
|
|
def decode_worker():
|
|
for _ in range(100):
|
|
decoder.decode(b"line\r\n", final=False)
|
|
|
|
def reset_worker():
|
|
for _ in range(100):
|
|
decoder.reset()
|
|
|
|
run_concurrently([decode_worker] * 2 + [reset_worker] * 2)
|