mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			88 lines
		
	
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			88 lines
		
	
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Test the internal _wmi module on Windows
 | 
						|
# This is used by the platform module, and potentially others
 | 
						|
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
from test.support import import_helper, requires_resource, LOOPBACK_TIMEOUT
 | 
						|
 | 
						|
 | 
						|
# Do this first so test will be skipped if module doesn't exist
 | 
						|
_wmi = import_helper.import_module('_wmi', required_on=['win'])
 | 
						|
 | 
						|
 | 
						|
def wmi_exec_query(query):
 | 
						|
    # gh-112278: WMI maybe slow response when first call.
 | 
						|
    try:
 | 
						|
        return _wmi.exec_query(query)
 | 
						|
    except BrokenPipeError:
 | 
						|
        pass
 | 
						|
    except WindowsError as e:
 | 
						|
        if e.winerror != 258:
 | 
						|
            raise
 | 
						|
    time.sleep(LOOPBACK_TIMEOUT)
 | 
						|
    return _wmi.exec_query(query)
 | 
						|
 | 
						|
 | 
						|
class WmiTests(unittest.TestCase):
 | 
						|
    def test_wmi_query_os_version(self):
 | 
						|
        r = wmi_exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
 | 
						|
        self.assertEqual(1, len(r))
 | 
						|
        k, eq, v = r[0].partition("=")
 | 
						|
        self.assertEqual("=", eq, r[0])
 | 
						|
        self.assertEqual("Version", k, r[0])
 | 
						|
        # Best we can check for the version is that it's digits, dot, digits, anything
 | 
						|
        # Otherwise, we are likely checking the result of the query against itself
 | 
						|
        self.assertRegex(v, r"\d+\.\d+.+$", r[0])
 | 
						|
 | 
						|
    def test_wmi_query_repeated(self):
 | 
						|
        # Repeated queries should not break
 | 
						|
        for _ in range(10):
 | 
						|
            self.test_wmi_query_os_version()
 | 
						|
 | 
						|
    def test_wmi_query_error(self):
 | 
						|
        # Invalid queries fail with OSError
 | 
						|
        try:
 | 
						|
            wmi_exec_query("SELECT InvalidColumnName FROM InvalidTableName")
 | 
						|
        except OSError as ex:
 | 
						|
            if ex.winerror & 0xFFFFFFFF == 0x80041010:
 | 
						|
                # This is the expected error code. All others should fail the test
 | 
						|
                return
 | 
						|
        self.fail("Expected OSError")
 | 
						|
 | 
						|
    def test_wmi_query_repeated_error(self):
 | 
						|
        for _ in range(10):
 | 
						|
            self.test_wmi_query_error()
 | 
						|
 | 
						|
    def test_wmi_query_not_select(self):
 | 
						|
        # Queries other than SELECT are blocked to avoid potential exploits
 | 
						|
        with self.assertRaises(ValueError):
 | 
						|
            wmi_exec_query("not select, just in case someone tries something")
 | 
						|
 | 
						|
    @requires_resource('cpu')
 | 
						|
    def test_wmi_query_overflow(self):
 | 
						|
        # Ensure very big queries fail
 | 
						|
        # Test multiple times to ensure consistency
 | 
						|
        for _ in range(2):
 | 
						|
            with self.assertRaises(OSError):
 | 
						|
                wmi_exec_query("SELECT * FROM CIM_DataFile")
 | 
						|
 | 
						|
    def test_wmi_query_multiple_rows(self):
 | 
						|
        # Multiple instances should have an extra null separator
 | 
						|
        r = wmi_exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
 | 
						|
        self.assertFalse(r.startswith("\0"), r)
 | 
						|
        self.assertFalse(r.endswith("\0"), r)
 | 
						|
        it = iter(r.split("\0"))
 | 
						|
        try:
 | 
						|
            while True:
 | 
						|
                self.assertRegex(next(it), r"ProcessId=\d+")
 | 
						|
                self.assertEqual("", next(it))
 | 
						|
        except StopIteration:
 | 
						|
            pass
 | 
						|
 | 
						|
    def test_wmi_query_threads(self):
 | 
						|
        from concurrent.futures import ThreadPoolExecutor
 | 
						|
        query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000"
 | 
						|
        with ThreadPoolExecutor(4) as pool:
 | 
						|
            task = [pool.submit(wmi_exec_query, query) for _ in range(32)]
 | 
						|
            for t in task:
 | 
						|
                self.assertRegex(t.result(), "ProcessId=")
 |