| 
									
										
										
										
											2001-10-12 20:53:59 +00:00
										 |  |  | import hotshot | 
					
						
							|  |  |  | import hotshot.log | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import pprint | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import test_support | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from hotshot.log import ENTER, EXIT, LINE | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def shortfilename(fn): | 
					
						
							|  |  |  |     # We use a really shortened filename since an exact match is made, | 
					
						
							|  |  |  |     # and the source may be either a Python source file or a | 
					
						
							|  |  |  |     # pre-compiled bytecode file. | 
					
						
							|  |  |  |     if fn: | 
					
						
							|  |  |  |         return os.path.splitext(os.path.basename(fn))[0] | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         return fn | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2001-10-13 03:00:11 +00:00
										 |  |  | class UnlinkingLogReader(hotshot.log.LogReader): | 
					
						
							|  |  |  |     """Extend the LogReader so the log file is unlinked when we're
 | 
					
						
							|  |  |  |     done with it."""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, logfn): | 
					
						
							|  |  |  |         self.__logfn = logfn | 
					
						
							|  |  |  |         hotshot.log.LogReader.__init__(self, logfn) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def next(self, index=None): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             return hotshot.log.LogReader.next(self) | 
					
						
							|  |  |  |         except (IndexError, StopIteration): | 
					
						
							|  |  |  |             os.unlink(self.__logfn) | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2001-10-12 20:53:59 +00:00
										 |  |  | class HotShotTestCase(unittest.TestCase): | 
					
						
							|  |  |  |     def new_profiler(self, lineevents=0, linetimings=1): | 
					
						
							|  |  |  |         self.logfn = test_support.TESTFN | 
					
						
							|  |  |  |         return hotshot.Profile(self.logfn, lineevents, linetimings) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_logreader(self): | 
					
						
							| 
									
										
										
										
											2001-10-13 03:00:11 +00:00
										 |  |  |         log = UnlinkingLogReader(self.logfn) | 
					
						
							| 
									
										
										
										
											2001-10-12 20:53:59 +00:00
										 |  |  |         return log | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_events_wotime(self): | 
					
						
							|  |  |  |         L = [] | 
					
						
							|  |  |  |         for event in self.get_logreader(): | 
					
						
							|  |  |  |             what, (filename, lineno, funcname), tdelta = event | 
					
						
							|  |  |  |             L.append((what, (shortfilename(filename), lineno, funcname))) | 
					
						
							|  |  |  |         return L | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def check_events(self, expected): | 
					
						
							|  |  |  |         events = self.get_events_wotime() | 
					
						
							| 
									
										
										
										
											2001-10-18 19:34:00 +00:00
										 |  |  |         if not __debug__: | 
					
						
							|  |  |  |             # Running under -O, so we don't get LINE events | 
					
						
							|  |  |  |             expected = [ev for ev in expected if ev[0] != LINE] | 
					
						
							| 
									
										
										
										
											2001-10-12 20:53:59 +00:00
										 |  |  |         if events != expected: | 
					
						
							|  |  |  |             self.fail( | 
					
						
							|  |  |  |                 "events did not match expectation; got:\n%s\nexpected:\n%s" | 
					
						
							|  |  |  |                 % (pprint.pformat(events), pprint.pformat(expected))) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run_test(self, callable, events, profiler=None): | 
					
						
							|  |  |  |         if profiler is None: | 
					
						
							|  |  |  |             profiler = self.new_profiler() | 
					
						
							|  |  |  |         profiler.runcall(callable) | 
					
						
							|  |  |  |         profiler.close() | 
					
						
							|  |  |  |         self.check_events(events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_line_numbers(self): | 
					
						
							|  |  |  |         def f(): | 
					
						
							|  |  |  |             y = 2 | 
					
						
							|  |  |  |             x = 1 | 
					
						
							|  |  |  |         def g(): | 
					
						
							|  |  |  |             f() | 
					
						
							|  |  |  |         f_lineno = f.func_code.co_firstlineno | 
					
						
							|  |  |  |         g_lineno = g.func_code.co_firstlineno | 
					
						
							|  |  |  |         events = [(ENTER, ("test_hotshot", g_lineno, "g")), | 
					
						
							|  |  |  |                   (LINE,  ("test_hotshot", g_lineno, "g")), | 
					
						
							|  |  |  |                   (LINE,  ("test_hotshot", g_lineno+1, "g")), | 
					
						
							|  |  |  |                   (ENTER, ("test_hotshot", f_lineno, "f")), | 
					
						
							|  |  |  |                   (LINE,  ("test_hotshot", f_lineno, "f")), | 
					
						
							|  |  |  |                   (LINE,  ("test_hotshot", f_lineno+1, "f")), | 
					
						
							|  |  |  |                   (LINE,  ("test_hotshot", f_lineno+2, "f")), | 
					
						
							|  |  |  |                   (EXIT,  ("test_hotshot", f_lineno, "f")), | 
					
						
							|  |  |  |                   (EXIT,  ("test_hotshot", g_lineno, "g")), | 
					
						
							|  |  |  |                   ] | 
					
						
							|  |  |  |         self.run_test(g, events, self.new_profiler(lineevents=1)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_main(): | 
					
						
							|  |  |  |     test_support.run_unittest(HotShotTestCase) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     test_main() |