mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 03:04:41 +00:00 
			
		
		
		
	 2682638ee1
			
		
	
	
		2682638ee1
		
	
	
	
	
		
			
			svn+ssh://pythondev@svn.python.org/python/branches/py3k
................
  r80587 | senthil.kumaran | 2010-04-28 23:09:48 +0530 (Wed, 28 Apr 2010) | 9 lines
  Merged revisions 80583 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk
  ........
    r80583 | senthil.kumaran | 2010-04-28 22:50:43 +0530 (Wed, 28 Apr 2010) | 3 lines
    Fixed Issue6312 - httplib fails with HEAD requests to pages with "transfer-encoding: chunked"
  ........
................
		
	
			
		
			
				
	
	
		
			413 lines
		
	
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			413 lines
		
	
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import errno
 | |
| from http import client
 | |
| import io
 | |
| import array
 | |
| import socket
 | |
| 
 | |
| from unittest import TestCase
 | |
| 
 | |
| from test import support
 | |
| 
 | |
| HOST = support.HOST
 | |
| 
 | |
| class FakeSocket:
 | |
|     def __init__(self, text, fileclass=io.BytesIO):
 | |
|         if isinstance(text, str):
 | |
|             text = text.encode("ascii")
 | |
|         self.text = text
 | |
|         self.fileclass = fileclass
 | |
|         self.data = b''
 | |
| 
 | |
|     def sendall(self, data):
 | |
|         self.data += data
 | |
| 
 | |
|     def makefile(self, mode, bufsize=None):
 | |
|         if mode != 'r' and mode != 'rb':
 | |
|             raise client.UnimplementedFileMode()
 | |
|         return self.fileclass(self.text)
 | |
| 
 | |
| class EPipeSocket(FakeSocket):
 | |
| 
 | |
|     def __init__(self, text, pipe_trigger):
 | |
|         # When sendall() is called with pipe_trigger, raise EPIPE.
 | |
|         FakeSocket.__init__(self, text)
 | |
|         self.pipe_trigger = pipe_trigger
 | |
| 
 | |
|     def sendall(self, data):
 | |
|         if self.pipe_trigger in data:
 | |
|             raise socket.error(errno.EPIPE, "gotcha")
 | |
|         self.data += data
 | |
| 
 | |
|     def close(self):
 | |
|         pass
 | |
| 
 | |
| class NoEOFStringIO(io.BytesIO):
 | |
|     """Like StringIO, but raises AssertionError on EOF.
 | |
| 
 | |
|     This is used below to test that http.client doesn't try to read
 | |
|     more from the underlying file than it should.
 | |
|     """
 | |
|     def read(self, n=-1):
 | |
|         data = io.BytesIO.read(self, n)
 | |
|         if data == b'':
 | |
|             raise AssertionError('caller tried to read past EOF')
 | |
|         return data
 | |
| 
 | |
|     def readline(self, length=None):
 | |
|         data = io.BytesIO.readline(self, length)
 | |
|         if data == b'':
 | |
|             raise AssertionError('caller tried to read past EOF')
 | |
|         return data
 | |
| 
 | |
| class HeaderTests(TestCase):
 | |
|     def test_auto_headers(self):
 | |
|         # Some headers are added automatically, but should not be added by
 | |
|         # .request() if they are explicitly set.
 | |
| 
 | |
|         class HeaderCountingBuffer(list):
 | |
|             def __init__(self):
 | |
|                 self.count = {}
 | |
|             def append(self, item):
 | |
|                 kv = item.split(b':')
 | |
|                 if len(kv) > 1:
 | |
|                     # item is a 'Key: Value' header string
 | |
|                     lcKey = kv[0].decode('ascii').lower()
 | |
|                     self.count.setdefault(lcKey, 0)
 | |
|                     self.count[lcKey] += 1
 | |
|                 list.append(self, item)
 | |
| 
 | |
|         for explicit_header in True, False:
 | |
|             for header in 'Content-length', 'Host', 'Accept-encoding':
 | |
|                 conn = client.HTTPConnection('example.com')
 | |
|                 conn.sock = FakeSocket('blahblahblah')
 | |
|                 conn._buffer = HeaderCountingBuffer()
 | |
| 
 | |
|                 body = 'spamspamspam'
 | |
|                 headers = {}
 | |
|                 if explicit_header:
 | |
|                     headers[header] = str(len(body))
 | |
|                 conn.request('POST', '/', body, headers)
 | |
|                 self.assertEqual(conn._buffer.count[header.lower()], 1)
 | |
| 
 | |
| class BasicTest(TestCase):
 | |
|     def test_status_lines(self):
 | |
|         # Test HTTP status lines
 | |
| 
 | |
|         body = "HTTP/1.1 200 Ok\r\n\r\nText"
 | |
|         sock = FakeSocket(body)
 | |
|         resp = client.HTTPResponse(sock)
 | |
|         resp.begin()
 | |
|         self.assertEqual(resp.read(), b"Text")
 | |
|         self.assertTrue(resp.isclosed())
 | |
| 
 | |
|         body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
 | |
|         sock = FakeSocket(body)
 | |
|         resp = client.HTTPResponse(sock)
 | |
|         self.assertRaises(client.BadStatusLine, resp.begin)
 | |
| 
 | |
|     def test_partial_reads(self):
 | |
|         # if we have a lenght, the system knows when to close itself
 | |
|         # same behaviour than when we read the whole thing with read()
 | |
|         body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
 | |
|         sock = FakeSocket(body)
 | |
|         resp = client.HTTPResponse(sock)
 | |
|         resp.begin()
 | |
|         self.assertEqual(resp.read(2), b'Te')
 | |
|         self.assertFalse(resp.isclosed())
 | |
|         self.assertEqual(resp.read(2), b'xt')
 | |
|         self.assertTrue(resp.isclosed())
 | |
| 
 | |
|     def test_host_port(self):
 | |
|         # Check invalid host_port
 | |
| 
 | |
|         for hp in ("www.python.org:abc", "www.python.org:"):
 | |
|             self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
 | |
| 
 | |
|         for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
 | |
|                           "fe80::207:e9ff:fe9b", 8000),
 | |
|                          ("www.python.org:80", "www.python.org", 80),
 | |
|                          ("www.python.org", "www.python.org", 80),
 | |
|                          ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
 | |
|             c = client.HTTPConnection(hp)
 | |
|             self.assertEqual(h, c.host)
 | |
|             self.assertEqual(p, c.port)
 | |
| 
 | |
|     def test_response_headers(self):
 | |
|         # test response with multiple message headers with the same field name.
 | |
|         text = ('HTTP/1.1 200 OK\r\n'
 | |
|                 'Set-Cookie: Customer="WILE_E_COYOTE"; '
 | |
|                 'Version="1"; Path="/acme"\r\n'
 | |
|                 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
 | |
|                 ' Path="/acme"\r\n'
 | |
|                 '\r\n'
 | |
|                 'No body\r\n')
 | |
|         hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
 | |
|                ', '
 | |
|                'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
 | |
|         s = FakeSocket(text)
 | |
|         r = client.HTTPResponse(s)
 | |
|         r.begin()
 | |
|         cookies = r.getheader("Set-Cookie")
 | |
|         self.assertEqual(cookies, hdr)
 | |
| 
 | |
|     def test_read_head(self):
 | |
|         # Test that the library doesn't attempt to read any data
 | |
|         # from a HEAD request.  (Tickles SF bug #622042.)
 | |
|         sock = FakeSocket(
 | |
|             'HTTP/1.1 200 OK\r\n'
 | |
|             'Content-Length: 14432\r\n'
 | |
|             '\r\n',
 | |
|             NoEOFStringIO)
 | |
|         resp = client.HTTPResponse(sock, method="HEAD")
 | |
|         resp.begin()
 | |
|         if resp.read():
 | |
|             self.fail("Did not expect response from HEAD request")
 | |
| 
 | |
|     def test_send_file(self):
 | |
|         expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
 | |
|                     b'Accept-Encoding: identity\r\nContent-Length:')
 | |
| 
 | |
|         body = open(__file__, 'rb')
 | |
|         conn = client.HTTPConnection('example.com')
 | |
|         sock = FakeSocket(body)
 | |
|         conn.sock = sock
 | |
|         conn.request('GET', '/foo', body)
 | |
|         self.assertTrue(sock.data.startswith(expected), '%r != %r' %
 | |
|                 (sock.data[:len(expected)], expected))
 | |
| 
 | |
|     def test_send(self):
 | |
|         expected = b'this is a test this is only a test'
 | |
|         conn = client.HTTPConnection('example.com')
 | |
|         sock = FakeSocket(None)
 | |
|         conn.sock = sock
 | |
|         conn.send(expected)
 | |
|         self.assertEquals(expected, sock.data)
 | |
|         sock.data = b''
 | |
|         conn.send(array.array('b', expected))
 | |
|         self.assertEquals(expected, sock.data)
 | |
|         sock.data = b''
 | |
|         conn.send(io.BytesIO(expected))
 | |
|         self.assertEquals(expected, sock.data)
 | |
| 
 | |
|     def test_chunked(self):
 | |
|         chunked_start = (
 | |
|             'HTTP/1.1 200 OK\r\n'
 | |
|             'Transfer-Encoding: chunked\r\n\r\n'
 | |
|             'a\r\n'
 | |
|             'hello worl\r\n'
 | |
|             '1\r\n'
 | |
|             'd\r\n'
 | |
|         )
 | |
|         sock = FakeSocket(chunked_start + '0\r\n')
 | |
|         resp = client.HTTPResponse(sock, method="GET")
 | |
|         resp.begin()
 | |
|         self.assertEquals(resp.read(), b'hello world')
 | |
|         resp.close()
 | |
| 
 | |
|         for x in ('', 'foo\r\n'):
 | |
|             sock = FakeSocket(chunked_start + x)
 | |
|             resp = client.HTTPResponse(sock, method="GET")
 | |
|             resp.begin()
 | |
|             try:
 | |
|                 resp.read()
 | |
|             except client.IncompleteRead as i:
 | |
|                 self.assertEquals(i.partial, b'hello world')
 | |
|                 self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
 | |
|                 self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
 | |
|             else:
 | |
|                 self.fail('IncompleteRead expected')
 | |
|             finally:
 | |
|                 resp.close()
 | |
| 
 | |
|     def test_chunked_head(self):
 | |
|         chunked_start = (
 | |
|             'HTTP/1.1 200 OK\r\n'
 | |
|             'Transfer-Encoding: chunked\r\n\r\n'
 | |
|             'a\r\n'
 | |
|             'hello world\r\n'
 | |
|             '1\r\n'
 | |
|             'd\r\n'
 | |
|         )
 | |
|         sock = FakeSocket(chunked_start + '0\r\n')
 | |
|         resp = client.HTTPResponse(sock, method="HEAD")
 | |
|         resp.begin()
 | |
|         self.assertEquals(resp.read(), b'')
 | |
|         self.assertEquals(resp.status, 200)
 | |
|         self.assertEquals(resp.reason, 'OK')
 | |
|         resp.close()
 | |
| 
 | |
|     def test_negative_content_length(self):
 | |
|         sock = FakeSocket(
 | |
|             'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
 | |
|         resp = client.HTTPResponse(sock, method="GET")
 | |
|         resp.begin()
 | |
|         self.assertEquals(resp.read(), b'Hello\r\n')
 | |
|         resp.close()
 | |
| 
 | |
|     def test_incomplete_read(self):
 | |
|         sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
 | |
|         resp = client.HTTPResponse(sock, method="GET")
 | |
|         resp.begin()
 | |
|         try:
 | |
|             resp.read()
 | |
|         except client.IncompleteRead as i:
 | |
|             self.assertEquals(i.partial, b'Hello\r\n')
 | |
|             self.assertEqual(repr(i),
 | |
|                              "IncompleteRead(7 bytes read, 3 more expected)")
 | |
|             self.assertEqual(str(i),
 | |
|                              "IncompleteRead(7 bytes read, 3 more expected)")
 | |
|         else:
 | |
|             self.fail('IncompleteRead expected')
 | |
|         finally:
 | |
|             resp.close()
 | |
| 
 | |
|     def test_epipe(self):
 | |
|         sock = EPipeSocket(
 | |
|             "HTTP/1.0 401 Authorization Required\r\n"
 | |
|             "Content-type: text/html\r\n"
 | |
|             "WWW-Authenticate: Basic realm=\"example\"\r\n",
 | |
|             b"Content-Length")
 | |
|         conn = client.HTTPConnection("example.com")
 | |
|         conn.sock = sock
 | |
|         self.assertRaises(socket.error,
 | |
|                           lambda: conn.request("PUT", "/url", "body"))
 | |
|         resp = conn.getresponse()
 | |
|         self.assertEqual(401, resp.status)
 | |
|         self.assertEqual("Basic realm=\"example\"",
 | |
|                          resp.getheader("www-authenticate"))
 | |
| 
 | |
| class OfflineTest(TestCase):
 | |
|     def test_responses(self):
 | |
|         self.assertEquals(client.responses[client.NOT_FOUND], "Not Found")
 | |
| 
 | |
| class TimeoutTest(TestCase):
 | |
|     PORT = None
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | |
|         TimeoutTest.PORT = support.bind_port(self.serv)
 | |
|         self.serv.listen(5)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         self.serv.close()
 | |
|         self.serv = None
 | |
| 
 | |
|     def testTimeoutAttribute(self):
 | |
|         # This will prove that the timeout gets through HTTPConnection
 | |
|         # and into the socket.
 | |
| 
 | |
|         # default -- use global socket timeout
 | |
|         self.assertTrue(socket.getdefaulttimeout() is None)
 | |
|         socket.setdefaulttimeout(30)
 | |
|         try:
 | |
|             httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
 | |
|             httpConn.connect()
 | |
|         finally:
 | |
|             socket.setdefaulttimeout(None)
 | |
|         self.assertEqual(httpConn.sock.gettimeout(), 30)
 | |
|         httpConn.close()
 | |
| 
 | |
|         # no timeout -- do not use global socket default
 | |
|         self.assertTrue(socket.getdefaulttimeout() is None)
 | |
|         socket.setdefaulttimeout(30)
 | |
|         try:
 | |
|             httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
 | |
|                                               timeout=None)
 | |
|             httpConn.connect()
 | |
|         finally:
 | |
|             socket.setdefaulttimeout(None)
 | |
|         self.assertEqual(httpConn.sock.gettimeout(), None)
 | |
|         httpConn.close()
 | |
| 
 | |
|         # a value
 | |
|         httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
 | |
|         httpConn.connect()
 | |
|         self.assertEqual(httpConn.sock.gettimeout(), 30)
 | |
|         httpConn.close()
 | |
| 
 | |
| class HTTPSTimeoutTest(TestCase):
 | |
| # XXX Here should be tests for HTTPS, there isn't any right now!
 | |
| 
 | |
|     def test_attributes(self):
 | |
|         # simple test to check it's storing it
 | |
|         if hasattr(client, 'HTTPSConnection'):
 | |
|             h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
 | |
|             self.assertEqual(h.timeout, 30)
 | |
| 
 | |
| class RequestBodyTest(TestCase):
 | |
|     """Test cases where a request includes a message body."""
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.conn = client.HTTPConnection('example.com')
 | |
|         self.conn.sock = self.sock = FakeSocket("")
 | |
|         self.conn.sock = self.sock
 | |
| 
 | |
|     def get_headers_and_fp(self):
 | |
|         f = io.BytesIO(self.sock.data)
 | |
|         f.readline()  # read the request line
 | |
|         message = client.parse_headers(f)
 | |
|         return message, f
 | |
| 
 | |
|     def test_manual_content_length(self):
 | |
|         # Set an incorrect content-length so that we can verify that
 | |
|         # it will not be over-ridden by the library.
 | |
|         self.conn.request("PUT", "/url", "body",
 | |
|                           {"Content-Length": "42"})
 | |
|         message, f = self.get_headers_and_fp()
 | |
|         self.assertEqual("42", message.get("content-length"))
 | |
|         self.assertEqual(4, len(f.read()))
 | |
| 
 | |
|     def test_ascii_body(self):
 | |
|         self.conn.request("PUT", "/url", "body")
 | |
|         message, f = self.get_headers_and_fp()
 | |
|         self.assertEqual("text/plain", message.get_content_type())
 | |
|         self.assertEqual(None, message.get_charset())
 | |
|         self.assertEqual("4", message.get("content-length"))
 | |
|         self.assertEqual(b'body', f.read())
 | |
| 
 | |
|     def test_latin1_body(self):
 | |
|         self.conn.request("PUT", "/url", "body\xc1")
 | |
|         message, f = self.get_headers_and_fp()
 | |
|         self.assertEqual("text/plain", message.get_content_type())
 | |
|         self.assertEqual(None, message.get_charset())
 | |
|         self.assertEqual("5", message.get("content-length"))
 | |
|         self.assertEqual(b'body\xc1', f.read())
 | |
| 
 | |
|     def test_bytes_body(self):
 | |
|         self.conn.request("PUT", "/url", b"body\xc1")
 | |
|         message, f = self.get_headers_and_fp()
 | |
|         self.assertEqual("text/plain", message.get_content_type())
 | |
|         self.assertEqual(None, message.get_charset())
 | |
|         self.assertEqual("5", message.get("content-length"))
 | |
|         self.assertEqual(b'body\xc1', f.read())
 | |
| 
 | |
|     def test_file_body(self):
 | |
|         f = open(support.TESTFN, "w")
 | |
|         f.write("body")
 | |
|         f.close()
 | |
|         f = open(support.TESTFN)
 | |
|         self.conn.request("PUT", "/url", f)
 | |
|         message, f = self.get_headers_and_fp()
 | |
|         self.assertEqual("text/plain", message.get_content_type())
 | |
|         self.assertEqual(None, message.get_charset())
 | |
|         self.assertEqual("4", message.get("content-length"))
 | |
|         self.assertEqual(b'body', f.read())
 | |
| 
 | |
|     def test_binary_file_body(self):
 | |
|         f = open(support.TESTFN, "wb")
 | |
|         f.write(b"body\xc1")
 | |
|         f.close()
 | |
|         f = open(support.TESTFN, "rb")
 | |
|         self.conn.request("PUT", "/url", f)
 | |
|         message, f = self.get_headers_and_fp()
 | |
|         self.assertEqual("text/plain", message.get_content_type())
 | |
|         self.assertEqual(None, message.get_charset())
 | |
|         self.assertEqual("5", message.get("content-length"))
 | |
|         self.assertEqual(b'body\xc1', f.read())
 | |
| 
 | |
| def test_main(verbose=None):
 | |
|     support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
 | |
|                          HTTPSTimeoutTest, RequestBodyTest)
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     test_main()
 |