mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			283 lines
		
	
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			283 lines
		
	
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Test just the SSL support in the socket module, in a moderately bogus way.
 | |
| 
 | |
| import sys
 | |
| import unittest
 | |
| from test import test_support
 | |
| import socket
 | |
| import errno
 | |
| import threading
 | |
| import subprocess
 | |
| import time
 | |
| import os
 | |
| import urllib
 | |
| import warnings
 | |
| 
 | |
| warnings.filterwarnings(
 | |
|     'ignore',
 | |
|     'socket.ssl.. is deprecated.  Use ssl.wrap_socket.. instead.',
 | |
|     DeprecationWarning)
 | |
| 
 | |
| # Optionally test SSL support, if we have it in the tested platform
 | |
| skip_expected = not hasattr(socket, "ssl")
 | |
| 
 | |
| HOST = test_support.HOST
 | |
| 
 | |
| class ConnectedTests(unittest.TestCase):
 | |
| 
 | |
|     def urlopen(self, host, *args, **kwargs):
 | |
|         # Connecting to remote hosts is flaky.  Make it more robust
 | |
|         # by retrying the connection several times.
 | |
|         for i in range(3):
 | |
|             try:
 | |
|                 return urllib.urlopen(host, *args, **kwargs)
 | |
|             except IOError, e:
 | |
|                 last_exc = e
 | |
|                 continue
 | |
|             except:
 | |
|                 raise
 | |
|         raise last_exc
 | |
| 
 | |
|     def testBasic(self):
 | |
|         socket.RAND_status()
 | |
|         try:
 | |
|             socket.RAND_egd(1)
 | |
|         except TypeError:
 | |
|             pass
 | |
|         else:
 | |
|             print "didn't raise TypeError"
 | |
|         socket.RAND_add("this is a random string", 75.0)
 | |
| 
 | |
|         with test_support.transient_internet():
 | |
|             f = self.urlopen('https://sf.net')
 | |
|         buf = f.read()
 | |
|         f.close()
 | |
| 
 | |
|     def testTimeout(self):
 | |
|         def error_msg(extra_msg):
 | |
|             print >> sys.stderr, """\
 | |
|         WARNING:  an attempt to connect to %r %s, in
 | |
|         testTimeout.  That may be legitimate, but is not the outcome we
 | |
|         hoped for.  If this message is seen often, testTimeout should be
 | |
|         changed to use a more reliable address.""" % (ADDR, extra_msg)
 | |
| 
 | |
|         # A service which issues a welcome banner (without need to write
 | |
|         # anything).
 | |
|         ADDR = "pop.gmail.com", 995
 | |
| 
 | |
|         s = socket.socket()
 | |
|         s.settimeout(30.0)
 | |
|         try:
 | |
|             s.connect(ADDR)
 | |
|         except socket.timeout:
 | |
|             error_msg('timed out')
 | |
|             return
 | |
|         except socket.error, exc:  # In case connection is refused.
 | |
|             if exc.args[0] == errno.ECONNREFUSED:
 | |
|                 error_msg('was refused')
 | |
|                 return
 | |
|             else:
 | |
|                 raise
 | |
| 
 | |
|         ss = socket.ssl(s)
 | |
|         # Read part of return welcome banner twice.
 | |
|         ss.read(1)
 | |
|         ss.read(1)
 | |
|         s.close()
 | |
| 
 | |
| class BasicTests(unittest.TestCase):
 | |
| 
 | |
|     def testRudeShutdown(self):
 | |
|         listener_ready = threading.Event()
 | |
|         listener_gone = threading.Event()
 | |
|         sock = socket.socket()
 | |
|         port = test_support.bind_port(sock)
 | |
| 
 | |
|         # `listener` runs in a thread.  It opens a socket and sits in accept()
 | |
|         # until the main thread connects.  Then it rudely closes the socket,
 | |
|         # and sets Event `listener_gone` to let the main thread know the socket
 | |
|         # is gone.
 | |
|         def listener(s):
 | |
|             s.listen(5)
 | |
|             listener_ready.set()
 | |
|             s.accept()
 | |
|             s = None # reclaim the socket object, which also closes it
 | |
|             listener_gone.set()
 | |
| 
 | |
|         def connector():
 | |
|             listener_ready.wait()
 | |
|             s = socket.socket()
 | |
|             s.connect((HOST, port))
 | |
|             listener_gone.wait()
 | |
|             try:
 | |
|                 ssl_sock = socket.ssl(s)
 | |
|             except socket.sslerror:
 | |
|                 pass
 | |
|             else:
 | |
|                 raise test_support.TestFailed(
 | |
|                       'connecting to closed SSL socket should have failed')
 | |
| 
 | |
|         t = threading.Thread(target=listener, args=(sock,))
 | |
|         t.start()
 | |
|         connector()
 | |
|         t.join()
 | |
| 
 | |
|     def connect(self, s, host_port):
 | |
|         # Connecting to remote hosts is flaky.  Make it more robust
 | |
|         # by retrying the connection several times.
 | |
|         for i in range(3):
 | |
|             try:
 | |
|                 return s.connect(host_port)
 | |
|             except IOError, e:
 | |
|                 last_exc = e
 | |
|                 continue
 | |
|             except:
 | |
|                 raise
 | |
|         raise last_exc
 | |
| 
 | |
|     def test_978833(self):
 | |
|         if not test_support.is_resource_enabled("network"):
 | |
|             return
 | |
|         if test_support.verbose:
 | |
|             print "test_978833 ..."
 | |
| 
 | |
|         import os, httplib, ssl
 | |
|         with test_support.transient_internet():
 | |
|             s = socket.socket(socket.AF_INET)
 | |
|             try:
 | |
|                 self.connect(s, ("svn.python.org", 443))
 | |
|             except IOError:
 | |
|                 print >> sys.stderr, """\
 | |
|         WARNING:  an attempt to connect to svn.python.org:443 failed, in
 | |
|         test_978833.  That may be legitimate, but is not the outcome we
 | |
|         hoped for.  If this message is seen often, test_978833 should be
 | |
|         changed to use a more reliable address."""
 | |
|                 return
 | |
|             fd = s._sock.fileno()
 | |
|             sock = ssl.wrap_socket(s)
 | |
|             s = None
 | |
|             sock.close()
 | |
|             try:
 | |
|                 os.fstat(fd)
 | |
|             except OSError:
 | |
|                 pass
 | |
|             else:
 | |
|                 raise test_support.TestFailed("Failed to close socket")
 | |
| 
 | |
| class OpenSSLTests(unittest.TestCase):
 | |
| 
 | |
|     def testBasic(self):
 | |
|         s = socket.socket()
 | |
|         s.connect((HOST, OpenSSLServer.PORT))
 | |
|         ss = socket.ssl(s)
 | |
|         ss.write("Foo\n")
 | |
|         i = ss.read(4)
 | |
|         self.assertEqual(i, "Foo\n")
 | |
|         s.close()
 | |
| 
 | |
|     def testMethods(self):
 | |
|         # read & write is already tried in the Basic test
 | |
|         # now we'll try to get the server info about certificates
 | |
|         # this came from the certificate I used, one I found in /usr/share/openssl
 | |
|         info = "/C=PT/ST=Queensland/L=Lisboa/O=Neuronio, Lda./OU=Desenvolvimento/CN=brutus.neuronio.pt/emailAddress=sampo@iki.fi"
 | |
| 
 | |
|         s = socket.socket()
 | |
|         s.connect((HOST, OpenSSLServer.PORT))
 | |
|         ss = socket.ssl(s)
 | |
|         cert = ss.server()
 | |
|         self.assertEqual(cert, info)
 | |
|         cert = ss.issuer()
 | |
|         self.assertEqual(cert, info)
 | |
|         s.close()
 | |
| 
 | |
| 
 | |
| class OpenSSLServer(threading.Thread):
 | |
|     PORT = None
 | |
|     def __init__(self):
 | |
|         self.s = None
 | |
|         self.keepServing = True
 | |
|         self._external()
 | |
|         if self.haveServer:
 | |
|             threading.Thread.__init__(self)
 | |
| 
 | |
|     def _external(self):
 | |
|         # let's find the .pem files
 | |
|         curdir = os.path.dirname(__file__) or os.curdir
 | |
|         cert_file = os.path.join(curdir, "ssl_cert.pem")
 | |
|         if not os.access(cert_file, os.F_OK):
 | |
|             raise ValueError("No cert file found! (tried %r)" % cert_file)
 | |
|         key_file = os.path.join(curdir, "ssl_key.pem")
 | |
|         if not os.access(key_file, os.F_OK):
 | |
|             raise ValueError("No key file found! (tried %r)" % key_file)
 | |
| 
 | |
|         try:
 | |
|             # XXX TODO: on Windows, this should make more effort to use the
 | |
|             # openssl.exe that would have been built by the pcbuild.sln.
 | |
|             OpenSSLServer.PORT = test_support.find_unused_port()
 | |
|             args = (OpenSSLServer.PORT, cert_file, key_file)
 | |
|             cmd = "openssl s_server -accept %d -cert %s -key %s -quiet" % args
 | |
|             self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE,
 | |
|                                        stdout=subprocess.PIPE,
 | |
|                                        stderr=subprocess.STDOUT)
 | |
|             time.sleep(1)
 | |
|         except:
 | |
|             self.haveServer = False
 | |
|         else:
 | |
|             # let's try if it is actually up
 | |
|             try:
 | |
|                 s = socket.socket()
 | |
|                 s.connect((HOST, OpenSSLServer.PORT))
 | |
|                 s.close()
 | |
|                 if self.s.stdout.readline() != "ERROR\n":
 | |
|                     raise ValueError
 | |
|             except:
 | |
|                 self.haveServer = False
 | |
|             else:
 | |
|                 self.haveServer = True
 | |
| 
 | |
|     def run(self):
 | |
|         while self.keepServing:
 | |
|             time.sleep(.5)
 | |
|             l = self.s.stdout.readline()
 | |
|             self.s.stdin.write(l)
 | |
| 
 | |
|     def shutdown(self):
 | |
|         self.keepServing = False
 | |
|         if not self.s:
 | |
|             return
 | |
|         if sys.platform == "win32":
 | |
|             subprocess.TerminateProcess(int(self.s._handle), -1)
 | |
|         else:
 | |
|             os.kill(self.s.pid, 15)
 | |
| 
 | |
| def test_main():
 | |
|     if not hasattr(socket, "ssl"):
 | |
|         raise test_support.TestSkipped("socket module has no ssl support")
 | |
| 
 | |
|     tests = [BasicTests]
 | |
| 
 | |
|     if test_support.is_resource_enabled('network'):
 | |
|         tests.append(ConnectedTests)
 | |
| 
 | |
|     # in these platforms we can kill the openssl process
 | |
|     if sys.platform in ("sunos5", "darwin", "linux1",
 | |
|                         "linux2", "win32", "hp-ux11"):
 | |
| 
 | |
|         server = OpenSSLServer()
 | |
|         if server.haveServer:
 | |
|             tests.append(OpenSSLTests)
 | |
|             server.start()
 | |
|     else:
 | |
|         server = None
 | |
| 
 | |
|     thread_info = test_support.threading_setup()
 | |
| 
 | |
|     try:
 | |
|         test_support.run_unittest(*tests)
 | |
|     finally:
 | |
|         if server is not None and server.haveServer:
 | |
|             server.shutdown()
 | |
| 
 | |
|     test_support.threading_cleanup(*thread_info)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     test_main()
 | 
