| 
									
										
										
										
											2001-04-13 14:57:44 +00:00
										 |  |  | import httplib | 
					
						
							|  |  |  | import StringIO | 
					
						
							| 
									
										
										
										
											2003-07-08 12:36:58 +00:00
										 |  |  | import sys | 
					
						
							| 
									
										
										
										
											2007-03-23 18:54:07 +00:00
										 |  |  | import socket | 
					
						
							| 
									
										
										
										
											2003-07-08 12:36:58 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2004-08-07 16:28:14 +00:00
										 |  |  | from unittest import TestCase | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from test import test_support | 
					
						
							| 
									
										
										
										
											2001-04-13 14:57:44 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | class FakeSocket: | 
					
						
							| 
									
										
										
										
											2003-07-08 12:36:58 +00:00
										 |  |  |     def __init__(self, text, fileclass=StringIO.StringIO): | 
					
						
							| 
									
										
										
										
											2001-04-13 14:57:44 +00:00
										 |  |  |         self.text = text | 
					
						
							| 
									
										
										
										
											2003-07-08 12:36:58 +00:00
										 |  |  |         self.fileclass = fileclass | 
					
						
							| 
									
										
										
										
											2006-11-12 10:32:47 +00:00
										 |  |  |         self.data = '' | 
					
						
							| 
									
										
										
										
											2001-04-13 14:57:44 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2004-08-07 16:28:14 +00:00
										 |  |  |     def sendall(self, data): | 
					
						
							| 
									
										
										
										
											2006-11-12 10:32:47 +00:00
										 |  |  |         self.data += data | 
					
						
							| 
									
										
										
										
											2004-08-07 16:28:14 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2001-04-13 14:57:44 +00:00
										 |  |  |     def makefile(self, mode, bufsize=None): | 
					
						
							|  |  |  |         if mode != 'r' and mode != 'rb': | 
					
						
							| 
									
										
										
										
											2002-04-01 19:00:50 +00:00
										 |  |  |             raise httplib.UnimplementedFileMode() | 
					
						
							| 
									
										
										
										
											2003-07-08 12:36:58 +00:00
										 |  |  |         return self.fileclass(self.text) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class NoEOFStringIO(StringIO.StringIO): | 
					
						
							|  |  |  |     """Like StringIO, but raises AssertionError on EOF.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This is used below to test that httplib doesn't try to read | 
					
						
							|  |  |  |     more from the underlying file than it should. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     def read(self, n=-1): | 
					
						
							|  |  |  |         data = StringIO.StringIO.read(self, n) | 
					
						
							|  |  |  |         if data == '': | 
					
						
							|  |  |  |             raise AssertionError('caller tried to read past EOF') | 
					
						
							|  |  |  |         return data | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def readline(self, length=None): | 
					
						
							|  |  |  |         data = StringIO.StringIO.readline(self, length) | 
					
						
							|  |  |  |         if data == '': | 
					
						
							|  |  |  |             raise AssertionError('caller tried to read past EOF') | 
					
						
							|  |  |  |         return data | 
					
						
							| 
									
										
										
										
											2001-04-13 14:57:44 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2004-08-07 16:28:14 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | class HeaderTests(TestCase): | 
					
						
							|  |  |  |     def test_auto_headers(self): | 
					
						
							|  |  |  |         # Some headers are added automatically, but should not be added by | 
					
						
							|  |  |  |         # .request() if they are explicitly set. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         import httplib | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class HeaderCountingBuffer(list): | 
					
						
							|  |  |  |             def __init__(self): | 
					
						
							|  |  |  |                 self.count = {} | 
					
						
							|  |  |  |             def append(self, item): | 
					
						
							|  |  |  |                 kv = item.split(':') | 
					
						
							|  |  |  |                 if len(kv) > 1: | 
					
						
							|  |  |  |                     # item is a 'Key: Value' header string | 
					
						
							|  |  |  |                     lcKey = kv[0].lower() | 
					
						
							|  |  |  |                     self.count.setdefault(lcKey, 0) | 
					
						
							|  |  |  |                     self.count[lcKey] += 1 | 
					
						
							|  |  |  |                 list.append(self, item) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for explicit_header in True, False: | 
					
						
							|  |  |  |             for header in 'Content-length', 'Host', 'Accept-encoding': | 
					
						
							|  |  |  |                 conn = httplib.HTTPConnection('example.com') | 
					
						
							|  |  |  |                 conn.sock = FakeSocket('blahblahblah') | 
					
						
							|  |  |  |                 conn._buffer = HeaderCountingBuffer() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 body = 'spamspamspam' | 
					
						
							|  |  |  |                 headers = {} | 
					
						
							|  |  |  |                 if explicit_header: | 
					
						
							|  |  |  |                     headers[header] = str(len(body)) | 
					
						
							|  |  |  |                 conn.request('POST', '/', body, headers) | 
					
						
							|  |  |  |                 self.assertEqual(conn._buffer.count[header.lower()], 1) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2006-10-29 20:24:01 +00:00
										 |  |  | class BasicTest(TestCase): | 
					
						
							|  |  |  |     def test_status_lines(self): | 
					
						
							|  |  |  |         # Test HTTP status lines | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         body = "HTTP/1.1 200 Ok\r\n\r\nText" | 
					
						
							|  |  |  |         sock = FakeSocket(body) | 
					
						
							|  |  |  |         resp = httplib.HTTPResponse(sock) | 
					
						
							| 
									
										
										
										
											2003-01-23 18:02:20 +00:00
										 |  |  |         resp.begin() | 
					
						
							| 
									
										
										
										
											2006-10-29 20:24:01 +00:00
										 |  |  |         self.assertEqual(resp.read(), 'Text') | 
					
						
							|  |  |  |         resp.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" | 
					
						
							|  |  |  |         sock = FakeSocket(body) | 
					
						
							|  |  |  |         resp = httplib.HTTPResponse(sock) | 
					
						
							|  |  |  |         self.assertRaises(httplib.BadStatusLine, resp.begin) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_host_port(self): | 
					
						
							|  |  |  |         # Check invalid host_port | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for hp in ("www.python.org:abc", "www.python.org:"): | 
					
						
							|  |  |  |             self.assertRaises(httplib.InvalidURL, httplib.HTTP, hp) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b", 8000), | 
					
						
							|  |  |  |                          ("www.python.org:80", "www.python.org", 80), | 
					
						
							|  |  |  |                          ("www.python.org", "www.python.org", 80), | 
					
						
							|  |  |  |                          ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)): | 
					
						
							| 
									
										
										
										
											2004-09-14 21:45:36 +00:00
										 |  |  |             http = httplib.HTTP(hp) | 
					
						
							| 
									
										
										
										
											2006-10-29 20:24:01 +00:00
										 |  |  |             c = http._conn | 
					
						
							|  |  |  |             if h != c.host: self.fail("Host incorrectly parsed: %s != %s" % (h, c.host)) | 
					
						
							|  |  |  |             if p != c.port: self.fail("Port incorrectly parsed: %s != %s" % (p, c.host)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_response_headers(self): | 
					
						
							|  |  |  |         # test response with multiple message headers with the same field name. | 
					
						
							|  |  |  |         text = ('HTTP/1.1 200 OK\r\n' | 
					
						
							|  |  |  |                 'Set-Cookie: Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"\r\n' | 
					
						
							|  |  |  |                 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' | 
					
						
							|  |  |  |                 ' Path="/acme"\r\n' | 
					
						
							|  |  |  |                 '\r\n' | 
					
						
							|  |  |  |                 'No body\r\n') | 
					
						
							|  |  |  |         hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' | 
					
						
							|  |  |  |                ', ' | 
					
						
							|  |  |  |                'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') | 
					
						
							|  |  |  |         s = FakeSocket(text) | 
					
						
							|  |  |  |         r = httplib.HTTPResponse(s) | 
					
						
							|  |  |  |         r.begin() | 
					
						
							|  |  |  |         cookies = r.getheader("Set-Cookie") | 
					
						
							|  |  |  |         if cookies != hdr: | 
					
						
							|  |  |  |             self.fail("multiple headers not combined properly") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_read_head(self): | 
					
						
							|  |  |  |         # Test that the library doesn't attempt to read any data | 
					
						
							|  |  |  |         # from a HEAD request.  (Tickles SF bug #622042.) | 
					
						
							|  |  |  |         sock = FakeSocket( | 
					
						
							|  |  |  |             'HTTP/1.1 200 OK\r\n' | 
					
						
							|  |  |  |             'Content-Length: 14432\r\n' | 
					
						
							|  |  |  |             '\r\n', | 
					
						
							|  |  |  |             NoEOFStringIO) | 
					
						
							|  |  |  |         resp = httplib.HTTPResponse(sock, method="HEAD") | 
					
						
							|  |  |  |         resp.begin() | 
					
						
							|  |  |  |         if resp.read() != "": | 
					
						
							|  |  |  |             self.fail("Did not expect response from HEAD request") | 
					
						
							|  |  |  |         resp.close() | 
					
						
							| 
									
										
										
										
											2003-05-05 16:13:58 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2006-11-12 10:32:47 +00:00
										 |  |  |     def test_send_file(self): | 
					
						
							|  |  |  |         expected = 'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ | 
					
						
							|  |  |  |                    'Accept-Encoding: identity\r\nContent-Length:' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         body = open(__file__, 'rb') | 
					
						
							|  |  |  |         conn = httplib.HTTPConnection('example.com') | 
					
						
							|  |  |  |         sock = FakeSocket(body) | 
					
						
							|  |  |  |         conn.sock = sock | 
					
						
							|  |  |  |         conn.request('GET', '/foo', body) | 
					
						
							|  |  |  |         self.assertTrue(sock.data.startswith(expected)) | 
					
						
							| 
									
										
										
										
											2004-08-07 16:28:14 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2006-02-17 22:01:08 +00:00
										 |  |  | class OfflineTest(TestCase): | 
					
						
							|  |  |  |     def test_responses(self): | 
					
						
							|  |  |  |         self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2007-03-23 18:54:07 +00:00
										 |  |  | PORT = 50003 | 
					
						
							|  |  |  | HOST = "localhost" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TimeoutTest(TestCase): | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
					
						
							|  |  |  |         self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | 
					
						
							|  |  |  |         global PORT | 
					
						
							|  |  |  |         PORT = test_support.bind_port(self.serv, HOST, PORT) | 
					
						
							|  |  |  |         self.serv.listen(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         self.serv.close() | 
					
						
							|  |  |  |         self.serv = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testTimeoutAttribute(self): | 
					
						
							|  |  |  |         '''This will prove that the timeout gets through
 | 
					
						
							|  |  |  |         HTTPConnection and into the socket. | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         # default | 
					
						
							|  |  |  |         httpConn = httplib.HTTPConnection(HOST, PORT) | 
					
						
							|  |  |  |         httpConn.connect() | 
					
						
							|  |  |  |         self.assertTrue(httpConn.sock.gettimeout() is None) | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |         # a value | 
					
						
							|  |  |  |         httpConn = httplib.HTTPConnection(HOST, PORT, timeout=10) | 
					
						
							|  |  |  |         httpConn.connect() | 
					
						
							|  |  |  |         self.assertEqual(httpConn.sock.gettimeout(), 10) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # None, having other default | 
					
						
							|  |  |  |         previous = socket.getdefaulttimeout() | 
					
						
							|  |  |  |         socket.setdefaulttimeout(10) | 
					
						
							|  |  |  |         httpConn = httplib.HTTPConnection(HOST, PORT, timeout=None) | 
					
						
							|  |  |  |         httpConn.connect() | 
					
						
							|  |  |  |         socket.setdefaulttimeout(previous) | 
					
						
							|  |  |  |         self.assertEqual(httpConn.sock.gettimeout(), 10) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2004-08-07 16:28:14 +00:00
										 |  |  | def test_main(verbose=None): | 
					
						
							| 
									
										
										
										
											2007-03-23 18:54:07 +00:00
										 |  |  |     test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest) | 
					
						
							| 
									
										
										
										
											2004-08-07 16:28:14 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2006-10-29 20:24:01 +00:00
										 |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     test_main() |