| 
									
										
										
										
											2007-06-07 13:34:10 +00:00
										 |  |  | #!/usr/bin/env python | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import threading | 
					
						
							|  |  |  | import urlparse | 
					
						
							|  |  |  | import urllib2 | 
					
						
							|  |  |  | import BaseHTTPServer | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | import hashlib | 
					
						
							|  |  |  | from test import test_support | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Loopback http server infrastructure | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class LoopbackHttpServer(BaseHTTPServer.HTTPServer): | 
					
						
							|  |  |  |     """HTTP server w/ a few modifications that make it useful for
 | 
					
						
							|  |  |  |     loopback testing purposes. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, server_address, RequestHandlerClass): | 
					
						
							|  |  |  |         BaseHTTPServer.HTTPServer.__init__(self, | 
					
						
							|  |  |  |                                            server_address, | 
					
						
							|  |  |  |                                            RequestHandlerClass) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Set the timeout of our listening socket really low so | 
					
						
							|  |  |  |         # that we can stop the server easily. | 
					
						
							|  |  |  |         self.socket.settimeout(1.0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_request(self): | 
					
						
							|  |  |  |         """BaseHTTPServer method, overridden.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         request, client_address = self.socket.accept() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # It's a loopback connection, so setting the timeout | 
					
						
							|  |  |  |         # really low shouldn't affect anything, but should make | 
					
						
							|  |  |  |         # deadlocks less likely to occur. | 
					
						
							|  |  |  |         request.settimeout(10.0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return (request, client_address) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class LoopbackHttpServerThread(threading.Thread): | 
					
						
							|  |  |  |     """Stoppable thread that runs a loopback http server.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, port, RequestHandlerClass): | 
					
						
							|  |  |  |         threading.Thread.__init__(self) | 
					
						
							|  |  |  |         self._RequestHandlerClass = RequestHandlerClass | 
					
						
							|  |  |  |         self._stop = False | 
					
						
							|  |  |  |         self._port = port | 
					
						
							|  |  |  |         self._server_address = ('127.0.0.1', self._port) | 
					
						
							|  |  |  |         self.ready = threading.Event() | 
					
						
							| 
									
										
										
										
											2007-06-11 04:52:37 +00:00
										 |  |  |         self.error = None | 
					
						
							| 
									
										
										
										
											2007-06-07 13:34:10 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def stop(self): | 
					
						
							|  |  |  |         """Stops the webserver if it's currently running.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Set the stop flag. | 
					
						
							|  |  |  |         self._stop = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.join() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run(self): | 
					
						
							|  |  |  |         protocol = "HTTP/1.0" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2007-06-11 04:52:37 +00:00
										 |  |  |         try: | 
					
						
							|  |  |  |             self._RequestHandlerClass.protocol_version = protocol | 
					
						
							|  |  |  |             httpd = LoopbackHttpServer(self._server_address, | 
					
						
							|  |  |  |                                        self._RequestHandlerClass) | 
					
						
							| 
									
										
										
										
											2007-06-07 13:34:10 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2007-06-11 04:52:37 +00:00
										 |  |  |             sa = httpd.socket.getsockname() | 
					
						
							|  |  |  |             #print "Serving HTTP on", sa[0], "port", sa[1], "..." | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             # Fail "gracefully" if we are unable to start. | 
					
						
							|  |  |  |             self.ready.set() | 
					
						
							|  |  |  |             self.error = sys.exc_info()[1] | 
					
						
							|  |  |  |             raise | 
					
						
							| 
									
										
										
										
											2007-06-07 13:34:10 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         self.ready.set() | 
					
						
							|  |  |  |         while not self._stop: | 
					
						
							|  |  |  |             httpd.handle_request() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Authentication infrastructure | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class DigestAuthHandler: | 
					
						
							|  |  |  |     """Handler for performing digest authentication.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         self._request_num = 0 | 
					
						
							|  |  |  |         self._nonces = [] | 
					
						
							|  |  |  |         self._users = {} | 
					
						
							|  |  |  |         self._realm_name = "Test Realm" | 
					
						
							|  |  |  |         self._qop = "auth" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def set_qop(self, qop): | 
					
						
							|  |  |  |         self._qop = qop | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def set_users(self, users): | 
					
						
							|  |  |  |         assert isinstance(users, dict) | 
					
						
							|  |  |  |         self._users = users | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def set_realm(self, realm): | 
					
						
							|  |  |  |         self._realm_name = realm | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _generate_nonce(self): | 
					
						
							|  |  |  |         self._request_num += 1 | 
					
						
							|  |  |  |         nonce = hashlib.md5(str(self._request_num)).hexdigest() | 
					
						
							|  |  |  |         self._nonces.append(nonce) | 
					
						
							|  |  |  |         return nonce | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _create_auth_dict(self, auth_str): | 
					
						
							|  |  |  |         first_space_index = auth_str.find(" ") | 
					
						
							|  |  |  |         auth_str = auth_str[first_space_index+1:] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         parts = auth_str.split(",") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         auth_dict = {} | 
					
						
							|  |  |  |         for part in parts: | 
					
						
							|  |  |  |             name, value = part.split("=") | 
					
						
							|  |  |  |             name = name.strip() | 
					
						
							|  |  |  |             if value[0] == '"' and value[-1] == '"': | 
					
						
							|  |  |  |                 value = value[1:-1] | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 value = value.strip() | 
					
						
							|  |  |  |             auth_dict[name] = value | 
					
						
							|  |  |  |         return auth_dict | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _validate_auth(self, auth_dict, password, method, uri): | 
					
						
							|  |  |  |         final_dict = {} | 
					
						
							|  |  |  |         final_dict.update(auth_dict) | 
					
						
							|  |  |  |         final_dict["password"] = password | 
					
						
							|  |  |  |         final_dict["method"] = method | 
					
						
							|  |  |  |         final_dict["uri"] = uri | 
					
						
							|  |  |  |         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict | 
					
						
							|  |  |  |         HA1 = hashlib.md5(HA1_str).hexdigest() | 
					
						
							|  |  |  |         HA2_str = "%(method)s:%(uri)s" % final_dict | 
					
						
							|  |  |  |         HA2 = hashlib.md5(HA2_str).hexdigest() | 
					
						
							|  |  |  |         final_dict["HA1"] = HA1 | 
					
						
							|  |  |  |         final_dict["HA2"] = HA2 | 
					
						
							|  |  |  |         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ | 
					
						
							|  |  |  |                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict | 
					
						
							|  |  |  |         response = hashlib.md5(response_str).hexdigest() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return response == auth_dict["response"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _return_auth_challenge(self, request_handler): | 
					
						
							|  |  |  |         request_handler.send_response(407, "Proxy Authentication Required") | 
					
						
							|  |  |  |         request_handler.send_header("Content-Type", "text/html") | 
					
						
							|  |  |  |         request_handler.send_header( | 
					
						
							|  |  |  |             'Proxy-Authenticate', 'Digest realm="%s", ' | 
					
						
							|  |  |  |             'qop="%s",' | 
					
						
							|  |  |  |             'nonce="%s", ' % \ | 
					
						
							|  |  |  |             (self._realm_name, self._qop, self._generate_nonce())) | 
					
						
							|  |  |  |         # XXX: Not sure if we're supposed to add this next header or | 
					
						
							|  |  |  |         # not. | 
					
						
							|  |  |  |         #request_handler.send_header('Connection', 'close') | 
					
						
							|  |  |  |         request_handler.end_headers() | 
					
						
							|  |  |  |         request_handler.wfile.write("Proxy Authentication Required.") | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def handle_request(self, request_handler): | 
					
						
							|  |  |  |         """Performs digest authentication on the given HTTP request
 | 
					
						
							|  |  |  |         handler.  Returns True if authentication was successful, False | 
					
						
							|  |  |  |         otherwise. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         If no users have been set, then digest auth is effectively | 
					
						
							|  |  |  |         disabled and this method will always return True. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if len(self._users) == 0: | 
					
						
							|  |  |  |             return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if not request_handler.headers.has_key('Proxy-Authorization'): | 
					
						
							|  |  |  |             return self._return_auth_challenge(request_handler) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             auth_dict = self._create_auth_dict( | 
					
						
							|  |  |  |                 request_handler.headers['Proxy-Authorization'] | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             if self._users.has_key(auth_dict["username"]): | 
					
						
							|  |  |  |                 password = self._users[ auth_dict["username"] ] | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 return self._return_auth_challenge(request_handler) | 
					
						
							|  |  |  |             if not auth_dict.get("nonce") in self._nonces: | 
					
						
							|  |  |  |                 return self._return_auth_challenge(request_handler) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 self._nonces.remove(auth_dict["nonce"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             auth_validated = False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # MSIE uses short_path in its validation, but Python's | 
					
						
							|  |  |  |             # urllib2 uses the full path, so we're going to see if | 
					
						
							|  |  |  |             # either of them works here. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             for path in [request_handler.path, request_handler.short_path]: | 
					
						
							|  |  |  |                 if self._validate_auth(auth_dict, | 
					
						
							|  |  |  |                                        password, | 
					
						
							|  |  |  |                                        request_handler.command, | 
					
						
							|  |  |  |                                        path): | 
					
						
							|  |  |  |                     auth_validated = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if not auth_validated: | 
					
						
							|  |  |  |                 return self._return_auth_challenge(request_handler) | 
					
						
							|  |  |  |             return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Proxy test infrastructure | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): | 
					
						
							|  |  |  |     """This is a 'fake proxy' that makes it look like the entire
 | 
					
						
							|  |  |  |     internet has gone down due to a sudden zombie invasion.  It main | 
					
						
							|  |  |  |     utility is in providing us with authentication support for | 
					
						
							|  |  |  |     testing. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     digest_auth_handler = DigestAuthHandler() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def log_message(self, format, *args): | 
					
						
							|  |  |  |         # Uncomment the next line for debugging. | 
					
						
							|  |  |  |         #sys.stderr.write(format % args) | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def do_GET(self): | 
					
						
							|  |  |  |         (scm, netloc, path, params, query, fragment) = urlparse.urlparse( | 
					
						
							|  |  |  |             self.path, 'http') | 
					
						
							|  |  |  |         self.short_path = path | 
					
						
							|  |  |  |         if self.digest_auth_handler.handle_request(self): | 
					
						
							|  |  |  |             self.send_response(200, "OK") | 
					
						
							|  |  |  |             self.send_header("Content-Type", "text/html") | 
					
						
							|  |  |  |             self.end_headers() | 
					
						
							|  |  |  |             self.wfile.write("You've reached %s!<BR>" % self.path) | 
					
						
							|  |  |  |             self.wfile.write("Our apologies, but our server is down due to " | 
					
						
							|  |  |  |                               "a sudden zombie invasion.") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Test cases | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class ProxyAuthTests(unittest.TestCase): | 
					
						
							|  |  |  |     URL = "http://www.foo.com" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     PORT = 8080 | 
					
						
							|  |  |  |     USER = "tester" | 
					
						
							|  |  |  |     PASSWD = "test123" | 
					
						
							|  |  |  |     REALM = "TestRealm" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     PROXY_URL = "http://127.0.0.1:%d" % PORT | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         FakeProxyHandler.digest_auth_handler.set_users({ | 
					
						
							|  |  |  |             self.USER : self.PASSWD | 
					
						
							|  |  |  |             }) | 
					
						
							|  |  |  |         FakeProxyHandler.digest_auth_handler.set_realm(self.REALM) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler) | 
					
						
							|  |  |  |         self.server.start() | 
					
						
							|  |  |  |         self.server.ready.wait() | 
					
						
							| 
									
										
										
										
											2007-06-11 04:52:37 +00:00
										 |  |  |         if self.server.error: | 
					
						
							|  |  |  |             raise self.server.error | 
					
						
							| 
									
										
										
										
											2007-06-07 13:34:10 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         handler = urllib2.ProxyHandler({"http" : self.PROXY_URL}) | 
					
						
							|  |  |  |         self._digest_auth_handler = urllib2.ProxyDigestAuthHandler() | 
					
						
							|  |  |  |         self.opener = urllib2.build_opener(handler, self._digest_auth_handler) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         self.server.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_proxy_with_bad_password_raises_httperror(self): | 
					
						
							|  |  |  |         self._digest_auth_handler.add_password(self.REALM, self.URL, | 
					
						
							|  |  |  |                                                self.USER, self.PASSWD+"bad") | 
					
						
							|  |  |  |         FakeProxyHandler.digest_auth_handler.set_qop("auth") | 
					
						
							|  |  |  |         self.assertRaises(urllib2.HTTPError, | 
					
						
							|  |  |  |                           self.opener.open, | 
					
						
							|  |  |  |                           self.URL) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_proxy_with_no_password_raises_httperror(self): | 
					
						
							|  |  |  |         FakeProxyHandler.digest_auth_handler.set_qop("auth") | 
					
						
							|  |  |  |         self.assertRaises(urllib2.HTTPError, | 
					
						
							|  |  |  |                           self.opener.open, | 
					
						
							|  |  |  |                           self.URL) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_proxy_qop_auth_works(self): | 
					
						
							|  |  |  |         self._digest_auth_handler.add_password(self.REALM, self.URL, | 
					
						
							|  |  |  |                                                self.USER, self.PASSWD) | 
					
						
							|  |  |  |         FakeProxyHandler.digest_auth_handler.set_qop("auth") | 
					
						
							|  |  |  |         result = self.opener.open(self.URL) | 
					
						
							|  |  |  |         while result.read(): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  |         result.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_proxy_qop_auth_int_works_or_throws_urlerror(self): | 
					
						
							|  |  |  |         self._digest_auth_handler.add_password(self.REALM, self.URL, | 
					
						
							|  |  |  |                                                self.USER, self.PASSWD) | 
					
						
							|  |  |  |         FakeProxyHandler.digest_auth_handler.set_qop("auth-int") | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             result = self.opener.open(self.URL) | 
					
						
							|  |  |  |         except urllib2.URLError: | 
					
						
							|  |  |  |             # It's okay if we don't support auth-int, but we certainly | 
					
						
							|  |  |  |             # shouldn't receive any kind of exception here other than | 
					
						
							|  |  |  |             # a URLError. | 
					
						
							|  |  |  |             result = None | 
					
						
							|  |  |  |         if result: | 
					
						
							|  |  |  |             while result.read(): | 
					
						
							|  |  |  |                 pass | 
					
						
							|  |  |  |             result.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_main(): | 
					
						
							|  |  |  |     # We will NOT depend on the network resource flag | 
					
						
							|  |  |  |     # (Lib/test/regrtest.py -u network) since all tests here are only | 
					
						
							|  |  |  |     # localhost.  However, if this is a bad rationale, then uncomment | 
					
						
							|  |  |  |     # the next line. | 
					
						
							|  |  |  |     #test_support.requires("network") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     test_support.run_unittest(ProxyAuthTests) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     test_main() |