mirror of
				https://github.com/python/cpython.git
				synced 2025-10-29 12:44:56 +00:00 
			
		
		
		
	
		
			
	
	
		
			58 lines
		
	
	
	
		
			2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			58 lines
		
	
	
	
		
			2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import unittest | ||
|  | from unittest import mock | ||
|  | 
 | ||
|  | import asyncio | ||
|  | 
 | ||
|  | 
 | ||
|  | class ProtocolsAbsTests(unittest.TestCase): | ||
|  | 
 | ||
|  |     def test_base_protocol(self): | ||
|  |         f = mock.Mock() | ||
|  |         p = asyncio.BaseProtocol() | ||
|  |         self.assertIsNone(p.connection_made(f)) | ||
|  |         self.assertIsNone(p.connection_lost(f)) | ||
|  |         self.assertIsNone(p.pause_writing()) | ||
|  |         self.assertIsNone(p.resume_writing()) | ||
|  |         self.assertFalse(hasattr(p, '__dict__')) | ||
|  | 
 | ||
|  |     def test_protocol(self): | ||
|  |         f = mock.Mock() | ||
|  |         p = asyncio.Protocol() | ||
|  |         self.assertIsNone(p.connection_made(f)) | ||
|  |         self.assertIsNone(p.connection_lost(f)) | ||
|  |         self.assertIsNone(p.data_received(f)) | ||
|  |         self.assertIsNone(p.eof_received()) | ||
|  |         self.assertIsNone(p.pause_writing()) | ||
|  |         self.assertIsNone(p.resume_writing()) | ||
|  |         self.assertFalse(hasattr(p, '__dict__')) | ||
|  | 
 | ||
|  |     def test_buffered_protocol(self): | ||
|  |         f = mock.Mock() | ||
|  |         p = asyncio.BufferedProtocol() | ||
|  |         self.assertIsNone(p.connection_made(f)) | ||
|  |         self.assertIsNone(p.connection_lost(f)) | ||
|  |         self.assertIsNone(p.get_buffer(100)) | ||
|  |         self.assertIsNone(p.buffer_updated(150)) | ||
|  |         self.assertIsNone(p.pause_writing()) | ||
|  |         self.assertIsNone(p.resume_writing()) | ||
|  |         self.assertFalse(hasattr(p, '__dict__')) | ||
|  | 
 | ||
|  |     def test_datagram_protocol(self): | ||
|  |         f = mock.Mock() | ||
|  |         dp = asyncio.DatagramProtocol() | ||
|  |         self.assertIsNone(dp.connection_made(f)) | ||
|  |         self.assertIsNone(dp.connection_lost(f)) | ||
|  |         self.assertIsNone(dp.error_received(f)) | ||
|  |         self.assertIsNone(dp.datagram_received(f, f)) | ||
|  |         self.assertFalse(hasattr(dp, '__dict__')) | ||
|  | 
 | ||
|  |     def test_subprocess_protocol(self): | ||
|  |         f = mock.Mock() | ||
|  |         sp = asyncio.SubprocessProtocol() | ||
|  |         self.assertIsNone(sp.connection_made(f)) | ||
|  |         self.assertIsNone(sp.connection_lost(f)) | ||
|  |         self.assertIsNone(sp.pipe_data_received(1, f)) | ||
|  |         self.assertIsNone(sp.pipe_connection_lost(1, f)) | ||
|  |         self.assertIsNone(sp.process_exited()) | ||
|  |         self.assertFalse(hasattr(sp, '__dict__')) |