mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 19:24:34 +00:00 
			
		
		
		
	 40348acc18
			
		
	
	
		40348acc18
		
			
		
	
	
	
	
		
			
			Instead of explicitly enumerate test classes for run_unittest() use the unittest ability to discover tests. This also makes these tests discoverable and runnable with unittest. load_tests() can be used for dynamic generating tests and adding doctests. setUpModule(), tearDownModule() and addModuleCleanup() can be used for running code before and after all module tests.
		
			
				
	
	
		
			173 lines
		
	
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			173 lines
		
	
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import dis
 | |
| import os.path
 | |
| import re
 | |
| import subprocess
 | |
| import sys
 | |
| import types
 | |
| import unittest
 | |
| 
 | |
| from test.support import findfile
 | |
| 
 | |
| 
 | |
| def abspath(filename):
 | |
|     return os.path.abspath(findfile(filename, subdir="dtracedata"))
 | |
| 
 | |
| 
 | |
| def normalize_trace_output(output):
 | |
|     """Normalize DTrace output for comparison.
 | |
| 
 | |
|     DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
 | |
|     are concatenated. So if the operating system moves our thread around, the
 | |
|     straight result can be "non-causal". So we add timestamps to the probe
 | |
|     firing, sort by that field, then strip it from the output"""
 | |
| 
 | |
|     # When compiling with '--with-pydebug', strip '[# refs]' debug output.
 | |
|     output = re.sub(r"\[[0-9]+ refs\]", "", output)
 | |
|     try:
 | |
|         result = [
 | |
|             row.split("\t")
 | |
|             for row in output.splitlines()
 | |
|             if row and not row.startswith('#')
 | |
|         ]
 | |
|         result.sort(key=lambda row: int(row[0]))
 | |
|         result = [row[1] for row in result]
 | |
|         return "\n".join(result)
 | |
|     except (IndexError, ValueError):
 | |
|         raise AssertionError(
 | |
|             "tracer produced unparseable output:\n{}".format(output)
 | |
|         )
 | |
| 
 | |
| 
 | |
| class TraceBackend:
 | |
|     EXTENSION = None
 | |
|     COMMAND = None
 | |
|     COMMAND_ARGS = []
 | |
| 
 | |
|     def run_case(self, name, optimize_python=None):
 | |
|         actual_output = normalize_trace_output(self.trace_python(
 | |
|             script_file=abspath(name + self.EXTENSION),
 | |
|             python_file=abspath(name + ".py"),
 | |
|             optimize_python=optimize_python))
 | |
| 
 | |
|         with open(abspath(name + self.EXTENSION + ".expected")) as f:
 | |
|             expected_output = f.read().rstrip()
 | |
| 
 | |
|         return (expected_output, actual_output)
 | |
| 
 | |
|     def generate_trace_command(self, script_file, subcommand=None):
 | |
|         command = self.COMMAND + [script_file]
 | |
|         if subcommand:
 | |
|             command += ["-c", subcommand]
 | |
|         return command
 | |
| 
 | |
|     def trace(self, script_file, subcommand=None):
 | |
|         command = self.generate_trace_command(script_file, subcommand)
 | |
|         stdout, _ = subprocess.Popen(command,
 | |
|                                      stdout=subprocess.PIPE,
 | |
|                                      stderr=subprocess.STDOUT,
 | |
|                                      universal_newlines=True).communicate()
 | |
|         return stdout
 | |
| 
 | |
|     def trace_python(self, script_file, python_file, optimize_python=None):
 | |
|         python_flags = []
 | |
|         if optimize_python:
 | |
|             python_flags.extend(["-O"] * optimize_python)
 | |
|         subcommand = " ".join([sys.executable] + python_flags + [python_file])
 | |
|         return self.trace(script_file, subcommand)
 | |
| 
 | |
|     def assert_usable(self):
 | |
|         try:
 | |
|             output = self.trace(abspath("assert_usable" + self.EXTENSION))
 | |
|             output = output.strip()
 | |
|         except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
 | |
|             output = str(fnfe)
 | |
|         if output != "probe: success":
 | |
|             raise unittest.SkipTest(
 | |
|                 "{}(1) failed: {}".format(self.COMMAND[0], output)
 | |
|             )
 | |
| 
 | |
| 
 | |
| class DTraceBackend(TraceBackend):
 | |
|     EXTENSION = ".d"
 | |
|     COMMAND = ["dtrace", "-q", "-s"]
 | |
| 
 | |
| 
 | |
| class SystemTapBackend(TraceBackend):
 | |
|     EXTENSION = ".stp"
 | |
|     COMMAND = ["stap", "-g"]
 | |
| 
 | |
| 
 | |
| class TraceTests:
 | |
|     # unittest.TestCase options
 | |
|     maxDiff = None
 | |
| 
 | |
|     # TraceTests options
 | |
|     backend = None
 | |
|     optimize_python = 0
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(self):
 | |
|         self.backend.assert_usable()
 | |
| 
 | |
|     def run_case(self, name):
 | |
|         actual_output, expected_output = self.backend.run_case(
 | |
|             name, optimize_python=self.optimize_python)
 | |
|         self.assertEqual(actual_output, expected_output)
 | |
| 
 | |
|     def test_function_entry_return(self):
 | |
|         self.run_case("call_stack")
 | |
| 
 | |
|     def test_verify_call_opcodes(self):
 | |
|         """Ensure our call stack test hits all function call opcodes"""
 | |
| 
 | |
|         opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
 | |
| 
 | |
|         with open(abspath("call_stack.py")) as f:
 | |
|             code_string = f.read()
 | |
| 
 | |
|         def get_function_instructions(funcname):
 | |
|             # Recompile with appropriate optimization setting
 | |
|             code = compile(source=code_string,
 | |
|                            filename="<string>",
 | |
|                            mode="exec",
 | |
|                            optimize=self.optimize_python)
 | |
| 
 | |
|             for c in code.co_consts:
 | |
|                 if isinstance(c, types.CodeType) and c.co_name == funcname:
 | |
|                     return dis.get_instructions(c)
 | |
|             return []
 | |
| 
 | |
|         for instruction in get_function_instructions('start'):
 | |
|             opcodes.discard(instruction.opname)
 | |
| 
 | |
|         self.assertEqual(set(), opcodes)
 | |
| 
 | |
|     def test_gc(self):
 | |
|         self.run_case("gc")
 | |
| 
 | |
|     def test_line(self):
 | |
|         self.run_case("line")
 | |
| 
 | |
| 
 | |
| class DTraceNormalTests(TraceTests, unittest.TestCase):
 | |
|     backend = DTraceBackend()
 | |
|     optimize_python = 0
 | |
| 
 | |
| 
 | |
| class DTraceOptimizedTests(TraceTests, unittest.TestCase):
 | |
|     backend = DTraceBackend()
 | |
|     optimize_python = 2
 | |
| 
 | |
| 
 | |
| class SystemTapNormalTests(TraceTests, unittest.TestCase):
 | |
|     backend = SystemTapBackend()
 | |
|     optimize_python = 0
 | |
| 
 | |
| 
 | |
| class SystemTapOptimizedTests(TraceTests, unittest.TestCase):
 | |
|     backend = SystemTapBackend()
 | |
|     optimize_python = 2
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     test_main()
 |