| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | import contextlib | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import tracemalloc | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | from unittest.mock import patch | 
					
						
							| 
									
										
										
										
											2015-05-06 06:33:17 +03:00
										 |  |  | from test.support.script_helper import (assert_python_ok, assert_python_failure, | 
					
						
							|  |  |  |                                         interpreter_requires_environment) | 
					
						
							|  |  |  | from test import support | 
					
						
							| 
									
										
										
										
											2017-09-07 18:56:24 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 13:39:05 +01:00
										 |  |  | try: | 
					
						
							|  |  |  |     import _testcapi | 
					
						
							|  |  |  | except ImportError: | 
					
						
							|  |  |  |     _testcapi = None | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | EMPTY_STRING_SIZE = sys.getsizeof(b'') | 
					
						
							| 
									
										
										
										
											2018-07-25 19:23:53 +02:00
										 |  |  | INVALID_NFRAME = (-1, 2**30) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 13:39:05 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | def get_frames(nframe, lineno_delta): | 
					
						
							|  |  |  |     frames = [] | 
					
						
							|  |  |  |     frame = sys._getframe(1) | 
					
						
							|  |  |  |     for index in range(nframe): | 
					
						
							|  |  |  |         code = frame.f_code | 
					
						
							|  |  |  |         lineno = frame.f_lineno + lineno_delta | 
					
						
							|  |  |  |         frames.append((code.co_filename, lineno)) | 
					
						
							|  |  |  |         lineno_delta = 0 | 
					
						
							|  |  |  |         frame = frame.f_back | 
					
						
							|  |  |  |         if frame is None: | 
					
						
							|  |  |  |             break | 
					
						
							|  |  |  |     return tuple(frames) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def allocate_bytes(size): | 
					
						
							|  |  |  |     nframe = tracemalloc.get_traceback_limit() | 
					
						
							|  |  |  |     bytes_len = (size - EMPTY_STRING_SIZE) | 
					
						
							|  |  |  |     frames = get_frames(nframe, 1) | 
					
						
							|  |  |  |     data = b'x' * bytes_len | 
					
						
							|  |  |  |     return data, tracemalloc.Traceback(frames) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def create_snapshots(): | 
					
						
							|  |  |  |     traceback_limit = 2 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |     # _tracemalloc._get_traces() returns a list of (domain, size, | 
					
						
							|  |  |  |     # traceback_frames) tuples. traceback_frames is a tuple of (filename, | 
					
						
							|  |  |  |     # line_number) tuples. | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |     raw_traces = [ | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |         (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |         (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         (1, 2, (('a.py', 5), ('b.py', 4))), | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         (2, 66, (('b.py', 1),)), | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         (3, 7, (('<unknown>', 0),)), | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |     ] | 
					
						
							|  |  |  |     snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     raw_traces2 = [ | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |         (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |         (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         (2, 2, (('a.py', 5), ('b.py', 4))), | 
					
						
							|  |  |  |         (2, 5000, (('a.py', 5), ('b.py', 4))), | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         (4, 400, (('c.py', 578),)), | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |     ] | 
					
						
							|  |  |  |     snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return (snapshot, snapshot2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def frame(filename, lineno): | 
					
						
							|  |  |  |     return tracemalloc._Frame((filename, lineno)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def traceback(*frames): | 
					
						
							|  |  |  |     return tracemalloc.Traceback(frames) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def traceback_lineno(filename, lineno): | 
					
						
							|  |  |  |     return traceback((filename, lineno)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def traceback_filename(filename): | 
					
						
							|  |  |  |     return traceback_lineno(filename, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestTracemallocEnabled(unittest.TestCase): | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         if tracemalloc.is_tracing(): | 
					
						
							|  |  |  |             self.skipTest("tracemalloc must be stopped before the test") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:37:20 +01:00
										 |  |  |         tracemalloc.start(1) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_tracemalloc_memory(self): | 
					
						
							|  |  |  |         data = [allocate_bytes(123) for count in range(1000)] | 
					
						
							|  |  |  |         size = tracemalloc.get_tracemalloc_memory() | 
					
						
							|  |  |  |         self.assertGreaterEqual(size, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tracemalloc.clear_traces() | 
					
						
							|  |  |  |         size2 = tracemalloc.get_tracemalloc_memory() | 
					
						
							|  |  |  |         self.assertGreaterEqual(size2, 0) | 
					
						
							|  |  |  |         self.assertLessEqual(size2, size) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_object_traceback(self): | 
					
						
							|  |  |  |         tracemalloc.clear_traces() | 
					
						
							|  |  |  |         obj_size = 12345 | 
					
						
							|  |  |  |         obj, obj_traceback = allocate_bytes(obj_size) | 
					
						
							|  |  |  |         traceback = tracemalloc.get_object_traceback(obj) | 
					
						
							|  |  |  |         self.assertEqual(traceback, obj_traceback) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-10-25 13:31:16 +02:00
										 |  |  |     def test_new_reference(self): | 
					
						
							|  |  |  |         tracemalloc.clear_traces() | 
					
						
							|  |  |  |         # gc.collect() indirectly calls PyList_ClearFreeList() | 
					
						
							|  |  |  |         support.gc_collect() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Create a list and "destroy it": put it in the PyListObject free list | 
					
						
							|  |  |  |         obj = [] | 
					
						
							|  |  |  |         obj = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Create a list which should reuse the previously created empty list | 
					
						
							|  |  |  |         obj = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         nframe = tracemalloc.get_traceback_limit() | 
					
						
							|  |  |  |         frames = get_frames(nframe, -3) | 
					
						
							|  |  |  |         obj_traceback = tracemalloc.Traceback(frames) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         traceback = tracemalloc.get_object_traceback(obj) | 
					
						
							|  |  |  |         self.assertIsNotNone(traceback) | 
					
						
							|  |  |  |         self.assertEqual(traceback, obj_traceback) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |     def test_set_traceback_limit(self): | 
					
						
							|  |  |  |         obj_size = 10 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:37:20 +01:00
										 |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  |         self.assertRaises(ValueError, tracemalloc.start, -1) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:37:20 +01:00
										 |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  |         tracemalloc.start(10) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         obj2, obj2_traceback = allocate_bytes(obj_size) | 
					
						
							|  |  |  |         traceback = tracemalloc.get_object_traceback(obj2) | 
					
						
							|  |  |  |         self.assertEqual(len(traceback), 10) | 
					
						
							|  |  |  |         self.assertEqual(traceback, obj2_traceback) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:37:20 +01:00
										 |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  |         tracemalloc.start(1) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         obj, obj_traceback = allocate_bytes(obj_size) | 
					
						
							|  |  |  |         traceback = tracemalloc.get_object_traceback(obj) | 
					
						
							|  |  |  |         self.assertEqual(len(traceback), 1) | 
					
						
							|  |  |  |         self.assertEqual(traceback, obj_traceback) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def find_trace(self, traces, traceback): | 
					
						
							|  |  |  |         for trace in traces: | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |             if trace[2] == traceback._frames: | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |                 return trace | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.fail("trace not found") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_traces(self): | 
					
						
							|  |  |  |         tracemalloc.clear_traces() | 
					
						
							|  |  |  |         obj_size = 12345 | 
					
						
							|  |  |  |         obj, obj_traceback = allocate_bytes(obj_size) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         traces = tracemalloc._get_traces() | 
					
						
							|  |  |  |         trace = self.find_trace(traces, obj_traceback) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertIsInstance(trace, tuple) | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         domain, size, traceback = trace | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         self.assertEqual(size, obj_size) | 
					
						
							|  |  |  |         self.assertEqual(traceback, obj_traceback._frames) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  |         self.assertEqual(tracemalloc._get_traces(), []) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_traces_intern_traceback(self): | 
					
						
							|  |  |  |         # dummy wrappers to get more useful and identical frames in the traceback | 
					
						
							|  |  |  |         def allocate_bytes2(size): | 
					
						
							|  |  |  |             return allocate_bytes(size) | 
					
						
							|  |  |  |         def allocate_bytes3(size): | 
					
						
							|  |  |  |             return allocate_bytes2(size) | 
					
						
							|  |  |  |         def allocate_bytes4(size): | 
					
						
							|  |  |  |             return allocate_bytes3(size) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Ensure that two identical tracebacks are not duplicated | 
					
						
							| 
									
										
										
										
											2013-11-23 12:37:20 +01:00
										 |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  |         tracemalloc.start(4) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         obj_size = 123 | 
					
						
							|  |  |  |         obj1, obj1_traceback = allocate_bytes4(obj_size) | 
					
						
							|  |  |  |         obj2, obj2_traceback = allocate_bytes4(obj_size) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         traces = tracemalloc._get_traces() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-30 00:05:07 +01:00
										 |  |  |         obj1_traceback._frames = tuple(reversed(obj1_traceback._frames)) | 
					
						
							|  |  |  |         obj2_traceback._frames = tuple(reversed(obj2_traceback._frames)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         trace1 = self.find_trace(traces, obj1_traceback) | 
					
						
							|  |  |  |         trace2 = self.find_trace(traces, obj2_traceback) | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         domain1, size1, traceback1 = trace1 | 
					
						
							|  |  |  |         domain2, size2, traceback2 = trace2 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         self.assertIs(traceback2, traceback1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_traced_memory(self): | 
					
						
							|  |  |  |         # Python allocates some internals objects, so the test must tolerate | 
					
						
							|  |  |  |         # a small difference between the expected size and the real usage | 
					
						
							|  |  |  |         max_error = 2048 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # allocate one object | 
					
						
							|  |  |  |         obj_size = 1024 * 1024 | 
					
						
							|  |  |  |         tracemalloc.clear_traces() | 
					
						
							|  |  |  |         obj, obj_traceback = allocate_bytes(obj_size) | 
					
						
							| 
									
										
										
										
											2013-11-27 21:39:49 +01:00
										 |  |  |         size, peak_size = tracemalloc.get_traced_memory() | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         self.assertGreaterEqual(size, obj_size) | 
					
						
							| 
									
										
										
										
											2013-11-27 21:39:49 +01:00
										 |  |  |         self.assertGreaterEqual(peak_size, size) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         self.assertLessEqual(size - obj_size, max_error) | 
					
						
							| 
									
										
										
										
											2013-11-27 21:39:49 +01:00
										 |  |  |         self.assertLessEqual(peak_size - size, max_error) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         # destroy the object | 
					
						
							|  |  |  |         obj = None | 
					
						
							| 
									
										
										
										
											2013-11-27 21:39:49 +01:00
										 |  |  |         size2, peak_size2 = tracemalloc.get_traced_memory() | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         self.assertLess(size2, size) | 
					
						
							|  |  |  |         self.assertGreaterEqual(size - size2, obj_size - max_error) | 
					
						
							| 
									
										
										
										
											2013-11-27 21:39:49 +01:00
										 |  |  |         self.assertGreaterEqual(peak_size2, peak_size) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         # clear_traces() must reset traced memory counters | 
					
						
							|  |  |  |         tracemalloc.clear_traces() | 
					
						
							|  |  |  |         self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # allocate another object | 
					
						
							|  |  |  |         obj, obj_traceback = allocate_bytes(obj_size) | 
					
						
							| 
									
										
										
										
											2013-11-27 21:39:49 +01:00
										 |  |  |         size, peak_size = tracemalloc.get_traced_memory() | 
					
						
							|  |  |  |         self.assertGreaterEqual(size, obj_size) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-25 09:29:45 +01:00
										 |  |  |         # stop() also resets traced memory counters | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  |         self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear_traces(self): | 
					
						
							|  |  |  |         obj, obj_traceback = allocate_bytes(123) | 
					
						
							|  |  |  |         traceback = tracemalloc.get_object_traceback(obj) | 
					
						
							|  |  |  |         self.assertIsNotNone(traceback) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tracemalloc.clear_traces() | 
					
						
							|  |  |  |         traceback2 = tracemalloc.get_object_traceback(obj) | 
					
						
							|  |  |  |         self.assertIsNone(traceback2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_is_tracing(self): | 
					
						
							|  |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  |         self.assertFalse(tracemalloc.is_tracing()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tracemalloc.start() | 
					
						
							|  |  |  |         self.assertTrue(tracemalloc.is_tracing()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_snapshot(self): | 
					
						
							|  |  |  |         obj, source = allocate_bytes(123) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # take a snapshot | 
					
						
							|  |  |  |         snapshot = tracemalloc.take_snapshot() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # write on disk | 
					
						
							|  |  |  |         snapshot.dump(support.TESTFN) | 
					
						
							|  |  |  |         self.addCleanup(support.unlink, support.TESTFN) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # load from disk | 
					
						
							|  |  |  |         snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) | 
					
						
							|  |  |  |         self.assertEqual(snapshot2.traces, snapshot.traces) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # tracemalloc must be tracing memory allocations to take a snapshot | 
					
						
							|  |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  |         with self.assertRaises(RuntimeError) as cm: | 
					
						
							|  |  |  |             tracemalloc.take_snapshot() | 
					
						
							|  |  |  |         self.assertEqual(str(cm.exception), | 
					
						
							|  |  |  |                          "the tracemalloc module must be tracing memory " | 
					
						
							|  |  |  |                          "allocations to take a snapshot") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_snapshot_save_attr(self): | 
					
						
							|  |  |  |         # take a snapshot with a new attribute | 
					
						
							|  |  |  |         snapshot = tracemalloc.take_snapshot() | 
					
						
							|  |  |  |         snapshot.test_attr = "new" | 
					
						
							|  |  |  |         snapshot.dump(support.TESTFN) | 
					
						
							|  |  |  |         self.addCleanup(support.unlink, support.TESTFN) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-04-19 04:03:41 +00:00
										 |  |  |         # load() should recreate the attribute | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) | 
					
						
							|  |  |  |         self.assertEqual(snapshot2.test_attr, "new") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def fork_child(self): | 
					
						
							|  |  |  |         if not tracemalloc.is_tracing(): | 
					
						
							|  |  |  |             return 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         obj_size = 12345 | 
					
						
							|  |  |  |         obj, obj_traceback = allocate_bytes(obj_size) | 
					
						
							|  |  |  |         traceback = tracemalloc.get_object_traceback(obj) | 
					
						
							|  |  |  |         if traceback is None: | 
					
						
							|  |  |  |             return 3 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # everything is fine | 
					
						
							|  |  |  |         return 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') | 
					
						
							|  |  |  |     def test_fork(self): | 
					
						
							|  |  |  |         # check that tracemalloc is still working after fork | 
					
						
							|  |  |  |         pid = os.fork() | 
					
						
							|  |  |  |         if not pid: | 
					
						
							|  |  |  |             # child | 
					
						
							|  |  |  |             exitcode = 1 | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 exitcode = self.fork_child() | 
					
						
							|  |  |  |             finally: | 
					
						
							|  |  |  |                 os._exit(exitcode) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             pid2, status = os.waitpid(pid, 0) | 
					
						
							|  |  |  |             self.assertTrue(os.WIFEXITED(status)) | 
					
						
							|  |  |  |             exitcode = os.WEXITSTATUS(status) | 
					
						
							|  |  |  |             self.assertEqual(exitcode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestSnapshot(unittest.TestCase): | 
					
						
							|  |  |  |     maxDiff = 4000 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_create_snapshot(self): | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |         raw_traces = [(0, 5, (('a.py', 2),))] | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         with contextlib.ExitStack() as stack: | 
					
						
							|  |  |  |             stack.enter_context(patch.object(tracemalloc, 'is_tracing', | 
					
						
							|  |  |  |                                              return_value=True)) | 
					
						
							|  |  |  |             stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit', | 
					
						
							|  |  |  |                                              return_value=5)) | 
					
						
							|  |  |  |             stack.enter_context(patch.object(tracemalloc, '_get_traces', | 
					
						
							|  |  |  |                                              return_value=raw_traces)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             snapshot = tracemalloc.take_snapshot() | 
					
						
							|  |  |  |             self.assertEqual(snapshot.traceback_limit, 5) | 
					
						
							|  |  |  |             self.assertEqual(len(snapshot.traces), 1) | 
					
						
							|  |  |  |             trace = snapshot.traces[0] | 
					
						
							|  |  |  |             self.assertEqual(trace.size, 5) | 
					
						
							|  |  |  |             self.assertEqual(len(trace.traceback), 1) | 
					
						
							|  |  |  |             self.assertEqual(trace.traceback[0].filename, 'a.py') | 
					
						
							|  |  |  |             self.assertEqual(trace.traceback[0].lineno, 2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_filter_traces(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         filter1 = tracemalloc.Filter(False, "b.py") | 
					
						
							|  |  |  |         filter2 = tracemalloc.Filter(True, "a.py", 2) | 
					
						
							|  |  |  |         filter3 = tracemalloc.Filter(True, "a.py", 5) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         original_traces = list(snapshot.traces._traces) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # exclude b.py | 
					
						
							|  |  |  |         snapshot3 = snapshot.filter_traces((filter1,)) | 
					
						
							|  |  |  |         self.assertEqual(snapshot3.traces._traces, [ | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (1, 2, (('a.py', 5), ('b.py', 4))), | 
					
						
							|  |  |  |             (3, 7, (('<unknown>', 0),)), | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # filter_traces() must not touch the original snapshot | 
					
						
							|  |  |  |         self.assertEqual(snapshot.traces._traces, original_traces) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # only include two lines of a.py | 
					
						
							|  |  |  |         snapshot4 = snapshot3.filter_traces((filter2, filter3)) | 
					
						
							|  |  |  |         self.assertEqual(snapshot4.traces._traces, [ | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (1, 2, (('a.py', 5), ('b.py', 4))), | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # No filter: just duplicate the snapshot | 
					
						
							|  |  |  |         snapshot5 = snapshot.filter_traces(()) | 
					
						
							|  |  |  |         self.assertIsNot(snapshot5, snapshot) | 
					
						
							|  |  |  |         self.assertIsNot(snapshot5.traces, snapshot.traces) | 
					
						
							|  |  |  |         self.assertEqual(snapshot5.traces, snapshot.traces) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-03-10 11:05:07 +01:00
										 |  |  |         self.assertRaises(TypeError, snapshot.filter_traces, filter1) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 12:58:23 +01:00
										 |  |  |     def test_filter_traces_domain(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         filter1 = tracemalloc.Filter(False, "a.py", domain=1) | 
					
						
							|  |  |  |         filter2 = tracemalloc.Filter(True, "a.py", domain=1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         original_traces = list(snapshot.traces._traces) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # exclude a.py of domain 1 | 
					
						
							|  |  |  |         snapshot3 = snapshot.filter_traces((filter1,)) | 
					
						
							|  |  |  |         self.assertEqual(snapshot3.traces._traces, [ | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (2, 66, (('b.py', 1),)), | 
					
						
							|  |  |  |             (3, 7, (('<unknown>', 0),)), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # include domain 1 | 
					
						
							|  |  |  |         snapshot3 = snapshot.filter_traces((filter1,)) | 
					
						
							|  |  |  |         self.assertEqual(snapshot3.traces._traces, [ | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (2, 66, (('b.py', 1),)), | 
					
						
							|  |  |  |             (3, 7, (('<unknown>', 0),)), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_filter_traces_domain_filter(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         filter1 = tracemalloc.DomainFilter(False, domain=3) | 
					
						
							|  |  |  |         filter2 = tracemalloc.DomainFilter(True, domain=3) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # exclude domain 2 | 
					
						
							|  |  |  |         snapshot3 = snapshot.filter_traces((filter1,)) | 
					
						
							|  |  |  |         self.assertEqual(snapshot3.traces._traces, [ | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (0, 10, (('a.py', 2), ('b.py', 4))), | 
					
						
							|  |  |  |             (1, 2, (('a.py', 5), ('b.py', 4))), | 
					
						
							|  |  |  |             (2, 66, (('b.py', 1),)), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # include domain 2 | 
					
						
							|  |  |  |         snapshot3 = snapshot.filter_traces((filter2,)) | 
					
						
							|  |  |  |         self.assertEqual(snapshot3.traces._traces, [ | 
					
						
							|  |  |  |             (3, 7, (('<unknown>', 0),)), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |     def test_snapshot_group_by_line(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         tb_0 = traceback_lineno('<unknown>', 0) | 
					
						
							|  |  |  |         tb_a_2 = traceback_lineno('a.py', 2) | 
					
						
							|  |  |  |         tb_a_5 = traceback_lineno('a.py', 5) | 
					
						
							|  |  |  |         tb_b_1 = traceback_lineno('b.py', 1) | 
					
						
							|  |  |  |         tb_c_578 = traceback_lineno('c.py', 578) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # stats per file and line | 
					
						
							|  |  |  |         stats1 = snapshot.statistics('lineno') | 
					
						
							|  |  |  |         self.assertEqual(stats1, [ | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_b_1, 66, 1), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_a_2, 30, 3), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_0, 7, 1), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_a_5, 2, 1), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # stats per file and line (2) | 
					
						
							|  |  |  |         stats2 = snapshot2.statistics('lineno') | 
					
						
							|  |  |  |         self.assertEqual(stats2, [ | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_a_5, 5002, 2), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_c_578, 400, 1), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_a_2, 30, 3), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # stats diff per file and line | 
					
						
							|  |  |  |         statistics = snapshot2.compare_to(snapshot, 'lineno') | 
					
						
							|  |  |  |         self.assertEqual(statistics, [ | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_snapshot_group_by_file(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         tb_0 = traceback_filename('<unknown>') | 
					
						
							|  |  |  |         tb_a = traceback_filename('a.py') | 
					
						
							|  |  |  |         tb_b = traceback_filename('b.py') | 
					
						
							|  |  |  |         tb_c = traceback_filename('c.py') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # stats per file | 
					
						
							|  |  |  |         stats1 = snapshot.statistics('filename') | 
					
						
							|  |  |  |         self.assertEqual(stats1, [ | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_b, 66, 1), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_a, 32, 4), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_0, 7, 1), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # stats per file (2) | 
					
						
							|  |  |  |         stats2 = snapshot2.statistics('filename') | 
					
						
							|  |  |  |         self.assertEqual(stats2, [ | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_a, 5032, 5), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_c, 400, 1), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # stats diff per file | 
					
						
							|  |  |  |         diff = snapshot2.compare_to(snapshot, 'filename') | 
					
						
							|  |  |  |         self.assertEqual(diff, [ | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_snapshot_group_by_traceback(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # stats per file | 
					
						
							|  |  |  |         tb1 = traceback(('a.py', 2), ('b.py', 4)) | 
					
						
							|  |  |  |         tb2 = traceback(('a.py', 5), ('b.py', 4)) | 
					
						
							|  |  |  |         tb3 = traceback(('b.py', 1)) | 
					
						
							|  |  |  |         tb4 = traceback(('<unknown>', 0)) | 
					
						
							|  |  |  |         stats1 = snapshot.statistics('traceback') | 
					
						
							|  |  |  |         self.assertEqual(stats1, [ | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb3, 66, 1), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb1, 30, 3), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb4, 7, 1), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb2, 2, 1), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # stats per file (2) | 
					
						
							|  |  |  |         tb5 = traceback(('c.py', 578)) | 
					
						
							|  |  |  |         stats2 = snapshot2.statistics('traceback') | 
					
						
							|  |  |  |         self.assertEqual(stats2, [ | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb2, 5002, 2), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb5, 400, 1), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb1, 30, 3), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # stats diff per file | 
					
						
							|  |  |  |         diff = snapshot2.compare_to(snapshot, 'traceback') | 
					
						
							|  |  |  |         self.assertEqual(diff, [ | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1), | 
					
						
							|  |  |  |             tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertRaises(ValueError, | 
					
						
							|  |  |  |                           snapshot.statistics, 'traceback', cumulative=True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_snapshot_group_by_cumulative(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         tb_0 = traceback_filename('<unknown>') | 
					
						
							|  |  |  |         tb_a = traceback_filename('a.py') | 
					
						
							|  |  |  |         tb_b = traceback_filename('b.py') | 
					
						
							|  |  |  |         tb_a_2 = traceback_lineno('a.py', 2) | 
					
						
							|  |  |  |         tb_a_5 = traceback_lineno('a.py', 5) | 
					
						
							|  |  |  |         tb_b_1 = traceback_lineno('b.py', 1) | 
					
						
							|  |  |  |         tb_b_4 = traceback_lineno('b.py', 4) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # per file | 
					
						
							|  |  |  |         stats = snapshot.statistics('filename', True) | 
					
						
							|  |  |  |         self.assertEqual(stats, [ | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_b, 98, 5), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_a, 32, 4), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_0, 7, 1), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # per line | 
					
						
							|  |  |  |         stats = snapshot.statistics('lineno', True) | 
					
						
							|  |  |  |         self.assertEqual(stats, [ | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_b_1, 66, 1), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_b_4, 32, 4), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_a_2, 30, 3), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_0, 7, 1), | 
					
						
							|  |  |  |             tracemalloc.Statistic(tb_a_5, 2, 1), | 
					
						
							|  |  |  |         ]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_trace_format(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         trace = snapshot.traces[0] | 
					
						
							| 
									
										
										
										
											2017-11-30 00:05:07 +01:00
										 |  |  |         self.assertEqual(str(trace), 'b.py:4: 10 B') | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         traceback = trace.traceback | 
					
						
							| 
									
										
										
										
											2017-11-30 00:05:07 +01:00
										 |  |  |         self.assertEqual(str(traceback), 'b.py:4') | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         frame = traceback[0] | 
					
						
							| 
									
										
										
										
											2017-11-30 00:05:07 +01:00
										 |  |  |         self.assertEqual(str(frame), 'b.py:4') | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_statistic_format(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         stats = snapshot.statistics('lineno') | 
					
						
							|  |  |  |         stat = stats[0] | 
					
						
							|  |  |  |         self.assertEqual(str(stat), | 
					
						
							|  |  |  |                          'b.py:1: size=66 B, count=1, average=66 B') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_statistic_diff_format(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         stats = snapshot2.compare_to(snapshot, 'lineno') | 
					
						
							|  |  |  |         stat = stats[0] | 
					
						
							|  |  |  |         self.assertEqual(str(stat), | 
					
						
							|  |  |  |                          'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-01 04:07:02 +01:00
										 |  |  |     def test_slices(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         self.assertEqual(snapshot.traces[:2], | 
					
						
							|  |  |  |                          (snapshot.traces[0], snapshot.traces[1])) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         traceback = snapshot.traces[0].traceback | 
					
						
							|  |  |  |         self.assertEqual(traceback[:2], | 
					
						
							|  |  |  |                          (traceback[0], traceback[1])) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-16 23:53:38 +01:00
										 |  |  |     def test_format_traceback(self): | 
					
						
							|  |  |  |         snapshot, snapshot2 = create_snapshots() | 
					
						
							|  |  |  |         def getline(filename, lineno): | 
					
						
							|  |  |  |             return '  <%s, %s>' % (filename, lineno) | 
					
						
							|  |  |  |         with unittest.mock.patch('tracemalloc.linecache.getline', | 
					
						
							|  |  |  |                                  side_effect=getline): | 
					
						
							|  |  |  |             tb = snapshot.traces[0].traceback | 
					
						
							|  |  |  |             self.assertEqual(tb.format(), | 
					
						
							| 
									
										
										
										
											2017-11-30 00:05:07 +01:00
										 |  |  |                              ['  File "b.py", line 4', | 
					
						
							|  |  |  |                               '    <b.py, 4>', | 
					
						
							|  |  |  |                               '  File "a.py", line 2', | 
					
						
							|  |  |  |                               '    <a.py, 2>']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertEqual(tb.format(limit=1), | 
					
						
							|  |  |  |                              ['  File "a.py", line 2', | 
					
						
							|  |  |  |                               '    <a.py, 2>']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertEqual(tb.format(limit=-1), | 
					
						
							|  |  |  |                              ['  File "b.py", line 4', | 
					
						
							|  |  |  |                               '    <b.py, 4>']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertEqual(tb.format(most_recent_first=True), | 
					
						
							| 
									
										
										
										
											2014-02-16 23:53:38 +01:00
										 |  |  |                              ['  File "a.py", line 2', | 
					
						
							|  |  |  |                               '    <a.py, 2>', | 
					
						
							|  |  |  |                               '  File "b.py", line 4', | 
					
						
							|  |  |  |                               '    <b.py, 4>']) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-30 00:05:07 +01:00
										 |  |  |             self.assertEqual(tb.format(limit=1, most_recent_first=True), | 
					
						
							| 
									
										
										
										
											2014-02-16 23:53:38 +01:00
										 |  |  |                              ['  File "a.py", line 2', | 
					
						
							|  |  |  |                               '    <a.py, 2>']) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-30 00:05:07 +01:00
										 |  |  |             self.assertEqual(tb.format(limit=-1, most_recent_first=True), | 
					
						
							|  |  |  |                              ['  File "b.py", line 4', | 
					
						
							|  |  |  |                               '    <b.py, 4>']) | 
					
						
							| 
									
										
										
										
											2014-02-16 23:53:38 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | class TestFilters(unittest.TestCase): | 
					
						
							|  |  |  |     maxDiff = 2048 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_filter_attributes(self): | 
					
						
							|  |  |  |         # test default values | 
					
						
							|  |  |  |         f = tracemalloc.Filter(True, "abc") | 
					
						
							|  |  |  |         self.assertEqual(f.inclusive, True) | 
					
						
							|  |  |  |         self.assertEqual(f.filename_pattern, "abc") | 
					
						
							|  |  |  |         self.assertIsNone(f.lineno) | 
					
						
							|  |  |  |         self.assertEqual(f.all_frames, False) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # test custom values | 
					
						
							|  |  |  |         f = tracemalloc.Filter(False, "test.py", 123, True) | 
					
						
							|  |  |  |         self.assertEqual(f.inclusive, False) | 
					
						
							|  |  |  |         self.assertEqual(f.filename_pattern, "test.py") | 
					
						
							|  |  |  |         self.assertEqual(f.lineno, 123) | 
					
						
							|  |  |  |         self.assertEqual(f.all_frames, True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # parameters passed by keyword | 
					
						
							|  |  |  |         f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True) | 
					
						
							|  |  |  |         self.assertEqual(f.inclusive, False) | 
					
						
							|  |  |  |         self.assertEqual(f.filename_pattern, "test.py") | 
					
						
							|  |  |  |         self.assertEqual(f.lineno, 123) | 
					
						
							|  |  |  |         self.assertEqual(f.all_frames, True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # read-only attribute | 
					
						
							|  |  |  |         self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_filter_match(self): | 
					
						
							|  |  |  |         # filter without line number | 
					
						
							|  |  |  |         f = tracemalloc.Filter(True, "abc") | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("abc", 0)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("abc", 5)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("abc", 10)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("12356", 0)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("12356", 5)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("12356", 10)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(False, "abc") | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("abc", 0)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("abc", 5)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("abc", 10)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("12356", 0)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("12356", 5)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("12356", 10)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # filter with line number > 0 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(True, "abc", 5) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("abc", 0)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("abc", 5)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("abc", 10)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("12356", 0)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("12356", 5)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("12356", 10)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(False, "abc", 5) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("abc", 0)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("abc", 5)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("abc", 10)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("12356", 0)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("12356", 5)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("12356", 10)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # filter with line number 0 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(True, "abc", 0) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("abc", 0)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("abc", 5)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("abc", 10)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("12356", 0)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("12356", 5)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("12356", 10)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(False, "abc", 0) | 
					
						
							|  |  |  |         self.assertFalse(f._match_frame("abc", 0)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("abc", 5)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("abc", 10)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("12356", 0)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("12356", 5)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_frame("12356", 10)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_filter_match_filename(self): | 
					
						
							|  |  |  |         def fnmatch(inclusive, filename, pattern): | 
					
						
							|  |  |  |             f = tracemalloc.Filter(inclusive, pattern) | 
					
						
							|  |  |  |             return f._match_frame(filename, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertTrue(fnmatch(True, "abc", "abc")) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch(True, "12356", "abc")) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch(True, "<unknown>", "abc")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertFalse(fnmatch(False, "abc", "abc")) | 
					
						
							|  |  |  |         self.assertTrue(fnmatch(False, "12356", "abc")) | 
					
						
							|  |  |  |         self.assertTrue(fnmatch(False, "<unknown>", "abc")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_filter_match_filename_joker(self): | 
					
						
							|  |  |  |         def fnmatch(filename, pattern): | 
					
						
							|  |  |  |             filter = tracemalloc.Filter(True, pattern) | 
					
						
							|  |  |  |             return filter._match_frame(filename, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # empty string | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('abc', '')) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('', 'abc')) | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('', '')) | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('', '*')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # no * | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('abc', 'abc')) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('abc', 'abcd')) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('abc', 'def')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # a* | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('abc', 'a*')) | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('abc', 'abc*')) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('abc', 'b*')) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('abc', 'abcd*')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # a*b | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('abc', 'a*c')) | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('abcdcx', 'a*cx')) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('abb', 'a*c')) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('abcdce', 'a*cx')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # a*b*c | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('abcde', 'a*c*e')) | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg')) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('abcdd', 'a*c*e')) | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg')) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-04-13 14:21:02 -04:00
										 |  |  |         # replace .pyc suffix with .py | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         self.assertTrue(fnmatch('a.pyc', 'a.py')) | 
					
						
							|  |  |  |         self.assertTrue(fnmatch('a.py', 'a.pyc')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if os.name == 'nt': | 
					
						
							|  |  |  |             # case insensitive | 
					
						
							|  |  |  |             self.assertTrue(fnmatch('aBC', 'ABc')) | 
					
						
							|  |  |  |             self.assertTrue(fnmatch('aBcDe', 'Ab*dE')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertTrue(fnmatch('a.pyc', 'a.PY')) | 
					
						
							|  |  |  |             self.assertTrue(fnmatch('a.py', 'a.PYC')) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             # case sensitive | 
					
						
							|  |  |  |             self.assertFalse(fnmatch('aBC', 'ABc')) | 
					
						
							|  |  |  |             self.assertFalse(fnmatch('aBcDe', 'Ab*dE')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertFalse(fnmatch('a.pyc', 'a.PY')) | 
					
						
							|  |  |  |             self.assertFalse(fnmatch('a.py', 'a.PYC')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if os.name == 'nt': | 
					
						
							|  |  |  |             # normalize alternate separator "/" to the standard separator "\" | 
					
						
							|  |  |  |             self.assertTrue(fnmatch(r'a/b', r'a\b')) | 
					
						
							|  |  |  |             self.assertTrue(fnmatch(r'a\b', r'a/b')) | 
					
						
							|  |  |  |             self.assertTrue(fnmatch(r'a/b\c', r'a\b/c')) | 
					
						
							|  |  |  |             self.assertTrue(fnmatch(r'a/b/c', r'a\b\c')) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             # there is no alternate separator | 
					
						
							|  |  |  |             self.assertFalse(fnmatch(r'a/b', r'a\b')) | 
					
						
							|  |  |  |             self.assertFalse(fnmatch(r'a\b', r'a/b')) | 
					
						
							|  |  |  |             self.assertFalse(fnmatch(r'a/b\c', r'a\b/c')) | 
					
						
							|  |  |  |             self.assertFalse(fnmatch(r'a/b/c', r'a\b\c')) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-05-13 00:27:01 -05:00
										 |  |  |         # as of 3.5, .pyo is no longer munged to .py | 
					
						
							|  |  |  |         self.assertFalse(fnmatch('a.pyo', 'a.py')) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |     def test_filter_match_trace(self): | 
					
						
							|  |  |  |         t1 = (("a.py", 2), ("b.py", 3)) | 
					
						
							|  |  |  |         t2 = (("b.py", 4), ("b.py", 5)) | 
					
						
							|  |  |  |         t3 = (("c.py", 5), ('<unknown>', 0)) | 
					
						
							|  |  |  |         unknown = (('<unknown>', 0),) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(True, "b.py", all_frames=True) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t1)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t2)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(t3)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(unknown)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(True, "b.py", all_frames=False) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(t1)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t2)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(t3)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(unknown)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(False, "b.py", all_frames=True) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(t1)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(t2)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t3)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(unknown)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(False, "b.py", all_frames=False) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t1)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(t2)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t3)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(unknown)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(False, "<unknown>", all_frames=False) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t1)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t2)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t3)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(unknown)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(True, "<unknown>", all_frames=True) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(t1)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(t2)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t3)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(unknown)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         f = tracemalloc.Filter(False, "<unknown>", all_frames=True) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t1)) | 
					
						
							|  |  |  |         self.assertTrue(f._match_traceback(t2)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(t3)) | 
					
						
							|  |  |  |         self.assertFalse(f._match_traceback(unknown)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestCommandLine(unittest.TestCase): | 
					
						
							| 
									
										
										
										
											2015-01-22 14:38:00 -08:00
										 |  |  |     def test_env_var_disabled_by_default(self): | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         # not tracing by default | 
					
						
							|  |  |  |         code = 'import tracemalloc; print(tracemalloc.is_tracing())' | 
					
						
							|  |  |  |         ok, stdout, stderr = assert_python_ok('-c', code) | 
					
						
							|  |  |  |         stdout = stdout.rstrip() | 
					
						
							|  |  |  |         self.assertEqual(stdout, b'False') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-05-06 06:33:17 +03:00
										 |  |  |     @unittest.skipIf(interpreter_requires_environment(), | 
					
						
							| 
									
										
										
										
											2015-02-04 00:59:40 -08:00
										 |  |  |                      'Cannot run -E tests when PYTHON env vars are required.') | 
					
						
							| 
									
										
										
										
											2015-01-22 14:38:00 -08:00
										 |  |  |     def test_env_var_ignored_with_E(self): | 
					
						
							|  |  |  |         """PYTHON* environment variables must be ignored when -E is present.""" | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         code = 'import tracemalloc; print(tracemalloc.is_tracing())' | 
					
						
							|  |  |  |         ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1') | 
					
						
							|  |  |  |         stdout = stdout.rstrip() | 
					
						
							|  |  |  |         self.assertEqual(stdout, b'False') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-07-25 19:23:53 +02:00
										 |  |  |     def test_env_var_disabled(self): | 
					
						
							|  |  |  |         # tracing at startup | 
					
						
							|  |  |  |         code = 'import tracemalloc; print(tracemalloc.is_tracing())' | 
					
						
							|  |  |  |         ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0') | 
					
						
							|  |  |  |         stdout = stdout.rstrip() | 
					
						
							|  |  |  |         self.assertEqual(stdout, b'False') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-22 14:38:00 -08:00
										 |  |  |     def test_env_var_enabled_at_startup(self): | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         # tracing at startup | 
					
						
							|  |  |  |         code = 'import tracemalloc; print(tracemalloc.is_tracing())' | 
					
						
							|  |  |  |         ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1') | 
					
						
							|  |  |  |         stdout = stdout.rstrip() | 
					
						
							|  |  |  |         self.assertEqual(stdout, b'True') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-22 14:38:00 -08:00
										 |  |  |     def test_env_limit(self): | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |         # start and set the number of frames | 
					
						
							|  |  |  |         code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' | 
					
						
							|  |  |  |         ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10') | 
					
						
							|  |  |  |         stdout = stdout.rstrip() | 
					
						
							|  |  |  |         self.assertEqual(stdout, b'10') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-15 18:11:45 -08:00
										 |  |  |     def check_env_var_invalid(self, nframe): | 
					
						
							|  |  |  |         with support.SuppressCrashReport(): | 
					
						
							|  |  |  |             ok, stdout, stderr = assert_python_failure( | 
					
						
							|  |  |  |                 '-c', 'pass', | 
					
						
							|  |  |  |                 PYTHONTRACEMALLOC=str(nframe)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if b'ValueError: the number of frames must be in range' in stderr: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         self.fail(f"unexpeced output: {stderr!a}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |     def test_env_var_invalid(self): | 
					
						
							| 
									
										
										
										
											2018-07-25 19:23:53 +02:00
										 |  |  |         for nframe in INVALID_NFRAME: | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |             with self.subTest(nframe=nframe): | 
					
						
							| 
									
										
										
										
											2017-11-15 18:11:45 -08:00
										 |  |  |                 self.check_env_var_invalid(nframe) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_sys_xoptions(self): | 
					
						
							|  |  |  |         for xoptions, nframe in ( | 
					
						
							|  |  |  |             ('tracemalloc', 1), | 
					
						
							|  |  |  |             ('tracemalloc=1', 1), | 
					
						
							|  |  |  |             ('tracemalloc=15', 15), | 
					
						
							|  |  |  |         ): | 
					
						
							|  |  |  |             with self.subTest(xoptions=xoptions, nframe=nframe): | 
					
						
							|  |  |  |                 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' | 
					
						
							|  |  |  |                 ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code) | 
					
						
							|  |  |  |                 stdout = stdout.rstrip() | 
					
						
							|  |  |  |                 self.assertEqual(stdout, str(nframe).encode('ascii')) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-15 18:11:45 -08:00
										 |  |  |     def check_sys_xoptions_invalid(self, nframe): | 
					
						
							|  |  |  |         args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass') | 
					
						
							|  |  |  |         with support.SuppressCrashReport(): | 
					
						
							|  |  |  |             ok, stdout, stderr = assert_python_failure(*args) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if b'ValueError: the number of frames must be in range' in stderr: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         self.fail(f"unexpeced output: {stderr!a}") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |     def test_sys_xoptions_invalid(self): | 
					
						
							| 
									
										
										
										
											2018-07-25 19:23:53 +02:00
										 |  |  |         for nframe in INVALID_NFRAME: | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |             with self.subTest(nframe=nframe): | 
					
						
							| 
									
										
										
										
											2017-11-15 18:11:45 -08:00
										 |  |  |                 self.check_sys_xoptions_invalid(nframe) | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-19 20:20:10 +02:00
										 |  |  |     @unittest.skipIf(_testcapi is None, 'need _testcapi') | 
					
						
							| 
									
										
										
										
											2014-06-02 21:36:59 +02:00
										 |  |  |     def test_pymem_alloc0(self): | 
					
						
							|  |  |  |         # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled | 
					
						
							|  |  |  |         # does not crash. | 
					
						
							|  |  |  |         code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1' | 
					
						
							|  |  |  |         assert_python_ok('-X', 'tracemalloc', '-c', code) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-22 13:39:05 +01:00
										 |  |  | @unittest.skipIf(_testcapi is None, 'need _testcapi') | 
					
						
							|  |  |  | class TestCAPI(unittest.TestCase): | 
					
						
							|  |  |  |     maxDiff = 80 * 20 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         if tracemalloc.is_tracing(): | 
					
						
							|  |  |  |             self.skipTest("tracemalloc must be stopped before the test") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.domain = 5 | 
					
						
							|  |  |  |         self.size = 123 | 
					
						
							|  |  |  |         self.obj = allocate_bytes(self.size)[0] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # for the type "object", id(obj) is the address of its memory block. | 
					
						
							|  |  |  |         # This type is not tracked by the garbage collector | 
					
						
							|  |  |  |         self.ptr = id(self.obj) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_traceback(self): | 
					
						
							|  |  |  |         frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr) | 
					
						
							|  |  |  |         if frames is not None: | 
					
						
							|  |  |  |             return tracemalloc.Traceback(frames) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def track(self, release_gil=False, nframe=1): | 
					
						
							| 
									
										
										
										
											2018-09-17 15:17:29 +03:00
										 |  |  |         frames = get_frames(nframe, 1) | 
					
						
							| 
									
										
										
										
											2016-03-22 13:39:05 +01:00
										 |  |  |         _testcapi.tracemalloc_track(self.domain, self.ptr, self.size, | 
					
						
							|  |  |  |                                     release_gil) | 
					
						
							|  |  |  |         return frames | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def untrack(self): | 
					
						
							|  |  |  |         _testcapi.tracemalloc_untrack(self.domain, self.ptr) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_traced_memory(self): | 
					
						
							|  |  |  |         # Get the traced size in the domain | 
					
						
							|  |  |  |         snapshot = tracemalloc.take_snapshot() | 
					
						
							|  |  |  |         domain_filter = tracemalloc.DomainFilter(True, self.domain) | 
					
						
							|  |  |  |         snapshot = snapshot.filter_traces([domain_filter]) | 
					
						
							|  |  |  |         return sum(trace.size for trace in snapshot.traces) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def check_track(self, release_gil): | 
					
						
							|  |  |  |         nframe = 5 | 
					
						
							|  |  |  |         tracemalloc.start(nframe) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         size = tracemalloc.get_traced_memory()[0] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         frames = self.track(release_gil, nframe) | 
					
						
							|  |  |  |         self.assertEqual(self.get_traceback(), | 
					
						
							|  |  |  |                          tracemalloc.Traceback(frames)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(self.get_traced_memory(), self.size) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_track(self): | 
					
						
							|  |  |  |         self.check_track(False) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_track_without_gil(self): | 
					
						
							|  |  |  |         # check that calling _PyTraceMalloc_Track() without holding the GIL | 
					
						
							|  |  |  |         # works too | 
					
						
							|  |  |  |         self.check_track(True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_track_already_tracked(self): | 
					
						
							|  |  |  |         nframe = 5 | 
					
						
							|  |  |  |         tracemalloc.start(nframe) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # track a first time | 
					
						
							|  |  |  |         self.track() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # calling _PyTraceMalloc_Track() must remove the old trace and add | 
					
						
							|  |  |  |         # a new trace with the new traceback | 
					
						
							|  |  |  |         frames = self.track(nframe=nframe) | 
					
						
							|  |  |  |         self.assertEqual(self.get_traceback(), | 
					
						
							|  |  |  |                          tracemalloc.Traceback(frames)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_untrack(self): | 
					
						
							|  |  |  |         tracemalloc.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.track() | 
					
						
							|  |  |  |         self.assertIsNotNone(self.get_traceback()) | 
					
						
							|  |  |  |         self.assertEqual(self.get_traced_memory(), self.size) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # untrack must remove the trace | 
					
						
							|  |  |  |         self.untrack() | 
					
						
							|  |  |  |         self.assertIsNone(self.get_traceback()) | 
					
						
							|  |  |  |         self.assertEqual(self.get_traced_memory(), 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # calling _PyTraceMalloc_Untrack() multiple times must not crash | 
					
						
							|  |  |  |         self.untrack() | 
					
						
							|  |  |  |         self.untrack() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_stop_track(self): | 
					
						
							|  |  |  |         tracemalloc.start() | 
					
						
							|  |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.assertRaises(RuntimeError): | 
					
						
							|  |  |  |             self.track() | 
					
						
							|  |  |  |         self.assertIsNone(self.get_traceback()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_stop_untrack(self): | 
					
						
							|  |  |  |         tracemalloc.start() | 
					
						
							|  |  |  |         self.track() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         tracemalloc.stop() | 
					
						
							|  |  |  |         with self.assertRaises(RuntimeError): | 
					
						
							|  |  |  |             self.untrack() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  | def test_main(): | 
					
						
							|  |  |  |     support.run_unittest( | 
					
						
							|  |  |  |         TestTracemallocEnabled, | 
					
						
							|  |  |  |         TestSnapshot, | 
					
						
							|  |  |  |         TestFilters, | 
					
						
							|  |  |  |         TestCommandLine, | 
					
						
							| 
									
										
										
										
											2016-03-22 13:39:05 +01:00
										 |  |  |         TestCAPI, | 
					
						
							| 
									
										
										
										
											2013-11-23 12:27:24 +01:00
										 |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     test_main() |