Issue #25114, asyncio: add ssl_object extra info to SSL transports

This info is required on Python 3.5 and newer to get specific information on
the SSL object, like getting the binary peer certificate (instead of getting
it as text).
This commit is contained in:
Victor Stinner 2015-09-21 18:06:17 +02:00
parent 808d6416e3
commit f7dc7fb74d
4 changed files with 73 additions and 12 deletions

View file

@ -71,6 +71,8 @@ BaseTransport
- ``'peercert'``: peer certificate; result of - ``'peercert'``: peer certificate; result of
:meth:`ssl.SSLSocket.getpeercert` :meth:`ssl.SSLSocket.getpeercert`
- ``'sslcontext'``: :class:`ssl.SSLContext` instance - ``'sslcontext'``: :class:`ssl.SSLContext` instance
- ``'ssl_object'``: :class:`ssl.SSLObject` or :class:`ssl.SSLSocket`
instance
* pipe: * pipe:
@ -80,6 +82,9 @@ BaseTransport
- ``'subprocess'``: :class:`subprocess.Popen` instance - ``'subprocess'``: :class:`subprocess.Popen` instance
.. versionchanged:: 3.4.4
``'ssl_object'`` info was added to SSL sockets.
ReadTransport ReadTransport
------------- -------------

View file

@ -843,6 +843,7 @@ def _on_handshake(self, start_time):
self._extra.update(peercert=peercert, self._extra.update(peercert=peercert,
cipher=self._sock.cipher(), cipher=self._sock.cipher(),
compression=self._sock.compression(), compression=self._sock.compression(),
ssl_object=self._sock,
) )
self._read_wants_write = False self._read_wants_write = False

View file

@ -295,6 +295,7 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
def __init__(self, loop, ssl_protocol, app_protocol): def __init__(self, loop, ssl_protocol, app_protocol):
self._loop = loop self._loop = loop
# SSLProtocol instance
self._ssl_protocol = ssl_protocol self._ssl_protocol = ssl_protocol
self._app_protocol = app_protocol self._app_protocol = app_protocol
self._closed = False self._closed = False
@ -425,10 +426,12 @@ def __init__(self, loop, app_protocol, sslcontext, waiter,
self._app_protocol = app_protocol self._app_protocol = app_protocol
self._app_transport = _SSLProtocolTransport(self._loop, self._app_transport = _SSLProtocolTransport(self._loop,
self, self._app_protocol) self, self._app_protocol)
# _SSLPipe instance (None until the connection is made)
self._sslpipe = None self._sslpipe = None
self._session_established = False self._session_established = False
self._in_handshake = False self._in_handshake = False
self._in_shutdown = False self._in_shutdown = False
# transport, ex: SelectorSocketTransport
self._transport = None self._transport = None
def _wakeup_waiter(self, exc=None): def _wakeup_waiter(self, exc=None):
@ -591,6 +594,7 @@ def _on_handshake_complete(self, handshake_exc):
self._extra.update(peercert=peercert, self._extra.update(peercert=peercert,
cipher=sslobj.cipher(), cipher=sslobj.cipher(),
compression=sslobj.compression(), compression=sslobj.compression(),
ssl_object=sslobj,
) )
self._app_protocol.connection_made(self._app_transport) self._app_protocol.connection_made(self._app_transport)
self._wakeup_waiter() self._wakeup_waiter()

View file

@ -57,6 +57,17 @@ def osx_tiger():
ONLYKEY = data_file('ssl_key.pem') ONLYKEY = data_file('ssl_key.pem')
SIGNED_CERTFILE = data_file('keycert3.pem') SIGNED_CERTFILE = data_file('keycert3.pem')
SIGNING_CA = data_file('pycacert.pem') SIGNING_CA = data_file('pycacert.pem')
PEERCERT = {'serialNumber': 'B09264B1F2DA21D1',
'version': 1,
'subject': ((('countryName', 'XY'),),
(('localityName', 'Castle Anthrax'),),
(('organizationName', 'Python Software Foundation'),),
(('commonName', 'localhost'),)),
'issuer': ((('countryName', 'XY'),),
(('organizationName', 'Python Software Foundation CA'),),
(('commonName', 'our-ca-server'),)),
'notAfter': 'Nov 13 19:47:07 2022 GMT',
'notBefore': 'Jan 4 19:47:07 2013 GMT'}
class MyBaseProto(asyncio.Protocol): class MyBaseProto(asyncio.Protocol):
@ -596,22 +607,56 @@ def test_create_connection_sock(self):
self.assertGreater(pr.nbytes, 0) self.assertGreater(pr.nbytes, 0)
tr.close() tr.close()
def check_ssl_extra_info(self, client, check_sockname=True,
peername=None, peercert={}):
if check_sockname:
self.assertIsNotNone(client.get_extra_info('sockname'))
if peername:
self.assertEqual(peername,
client.get_extra_info('peername'))
else:
self.assertIsNotNone(client.get_extra_info('peername'))
self.assertEqual(peercert,
client.get_extra_info('peercert'))
# Python disables compression to prevent CRIME attacks by default
self.assertIsNone(client.get_extra_info('compression'))
# test SSL cipher
cipher = client.get_extra_info('cipher')
self.assertIsInstance(cipher, tuple)
self.assertEqual(len(cipher), 3, cipher)
self.assertIsInstance(cipher[0], str)
self.assertIsInstance(cipher[1], str)
self.assertIsInstance(cipher[2], int)
# test SSL object
sslobj = client.get_extra_info('ssl_object')
self.assertIsNotNone(sslobj)
self.assertEqual(sslobj.compression(),
client.get_extra_info('compression'))
self.assertEqual(sslobj.cipher(),
client.get_extra_info('cipher'))
self.assertEqual(sslobj.getpeercert(),
client.get_extra_info('peercert'))
def _basetest_create_ssl_connection(self, connection_fut, def _basetest_create_ssl_connection(self, connection_fut,
check_sockname=True): check_sockname=True,
peername=None):
tr, pr = self.loop.run_until_complete(connection_fut) tr, pr = self.loop.run_until_complete(connection_fut)
self.assertIsInstance(tr, asyncio.Transport) self.assertIsInstance(tr, asyncio.Transport)
self.assertIsInstance(pr, asyncio.Protocol) self.assertIsInstance(pr, asyncio.Protocol)
self.assertTrue('ssl' in tr.__class__.__name__.lower()) self.assertTrue('ssl' in tr.__class__.__name__.lower())
if check_sockname: self.check_ssl_extra_info(tr, check_sockname, peername)
self.assertIsNotNone(tr.get_extra_info('sockname'))
self.loop.run_until_complete(pr.done) self.loop.run_until_complete(pr.done)
self.assertGreater(pr.nbytes, 0) self.assertGreater(pr.nbytes, 0)
tr.close() tr.close()
def _test_create_ssl_connection(self, httpd, create_connection, def _test_create_ssl_connection(self, httpd, create_connection,
check_sockname=True): check_sockname=True, peername=None):
conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
self._basetest_create_ssl_connection(conn_fut, check_sockname) self._basetest_create_ssl_connection(conn_fut, check_sockname,
peername)
# ssl.Purpose was introduced in Python 3.4 # ssl.Purpose was introduced in Python 3.4
if hasattr(ssl, 'Purpose'): if hasattr(ssl, 'Purpose'):
@ -629,7 +674,8 @@ def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
with mock.patch('ssl.create_default_context', with mock.patch('ssl.create_default_context',
side_effect=_dummy_ssl_create_context) as m: side_effect=_dummy_ssl_create_context) as m:
conn_fut = create_connection(ssl=True) conn_fut = create_connection(ssl=True)
self._basetest_create_ssl_connection(conn_fut, check_sockname) self._basetest_create_ssl_connection(conn_fut, check_sockname,
peername)
self.assertEqual(m.call_count, 1) self.assertEqual(m.call_count, 1)
# With the real ssl.create_default_context(), certificate # With the real ssl.create_default_context(), certificate
@ -638,7 +684,8 @@ def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
conn_fut = create_connection(ssl=True) conn_fut = create_connection(ssl=True)
# Ignore the "SSL handshake failed" log in debug mode # Ignore the "SSL handshake failed" log in debug mode
with test_utils.disable_logger(): with test_utils.disable_logger():
self._basetest_create_ssl_connection(conn_fut, check_sockname) self._basetest_create_ssl_connection(conn_fut, check_sockname,
peername)
self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED') self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
@ -649,7 +696,8 @@ def test_create_ssl_connection(self):
self.loop.create_connection, self.loop.create_connection,
lambda: MyProto(loop=self.loop), lambda: MyProto(loop=self.loop),
*httpd.address) *httpd.address)
self._test_create_ssl_connection(httpd, create_connection) self._test_create_ssl_connection(httpd, create_connection,
peername=httpd.address)
def test_legacy_create_ssl_connection(self): def test_legacy_create_ssl_connection(self):
with test_utils.force_legacy_ssl_support(): with test_utils.force_legacy_ssl_support():
@ -669,7 +717,8 @@ def test_create_ssl_unix_connection(self):
server_hostname='127.0.0.1') server_hostname='127.0.0.1')
self._test_create_ssl_connection(httpd, create_connection, self._test_create_ssl_connection(httpd, create_connection,
check_sockname) check_sockname,
peername=httpd.address)
def test_legacy_create_ssl_unix_connection(self): def test_legacy_create_ssl_unix_connection(self):
with test_utils.force_legacy_ssl_support(): with test_utils.force_legacy_ssl_support():
@ -819,9 +868,7 @@ def test_create_server_ssl(self):
self.assertEqual(3, proto.nbytes) self.assertEqual(3, proto.nbytes)
# extra info is available # extra info is available
self.assertIsNotNone(proto.transport.get_extra_info('sockname')) self.check_ssl_extra_info(client, peername=(host, port))
self.assertEqual('127.0.0.1',
proto.transport.get_extra_info('peername')[0])
# close connection # close connection
proto.transport.close() proto.transport.close()
@ -1023,6 +1070,10 @@ def test_create_server_ssl_verified(self):
server_hostname='localhost') server_hostname='localhost')
client, pr = self.loop.run_until_complete(f_c) client, pr = self.loop.run_until_complete(f_c)
# extra info is available
self.check_ssl_extra_info(client,peername=(host, port),
peercert=PEERCERT)
# close connection # close connection
proto.transport.close() proto.transport.close()
client.close() client.close()