mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
bpo-32947: OpenSSL 1.1.1-pre1 / TLS 1.3 fixes (#5663)
* bpo-32947: OpenSSL 1.1.1-pre1 / TLS 1.3 fixes Misc fixes and workarounds for compatibility with OpenSSL 1.1.1-pre1 and TLS 1.3 support. With OpenSSL 1.1.1, Python negotiates TLS 1.3 by default. Some test cases only apply to TLS 1.2. Other tests currently fail because the threaded or async test servers stop after failure. I'm going to address these issues when OpenSSL 1.1.1 reaches beta. OpenSSL 1.1.1 has added a new option OP_ENABLE_MIDDLEBOX_COMPAT for TLS 1.3. The feature is enabled by default for maximum compatibility with broken middle boxes. Users should be able to disable the hack and CPython's test suite needs it to verify default options. Signed-off-by: Christian Heimes <christian@python.org>
This commit is contained in:
parent
2fa6b9eae0
commit
05d9fe32a1
9 changed files with 129 additions and 72 deletions
|
|
@ -831,6 +831,15 @@ Constants
|
||||||
|
|
||||||
.. versionadded:: 3.3
|
.. versionadded:: 3.3
|
||||||
|
|
||||||
|
.. data:: OP_ENABLE_MIDDLEBOX_COMPAT
|
||||||
|
|
||||||
|
Send dummy Change Cipher Spec (CCS) messages in TLS 1.3 handshake to make
|
||||||
|
a TLS 1.3 connection look more like a TLS 1.2 connection.
|
||||||
|
|
||||||
|
This option is only available with OpenSSL 1.1.1 and later.
|
||||||
|
|
||||||
|
.. versionadded:: 3.8
|
||||||
|
|
||||||
.. data:: OP_NO_COMPRESSION
|
.. data:: OP_NO_COMPRESSION
|
||||||
|
|
||||||
Disable compression on the SSL channel. This is useful if the application
|
Disable compression on the SSL channel. This is useful if the application
|
||||||
|
|
|
||||||
|
|
@ -669,6 +669,9 @@ expected hostname in A-label form (``"xn--pythn-mua.org"``), rather
|
||||||
than the U-label form (``"pythön.org"``). (Contributed by
|
than the U-label form (``"pythön.org"``). (Contributed by
|
||||||
Nathaniel J. Smith and Christian Heimes in :issue:`28414`.)
|
Nathaniel J. Smith and Christian Heimes in :issue:`28414`.)
|
||||||
|
|
||||||
|
The ssl module has preliminary and experimental support for TLS 1.3 and
|
||||||
|
OpenSSL 1.1.1. (Contributed by Christian Heimes in :issue:`32947`,
|
||||||
|
:issue:`20995`, :issue:`29136`, and :issue:`30622`)
|
||||||
|
|
||||||
string
|
string
|
||||||
------
|
------
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,8 @@ def simple_server_sslcontext():
|
||||||
server_context.load_cert_chain(ONLYCERT, ONLYKEY)
|
server_context.load_cert_chain(ONLYCERT, ONLYKEY)
|
||||||
server_context.check_hostname = False
|
server_context.check_hostname = False
|
||||||
server_context.verify_mode = ssl.CERT_NONE
|
server_context.verify_mode = ssl.CERT_NONE
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
server_context.options |= ssl.OP_NO_TLSv1_3
|
||||||
return server_context
|
return server_context
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -312,6 +312,8 @@ class SSLConnection(asyncore.dispatcher):
|
||||||
|
|
||||||
def secure_connection(self):
|
def secure_connection(self):
|
||||||
context = ssl.SSLContext()
|
context = ssl.SSLContext()
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
context.options |= ssl.OP_NO_TLSv1_3
|
||||||
context.load_cert_chain(CERTFILE)
|
context.load_cert_chain(CERTFILE)
|
||||||
socket = context.wrap_socket(self.socket,
|
socket = context.wrap_socket(self.socket,
|
||||||
suppress_ragged_eofs=False,
|
suppress_ragged_eofs=False,
|
||||||
|
|
@ -908,6 +910,8 @@ def test_auth_issued_twice(self):
|
||||||
def test_context(self):
|
def test_context(self):
|
||||||
self.client.quit()
|
self.client.quit()
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
ctx.options |= ssl.OP_NO_TLSv1_3
|
||||||
ctx.check_hostname = False
|
ctx.check_hostname = False
|
||||||
ctx.verify_mode = ssl.CERT_NONE
|
ctx.verify_mode = ssl.CERT_NONE
|
||||||
self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
|
self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
|
||||||
|
|
@ -940,6 +944,8 @@ def test_ccc(self):
|
||||||
def test_check_hostname(self):
|
def test_check_hostname(self):
|
||||||
self.client.quit()
|
self.client.quit()
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
ctx.options |= ssl.OP_NO_TLSv1_3
|
||||||
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
||||||
self.assertEqual(ctx.check_hostname, True)
|
self.assertEqual(ctx.check_hostname, True)
|
||||||
ctx.load_verify_locations(CAFILE)
|
ctx.load_verify_locations(CAFILE)
|
||||||
|
|
|
||||||
|
|
@ -153,6 +153,8 @@ def cmd_stls(self, arg):
|
||||||
if self.tls_active is False:
|
if self.tls_active is False:
|
||||||
self.push('+OK Begin TLS negotiation')
|
self.push('+OK Begin TLS negotiation')
|
||||||
context = ssl.SSLContext()
|
context = ssl.SSLContext()
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
context.options |= ssl.OP_NO_TLSv1_3
|
||||||
context.load_cert_chain(CERTFILE)
|
context.load_cert_chain(CERTFILE)
|
||||||
tls_sock = context.wrap_socket(self.socket,
|
tls_sock = context.wrap_socket(self.socket,
|
||||||
server_side=True,
|
server_side=True,
|
||||||
|
|
@ -356,6 +358,8 @@ def test_stls(self):
|
||||||
def test_stls_context(self):
|
def test_stls_context(self):
|
||||||
expected = b'+OK Begin TLS negotiation'
|
expected = b'+OK Begin TLS negotiation'
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
ctx.options |= ssl.OP_NO_TLSv1_3
|
||||||
ctx.load_verify_locations(CAFILE)
|
ctx.load_verify_locations(CAFILE)
|
||||||
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
||||||
self.assertEqual(ctx.check_hostname, True)
|
self.assertEqual(ctx.check_hostname, True)
|
||||||
|
|
@ -396,6 +400,8 @@ def test__all__(self):
|
||||||
|
|
||||||
def test_context(self):
|
def test_context(self):
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
ctx.options |= ssl.OP_NO_TLSv1_3
|
||||||
ctx.check_hostname = False
|
ctx.check_hostname = False
|
||||||
ctx.verify_mode = ssl.CERT_NONE
|
ctx.verify_mode = ssl.CERT_NONE
|
||||||
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
|
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,8 @@
|
||||||
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
|
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
|
||||||
HOST = support.HOST
|
HOST = support.HOST
|
||||||
IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
|
IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
|
||||||
IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
|
IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
|
||||||
|
IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
|
||||||
PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
|
PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
|
||||||
|
|
||||||
def data_file(*name):
|
def data_file(*name):
|
||||||
|
|
@ -54,6 +55,7 @@ def data_file(*name):
|
||||||
BYTES_CAPATH = os.fsencode(CAPATH)
|
BYTES_CAPATH = os.fsencode(CAPATH)
|
||||||
CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
|
CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
|
||||||
CAFILE_CACERT = data_file("capath", "5ed36f99.0")
|
CAFILE_CACERT = data_file("capath", "5ed36f99.0")
|
||||||
|
WRONG_CERT = data_file("wrongcert.pem")
|
||||||
|
|
||||||
CERTFILE_INFO = {
|
CERTFILE_INFO = {
|
||||||
'issuer': ((('countryName', 'XY'),),
|
'issuer': ((('countryName', 'XY'),),
|
||||||
|
|
@ -124,6 +126,7 @@ def data_file(*name):
|
||||||
OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
|
OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
|
||||||
OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
|
OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
|
||||||
OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
|
OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
|
||||||
|
OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
|
||||||
|
|
||||||
|
|
||||||
def handle_error(prefix):
|
def handle_error(prefix):
|
||||||
|
|
@ -232,6 +235,7 @@ def testing_context(server_cert=SIGNED_CERTFILE):
|
||||||
|
|
||||||
server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||||
server_context.load_cert_chain(server_cert)
|
server_context.load_cert_chain(server_cert)
|
||||||
|
client_context.load_verify_locations(SIGNING_CA)
|
||||||
|
|
||||||
return client_context, server_context, hostname
|
return client_context, server_context, hostname
|
||||||
|
|
||||||
|
|
@ -1016,7 +1020,8 @@ def test_options(self):
|
||||||
default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
|
default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
|
||||||
# SSLContext also enables these by default
|
# SSLContext also enables these by default
|
||||||
default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
|
default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
|
||||||
OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
|
OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
|
||||||
|
OP_ENABLE_MIDDLEBOX_COMPAT)
|
||||||
self.assertEqual(default, ctx.options)
|
self.assertEqual(default, ctx.options)
|
||||||
ctx.options |= ssl.OP_NO_TLSv1
|
ctx.options |= ssl.OP_NO_TLSv1
|
||||||
self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
|
self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
|
||||||
|
|
@ -1778,6 +1783,8 @@ def test_connect_cadata(self):
|
||||||
der = ssl.PEM_cert_to_DER_cert(pem)
|
der = ssl.PEM_cert_to_DER_cert(pem)
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
ctx.options |= ssl.OP_NO_TLSv1_3
|
||||||
ctx.load_verify_locations(cadata=pem)
|
ctx.load_verify_locations(cadata=pem)
|
||||||
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
|
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
|
||||||
s.connect(self.server_addr)
|
s.connect(self.server_addr)
|
||||||
|
|
@ -1787,6 +1794,8 @@ def test_connect_cadata(self):
|
||||||
# same with DER
|
# same with DER
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
ctx.options |= ssl.OP_NO_TLSv1_3
|
||||||
ctx.load_verify_locations(cadata=der)
|
ctx.load_verify_locations(cadata=der)
|
||||||
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
|
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
|
||||||
s.connect(self.server_addr)
|
s.connect(self.server_addr)
|
||||||
|
|
@ -2629,7 +2638,10 @@ def test_check_hostname(self):
|
||||||
def test_ecc_cert(self):
|
def test_ecc_cert(self):
|
||||||
client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
client_context.load_verify_locations(SIGNING_CA)
|
client_context.load_verify_locations(SIGNING_CA)
|
||||||
client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
|
client_context.set_ciphers(
|
||||||
|
'TLS13-AES-128-GCM-SHA256:TLS13-CHACHA20-POLY1305-SHA256:'
|
||||||
|
'ECDHE:ECDSA:!NULL:!aRSA'
|
||||||
|
)
|
||||||
hostname = SIGNED_CERTFILE_ECC_HOSTNAME
|
hostname = SIGNED_CERTFILE_ECC_HOSTNAME
|
||||||
|
|
||||||
server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||||
|
|
@ -2650,6 +2662,9 @@ def test_ecc_cert(self):
|
||||||
def test_dual_rsa_ecc(self):
|
def test_dual_rsa_ecc(self):
|
||||||
client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
client_context.load_verify_locations(SIGNING_CA)
|
client_context.load_verify_locations(SIGNING_CA)
|
||||||
|
# TODO: fix TLSv1.3 once SSLContext can restrict signature
|
||||||
|
# algorithms.
|
||||||
|
client_context.options |= ssl.OP_NO_TLSv1_3
|
||||||
# only ECDSA certs
|
# only ECDSA certs
|
||||||
client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
|
client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
|
||||||
hostname = SIGNED_CERTFILE_ECC_HOSTNAME
|
hostname = SIGNED_CERTFILE_ECC_HOSTNAME
|
||||||
|
|
@ -2676,6 +2691,8 @@ def test_check_hostname_idn(self):
|
||||||
|
|
||||||
server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||||
server_context.load_cert_chain(IDNSANSFILE)
|
server_context.load_cert_chain(IDNSANSFILE)
|
||||||
|
# TODO: fix TLSv1.3 support
|
||||||
|
server_context.options |= ssl.OP_NO_TLSv1_3
|
||||||
|
|
||||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
context.verify_mode = ssl.CERT_REQUIRED
|
context.verify_mode = ssl.CERT_REQUIRED
|
||||||
|
|
@ -2738,15 +2755,22 @@ def test_wrong_cert(self):
|
||||||
Launch a server with CERT_REQUIRED, and check that trying to
|
Launch a server with CERT_REQUIRED, and check that trying to
|
||||||
connect to it with a wrong client certificate fails.
|
connect to it with a wrong client certificate fails.
|
||||||
"""
|
"""
|
||||||
certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
|
client_context, server_context, hostname = testing_context()
|
||||||
"wrongcert.pem")
|
# load client cert
|
||||||
server = ThreadedEchoServer(CERTFILE,
|
client_context.load_cert_chain(WRONG_CERT)
|
||||||
certreqs=ssl.CERT_REQUIRED,
|
# require TLS client authentication
|
||||||
cacerts=CERTFILE, chatty=False,
|
server_context.verify_mode = ssl.CERT_REQUIRED
|
||||||
connectionchatty=False)
|
# TODO: fix TLSv1.3 support
|
||||||
|
# With TLS 1.3, test fails with exception in server thread
|
||||||
|
server_context.options |= ssl.OP_NO_TLSv1_3
|
||||||
|
|
||||||
|
server = ThreadedEchoServer(
|
||||||
|
context=server_context, chatty=True, connectionchatty=True,
|
||||||
|
)
|
||||||
|
|
||||||
with server, \
|
with server, \
|
||||||
socket.socket() as sock, \
|
client_context.wrap_socket(socket.socket(),
|
||||||
test_wrap_socket(sock, certfile=certfile) as s:
|
server_hostname=hostname) as s:
|
||||||
try:
|
try:
|
||||||
# Expect either an SSL error about the server rejecting
|
# Expect either an SSL error about the server rejecting
|
||||||
# the connection, or a low-level connection reset (which
|
# the connection, or a low-level connection reset (which
|
||||||
|
|
@ -3400,7 +3424,9 @@ def test_version_basic(self):
|
||||||
self.assertIs(s.version(), None)
|
self.assertIs(s.version(), None)
|
||||||
self.assertIs(s._sslobj, None)
|
self.assertIs(s._sslobj, None)
|
||||||
s.connect((HOST, server.port))
|
s.connect((HOST, server.port))
|
||||||
if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
|
if ssl.OPENSSL_VERSION_INFO >= (1, 1, 1):
|
||||||
|
self.assertEqual(s.version(), 'TLSv1.3')
|
||||||
|
elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
|
||||||
self.assertEqual(s.version(), 'TLSv1.2')
|
self.assertEqual(s.version(), 'TLSv1.2')
|
||||||
else: # 0.9.8 to 1.0.1
|
else: # 0.9.8 to 1.0.1
|
||||||
self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
|
self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
|
||||||
|
|
@ -3412,18 +3438,18 @@ def test_version_basic(self):
|
||||||
def test_tls1_3(self):
|
def test_tls1_3(self):
|
||||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||||
context.load_cert_chain(CERTFILE)
|
context.load_cert_chain(CERTFILE)
|
||||||
# disable all but TLS 1.3
|
|
||||||
context.options |= (
|
context.options |= (
|
||||||
ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
|
ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
|
||||||
)
|
)
|
||||||
with ThreadedEchoServer(context=context) as server:
|
with ThreadedEchoServer(context=context) as server:
|
||||||
with context.wrap_socket(socket.socket()) as s:
|
with context.wrap_socket(socket.socket()) as s:
|
||||||
s.connect((HOST, server.port))
|
s.connect((HOST, server.port))
|
||||||
self.assertIn(s.cipher()[0], [
|
self.assertIn(s.cipher()[0], {
|
||||||
'TLS13-AES-256-GCM-SHA384',
|
'TLS13-AES-256-GCM-SHA384',
|
||||||
'TLS13-CHACHA20-POLY1305-SHA256',
|
'TLS13-CHACHA20-POLY1305-SHA256',
|
||||||
'TLS13-AES-128-GCM-SHA256',
|
'TLS13-AES-128-GCM-SHA256',
|
||||||
])
|
})
|
||||||
|
self.assertEqual(s.version(), 'TLSv1.3')
|
||||||
|
|
||||||
@unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
|
@unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
|
||||||
def test_default_ecdh_curve(self):
|
def test_default_ecdh_curve(self):
|
||||||
|
|
@ -3452,58 +3478,54 @@ def test_tls_unique_channel_binding(self):
|
||||||
if support.verbose:
|
if support.verbose:
|
||||||
sys.stdout.write("\n")
|
sys.stdout.write("\n")
|
||||||
|
|
||||||
server = ThreadedEchoServer(CERTFILE,
|
client_context, server_context, hostname = testing_context()
|
||||||
certreqs=ssl.CERT_NONE,
|
# TODO: fix TLSv1.3 support
|
||||||
ssl_version=ssl.PROTOCOL_TLS_SERVER,
|
client_context.options |= ssl.OP_NO_TLSv1_3
|
||||||
cacerts=CERTFILE,
|
|
||||||
|
server = ThreadedEchoServer(context=server_context,
|
||||||
chatty=True,
|
chatty=True,
|
||||||
connectionchatty=False)
|
connectionchatty=False)
|
||||||
|
|
||||||
with server:
|
with server:
|
||||||
s = test_wrap_socket(socket.socket(),
|
with client_context.wrap_socket(
|
||||||
server_side=False,
|
socket.socket(),
|
||||||
certfile=CERTFILE,
|
server_hostname=hostname) as s:
|
||||||
ca_certs=CERTFILE,
|
s.connect((HOST, server.port))
|
||||||
cert_reqs=ssl.CERT_NONE,
|
# get the data
|
||||||
ssl_version=ssl.PROTOCOL_TLS_CLIENT)
|
cb_data = s.get_channel_binding("tls-unique")
|
||||||
s.connect((HOST, server.port))
|
if support.verbose:
|
||||||
# get the data
|
sys.stdout.write(
|
||||||
cb_data = s.get_channel_binding("tls-unique")
|
" got channel binding data: {0!r}\n".format(cb_data))
|
||||||
if support.verbose:
|
|
||||||
sys.stdout.write(" got channel binding data: {0!r}\n"
|
|
||||||
.format(cb_data))
|
|
||||||
|
|
||||||
# check if it is sane
|
# check if it is sane
|
||||||
self.assertIsNotNone(cb_data)
|
self.assertIsNotNone(cb_data)
|
||||||
self.assertEqual(len(cb_data), 12) # True for TLSv1
|
self.assertEqual(len(cb_data), 12) # True for TLSv1
|
||||||
|
|
||||||
# and compare with the peers version
|
# and compare with the peers version
|
||||||
s.write(b"CB tls-unique\n")
|
s.write(b"CB tls-unique\n")
|
||||||
peer_data_repr = s.read().strip()
|
peer_data_repr = s.read().strip()
|
||||||
self.assertEqual(peer_data_repr,
|
self.assertEqual(peer_data_repr,
|
||||||
repr(cb_data).encode("us-ascii"))
|
repr(cb_data).encode("us-ascii"))
|
||||||
s.close()
|
|
||||||
|
|
||||||
# now, again
|
# now, again
|
||||||
s = test_wrap_socket(socket.socket(),
|
with client_context.wrap_socket(
|
||||||
server_side=False,
|
socket.socket(),
|
||||||
certfile=CERTFILE,
|
server_hostname=hostname) as s:
|
||||||
ca_certs=CERTFILE,
|
s.connect((HOST, server.port))
|
||||||
cert_reqs=ssl.CERT_NONE,
|
new_cb_data = s.get_channel_binding("tls-unique")
|
||||||
ssl_version=ssl.PROTOCOL_TLS_CLIENT)
|
if support.verbose:
|
||||||
s.connect((HOST, server.port))
|
sys.stdout.write(
|
||||||
new_cb_data = s.get_channel_binding("tls-unique")
|
"got another channel binding data: {0!r}\n".format(
|
||||||
if support.verbose:
|
new_cb_data)
|
||||||
sys.stdout.write(" got another channel binding data: {0!r}\n"
|
)
|
||||||
.format(new_cb_data))
|
# is it really unique
|
||||||
# is it really unique
|
self.assertNotEqual(cb_data, new_cb_data)
|
||||||
self.assertNotEqual(cb_data, new_cb_data)
|
self.assertIsNotNone(cb_data)
|
||||||
self.assertIsNotNone(cb_data)
|
self.assertEqual(len(cb_data), 12) # True for TLSv1
|
||||||
self.assertEqual(len(cb_data), 12) # True for TLSv1
|
s.write(b"CB tls-unique\n")
|
||||||
s.write(b"CB tls-unique\n")
|
peer_data_repr = s.read().strip()
|
||||||
peer_data_repr = s.read().strip()
|
self.assertEqual(peer_data_repr,
|
||||||
self.assertEqual(peer_data_repr,
|
repr(new_cb_data).encode("us-ascii"))
|
||||||
repr(new_cb_data).encode("us-ascii"))
|
|
||||||
s.close()
|
|
||||||
|
|
||||||
def test_compression(self):
|
def test_compression(self):
|
||||||
client_context, server_context, hostname = testing_context()
|
client_context, server_context, hostname = testing_context()
|
||||||
|
|
@ -3528,8 +3550,11 @@ def test_compression_disabled(self):
|
||||||
def test_dh_params(self):
|
def test_dh_params(self):
|
||||||
# Check we can get a connection with ephemeral Diffie-Hellman
|
# Check we can get a connection with ephemeral Diffie-Hellman
|
||||||
client_context, server_context, hostname = testing_context()
|
client_context, server_context, hostname = testing_context()
|
||||||
|
# test scenario needs TLS <= 1.2
|
||||||
|
client_context.options |= ssl.OP_NO_TLSv1_3
|
||||||
server_context.load_dh_params(DHFILE)
|
server_context.load_dh_params(DHFILE)
|
||||||
server_context.set_ciphers("kEDH")
|
server_context.set_ciphers("kEDH")
|
||||||
|
server_context.options |= ssl.OP_NO_TLSv1_3
|
||||||
stats = server_params_test(client_context, server_context,
|
stats = server_params_test(client_context, server_context,
|
||||||
chatty=True, connectionchatty=True,
|
chatty=True, connectionchatty=True,
|
||||||
sni_name=hostname)
|
sni_name=hostname)
|
||||||
|
|
@ -3539,9 +3564,11 @@ def test_dh_params(self):
|
||||||
self.fail("Non-DH cipher: " + cipher[0])
|
self.fail("Non-DH cipher: " + cipher[0])
|
||||||
|
|
||||||
@unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
|
@unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
|
||||||
|
@unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
|
||||||
def test_ecdh_curve(self):
|
def test_ecdh_curve(self):
|
||||||
# server secp384r1, client auto
|
# server secp384r1, client auto
|
||||||
client_context, server_context, hostname = testing_context()
|
client_context, server_context, hostname = testing_context()
|
||||||
|
|
||||||
server_context.set_ecdh_curve("secp384r1")
|
server_context.set_ecdh_curve("secp384r1")
|
||||||
server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
|
server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
|
||||||
server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
|
server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
|
||||||
|
|
@ -3572,7 +3599,7 @@ def test_ecdh_curve(self):
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
# OpenSSL 1.0.2 does not fail although it should.
|
# OpenSSL 1.0.2 does not fail although it should.
|
||||||
if IS_OPENSSL_1_1:
|
if IS_OPENSSL_1_1_0:
|
||||||
self.fail("mismatch curve did not fail")
|
self.fail("mismatch curve did not fail")
|
||||||
|
|
||||||
def test_selected_alpn_protocol(self):
|
def test_selected_alpn_protocol(self):
|
||||||
|
|
@ -3616,7 +3643,7 @@ def test_alpn_protocols(self):
|
||||||
except ssl.SSLError as e:
|
except ssl.SSLError as e:
|
||||||
stats = e
|
stats = e
|
||||||
|
|
||||||
if (expected is None and IS_OPENSSL_1_1
|
if (expected is None and IS_OPENSSL_1_1_0
|
||||||
and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
|
and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
|
||||||
# OpenSSL 1.1.0 to 1.1.0e raises handshake error
|
# OpenSSL 1.1.0 to 1.1.0e raises handshake error
|
||||||
self.assertIsInstance(stats, ssl.SSLError)
|
self.assertIsInstance(stats, ssl.SSLError)
|
||||||
|
|
@ -3823,6 +3850,8 @@ def test_sendfile(self):
|
||||||
|
|
||||||
def test_session(self):
|
def test_session(self):
|
||||||
client_context, server_context, hostname = testing_context()
|
client_context, server_context, hostname = testing_context()
|
||||||
|
# TODO: sessions aren't compatible with TLSv1.3 yet
|
||||||
|
client_context.options |= ssl.OP_NO_TLSv1_3
|
||||||
|
|
||||||
# first connection without session
|
# first connection without session
|
||||||
stats = server_params_test(client_context, server_context,
|
stats = server_params_test(client_context, server_context,
|
||||||
|
|
@ -3881,7 +3910,7 @@ def test_session_handling(self):
|
||||||
client_context, server_context, hostname = testing_context()
|
client_context, server_context, hostname = testing_context()
|
||||||
client_context2, _, _ = testing_context()
|
client_context2, _, _ = testing_context()
|
||||||
|
|
||||||
# TODO: session reuse does not work with TLS 1.3
|
# TODO: session reuse does not work with TLSv1.3
|
||||||
client_context.options |= ssl.OP_NO_TLSv1_3
|
client_context.options |= ssl.OP_NO_TLSv1_3
|
||||||
client_context2.options |= ssl.OP_NO_TLSv1_3
|
client_context2.options |= ssl.OP_NO_TLSv1_3
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
Add OP_ENABLE_MIDDLEBOX_COMPAT and test workaround for TLSv1.3 for future
|
||||||
|
compatibility with OpenSSL 1.1.1.
|
||||||
|
|
@ -5681,6 +5681,10 @@ PyInit__ssl(void)
|
||||||
PyModule_AddIntConstant(m, "OP_NO_COMPRESSION",
|
PyModule_AddIntConstant(m, "OP_NO_COMPRESSION",
|
||||||
SSL_OP_NO_COMPRESSION);
|
SSL_OP_NO_COMPRESSION);
|
||||||
#endif
|
#endif
|
||||||
|
#ifdef SSL_OP_ENABLE_MIDDLEBOX_COMPAT
|
||||||
|
PyModule_AddIntConstant(m, "OP_ENABLE_MIDDLEBOX_COMPAT",
|
||||||
|
SSL_OP_ENABLE_MIDDLEBOX_COMPAT);
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef X509_CHECK_FLAG_ALWAYS_CHECK_SUBJECT
|
#ifdef X509_CHECK_FLAG_ALWAYS_CHECK_SUBJECT
|
||||||
PyModule_AddIntConstant(m, "HOSTFLAG_ALWAYS_CHECK_SUBJECT",
|
PyModule_AddIntConstant(m, "HOSTFLAG_ALWAYS_CHECK_SUBJECT",
|
||||||
|
|
|
||||||
|
|
@ -41,24 +41,20 @@
|
||||||
log = logging.getLogger("multissl")
|
log = logging.getLogger("multissl")
|
||||||
|
|
||||||
OPENSSL_OLD_VERSIONS = [
|
OPENSSL_OLD_VERSIONS = [
|
||||||
# "0.9.8zh",
|
"1.0.2",
|
||||||
# "1.0.1u",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
OPENSSL_RECENT_VERSIONS = [
|
OPENSSL_RECENT_VERSIONS = [
|
||||||
"1.0.2",
|
"1.0.2n",
|
||||||
"1.0.2m",
|
"1.1.0g",
|
||||||
"1.1.0g",
|
"1.1.1-pre1",
|
||||||
]
|
]
|
||||||
|
|
||||||
LIBRESSL_OLD_VERSIONS = [
|
LIBRESSL_OLD_VERSIONS = [
|
||||||
# "2.3.10",
|
|
||||||
# "2.4.5",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
LIBRESSL_RECENT_VERSIONS = [
|
LIBRESSL_RECENT_VERSIONS = [
|
||||||
"2.5.3",
|
# "2.6.5",
|
||||||
"2.5.5",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
# store files in ../multissl
|
# store files in ../multissl
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue