mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	Issue #23972: updates to asyncio datagram API. By Chris Laws.
This commit is contained in:
		
							parent
							
								
									d17e9785de
								
							
						
					
					
						commit
						b9bf913ab3
					
				
					 7 changed files with 385 additions and 74 deletions
				
			
		| 
						 | 
					@ -283,17 +283,50 @@ Creating connections
 | 
				
			||||||
      (:class:`StreamReader`, :class:`StreamWriter`) instead of a protocol.
 | 
					      (:class:`StreamReader`, :class:`StreamWriter`) instead of a protocol.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
.. coroutinemethod:: BaseEventLoop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, \*, family=0, proto=0, flags=0)
 | 
					.. coroutinemethod:: BaseEventLoop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, \*, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   Create datagram connection: socket family :py:data:`~socket.AF_INET` or
 | 
					   Create datagram connection: socket family :py:data:`~socket.AF_INET` or
 | 
				
			||||||
   :py:data:`~socket.AF_INET6` depending on *host* (or *family* if specified),
 | 
					   :py:data:`~socket.AF_INET6` depending on *host* (or *family* if specified),
 | 
				
			||||||
   socket type :py:data:`~socket.SOCK_DGRAM`.
 | 
					   socket type :py:data:`~socket.SOCK_DGRAM`. *protocol_factory* must be a
 | 
				
			||||||
 | 
					   callable returning a :ref:`protocol <asyncio-protocol>` instance.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   This method is a :ref:`coroutine <coroutine>` which will try to
 | 
					   This method is a :ref:`coroutine <coroutine>` which will try to
 | 
				
			||||||
   establish the connection in the background.  When successful, the
 | 
					   establish the connection in the background.  When successful, the
 | 
				
			||||||
   coroutine returns a ``(transport, protocol)`` pair.
 | 
					   coroutine returns a ``(transport, protocol)`` pair.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   See the :meth:`BaseEventLoop.create_connection` method for parameters.
 | 
					   Options changing how the connection is created:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   * *local_addr*, if given, is a ``(local_host, local_port)`` tuple used
 | 
				
			||||||
 | 
					     to bind the socket to locally.  The *local_host* and *local_port*
 | 
				
			||||||
 | 
					     are looked up using :meth:`getaddrinfo`.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   * *remote_addr*, if given, is a ``(remote_host, remote_port)`` tuple used
 | 
				
			||||||
 | 
					     to connect the socket to a remote address.  The *remote_host* and
 | 
				
			||||||
 | 
					     *remote_port* are looked up using :meth:`getaddrinfo`.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   * *family*, *proto*, *flags* are the optional address family, protocol
 | 
				
			||||||
 | 
					     and flags to be passed through to :meth:`getaddrinfo` for *host*
 | 
				
			||||||
 | 
					     resolution. If given, these should all be integers from the
 | 
				
			||||||
 | 
					     corresponding :mod:`socket` module constants.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   * *reuse_address* tells the kernel to reuse a local socket in
 | 
				
			||||||
 | 
					     TIME_WAIT state, without waiting for its natural timeout to
 | 
				
			||||||
 | 
					     expire. If not specified will automatically be set to True on
 | 
				
			||||||
 | 
					     UNIX.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   * *reuse_port* tells the kernel to allow this endpoint to be bound to the
 | 
				
			||||||
 | 
					     same port as other existing endpoints are bound to, so long as they all
 | 
				
			||||||
 | 
					     set this flag when being created. This option is not supported on Windows
 | 
				
			||||||
 | 
					     and some UNIX's. If the :py:data:`~socket.SO_REUSEPORT` constant is not
 | 
				
			||||||
 | 
					     defined then this capability is unsupported.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   * *allow_broadcast* tells the kernel to allow this endpoint to send
 | 
				
			||||||
 | 
					     messages to the broadcast address.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   * *sock* can optionally be specified in order to use a preexisting,
 | 
				
			||||||
 | 
					     already connected, :class:`socket.socket` object to be used by the
 | 
				
			||||||
 | 
					     transport. If specified, *local_addr* and *remote_addr* should be omitted
 | 
				
			||||||
 | 
					     (must be :const:`None`).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   On Windows with :class:`ProactorEventLoop`, this method is not supported.
 | 
					   On Windows with :class:`ProactorEventLoop`, this method is not supported.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -320,7 +353,7 @@ Creating connections
 | 
				
			||||||
Creating listening connections
 | 
					Creating listening connections
 | 
				
			||||||
------------------------------
 | 
					------------------------------
 | 
				
			||||||
 | 
					
 | 
				
			||||||
.. coroutinemethod:: BaseEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None)
 | 
					.. coroutinemethod:: BaseEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to
 | 
					   Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to
 | 
				
			||||||
   *host* and *port*.
 | 
					   *host* and *port*.
 | 
				
			||||||
| 
						 | 
					@ -359,6 +392,11 @@ Creating listening connections
 | 
				
			||||||
     expire. If not specified will automatically be set to True on
 | 
					     expire. If not specified will automatically be set to True on
 | 
				
			||||||
     UNIX.
 | 
					     UNIX.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   * *reuse_port* tells the kernel to allow this endpoint to be bound to the
 | 
				
			||||||
 | 
					     same port as other existing endpoints are bound to, so long as they all
 | 
				
			||||||
 | 
					     set this flag when being created. This option is not supported on
 | 
				
			||||||
 | 
					     Windows.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   This method is a :ref:`coroutine <coroutine>`.
 | 
					   This method is a :ref:`coroutine <coroutine>`.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   On Windows with :class:`ProactorEventLoop`, SSL/TLS is not supported.
 | 
					   On Windows with :class:`ProactorEventLoop`, SSL/TLS is not supported.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -700,8 +700,27 @@ def _create_connection_transport(self, sock, protocol_factory, ssl,
 | 
				
			||||||
    @coroutine
 | 
					    @coroutine
 | 
				
			||||||
    def create_datagram_endpoint(self, protocol_factory,
 | 
					    def create_datagram_endpoint(self, protocol_factory,
 | 
				
			||||||
                                 local_addr=None, remote_addr=None, *,
 | 
					                                 local_addr=None, remote_addr=None, *,
 | 
				
			||||||
                                 family=0, proto=0, flags=0):
 | 
					                                 family=0, proto=0, flags=0,
 | 
				
			||||||
 | 
					                                 reuse_address=None, reuse_port=None,
 | 
				
			||||||
 | 
					                                 allow_broadcast=None, sock=None):
 | 
				
			||||||
        """Create datagram connection."""
 | 
					        """Create datagram connection."""
 | 
				
			||||||
 | 
					        if sock is not None:
 | 
				
			||||||
 | 
					            if (local_addr or remote_addr or
 | 
				
			||||||
 | 
					                    family or proto or flags or
 | 
				
			||||||
 | 
					                    reuse_address or reuse_port or allow_broadcast):
 | 
				
			||||||
 | 
					                # show the problematic kwargs in exception msg
 | 
				
			||||||
 | 
					                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
 | 
				
			||||||
 | 
					                            family=family, proto=proto, flags=flags,
 | 
				
			||||||
 | 
					                            reuse_address=reuse_address, reuse_port=reuse_port,
 | 
				
			||||||
 | 
					                            allow_broadcast=allow_broadcast)
 | 
				
			||||||
 | 
					                problems = ', '.join(
 | 
				
			||||||
 | 
					                    '{}={}'.format(k, v) for k, v in opts.items() if v)
 | 
				
			||||||
 | 
					                raise ValueError(
 | 
				
			||||||
 | 
					                    'socket modifier keyword arguments can not be used '
 | 
				
			||||||
 | 
					                    'when sock is specified. ({})'.format(problems))
 | 
				
			||||||
 | 
					            sock.setblocking(False)
 | 
				
			||||||
 | 
					            r_addr = None
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
            if not (local_addr or remote_addr):
 | 
					            if not (local_addr or remote_addr):
 | 
				
			||||||
                if family == 0:
 | 
					                if family == 0:
 | 
				
			||||||
                    raise ValueError('unexpected address family')
 | 
					                    raise ValueError('unexpected address family')
 | 
				
			||||||
| 
						 | 
					@ -737,6 +756,9 @@ def create_datagram_endpoint(self, protocol_factory,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            exceptions = []
 | 
					            exceptions = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if reuse_address is None:
 | 
				
			||||||
 | 
					                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            for ((family, proto),
 | 
					            for ((family, proto),
 | 
				
			||||||
                 (local_address, remote_address)) in addr_pairs_info:
 | 
					                 (local_address, remote_address)) in addr_pairs_info:
 | 
				
			||||||
                sock = None
 | 
					                sock = None
 | 
				
			||||||
| 
						 | 
					@ -744,7 +766,19 @@ def create_datagram_endpoint(self, protocol_factory,
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
                    sock = socket.socket(
 | 
					                    sock = socket.socket(
 | 
				
			||||||
                        family=family, type=socket.SOCK_DGRAM, proto=proto)
 | 
					                        family=family, type=socket.SOCK_DGRAM, proto=proto)
 | 
				
			||||||
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | 
					                    if reuse_address:
 | 
				
			||||||
 | 
					                        sock.setsockopt(
 | 
				
			||||||
 | 
					                            socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | 
				
			||||||
 | 
					                    if reuse_port:
 | 
				
			||||||
 | 
					                        if not hasattr(socket, 'SO_REUSEPORT'):
 | 
				
			||||||
 | 
					                            raise ValueError(
 | 
				
			||||||
 | 
					                                'reuse_port not supported by socket module')
 | 
				
			||||||
 | 
					                        else:
 | 
				
			||||||
 | 
					                            sock.setsockopt(
 | 
				
			||||||
 | 
					                                socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
 | 
				
			||||||
 | 
					                    if allow_broadcast:
 | 
				
			||||||
 | 
					                        sock.setsockopt(
 | 
				
			||||||
 | 
					                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
 | 
				
			||||||
                    sock.setblocking(False)
 | 
					                    sock.setblocking(False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    if local_addr:
 | 
					                    if local_addr:
 | 
				
			||||||
| 
						 | 
					@ -767,8 +801,8 @@ def create_datagram_endpoint(self, protocol_factory,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        protocol = protocol_factory()
 | 
					        protocol = protocol_factory()
 | 
				
			||||||
        waiter = futures.Future(loop=self)
 | 
					        waiter = futures.Future(loop=self)
 | 
				
			||||||
        transport = self._make_datagram_transport(sock, protocol, r_addr,
 | 
					        transport = self._make_datagram_transport(
 | 
				
			||||||
                                                  waiter)
 | 
					            sock, protocol, r_addr, waiter)
 | 
				
			||||||
        if self._debug:
 | 
					        if self._debug:
 | 
				
			||||||
            if local_addr:
 | 
					            if local_addr:
 | 
				
			||||||
                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
 | 
					                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
 | 
				
			||||||
| 
						 | 
					@ -804,7 +838,8 @@ def create_server(self, protocol_factory, host=None, port=None,
 | 
				
			||||||
                      sock=None,
 | 
					                      sock=None,
 | 
				
			||||||
                      backlog=100,
 | 
					                      backlog=100,
 | 
				
			||||||
                      ssl=None,
 | 
					                      ssl=None,
 | 
				
			||||||
                      reuse_address=None):
 | 
					                      reuse_address=None,
 | 
				
			||||||
 | 
					                      reuse_port=None):
 | 
				
			||||||
        """Create a TCP server.
 | 
					        """Create a TCP server.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        The host parameter can be a string, in that case the TCP server is bound
 | 
					        The host parameter can be a string, in that case the TCP server is bound
 | 
				
			||||||
| 
						 | 
					@ -857,8 +892,15 @@ def create_server(self, protocol_factory, host=None, port=None,
 | 
				
			||||||
                        continue
 | 
					                        continue
 | 
				
			||||||
                    sockets.append(sock)
 | 
					                    sockets.append(sock)
 | 
				
			||||||
                    if reuse_address:
 | 
					                    if reuse_address:
 | 
				
			||||||
                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
 | 
					                        sock.setsockopt(
 | 
				
			||||||
                                        True)
 | 
					                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
 | 
				
			||||||
 | 
					                    if reuse_port:
 | 
				
			||||||
 | 
					                        if not hasattr(socket, 'SO_REUSEPORT'):
 | 
				
			||||||
 | 
					                            raise ValueError(
 | 
				
			||||||
 | 
					                                'reuse_port not supported by socket module')
 | 
				
			||||||
 | 
					                        else:
 | 
				
			||||||
 | 
					                            sock.setsockopt(
 | 
				
			||||||
 | 
					                                socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
 | 
				
			||||||
                    # Disable IPv4/IPv6 dual stack support (enabled by
 | 
					                    # Disable IPv4/IPv6 dual stack support (enabled by
 | 
				
			||||||
                    # default on Linux) which makes a single socket
 | 
					                    # default on Linux) which makes a single socket
 | 
				
			||||||
                    # listen on both address families.
 | 
					                    # listen on both address families.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -297,7 +297,8 @@ def create_connection(self, protocol_factory, host=None, port=None, *,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_server(self, protocol_factory, host=None, port=None, *,
 | 
					    def create_server(self, protocol_factory, host=None, port=None, *,
 | 
				
			||||||
                      family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
 | 
					                      family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
 | 
				
			||||||
                      sock=None, backlog=100, ssl=None, reuse_address=None):
 | 
					                      sock=None, backlog=100, ssl=None, reuse_address=None,
 | 
				
			||||||
 | 
					                      reuse_port=None):
 | 
				
			||||||
        """A coroutine which creates a TCP server bound to host and port.
 | 
					        """A coroutine which creates a TCP server bound to host and port.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        The return value is a Server object which can be used to stop
 | 
					        The return value is a Server object which can be used to stop
 | 
				
			||||||
| 
						 | 
					@ -327,6 +328,11 @@ def create_server(self, protocol_factory, host=None, port=None, *,
 | 
				
			||||||
        TIME_WAIT state, without waiting for its natural timeout to
 | 
					        TIME_WAIT state, without waiting for its natural timeout to
 | 
				
			||||||
        expire. If not specified will automatically be set to True on
 | 
					        expire. If not specified will automatically be set to True on
 | 
				
			||||||
        UNIX.
 | 
					        UNIX.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        reuse_port tells the kernel to allow this endpoint to be bound to
 | 
				
			||||||
 | 
					        the same port as other existing endpoints are bound to, so long as
 | 
				
			||||||
 | 
					        they all set this flag when being created. This option is not
 | 
				
			||||||
 | 
					        supported on Windows.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        raise NotImplementedError
 | 
					        raise NotImplementedError
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -358,7 +364,37 @@ def create_unix_server(self, protocol_factory, path, *,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_datagram_endpoint(self, protocol_factory,
 | 
					    def create_datagram_endpoint(self, protocol_factory,
 | 
				
			||||||
                                 local_addr=None, remote_addr=None, *,
 | 
					                                 local_addr=None, remote_addr=None, *,
 | 
				
			||||||
                                 family=0, proto=0, flags=0):
 | 
					                                 family=0, proto=0, flags=0,
 | 
				
			||||||
 | 
					                                 reuse_address=None, reuse_port=None,
 | 
				
			||||||
 | 
					                                 allow_broadcast=None, sock=None):
 | 
				
			||||||
 | 
					        """A coroutine which creates a datagram endpoint.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        This method will try to establish the endpoint in the background.
 | 
				
			||||||
 | 
					        When successful, the coroutine returns a (transport, protocol) pair.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        protocol_factory must be a callable returning a protocol instance.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        socket family AF_INET or socket.AF_INET6 depending on host (or
 | 
				
			||||||
 | 
					        family if specified), socket type SOCK_DGRAM.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        reuse_address tells the kernel to reuse a local socket in
 | 
				
			||||||
 | 
					        TIME_WAIT state, without waiting for its natural timeout to
 | 
				
			||||||
 | 
					        expire. If not specified it will automatically be set to True on
 | 
				
			||||||
 | 
					        UNIX.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        reuse_port tells the kernel to allow this endpoint to be bound to
 | 
				
			||||||
 | 
					        the same port as other existing endpoints are bound to, so long as
 | 
				
			||||||
 | 
					        they all set this flag when being created. This option is not
 | 
				
			||||||
 | 
					        supported on Windows and some UNIX's. If the
 | 
				
			||||||
 | 
					        :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
 | 
				
			||||||
 | 
					        capability is unsupported.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        allow_broadcast tells the kernel to allow this endpoint to send
 | 
				
			||||||
 | 
					        messages to the broadcast address.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        sock can optionally be specified in order to use a preexisting
 | 
				
			||||||
 | 
					        socket object.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
        raise NotImplementedError
 | 
					        raise NotImplementedError
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Pipes and subprocesses.
 | 
					    # Pipes and subprocesses.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -3,6 +3,7 @@
 | 
				
			||||||
import errno
 | 
					import errno
 | 
				
			||||||
import logging
 | 
					import logging
 | 
				
			||||||
import math
 | 
					import math
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
import socket
 | 
					import socket
 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
import threading
 | 
					import threading
 | 
				
			||||||
| 
						 | 
					@ -790,11 +791,11 @@ def connection_lost(self, exc):
 | 
				
			||||||
class MyDatagramProto(asyncio.DatagramProtocol):
 | 
					class MyDatagramProto(asyncio.DatagramProtocol):
 | 
				
			||||||
    done = None
 | 
					    done = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def __init__(self, create_future=False):
 | 
					    def __init__(self, create_future=False, loop=None):
 | 
				
			||||||
        self.state = 'INITIAL'
 | 
					        self.state = 'INITIAL'
 | 
				
			||||||
        self.nbytes = 0
 | 
					        self.nbytes = 0
 | 
				
			||||||
        if create_future:
 | 
					        if create_future:
 | 
				
			||||||
            self.done = asyncio.Future()
 | 
					            self.done = asyncio.Future(loop=loop)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def connection_made(self, transport):
 | 
					    def connection_made(self, transport):
 | 
				
			||||||
        self.transport = transport
 | 
					        self.transport = transport
 | 
				
			||||||
| 
						 | 
					@ -1099,6 +1100,19 @@ def test_create_server_no_getaddrinfo(self):
 | 
				
			||||||
        f = self.loop.create_server(MyProto, '0.0.0.0', 0)
 | 
					        f = self.loop.create_server(MyProto, '0.0.0.0', 0)
 | 
				
			||||||
        self.assertRaises(OSError, self.loop.run_until_complete, f)
 | 
					        self.assertRaises(OSError, self.loop.run_until_complete, f)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @mock.patch('asyncio.base_events.socket')
 | 
				
			||||||
 | 
					    def test_create_server_nosoreuseport(self, m_socket):
 | 
				
			||||||
 | 
					        m_socket.getaddrinfo = socket.getaddrinfo
 | 
				
			||||||
 | 
					        m_socket.SOCK_STREAM = socket.SOCK_STREAM
 | 
				
			||||||
 | 
					        m_socket.SOL_SOCKET = socket.SOL_SOCKET
 | 
				
			||||||
 | 
					        del m_socket.SO_REUSEPORT
 | 
				
			||||||
 | 
					        m_socket.socket.return_value = mock.Mock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        f = self.loop.create_server(
 | 
				
			||||||
 | 
					            MyProto, '0.0.0.0', 0, reuse_port=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, f)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @mock.patch('asyncio.base_events.socket')
 | 
					    @mock.patch('asyncio.base_events.socket')
 | 
				
			||||||
    def test_create_server_cant_bind(self, m_socket):
 | 
					    def test_create_server_cant_bind(self, m_socket):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1199,6 +1213,128 @@ class Err(OSError):
 | 
				
			||||||
        self.assertRaises(Err, self.loop.run_until_complete, fut)
 | 
					        self.assertRaises(Err, self.loop.run_until_complete, fut)
 | 
				
			||||||
        self.assertTrue(m_sock.close.called)
 | 
					        self.assertTrue(m_sock.close.called)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_create_datagram_endpoint_sock(self):
 | 
				
			||||||
 | 
					        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | 
				
			||||||
 | 
					        fut = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            lambda: MyDatagramProto(create_future=True, loop=self.loop),
 | 
				
			||||||
 | 
					            sock=sock)
 | 
				
			||||||
 | 
					        transport, protocol = self.loop.run_until_complete(fut)
 | 
				
			||||||
 | 
					        transport.close()
 | 
				
			||||||
 | 
					        self.loop.run_until_complete(protocol.done)
 | 
				
			||||||
 | 
					        self.assertEqual('CLOSED', protocol.state)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_create_datagram_endpoint_sock_sockopts(self):
 | 
				
			||||||
 | 
					        fut = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            MyDatagramProto, local_addr=('127.0.0.1', 0), sock=object())
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        fut = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=object())
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        fut = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            MyDatagramProto, family=1, sock=object())
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        fut = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            MyDatagramProto, proto=1, sock=object())
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        fut = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            MyDatagramProto, flags=1, sock=object())
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        fut = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            MyDatagramProto, reuse_address=True, sock=object())
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        fut = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            MyDatagramProto, reuse_port=True, sock=object())
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        fut = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            MyDatagramProto, allow_broadcast=True, sock=object())
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_create_datagram_endpoint_sockopts(self):
 | 
				
			||||||
 | 
					        # Socket options should not be applied unless asked for.
 | 
				
			||||||
 | 
					        # SO_REUSEADDR defaults to on for UNIX.
 | 
				
			||||||
 | 
					        # SO_REUSEPORT is not available on all platforms.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        coro = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            lambda: MyDatagramProto(create_future=True, loop=self.loop),
 | 
				
			||||||
 | 
					            local_addr=('127.0.0.1', 0))
 | 
				
			||||||
 | 
					        transport, protocol = self.loop.run_until_complete(coro)
 | 
				
			||||||
 | 
					        sock = transport.get_extra_info('socket')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        reuse_address_default_on = (
 | 
				
			||||||
 | 
					            os.name == 'posix' and sys.platform != 'cygwin')
 | 
				
			||||||
 | 
					        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if reuse_address_default_on:
 | 
				
			||||||
 | 
					            self.assertTrue(
 | 
				
			||||||
 | 
					                sock.getsockopt(
 | 
				
			||||||
 | 
					                    socket.SOL_SOCKET, socket.SO_REUSEADDR))
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            self.assertFalse(
 | 
				
			||||||
 | 
					                sock.getsockopt(
 | 
				
			||||||
 | 
					                    socket.SOL_SOCKET, socket.SO_REUSEADDR))
 | 
				
			||||||
 | 
					        if reuseport_supported:
 | 
				
			||||||
 | 
					            self.assertFalse(
 | 
				
			||||||
 | 
					                sock.getsockopt(
 | 
				
			||||||
 | 
					                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
 | 
				
			||||||
 | 
					        self.assertFalse(
 | 
				
			||||||
 | 
					            sock.getsockopt(
 | 
				
			||||||
 | 
					                socket.SOL_SOCKET, socket.SO_BROADCAST))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        transport.close()
 | 
				
			||||||
 | 
					        self.loop.run_until_complete(protocol.done)
 | 
				
			||||||
 | 
					        self.assertEqual('CLOSED', protocol.state)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        coro = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            lambda: MyDatagramProto(create_future=True, loop=self.loop),
 | 
				
			||||||
 | 
					            local_addr=('127.0.0.1', 0),
 | 
				
			||||||
 | 
					            reuse_address=True,
 | 
				
			||||||
 | 
					            reuse_port=reuseport_supported,
 | 
				
			||||||
 | 
					            allow_broadcast=True)
 | 
				
			||||||
 | 
					        transport, protocol = self.loop.run_until_complete(coro)
 | 
				
			||||||
 | 
					        sock = transport.get_extra_info('socket')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertTrue(
 | 
				
			||||||
 | 
					            sock.getsockopt(
 | 
				
			||||||
 | 
					                socket.SOL_SOCKET, socket.SO_REUSEADDR))
 | 
				
			||||||
 | 
					        if reuseport_supported:
 | 
				
			||||||
 | 
					            self.assertTrue(
 | 
				
			||||||
 | 
					                sock.getsockopt(
 | 
				
			||||||
 | 
					                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            self.assertFalse(
 | 
				
			||||||
 | 
					                sock.getsockopt(
 | 
				
			||||||
 | 
					                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
 | 
				
			||||||
 | 
					        self.assertTrue(
 | 
				
			||||||
 | 
					            sock.getsockopt(
 | 
				
			||||||
 | 
					                socket.SOL_SOCKET, socket.SO_BROADCAST))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        transport.close()
 | 
				
			||||||
 | 
					        self.loop.run_until_complete(protocol.done)
 | 
				
			||||||
 | 
					        self.assertEqual('CLOSED', protocol.state)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @mock.patch('asyncio.base_events.socket')
 | 
				
			||||||
 | 
					    def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
 | 
				
			||||||
 | 
					        m_socket.getaddrinfo = socket.getaddrinfo
 | 
				
			||||||
 | 
					        m_socket.SOCK_DGRAM = socket.SOCK_DGRAM
 | 
				
			||||||
 | 
					        m_socket.SOL_SOCKET = socket.SOL_SOCKET
 | 
				
			||||||
 | 
					        del m_socket.SO_REUSEPORT
 | 
				
			||||||
 | 
					        m_socket.socket.return_value = mock.Mock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        coro = self.loop.create_datagram_endpoint(
 | 
				
			||||||
 | 
					            lambda: MyDatagramProto(loop=self.loop),
 | 
				
			||||||
 | 
					            local_addr=('127.0.0.1', 0),
 | 
				
			||||||
 | 
					            reuse_address=False,
 | 
				
			||||||
 | 
					            reuse_port=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_accept_connection_retry(self):
 | 
					    def test_accept_connection_retry(self):
 | 
				
			||||||
        sock = mock.Mock()
 | 
					        sock = mock.Mock()
 | 
				
			||||||
        sock.accept.side_effect = BlockingIOError()
 | 
					        sock.accept.side_effect = BlockingIOError()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -814,6 +814,32 @@ def test_create_server(self):
 | 
				
			||||||
        # close server
 | 
					        # close server
 | 
				
			||||||
        server.close()
 | 
					        server.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
 | 
				
			||||||
 | 
					    def test_create_server_reuse_port(self):
 | 
				
			||||||
 | 
					        proto = MyProto(self.loop)
 | 
				
			||||||
 | 
					        f = self.loop.create_server(
 | 
				
			||||||
 | 
					            lambda: proto, '0.0.0.0', 0)
 | 
				
			||||||
 | 
					        server = self.loop.run_until_complete(f)
 | 
				
			||||||
 | 
					        self.assertEqual(len(server.sockets), 1)
 | 
				
			||||||
 | 
					        sock = server.sockets[0]
 | 
				
			||||||
 | 
					        self.assertFalse(
 | 
				
			||||||
 | 
					            sock.getsockopt(
 | 
				
			||||||
 | 
					                socket.SOL_SOCKET, socket.SO_REUSEPORT))
 | 
				
			||||||
 | 
					        server.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        test_utils.run_briefly(self.loop)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        proto = MyProto(self.loop)
 | 
				
			||||||
 | 
					        f = self.loop.create_server(
 | 
				
			||||||
 | 
					            lambda: proto, '0.0.0.0', 0, reuse_port=True)
 | 
				
			||||||
 | 
					        server = self.loop.run_until_complete(f)
 | 
				
			||||||
 | 
					        self.assertEqual(len(server.sockets), 1)
 | 
				
			||||||
 | 
					        sock = server.sockets[0]
 | 
				
			||||||
 | 
					        self.assertTrue(
 | 
				
			||||||
 | 
					            sock.getsockopt(
 | 
				
			||||||
 | 
					                socket.SOL_SOCKET, socket.SO_REUSEPORT))
 | 
				
			||||||
 | 
					        server.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _make_unix_server(self, factory, **kwargs):
 | 
					    def _make_unix_server(self, factory, **kwargs):
 | 
				
			||||||
        path = test_utils.gen_unix_socket_path()
 | 
					        path = test_utils.gen_unix_socket_path()
 | 
				
			||||||
        self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
 | 
					        self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
 | 
				
			||||||
| 
						 | 
					@ -1264,6 +1290,32 @@ def datagram_received(self, data, addr):
 | 
				
			||||||
        self.assertEqual('CLOSED', client.state)
 | 
					        self.assertEqual('CLOSED', client.state)
 | 
				
			||||||
        server.transport.close()
 | 
					        server.transport.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_create_datagram_endpoint_sock(self):
 | 
				
			||||||
 | 
					        sock = None
 | 
				
			||||||
 | 
					        local_address = ('127.0.0.1', 0)
 | 
				
			||||||
 | 
					        infos = self.loop.run_until_complete(
 | 
				
			||||||
 | 
					            self.loop.getaddrinfo(
 | 
				
			||||||
 | 
					                *local_address, type=socket.SOCK_DGRAM))
 | 
				
			||||||
 | 
					        for family, type, proto, cname, address in infos:
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                sock = socket.socket(family=family, type=type, proto=proto)
 | 
				
			||||||
 | 
					                sock.setblocking(False)
 | 
				
			||||||
 | 
					                sock.bind(address)
 | 
				
			||||||
 | 
					            except:
 | 
				
			||||||
 | 
					                pass
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            assert False, 'Can not create socket.'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        f = self.loop.create_connection(
 | 
				
			||||||
 | 
					            lambda: MyDatagramProto(loop=self.loop), sock=sock)
 | 
				
			||||||
 | 
					        tr, pr = self.loop.run_until_complete(f)
 | 
				
			||||||
 | 
					        self.assertIsInstance(tr, asyncio.Transport)
 | 
				
			||||||
 | 
					        self.assertIsInstance(pr, MyDatagramProto)
 | 
				
			||||||
 | 
					        tr.close()
 | 
				
			||||||
 | 
					        self.loop.run_until_complete(pr.done)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_internal_fds(self):
 | 
					    def test_internal_fds(self):
 | 
				
			||||||
        loop = self.create_event_loop()
 | 
					        loop = self.create_event_loop()
 | 
				
			||||||
        if not isinstance(loop, selector_events.BaseSelectorEventLoop):
 | 
					        if not isinstance(loop, selector_events.BaseSelectorEventLoop):
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -789,6 +789,7 @@ Ben Laurie
 | 
				
			||||||
Simon Law
 | 
					Simon Law
 | 
				
			||||||
Julia Lawall
 | 
					Julia Lawall
 | 
				
			||||||
Chris Lawrence
 | 
					Chris Lawrence
 | 
				
			||||||
 | 
					Chris Laws
 | 
				
			||||||
Brian Leair
 | 
					Brian Leair
 | 
				
			||||||
Mathieu Leduc-Hamel
 | 
					Mathieu Leduc-Hamel
 | 
				
			||||||
Amandine Lee
 | 
					Amandine Lee
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -90,6 +90,12 @@ Core and Builtins
 | 
				
			||||||
Library
 | 
					Library
 | 
				
			||||||
-------
 | 
					-------
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					- Issue #23972: Updates asyncio datagram create method allowing reuseport
 | 
				
			||||||
 | 
					  and reuseaddr socket options to be set prior to binding the socket.
 | 
				
			||||||
 | 
					  Mirroring the existing asyncio create_server method the reuseaddr option
 | 
				
			||||||
 | 
					  for datagram sockets defaults to True if the O/S is 'posix' (except if the
 | 
				
			||||||
 | 
					  platform is Cygwin). Patch by Chris Laws.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
- Issue #25304: Add asyncio.run_coroutine_threadsafe().  This lets you
 | 
					- Issue #25304: Add asyncio.run_coroutine_threadsafe().  This lets you
 | 
				
			||||||
  submit a coroutine to a loop from another thread, returning a
 | 
					  submit a coroutine to a loop from another thread, returning a
 | 
				
			||||||
  concurrent.futures.Future.  By Vincent Michel.
 | 
					  concurrent.futures.Future.  By Vincent Michel.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue