| 
									
										
										
										
											2022-09-07 21:09:20 +01:00
										 |  |  | # Test the internal _wmi module on Windows | 
					
						
							|  |  |  | # This is used by the platform module, and potentially others | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import unittest | 
					
						
							| 
									
										
										
										
											2022-09-08 22:02:04 +01:00
										 |  |  | from test.support import import_helper, requires_resource | 
					
						
							| 
									
										
										
										
											2022-09-07 21:09:20 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Do this first so test will be skipped if module doesn't exist | 
					
						
							|  |  |  | _wmi = import_helper.import_module('_wmi', required_on=['win']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class WmiTests(unittest.TestCase): | 
					
						
							|  |  |  |     def test_wmi_query_os_version(self): | 
					
						
							|  |  |  |         r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0") | 
					
						
							|  |  |  |         self.assertEqual(1, len(r)) | 
					
						
							|  |  |  |         k, eq, v = r[0].partition("=") | 
					
						
							|  |  |  |         self.assertEqual("=", eq, r[0]) | 
					
						
							|  |  |  |         self.assertEqual("Version", k, r[0]) | 
					
						
							|  |  |  |         # Best we can check for the version is that it's digits, dot, digits, anything | 
					
						
							|  |  |  |         # Otherwise, we are likely checking the result of the query against itself | 
					
						
							| 
									
										
										
										
											2022-09-08 22:02:04 +01:00
										 |  |  |         self.assertRegex(v, r"\d+\.\d+.+$", r[0]) | 
					
						
							| 
									
										
										
										
											2022-09-07 21:09:20 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_wmi_query_repeated(self): | 
					
						
							|  |  |  |         # Repeated queries should not break | 
					
						
							|  |  |  |         for _ in range(10): | 
					
						
							|  |  |  |             self.test_wmi_query_os_version() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_wmi_query_error(self): | 
					
						
							|  |  |  |         # Invalid queries fail with OSError | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             _wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName") | 
					
						
							|  |  |  |         except OSError as ex: | 
					
						
							|  |  |  |             if ex.winerror & 0xFFFFFFFF == 0x80041010: | 
					
						
							|  |  |  |                 # This is the expected error code. All others should fail the test | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  |         self.fail("Expected OSError") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_wmi_query_repeated_error(self): | 
					
						
							|  |  |  |         for _ in range(10): | 
					
						
							|  |  |  |             self.test_wmi_query_error() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_wmi_query_not_select(self): | 
					
						
							|  |  |  |         # Queries other than SELECT are blocked to avoid potential exploits | 
					
						
							|  |  |  |         with self.assertRaises(ValueError): | 
					
						
							|  |  |  |             _wmi.exec_query("not select, just in case someone tries something") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-08 22:02:04 +01:00
										 |  |  |     @requires_resource('cpu') | 
					
						
							| 
									
										
										
										
											2022-09-07 21:09:20 +01:00
										 |  |  |     def test_wmi_query_overflow(self): | 
					
						
							|  |  |  |         # Ensure very big queries fail | 
					
						
							|  |  |  |         # Test multiple times to ensure consistency | 
					
						
							|  |  |  |         for _ in range(2): | 
					
						
							|  |  |  |             with self.assertRaises(OSError): | 
					
						
							|  |  |  |                 _wmi.exec_query("SELECT * FROM CIM_DataFile") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_wmi_query_multiple_rows(self): | 
					
						
							|  |  |  |         # Multiple instances should have an extra null separator | 
					
						
							|  |  |  |         r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000") | 
					
						
							|  |  |  |         self.assertFalse(r.startswith("\0"), r) | 
					
						
							|  |  |  |         self.assertFalse(r.endswith("\0"), r) | 
					
						
							|  |  |  |         it = iter(r.split("\0")) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             while True: | 
					
						
							| 
									
										
										
										
											2022-09-08 22:02:04 +01:00
										 |  |  |                 self.assertRegex(next(it), r"ProcessId=\d+") | 
					
						
							| 
									
										
										
										
											2022-09-07 21:09:20 +01:00
										 |  |  |                 self.assertEqual("", next(it)) | 
					
						
							|  |  |  |         except StopIteration: | 
					
						
							|  |  |  |             pass | 
					
						
							| 
									
										
										
										
											2022-09-08 22:02:04 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_wmi_query_threads(self): | 
					
						
							|  |  |  |         from concurrent.futures import ThreadPoolExecutor | 
					
						
							|  |  |  |         query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000" | 
					
						
							|  |  |  |         with ThreadPoolExecutor(4) as pool: | 
					
						
							|  |  |  |             task = [pool.submit(_wmi.exec_query, query) for _ in range(32)] | 
					
						
							|  |  |  |             for t in task: | 
					
						
							|  |  |  |                 self.assertRegex(t.result(), "ProcessId=") |