mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	 45df820591
			
		
	
	
		45df820591
		
	
	
	
	
		
			
			svn+ssh://pythondev@svn.python.org/python/trunk ........ r80552 | victor.stinner | 2010-04-27 23:46:03 +0200 (mar., 27 avril 2010) | 3 lines Issue #7449, part 1: fix test_support.py for Python compiled without thread ........ r80553 | victor.stinner | 2010-04-27 23:47:01 +0200 (mar., 27 avril 2010) | 1 line Issue #7449, part 2: regrtest.py -j option requires thread support ........ r80554 | victor.stinner | 2010-04-27 23:51:26 +0200 (mar., 27 avril 2010) | 9 lines Issue #7449 part 3, test_doctest: import trace module in test_coverage() Import trace module fail if the threading module is missing. test_coverage() is only used if test_doctest.py is used with the -c option. This commit allows to execute the test suite without thread support. Move "import trace" in test_coverage() and use test_support.import_module('trace'). ........ r80555 | victor.stinner | 2010-04-27 23:56:26 +0200 (mar., 27 avril 2010) | 6 lines Issue #7449, part 4: skip test_multiprocessing if thread support is disabled import threading after _multiprocessing to raise a more revelant error message: "No module named _multiprocessing". _multiprocessing is not compiled without thread support. ........ r80556 | victor.stinner | 2010-04-28 00:01:24 +0200 (mer., 28 avril 2010) | 8 lines Issue #7449, part 5: split Test.test_open() of ctypes/test/test_errno.py * Split Test.test_open() in 2 functions: test_open() and test_thread_open() * Skip test_open() and test_thread_open() if we are unable to find the C library * Skip test_thread_open() if thread support is disabled * Use unittest.skipUnless(os.name == "nt", ...) on test_GetLastError() ........ r80564 | victor.stinner | 2010-04-28 00:59:35 +0200 (mer., 28 avril 2010) | 4 lines Issue #7449, part 6: fix test_hashlib for missing threading module Move @test_support.reap_thread decorator from test_main() to test_threaded_hashing(). ........ r80565 | victor.stinner | 2010-04-28 01:01:29 +0200 (mer., 28 avril 2010) | 6 lines Issue #7449, part 7: simplify threading detection in test_capi * Skip TestPendingCalls if threading module is missing * Test if threading module is present or not, instead of test the presence of _testcapi._test_thread_state ........ r80566 | victor.stinner | 2010-04-28 01:03:16 +0200 (mer., 28 avril 2010) | 4 lines Issue #7449, part 8: don't skip the whole test_asynchat if threading is missing TestFifo can be executed without the threading module ........ r80568 | victor.stinner | 2010-04-28 01:14:58 +0200 (mer., 28 avril 2010) | 6 lines Issue #7449, part 9: fix test_xmlrpclib for missing threading module * Skip testcases using threads if threading module is missing * Use "http://" instead of URL in ServerProxyTestCase if threading is missing because URL is not set in this case ........ r80569 | victor.stinner | 2010-04-28 01:33:58 +0200 (mer., 28 avril 2010) | 6 lines Partial revert of r80556 (Issue #7449, part 5, fix ctypes test) Rewrite r80556: the thread test have to be executed just after the test on libc_open() and so the test cannot be splitted in two functions (without duplicating code, and I don't want to duplicate code). ........ r80570 | victor.stinner | 2010-04-28 01:51:16 +0200 (mer., 28 avril 2010) | 8 lines Issue #7449, part 10: test_cmd imports trace module using test_support.import_module() Use test_support.import_module() instead of import to raise a SkipTest exception if the import fail. Import trace fails if the threading module is missing. See also part 3: test_doctest: import trace module in test_coverage(). ........ r80571 | victor.stinner | 2010-04-28 01:55:59 +0200 (mer., 28 avril 2010) | 6 lines Issue #7449, last part (11): fix many tests if thread support is disabled * Use try/except ImportError or test_support.import_module() to import thread and threading modules * Add @unittest.skipUnless(threading, ...) to testcases using threads ........
		
			
				
	
	
		
			276 lines
		
	
	
	
		
			9.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			276 lines
		
	
	
	
		
			9.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # test asynchat
 | |
| 
 | |
| from test import support
 | |
| 
 | |
| # If this fails, the test will be skipped.
 | |
| thread = support.import_module('_thread')
 | |
| 
 | |
| import asyncore, asynchat, socket, time
 | |
| import unittest
 | |
| import sys
 | |
| try:
 | |
|     import threading
 | |
| except ImportError:
 | |
|     threading = None
 | |
| 
 | |
| HOST = support.HOST
 | |
| SERVER_QUIT = b'QUIT\n'
 | |
| 
 | |
| if threading:
 | |
|     class echo_server(threading.Thread):
 | |
|         # parameter to determine the number of bytes passed back to the
 | |
|         # client each send
 | |
|         chunk_size = 1
 | |
| 
 | |
|         def __init__(self, event):
 | |
|             threading.Thread.__init__(self)
 | |
|             self.event = event
 | |
|             self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | |
|             self.port = support.bind_port(self.sock)
 | |
|             # This will be set if the client wants us to wait before echoing data
 | |
|             # back.
 | |
|             self.start_resend_event = None
 | |
| 
 | |
|         def run(self):
 | |
|             self.sock.listen(1)
 | |
|             self.event.set()
 | |
|             conn, client = self.sock.accept()
 | |
|             self.buffer = b""
 | |
|             # collect data until quit message is seen
 | |
|             while SERVER_QUIT not in self.buffer:
 | |
|                 data = conn.recv(1)
 | |
|                 if not data:
 | |
|                     break
 | |
|                 self.buffer = self.buffer + data
 | |
| 
 | |
|             # remove the SERVER_QUIT message
 | |
|             self.buffer = self.buffer.replace(SERVER_QUIT, b'')
 | |
| 
 | |
|             if self.start_resend_event:
 | |
|                 self.start_resend_event.wait()
 | |
| 
 | |
|             # re-send entire set of collected data
 | |
|             try:
 | |
|                 # this may fail on some tests, such as test_close_when_done, since
 | |
|                 # the client closes the channel when it's done sending
 | |
|                 while self.buffer:
 | |
|                     n = conn.send(self.buffer[:self.chunk_size])
 | |
|                     time.sleep(0.001)
 | |
|                     self.buffer = self.buffer[n:]
 | |
|             except:
 | |
|                 pass
 | |
| 
 | |
|             conn.close()
 | |
|             self.sock.close()
 | |
| 
 | |
|     class echo_client(asynchat.async_chat):
 | |
| 
 | |
|         def __init__(self, terminator, server_port):
 | |
|             asynchat.async_chat.__init__(self)
 | |
|             self.contents = []
 | |
|             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
 | |
|             self.connect((HOST, server_port))
 | |
|             self.set_terminator(terminator)
 | |
|             self.buffer = b""
 | |
| 
 | |
|             def handle_connect(self):
 | |
|                 pass
 | |
| 
 | |
|             if sys.platform == 'darwin':
 | |
|                 # select.poll returns a select.POLLHUP at the end of the tests
 | |
|                 # on darwin, so just ignore it
 | |
|                 def handle_expt(self):
 | |
|                     pass
 | |
| 
 | |
|         def collect_incoming_data(self, data):
 | |
|             self.buffer += data
 | |
| 
 | |
|         def found_terminator(self):
 | |
|             self.contents.append(self.buffer)
 | |
|             self.buffer = b""
 | |
| 
 | |
|     def start_echo_server():
 | |
|         event = threading.Event()
 | |
|         s = echo_server(event)
 | |
|         s.start()
 | |
|         event.wait()
 | |
|         event.clear()
 | |
|         time.sleep(0.01) # Give server time to start accepting.
 | |
|         return s, event
 | |
| 
 | |
| 
 | |
| @unittest.skipUnless(threading, 'Threading required for this test.')
 | |
| class TestAsynchat(unittest.TestCase):
 | |
|     usepoll = False
 | |
| 
 | |
|     def setUp (self):
 | |
|         self._threads = support.threading_setup()
 | |
| 
 | |
|     def tearDown (self):
 | |
|         support.threading_cleanup(*self._threads)
 | |
| 
 | |
|     def line_terminator_check(self, term, server_chunk):
 | |
|         event = threading.Event()
 | |
|         s = echo_server(event)
 | |
|         s.chunk_size = server_chunk
 | |
|         s.start()
 | |
|         event.wait()
 | |
|         event.clear()
 | |
|         time.sleep(0.01) # Give server time to start accepting.
 | |
|         c = echo_client(term, s.port)
 | |
|         c.push(b"hello ")
 | |
|         c.push(b"world" + term)
 | |
|         c.push(b"I'm not dead yet!" + term)
 | |
|         c.push(SERVER_QUIT)
 | |
|         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 | |
|         s.join()
 | |
| 
 | |
|         self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
 | |
| 
 | |
|     # the line terminator tests below check receiving variously-sized
 | |
|     # chunks back from the server in order to exercise all branches of
 | |
|     # async_chat.handle_read
 | |
| 
 | |
|     def test_line_terminator1(self):
 | |
|         # test one-character terminator
 | |
|         for l in (1,2,3):
 | |
|             self.line_terminator_check(b'\n', l)
 | |
| 
 | |
|     def test_line_terminator2(self):
 | |
|         # test two-character terminator
 | |
|         for l in (1,2,3):
 | |
|             self.line_terminator_check(b'\r\n', l)
 | |
| 
 | |
|     def test_line_terminator3(self):
 | |
|         # test three-character terminator
 | |
|         for l in (1,2,3):
 | |
|             self.line_terminator_check(b'qqq', l)
 | |
| 
 | |
|     def numeric_terminator_check(self, termlen):
 | |
|         # Try reading a fixed number of bytes
 | |
|         s, event = start_echo_server()
 | |
|         c = echo_client(termlen, s.port)
 | |
|         data = b"hello world, I'm not dead yet!\n"
 | |
|         c.push(data)
 | |
|         c.push(SERVER_QUIT)
 | |
|         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 | |
|         s.join()
 | |
| 
 | |
|         self.assertEqual(c.contents, [data[:termlen]])
 | |
| 
 | |
|     def test_numeric_terminator1(self):
 | |
|         # check that ints & longs both work (since type is
 | |
|         # explicitly checked in async_chat.handle_read)
 | |
|         self.numeric_terminator_check(1)
 | |
| 
 | |
|     def test_numeric_terminator2(self):
 | |
|         self.numeric_terminator_check(6)
 | |
| 
 | |
|     def test_none_terminator(self):
 | |
|         # Try reading a fixed number of bytes
 | |
|         s, event = start_echo_server()
 | |
|         c = echo_client(None, s.port)
 | |
|         data = b"hello world, I'm not dead yet!\n"
 | |
|         c.push(data)
 | |
|         c.push(SERVER_QUIT)
 | |
|         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 | |
|         s.join()
 | |
| 
 | |
|         self.assertEqual(c.contents, [])
 | |
|         self.assertEqual(c.buffer, data)
 | |
| 
 | |
|     def test_simple_producer(self):
 | |
|         s, event = start_echo_server()
 | |
|         c = echo_client(b'\n', s.port)
 | |
|         data = b"hello world\nI'm not dead yet!\n"
 | |
|         p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
 | |
|         c.push_with_producer(p)
 | |
|         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 | |
|         s.join()
 | |
| 
 | |
|         self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
 | |
| 
 | |
|     def test_string_producer(self):
 | |
|         s, event = start_echo_server()
 | |
|         c = echo_client(b'\n', s.port)
 | |
|         data = b"hello world\nI'm not dead yet!\n"
 | |
|         c.push_with_producer(data+SERVER_QUIT)
 | |
|         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 | |
|         s.join()
 | |
| 
 | |
|         self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
 | |
| 
 | |
|     def test_empty_line(self):
 | |
|         # checks that empty lines are handled correctly
 | |
|         s, event = start_echo_server()
 | |
|         c = echo_client(b'\n', s.port)
 | |
|         c.push(b"hello world\n\nI'm not dead yet!\n")
 | |
|         c.push(SERVER_QUIT)
 | |
|         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 | |
|         s.join()
 | |
| 
 | |
|         self.assertEqual(c.contents,
 | |
|                          [b"hello world", b"", b"I'm not dead yet!"])
 | |
| 
 | |
|     def test_close_when_done(self):
 | |
|         s, event = start_echo_server()
 | |
|         s.start_resend_event = threading.Event()
 | |
|         c = echo_client(b'\n', s.port)
 | |
|         c.push(b"hello world\nI'm not dead yet!\n")
 | |
|         c.push(SERVER_QUIT)
 | |
|         c.close_when_done()
 | |
|         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 | |
| 
 | |
|         # Only allow the server to start echoing data back to the client after
 | |
|         # the client has closed its connection.  This prevents a race condition
 | |
|         # where the server echoes all of its data before we can check that it
 | |
|         # got any down below.
 | |
|         s.start_resend_event.set()
 | |
|         s.join()
 | |
| 
 | |
|         self.assertEqual(c.contents, [])
 | |
|         # the server might have been able to send a byte or two back, but this
 | |
|         # at least checks that it received something and didn't just fail
 | |
|         # (which could still result in the client not having received anything)
 | |
|         self.assertGreater(len(s.buffer), 0)
 | |
| 
 | |
| 
 | |
| class TestAsynchat_WithPoll(TestAsynchat):
 | |
|     usepoll = True
 | |
| 
 | |
| class TestHelperFunctions(unittest.TestCase):
 | |
|     def test_find_prefix_at_end(self):
 | |
|         self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
 | |
|         self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
 | |
| 
 | |
| class TestFifo(unittest.TestCase):
 | |
|     def test_basic(self):
 | |
|         f = asynchat.fifo()
 | |
|         f.push(7)
 | |
|         f.push(b'a')
 | |
|         self.assertEqual(len(f), 2)
 | |
|         self.assertEqual(f.first(), 7)
 | |
|         self.assertEqual(f.pop(), (1, 7))
 | |
|         self.assertEqual(len(f), 1)
 | |
|         self.assertEqual(f.first(), b'a')
 | |
|         self.assertEqual(f.is_empty(), False)
 | |
|         self.assertEqual(f.pop(), (1, b'a'))
 | |
|         self.assertEqual(len(f), 0)
 | |
|         self.assertEqual(f.is_empty(), True)
 | |
|         self.assertEqual(f.pop(), (0, None))
 | |
| 
 | |
|     def test_given_list(self):
 | |
|         f = asynchat.fifo([b'x', 17, 3])
 | |
|         self.assertEqual(len(f), 3)
 | |
|         self.assertEqual(f.pop(), (1, b'x'))
 | |
|         self.assertEqual(f.pop(), (1, 17))
 | |
|         self.assertEqual(f.pop(), (1, 3))
 | |
|         self.assertEqual(f.pop(), (0, None))
 | |
| 
 | |
| 
 | |
| def test_main(verbose=None):
 | |
|     support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
 | |
|                               TestHelperFunctions, TestFifo)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     test_main(verbose=True)
 |