mirror of
https://github.com/python/cpython.git
synced 2025-12-31 04:23:37 +00:00
gh-135444: fix asyncio.DatagramTransport.sendto to account for datagram header size when data cannot be sent (#135445)
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
This commit is contained in:
parent
9d3b53c47f
commit
e3ea861351
5 changed files with 53 additions and 5 deletions
|
|
@ -460,6 +460,8 @@ def _pipe_closed(self, fut):
|
|||
class _ProactorDatagramTransport(_ProactorBasePipeTransport,
|
||||
transports.DatagramTransport):
|
||||
max_size = 256 * 1024
|
||||
_header_size = 8
|
||||
|
||||
def __init__(self, loop, sock, protocol, address=None,
|
||||
waiter=None, extra=None):
|
||||
self._address = address
|
||||
|
|
@ -499,7 +501,7 @@ def sendto(self, data, addr=None):
|
|||
|
||||
# Ensure that what we buffer is immutable.
|
||||
self._buffer.append((bytes(data), addr))
|
||||
self._buffer_size += len(data) + 8 # include header bytes
|
||||
self._buffer_size += len(data) + self._header_size
|
||||
|
||||
if self._write_fut is None:
|
||||
# No current write operations are active, kick one off
|
||||
|
|
@ -526,7 +528,7 @@ def _loop_writing(self, fut=None):
|
|||
return
|
||||
|
||||
data, addr = self._buffer.popleft()
|
||||
self._buffer_size -= len(data)
|
||||
self._buffer_size -= len(data) + self._header_size
|
||||
if self._address is not None:
|
||||
self._write_fut = self._loop._proactor.send(self._sock,
|
||||
data)
|
||||
|
|
|
|||
|
|
@ -1212,6 +1212,7 @@ def close(self):
|
|||
class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
|
||||
|
||||
_buffer_factory = collections.deque
|
||||
_header_size = 8
|
||||
|
||||
def __init__(self, loop, sock, protocol, address=None,
|
||||
waiter=None, extra=None):
|
||||
|
|
@ -1285,13 +1286,13 @@ def sendto(self, data, addr=None):
|
|||
|
||||
# Ensure that what we buffer is immutable.
|
||||
self._buffer.append((bytes(data), addr))
|
||||
self._buffer_size += len(data) + 8 # include header bytes
|
||||
self._buffer_size += len(data) + self._header_size
|
||||
self._maybe_pause_protocol()
|
||||
|
||||
def _sendto_ready(self):
|
||||
while self._buffer:
|
||||
data, addr = self._buffer.popleft()
|
||||
self._buffer_size -= len(data)
|
||||
self._buffer_size -= len(data) + self._header_size
|
||||
try:
|
||||
if self._extra['peername']:
|
||||
self._sock.send(data)
|
||||
|
|
@ -1299,7 +1300,7 @@ def _sendto_ready(self):
|
|||
self._sock.sendto(data, addr)
|
||||
except (BlockingIOError, InterruptedError):
|
||||
self._buffer.appendleft((data, addr)) # Try again later.
|
||||
self._buffer_size += len(data)
|
||||
self._buffer_size += len(data) + self._header_size
|
||||
break
|
||||
except OSError as exc:
|
||||
self._protocol.error_received(exc)
|
||||
|
|
|
|||
|
|
@ -566,6 +566,8 @@ def test_sendto(self):
|
|||
self.assertTrue(self.proactor.sendto.called)
|
||||
self.proactor.sendto.assert_called_with(
|
||||
self.sock, data, addr=('0.0.0.0', 1234))
|
||||
self.assertFalse(transport._buffer)
|
||||
self.assertEqual(0, transport._buffer_size)
|
||||
|
||||
def test_sendto_bytearray(self):
|
||||
data = bytearray(b'data')
|
||||
|
|
|
|||
|
|
@ -1497,6 +1497,47 @@ def test_sendto_closing(self):
|
|||
transport.sendto(b'data', (1,))
|
||||
self.assertEqual(transport._conn_lost, 2)
|
||||
|
||||
def test_sendto_sendto_ready(self):
|
||||
data = b'data'
|
||||
|
||||
# First queue up the buffer by having the socket blocked
|
||||
self.sock.sendto.side_effect = BlockingIOError
|
||||
transport = self.datagram_transport()
|
||||
transport.sendto(data, ('0.0.0.0', 12345))
|
||||
self.loop.assert_writer(7, transport._sendto_ready)
|
||||
self.assertEqual(1, len(transport._buffer))
|
||||
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)
|
||||
|
||||
# Now let the socket send the buffer
|
||||
self.sock.sendto.side_effect = None
|
||||
transport._sendto_ready()
|
||||
self.assertTrue(self.sock.sendto.called)
|
||||
self.assertEqual(
|
||||
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
|
||||
self.assertFalse(self.loop.writers)
|
||||
self.assertFalse(transport._buffer)
|
||||
self.assertEqual(transport._buffer_size, 0)
|
||||
|
||||
def test_sendto_sendto_ready_blocked(self):
|
||||
data = b'data'
|
||||
|
||||
# First queue up the buffer by having the socket blocked
|
||||
self.sock.sendto.side_effect = BlockingIOError
|
||||
transport = self.datagram_transport()
|
||||
transport.sendto(data, ('0.0.0.0', 12345))
|
||||
self.loop.assert_writer(7, transport._sendto_ready)
|
||||
self.assertEqual(1, len(transport._buffer))
|
||||
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)
|
||||
|
||||
# Now try to send the buffer, it will be added to buffer again if it fails
|
||||
transport._sendto_ready()
|
||||
self.assertTrue(self.sock.sendto.called)
|
||||
self.assertEqual(
|
||||
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
|
||||
self.assertTrue(self.loop.writers)
|
||||
self.assertEqual(1, len(transport._buffer))
|
||||
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)
|
||||
|
||||
def test_sendto_ready(self):
|
||||
data = b'data'
|
||||
self.sock.sendto.return_value = len(data)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,2 @@
|
|||
Fix :meth:`asyncio.DatagramTransport.sendto` to account for datagram header size when
|
||||
data cannot be sent.
|
||||
Loading…
Add table
Add a link
Reference in a new issue