gh-141863: use bytearray.take_bytes in asyncio streams for better performance (#141864)

This commit is contained in:
Cody Maloney 2025-11-24 07:36:53 -08:00 committed by GitHub
parent fb655527d8
commit b484c32d0a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 12 additions and 22 deletions

View file

@ -667,8 +667,7 @@ async def readuntil(self, separator=b'\n'):
# adds data which makes separator be found. That's why we check for # adds data which makes separator be found. That's why we check for
# EOF *after* inspecting the buffer. # EOF *after* inspecting the buffer.
if self._eof: if self._eof:
chunk = bytes(self._buffer) chunk = self._buffer.take_bytes()
self._buffer.clear()
raise exceptions.IncompleteReadError(chunk, None) raise exceptions.IncompleteReadError(chunk, None)
# _wait_for_data() will resume reading if stream was paused. # _wait_for_data() will resume reading if stream was paused.
@ -678,10 +677,9 @@ async def readuntil(self, separator=b'\n'):
raise exceptions.LimitOverrunError( raise exceptions.LimitOverrunError(
'Separator is found, but chunk is longer than limit', match_start) 'Separator is found, but chunk is longer than limit', match_start)
chunk = self._buffer[:match_end] chunk = self._buffer.take_bytes(match_end)
del self._buffer[:match_end]
self._maybe_resume_transport() self._maybe_resume_transport()
return bytes(chunk) return chunk
async def read(self, n=-1): async def read(self, n=-1):
"""Read up to `n` bytes from the stream. """Read up to `n` bytes from the stream.
@ -716,20 +714,16 @@ async def read(self, n=-1):
# collect everything in self._buffer, but that would # collect everything in self._buffer, but that would
# deadlock if the subprocess sends more than self.limit # deadlock if the subprocess sends more than self.limit
# bytes. So just call self.read(self._limit) until EOF. # bytes. So just call self.read(self._limit) until EOF.
blocks = [] joined = bytearray()
while True: while block := await self.read(self._limit):
block = await self.read(self._limit) joined += block
if not block: return joined.take_bytes()
break
blocks.append(block)
return b''.join(blocks)
if not self._buffer and not self._eof: if not self._buffer and not self._eof:
await self._wait_for_data('read') await self._wait_for_data('read')
# This will work right even if buffer is less than n bytes # This will work right even if buffer is less than n bytes
data = bytes(memoryview(self._buffer)[:n]) data = self._buffer.take_bytes(min(len(self._buffer), n))
del self._buffer[:n]
self._maybe_resume_transport() self._maybe_resume_transport()
return data return data
@ -760,18 +754,12 @@ async def readexactly(self, n):
while len(self._buffer) < n: while len(self._buffer) < n:
if self._eof: if self._eof:
incomplete = bytes(self._buffer) incomplete = self._buffer.take_bytes()
self._buffer.clear()
raise exceptions.IncompleteReadError(incomplete, n) raise exceptions.IncompleteReadError(incomplete, n)
await self._wait_for_data('readexactly') await self._wait_for_data('readexactly')
if len(self._buffer) == n: data = self._buffer.take_bytes(n)
data = bytes(self._buffer)
self._buffer.clear()
else:
data = bytes(memoryview(self._buffer)[:n])
del self._buffer[:n]
self._maybe_resume_transport() self._maybe_resume_transport()
return data return data

View file

@ -0,0 +1,2 @@
Update :ref:`asyncio-streams` to use :func:`bytearray.take_bytes` for a over
10% performance improvement on pyperformance asyncio_tcp benchmark.