mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	
		
			
	
	
		
			58 lines
		
	
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			58 lines
		
	
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import io | ||
|  | import time | ||
|  | import unittest | ||
|  | import tokenize | ||
|  | from functools import partial | ||
|  | from threading import Thread | ||
|  | 
 | ||
|  | from test.support import threading_helper | ||
|  | 
 | ||
|  | 
 | ||
|  | @threading_helper.requires_working_threading() | ||
|  | class TestTokenize(unittest.TestCase): | ||
|  |     def test_tokenizer_iter(self): | ||
|  |         source = io.StringIO("for _ in a:\n  pass") | ||
|  |         it = tokenize._tokenize.TokenizerIter(source.readline, extra_tokens=False) | ||
|  | 
 | ||
|  |         tokens = [] | ||
|  |         def next_token(it): | ||
|  |             while True: | ||
|  |                 try: | ||
|  |                     r = next(it) | ||
|  |                     tokens.append(tokenize.TokenInfo._make(r)) | ||
|  |                     time.sleep(0.03) | ||
|  |                 except StopIteration: | ||
|  |                     return | ||
|  | 
 | ||
|  |         threads = [] | ||
|  |         for _ in range(5): | ||
|  |             threads.append(Thread(target=partial(next_token, it))) | ||
|  | 
 | ||
|  |         for thread in threads: | ||
|  |             thread.start() | ||
|  | 
 | ||
|  |         for thread in threads: | ||
|  |             thread.join() | ||
|  | 
 | ||
|  |         expected_tokens = [ | ||
|  |             tokenize.TokenInfo(type=1, string='for', start=(1, 0), end=(1, 3), line='for _ in a:\n'), | ||
|  |             tokenize.TokenInfo(type=1, string='_', start=(1, 4), end=(1, 5), line='for _ in a:\n'), | ||
|  |             tokenize.TokenInfo(type=1, string='in', start=(1, 6), end=(1, 8), line='for _ in a:\n'), | ||
|  |             tokenize.TokenInfo(type=1, string='a', start=(1, 9), end=(1, 10), line='for _ in a:\n'), | ||
|  |             tokenize.TokenInfo(type=11, string=':', start=(1, 10), end=(1, 11), line='for _ in a:\n'), | ||
|  |             tokenize.TokenInfo(type=4, string='', start=(1, 11), end=(1, 11), line='for _ in a:\n'), | ||
|  |             tokenize.TokenInfo(type=5, string='', start=(2, -1), end=(2, -1), line='  pass'), | ||
|  |             tokenize.TokenInfo(type=1, string='pass', start=(2, 2), end=(2, 6), line='  pass'), | ||
|  |             tokenize.TokenInfo(type=4, string='', start=(2, 6), end=(2, 6), line='  pass'), | ||
|  |             tokenize.TokenInfo(type=6, string='', start=(2, -1), end=(2, -1), line='  pass'), | ||
|  |             tokenize.TokenInfo(type=0, string='', start=(2, -1), end=(2, -1), line='  pass'), | ||
|  |         ] | ||
|  | 
 | ||
|  |         tokens.sort() | ||
|  |         expected_tokens.sort() | ||
|  |         self.assertListEqual(tokens, expected_tokens) | ||
|  | 
 | ||
|  | 
 | ||
|  | if __name__ == "__main__": | ||
|  |     unittest.main() |