asyncio, Tulip issue 137: In debug mode, save traceback where Future, Task and

Handle objects are created. Pass the traceback to call_exception_handler() in
the 'source_traceback' key.

The traceback is truncated to hide internal calls in asyncio, show only the
traceback from user code.

Add tests for the new source_traceback, and a test for the 'Future/Task
exception was never retrieved' log.
This commit is contained in:
Victor Stinner 2014-06-27 13:52:20 +02:00
parent bbd96c6f47
commit 80f53aa9a0
8 changed files with 180 additions and 26 deletions

View file

@ -21,6 +21,7 @@
import logging import logging
import socket import socket
import subprocess import subprocess
import traceback
import time import time
import os import os
import sys import sys
@ -290,7 +291,10 @@ def call_later(self, delay, callback, *args):
Any positional arguments after the callback will be passed to Any positional arguments after the callback will be passed to
the callback when it is called. the callback when it is called.
""" """
return self.call_at(self.time() + delay, callback, *args) timer = self.call_at(self.time() + delay, callback, *args)
if timer._source_traceback:
del timer._source_traceback[-1]
return timer
def call_at(self, when, callback, *args): def call_at(self, when, callback, *args):
"""Like call_later(), but uses an absolute time.""" """Like call_later(), but uses an absolute time."""
@ -299,6 +303,8 @@ def call_at(self, when, callback, *args):
if self._debug: if self._debug:
self._assert_is_current_event_loop() self._assert_is_current_event_loop()
timer = events.TimerHandle(when, callback, args, self) timer = events.TimerHandle(when, callback, args, self)
if timer._source_traceback:
del timer._source_traceback[-1]
heapq.heappush(self._scheduled, timer) heapq.heappush(self._scheduled, timer)
return timer return timer
@ -312,7 +318,10 @@ def call_soon(self, callback, *args):
Any positional arguments after the callback will be passed to Any positional arguments after the callback will be passed to
the callback when it is called. the callback when it is called.
""" """
return self._call_soon(callback, args, check_loop=True) handle = self._call_soon(callback, args, check_loop=True)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
def _call_soon(self, callback, args, check_loop): def _call_soon(self, callback, args, check_loop):
if tasks.iscoroutinefunction(callback): if tasks.iscoroutinefunction(callback):
@ -320,6 +329,8 @@ def _call_soon(self, callback, args, check_loop):
if self._debug and check_loop: if self._debug and check_loop:
self._assert_is_current_event_loop() self._assert_is_current_event_loop()
handle = events.Handle(callback, args, self) handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle) self._ready.append(handle)
return handle return handle
@ -344,6 +355,8 @@ def _assert_is_current_event_loop(self):
def call_soon_threadsafe(self, callback, *args): def call_soon_threadsafe(self, callback, *args):
"""Like call_soon(), but thread safe.""" """Like call_soon(), but thread safe."""
handle = self._call_soon(callback, args, check_loop=False) handle = self._call_soon(callback, args, check_loop=False)
if handle._source_traceback:
del handle._source_traceback[-1]
self._write_to_self() self._write_to_self()
return handle return handle
@ -757,7 +770,14 @@ def default_exception_handler(self, context):
for key in sorted(context): for key in sorted(context):
if key in {'message', 'exception'}: if key in {'message', 'exception'}:
continue continue
log_lines.append('{}: {!r}'.format(key, context[key])) value = context[key]
if key == 'source_traceback':
tb = ''.join(traceback.format_list(value))
value = 'Object created at (most recent call last):\n'
value += tb.rstrip()
else:
value = repr(value)
log_lines.append('{}: {}'.format(key, value))
logger.error('\n'.join(log_lines), exc_info=exc_info) logger.error('\n'.join(log_lines), exc_info=exc_info)

View file

@ -11,6 +11,7 @@
import functools import functools
import inspect import inspect
import subprocess import subprocess
import traceback
import threading import threading
import socket import socket
import sys import sys
@ -66,7 +67,8 @@ def _format_callback(func, args, suffix=''):
class Handle: class Handle:
"""Object returned by callback registration methods.""" """Object returned by callback registration methods."""
__slots__ = ['_callback', '_args', '_cancelled', '_loop', '__weakref__'] __slots__ = ('_callback', '_args', '_cancelled', '_loop',
'_source_traceback', '__weakref__')
def __init__(self, callback, args, loop): def __init__(self, callback, args, loop):
assert not isinstance(callback, Handle), 'A Handle is not a callback' assert not isinstance(callback, Handle), 'A Handle is not a callback'
@ -74,6 +76,10 @@ def __init__(self, callback, args, loop):
self._callback = callback self._callback = callback
self._args = args self._args = args
self._cancelled = False self._cancelled = False
if self._loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
else:
self._source_traceback = None
def __repr__(self): def __repr__(self):
info = [] info = []
@ -91,11 +97,14 @@ def _run(self):
except Exception as exc: except Exception as exc:
cb = _format_callback(self._callback, self._args) cb = _format_callback(self._callback, self._args)
msg = 'Exception in callback {}'.format(cb) msg = 'Exception in callback {}'.format(cb)
self._loop.call_exception_handler({ context = {
'message': msg, 'message': msg,
'exception': exc, 'exception': exc,
'handle': self, 'handle': self,
}) }
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
self = None # Needed to break cycles when an exception occurs. self = None # Needed to break cycles when an exception occurs.
@ -107,7 +116,8 @@ class TimerHandle(Handle):
def __init__(self, when, callback, args, loop): def __init__(self, when, callback, args, loop):
assert when is not None assert when is not None
super().__init__(callback, args, loop) super().__init__(callback, args, loop)
if self._source_traceback:
del self._source_traceback[-1]
self._when = when self._when = when
def __repr__(self): def __repr__(self):

View file

@ -82,10 +82,11 @@ class itself, but instead to have a reference to a helper object
in a discussion about closing files when they are collected. in a discussion about closing files when they are collected.
""" """
__slots__ = ['exc', 'tb', 'loop'] __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
def __init__(self, exc, loop): def __init__(self, future, exc):
self.loop = loop self.loop = future._loop
self.source_traceback = future._source_traceback
self.exc = exc self.exc = exc
self.tb = None self.tb = None
@ -102,11 +103,12 @@ def clear(self):
def __del__(self): def __del__(self):
if self.tb: if self.tb:
msg = 'Future/Task exception was never retrieved:\n{tb}' msg = 'Future/Task exception was never retrieved'
context = { if self.source_traceback:
'message': msg.format(tb=''.join(self.tb)), msg += '\nFuture/Task created at (most recent call last):\n'
} msg += ''.join(traceback.format_list(self.source_traceback))
self.loop.call_exception_handler(context) msg += ''.join(self.tb).rstrip()
self.loop.call_exception_handler({'message': msg})
class Future: class Future:
@ -149,6 +151,10 @@ def __init__(self, *, loop=None):
else: else:
self._loop = loop self._loop = loop
self._callbacks = [] self._callbacks = []
if self._loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
else:
self._source_traceback = None
def _format_callbacks(self): def _format_callbacks(self):
cb = self._callbacks cb = self._callbacks
@ -196,10 +202,13 @@ def __del__(self):
return return
exc = self._exception exc = self._exception
context = { context = {
'message': 'Future/Task exception was never retrieved', 'message': ('%s exception was never retrieved'
% self.__class__.__name__),
'exception': exc, 'exception': exc,
'future': self, 'future': self,
} }
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context) self._loop.call_exception_handler(context)
def cancel(self): def cancel(self):
@ -335,7 +344,7 @@ def set_exception(self, exception):
if _PY34: if _PY34:
self._log_traceback = True self._log_traceback = True
else: else:
self._tb_logger = _TracebackLogger(exception, self._loop) self._tb_logger = _TracebackLogger(self, exception)
# Arrange for the logger to be activated after all callbacks # Arrange for the logger to be activated after all callbacks
# have had a chance to call result() or exception(). # have had a chance to call result() or exception().
self._loop.call_soon(self._tb_logger.activate) self._loop.call_soon(self._tb_logger.activate)

View file

@ -195,6 +195,8 @@ def all_tasks(cls, loop=None):
def __init__(self, coro, *, loop=None): def __init__(self, coro, *, loop=None):
assert iscoroutine(coro), repr(coro) # Not a coroutine function! assert iscoroutine(coro), repr(coro) # Not a coroutine function!
super().__init__(loop=loop) super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
self._coro = iter(coro) # Use the iterator just in case. self._coro = iter(coro) # Use the iterator just in case.
self._fut_waiter = None self._fut_waiter = None
self._must_cancel = False self._must_cancel = False
@ -207,10 +209,13 @@ def __init__(self, coro, *, loop=None):
if _PY34: if _PY34:
def __del__(self): def __del__(self):
if self._state == futures._PENDING: if self._state == futures._PENDING:
self._loop.call_exception_handler({ context = {
'task': self, 'task': self,
'message': 'Task was destroyed but it is pending!', 'message': 'Task was destroyed but it is pending!',
}) }
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
futures.Future.__del__(self) futures.Future.__del__(self)
def __repr__(self): def __repr__(self):
@ -620,7 +625,10 @@ def async(coro_or_future, *, loop=None):
raise ValueError('loop argument must agree with Future') raise ValueError('loop argument must agree with Future')
return coro_or_future return coro_or_future
elif iscoroutine(coro_or_future): elif iscoroutine(coro_or_future):
return Task(coro_or_future, loop=loop) task = Task(coro_or_future, loop=loop)
if task._source_traceback:
del task._source_traceback[-1]
return task
else: else:
raise TypeError('A Future or coroutine is required') raise TypeError('A Future or coroutine is required')

View file

@ -406,19 +406,22 @@ def zero_error():
1/0 1/0
def run_loop(): def run_loop():
self.loop.call_soon(zero_error) handle = self.loop.call_soon(zero_error)
self.loop._run_once() self.loop._run_once()
return handle
self.loop.set_debug(True)
self.loop._process_events = mock.Mock() self.loop._process_events = mock.Mock()
mock_handler = mock.Mock() mock_handler = mock.Mock()
self.loop.set_exception_handler(mock_handler) self.loop.set_exception_handler(mock_handler)
run_loop() handle = run_loop()
mock_handler.assert_called_with(self.loop, { mock_handler.assert_called_with(self.loop, {
'exception': MOCK_ANY, 'exception': MOCK_ANY,
'message': test_utils.MockPattern( 'message': test_utils.MockPattern(
'Exception in callback.*zero_error'), 'Exception in callback.*zero_error'),
'handle': MOCK_ANY, 'handle': handle,
'source_traceback': handle._source_traceback,
}) })
mock_handler.reset_mock() mock_handler.reset_mock()

View file

@ -1751,10 +1751,11 @@ def noop(*args):
pass pass
class HandleTests(unittest.TestCase): class HandleTests(test_utils.TestCase):
def setUp(self): def setUp(self):
self.loop = None self.loop = mock.Mock()
self.loop.get_debug.return_value = True
def test_handle(self): def test_handle(self):
def callback(*args): def callback(*args):
@ -1789,7 +1790,8 @@ def callback():
self.loop.call_exception_handler.assert_called_with({ self.loop.call_exception_handler.assert_called_with({
'message': test_utils.MockPattern('Exception in callback.*'), 'message': test_utils.MockPattern('Exception in callback.*'),
'exception': mock.ANY, 'exception': mock.ANY,
'handle': h 'handle': h,
'source_traceback': h._source_traceback,
}) })
def test_handle_weakref(self): def test_handle_weakref(self):
@ -1837,6 +1839,35 @@ def test_handle_repr(self):
% (cb_regex, re.escape(filename), lineno)) % (cb_regex, re.escape(filename), lineno))
self.assertRegex(repr(h), regex) self.assertRegex(repr(h), regex)
def test_handle_source_traceback(self):
loop = asyncio.get_event_loop_policy().new_event_loop()
loop.set_debug(True)
self.set_event_loop(loop)
def check_source_traceback(h):
lineno = sys._getframe(1).f_lineno - 1
self.assertIsInstance(h._source_traceback, list)
self.assertEqual(h._source_traceback[-1][:3],
(__file__,
lineno,
'test_handle_source_traceback'))
# call_soon
h = loop.call_soon(noop)
check_source_traceback(h)
# call_soon_threadsafe
h = loop.call_soon_threadsafe(noop)
check_source_traceback(h)
# call_later
h = loop.call_later(0, noop)
check_source_traceback(h)
# call_at
h = loop.call_later(0, noop)
check_source_traceback(h)
class TimerTests(unittest.TestCase): class TimerTests(unittest.TestCase):

View file

@ -2,8 +2,10 @@
import concurrent.futures import concurrent.futures
import re import re
import sys
import threading import threading
import unittest import unittest
from test import support
from unittest import mock from unittest import mock
import asyncio import asyncio
@ -284,6 +286,63 @@ def test_wrap_future_cancel2(self):
self.assertEqual(f1.result(), 42) self.assertEqual(f1.result(), 42)
self.assertTrue(f2.cancelled()) self.assertTrue(f2.cancelled())
def test_future_source_traceback(self):
self.loop.set_debug(True)
future = asyncio.Future(loop=self.loop)
lineno = sys._getframe().f_lineno - 1
self.assertIsInstance(future._source_traceback, list)
self.assertEqual(future._source_traceback[-1][:3],
(__file__,
lineno,
'test_future_source_traceback'))
@mock.patch('asyncio.base_events.logger')
def test_future_exception_never_retrieved(self, m_log):
self.loop.set_debug(True)
def memroy_error():
try:
raise MemoryError()
except BaseException as exc:
return exc
exc = memroy_error()
future = asyncio.Future(loop=self.loop)
source_traceback = future._source_traceback
future.set_exception(exc)
future = None
test_utils.run_briefly(self.loop)
support.gc_collect()
if sys.version_info >= (3, 4):
frame = source_traceback[-1]
regex = (r'^Future exception was never retrieved\n'
r'future: <Future finished exception=MemoryError\(\)>\n'
r'source_traceback: Object created at \(most recent call last\):\n'
r' File'
r'.*\n'
r' File "%s", line %s, in test_future_exception_never_retrieved\n'
r' future = asyncio\.Future\(loop=self\.loop\)$'
% (frame[0], frame[1]))
exc_info = (type(exc), exc, exc.__traceback__)
m_log.error.assert_called_once_with(mock.ANY, exc_info=exc_info)
else:
frame = source_traceback[-1]
regex = (r'^Future/Task exception was never retrieved\n'
r'Future/Task created at \(most recent call last\):\n'
r' File'
r'.*\n'
r' File "%s", line %s, in test_future_exception_never_retrieved\n'
r' future = asyncio\.Future\(loop=self\.loop\)\n'
r'Traceback \(most recent call last\):\n'
r'.*\n'
r'MemoryError$'
% (frame[0], frame[1]))
m_log.error.assert_called_once_with(mock.ANY, exc_info=False)
message = m_log.error.call_args[0][0]
self.assertRegex(message, re.compile(regex, re.DOTALL))
class FutureDoneCallbackTests(test_utils.TestCase): class FutureDoneCallbackTests(test_utils.TestCase):

View file

@ -1546,6 +1546,7 @@ def kill_me(loop):
raise Exception("code never reached") raise Exception("code never reached")
mock_handler = mock.Mock() mock_handler = mock.Mock()
self.loop.set_debug(True)
self.loop.set_exception_handler(mock_handler) self.loop.set_exception_handler(mock_handler)
# schedule the task # schedule the task
@ -1560,6 +1561,7 @@ def kill_me(loop):
# remove the future used in kill_me(), and references to the task # remove the future used in kill_me(), and references to the task
del coro.gi_frame.f_locals['future'] del coro.gi_frame.f_locals['future']
coro = None coro = None
source_traceback = task._source_traceback
task = None task = None
# no more reference to kill_me() task: the task is destroyed by the GC # no more reference to kill_me() task: the task is destroyed by the GC
@ -1570,6 +1572,7 @@ def kill_me(loop):
mock_handler.assert_called_with(self.loop, { mock_handler.assert_called_with(self.loop, {
'message': 'Task was destroyed but it is pending!', 'message': 'Task was destroyed but it is pending!',
'task': mock.ANY, 'task': mock.ANY,
'source_traceback': source_traceback,
}) })
mock_handler.reset_mock() mock_handler.reset_mock()
@ -1604,6 +1607,17 @@ def coro_noop():
self.assertRegex(message, re.compile(regex, re.DOTALL)) self.assertRegex(message, re.compile(regex, re.DOTALL))
def test_task_source_traceback(self):
self.loop.set_debug(True)
task = asyncio.Task(coroutine_function(), loop=self.loop)
lineno = sys._getframe().f_lineno - 1
self.assertIsInstance(task._source_traceback, list)
self.assertEqual(task._source_traceback[-1][:3],
(__file__,
lineno,
'test_task_source_traceback'))
class GatherTestsBase: class GatherTestsBase: