| 
									
										
										
										
											2016-09-09 17:37:37 -07:00
										 |  |  | import dis | 
					
						
							|  |  |  | import os.path | 
					
						
							|  |  |  | import re | 
					
						
							|  |  |  | import subprocess | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import types | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from test.support import findfile, run_unittest | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def abspath(filename): | 
					
						
							|  |  |  |     return os.path.abspath(findfile(filename, subdir="dtracedata")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def normalize_trace_output(output): | 
					
						
							|  |  |  |     """Normalize DTrace output for comparison.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers | 
					
						
							|  |  |  |     are concatenated. So if the operating system moves our thread around, the | 
					
						
							|  |  |  |     straight result can be "non-causal". So we add timestamps to the probe | 
					
						
							|  |  |  |     firing, sort by that field, then strip it from the output"""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # When compiling with '--with-pydebug', strip '[# refs]' debug output. | 
					
						
							|  |  |  |     output = re.sub(r"\[[0-9]+ refs\]", "", output) | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         result = [ | 
					
						
							|  |  |  |             row.split("\t") | 
					
						
							|  |  |  |             for row in output.splitlines() | 
					
						
							|  |  |  |             if row and not row.startswith('#') | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         result.sort(key=lambda row: int(row[0])) | 
					
						
							|  |  |  |         result = [row[1] for row in result] | 
					
						
							|  |  |  |         return "\n".join(result) | 
					
						
							|  |  |  |     except (IndexError, ValueError): | 
					
						
							|  |  |  |         raise AssertionError( | 
					
						
							|  |  |  |             "tracer produced unparseable output:\n{}".format(output) | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TraceBackend: | 
					
						
							|  |  |  |     EXTENSION = None | 
					
						
							|  |  |  |     COMMAND = None | 
					
						
							|  |  |  |     COMMAND_ARGS = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run_case(self, name, optimize_python=None): | 
					
						
							|  |  |  |         actual_output = normalize_trace_output(self.trace_python( | 
					
						
							|  |  |  |             script_file=abspath(name + self.EXTENSION), | 
					
						
							|  |  |  |             python_file=abspath(name + ".py"), | 
					
						
							|  |  |  |             optimize_python=optimize_python)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with open(abspath(name + self.EXTENSION + ".expected")) as f: | 
					
						
							|  |  |  |             expected_output = f.read().rstrip() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return (expected_output, actual_output) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def generate_trace_command(self, script_file, subcommand=None): | 
					
						
							|  |  |  |         command = self.COMMAND + [script_file] | 
					
						
							|  |  |  |         if subcommand: | 
					
						
							|  |  |  |             command += ["-c", subcommand] | 
					
						
							|  |  |  |         return command | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def trace(self, script_file, subcommand=None): | 
					
						
							|  |  |  |         command = self.generate_trace_command(script_file, subcommand) | 
					
						
							|  |  |  |         stdout, _ = subprocess.Popen(command, | 
					
						
							|  |  |  |                                      stdout=subprocess.PIPE, | 
					
						
							|  |  |  |                                      stderr=subprocess.STDOUT, | 
					
						
							|  |  |  |                                      universal_newlines=True).communicate() | 
					
						
							|  |  |  |         return stdout | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def trace_python(self, script_file, python_file, optimize_python=None): | 
					
						
							|  |  |  |         python_flags = [] | 
					
						
							|  |  |  |         if optimize_python: | 
					
						
							|  |  |  |             python_flags.extend(["-O"] * optimize_python) | 
					
						
							|  |  |  |         subcommand = " ".join([sys.executable] + python_flags + [python_file]) | 
					
						
							|  |  |  |         return self.trace(script_file, subcommand) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def assert_usable(self): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             output = self.trace(abspath("assert_usable" + self.EXTENSION)) | 
					
						
							|  |  |  |             output = output.strip() | 
					
						
							| 
									
										
										
										
											2016-11-14 17:14:42 +01:00
										 |  |  |         except (FileNotFoundError, PermissionError) as fnfe: | 
					
						
							| 
									
										
										
										
											2016-09-09 17:37:37 -07:00
										 |  |  |             output = str(fnfe) | 
					
						
							|  |  |  |         if output != "probe: success": | 
					
						
							|  |  |  |             raise unittest.SkipTest( | 
					
						
							|  |  |  |                 "{}(1) failed: {}".format(self.COMMAND[0], output) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class DTraceBackend(TraceBackend): | 
					
						
							|  |  |  |     EXTENSION = ".d" | 
					
						
							|  |  |  |     COMMAND = ["dtrace", "-q", "-s"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class SystemTapBackend(TraceBackend): | 
					
						
							|  |  |  |     EXTENSION = ".stp" | 
					
						
							|  |  |  |     COMMAND = ["stap", "-g"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TraceTests(unittest.TestCase): | 
					
						
							|  |  |  |     # unittest.TestCase options | 
					
						
							|  |  |  |     maxDiff = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # TraceTests options | 
					
						
							|  |  |  |     backend = None | 
					
						
							|  |  |  |     optimize_python = 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def setUpClass(self): | 
					
						
							|  |  |  |         self.backend.assert_usable() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run_case(self, name): | 
					
						
							|  |  |  |         actual_output, expected_output = self.backend.run_case( | 
					
						
							|  |  |  |             name, optimize_python=self.optimize_python) | 
					
						
							|  |  |  |         self.assertEqual(actual_output, expected_output) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_function_entry_return(self): | 
					
						
							|  |  |  |         self.run_case("call_stack") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_verify_call_opcodes(self): | 
					
						
							|  |  |  |         """Ensure our call stack test hits all function call opcodes""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with open(abspath("call_stack.py")) as f: | 
					
						
							|  |  |  |             code_string = f.read() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def get_function_instructions(funcname): | 
					
						
							|  |  |  |             # Recompile with appropriate optimization setting | 
					
						
							|  |  |  |             code = compile(source=code_string, | 
					
						
							|  |  |  |                            filename="<string>", | 
					
						
							|  |  |  |                            mode="exec", | 
					
						
							|  |  |  |                            optimize=self.optimize_python) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             for c in code.co_consts: | 
					
						
							|  |  |  |                 if isinstance(c, types.CodeType) and c.co_name == funcname: | 
					
						
							|  |  |  |                     return dis.get_instructions(c) | 
					
						
							|  |  |  |             return [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for instruction in get_function_instructions('start'): | 
					
						
							|  |  |  |             opcodes.discard(instruction.opname) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(set(), opcodes) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_gc(self): | 
					
						
							|  |  |  |         self.run_case("gc") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_line(self): | 
					
						
							|  |  |  |         self.run_case("line") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class DTraceNormalTests(TraceTests): | 
					
						
							|  |  |  |     backend = DTraceBackend() | 
					
						
							|  |  |  |     optimize_python = 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class DTraceOptimizedTests(TraceTests): | 
					
						
							|  |  |  |     backend = DTraceBackend() | 
					
						
							|  |  |  |     optimize_python = 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class SystemTapNormalTests(TraceTests): | 
					
						
							|  |  |  |     backend = SystemTapBackend() | 
					
						
							|  |  |  |     optimize_python = 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class SystemTapOptimizedTests(TraceTests): | 
					
						
							|  |  |  |     backend = SystemTapBackend() | 
					
						
							|  |  |  |     optimize_python = 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_main(): | 
					
						
							|  |  |  |     run_unittest(DTraceNormalTests, DTraceOptimizedTests, SystemTapNormalTests, | 
					
						
							|  |  |  |                  SystemTapOptimizedTests) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     test_main() |