mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			199 lines
		
	
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			199 lines
		
	
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| __all__ = ['coroutine',
 | |
|            'iscoroutinefunction', 'iscoroutine']
 | |
| 
 | |
| import functools
 | |
| import inspect
 | |
| import opcode
 | |
| import os
 | |
| import sys
 | |
| import traceback
 | |
| import types
 | |
| 
 | |
| from . import events
 | |
| from . import futures
 | |
| from .log import logger
 | |
| 
 | |
| 
 | |
| # Opcode of "yield from" instruction
 | |
| _YIELD_FROM = opcode.opmap['YIELD_FROM']
 | |
| 
 | |
| # If you set _DEBUG to true, @coroutine will wrap the resulting
 | |
| # generator objects in a CoroWrapper instance (defined below).  That
 | |
| # instance will log a message when the generator is never iterated
 | |
| # over, which may happen when you forget to use "yield from" with a
 | |
| # coroutine call.  Note that the value of the _DEBUG flag is taken
 | |
| # when the decorator is used, so to be of any use it must be set
 | |
| # before you define your coroutines.  A downside of using this feature
 | |
| # is that tracebacks show entries for the CoroWrapper.__next__ method
 | |
| # when _DEBUG is true.
 | |
| _DEBUG = (not sys.flags.ignore_environment
 | |
|           and bool(os.environ.get('PYTHONASYNCIODEBUG')))
 | |
| 
 | |
| 
 | |
| # Check for CPython issue #21209
 | |
| def has_yield_from_bug():
 | |
|     class MyGen:
 | |
|         def __init__(self):
 | |
|             self.send_args = None
 | |
|         def __iter__(self):
 | |
|             return self
 | |
|         def __next__(self):
 | |
|             return 42
 | |
|         def send(self, *what):
 | |
|             self.send_args = what
 | |
|             return None
 | |
|     def yield_from_gen(gen):
 | |
|         yield from gen
 | |
|     value = (1, 2, 3)
 | |
|     gen = MyGen()
 | |
|     coro = yield_from_gen(gen)
 | |
|     next(coro)
 | |
|     coro.send(value)
 | |
|     return gen.send_args != (value,)
 | |
| _YIELD_FROM_BUG = has_yield_from_bug()
 | |
| del has_yield_from_bug
 | |
| 
 | |
| 
 | |
| class CoroWrapper:
 | |
|     # Wrapper for coroutine object in _DEBUG mode.
 | |
| 
 | |
|     def __init__(self, gen, func):
 | |
|         assert inspect.isgenerator(gen), gen
 | |
|         self.gen = gen
 | |
|         self.func = func
 | |
|         self._source_traceback = traceback.extract_stack(sys._getframe(1))
 | |
|         # __name__, __qualname__, __doc__ attributes are set by the coroutine()
 | |
|         # decorator
 | |
| 
 | |
|     def __repr__(self):
 | |
|         coro_repr = _format_coroutine(self)
 | |
|         if self._source_traceback:
 | |
|             frame = self._source_traceback[-1]
 | |
|             coro_repr += ', created at %s:%s' % (frame[0], frame[1])
 | |
|         return '<%s %s>' % (self.__class__.__name__, coro_repr)
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __next__(self):
 | |
|         return next(self.gen)
 | |
| 
 | |
|     if _YIELD_FROM_BUG:
 | |
|         # For for CPython issue #21209: using "yield from" and a custom
 | |
|         # generator, generator.send(tuple) unpacks the tuple instead of passing
 | |
|         # the tuple unchanged. Check if the caller is a generator using "yield
 | |
|         # from" to decide if the parameter should be unpacked or not.
 | |
|         def send(self, *value):
 | |
|             frame = sys._getframe()
 | |
|             caller = frame.f_back
 | |
|             assert caller.f_lasti >= 0
 | |
|             if caller.f_code.co_code[caller.f_lasti] != _YIELD_FROM:
 | |
|                 value = value[0]
 | |
|             return self.gen.send(value)
 | |
|     else:
 | |
|         def send(self, value):
 | |
|             return self.gen.send(value)
 | |
| 
 | |
|     def throw(self, exc):
 | |
|         return self.gen.throw(exc)
 | |
| 
 | |
|     def close(self):
 | |
|         return self.gen.close()
 | |
| 
 | |
|     @property
 | |
|     def gi_frame(self):
 | |
|         return self.gen.gi_frame
 | |
| 
 | |
|     @property
 | |
|     def gi_running(self):
 | |
|         return self.gen.gi_running
 | |
| 
 | |
|     @property
 | |
|     def gi_code(self):
 | |
|         return self.gen.gi_code
 | |
| 
 | |
|     def __del__(self):
 | |
|         # Be careful accessing self.gen.frame -- self.gen might not exist.
 | |
|         gen = getattr(self, 'gen', None)
 | |
|         frame = getattr(gen, 'gi_frame', None)
 | |
|         if frame is not None and frame.f_lasti == -1:
 | |
|             msg = '%r was never yielded from' % self
 | |
|             tb = getattr(self, '_source_traceback', ())
 | |
|             if tb:
 | |
|                 tb = ''.join(traceback.format_list(tb))
 | |
|                 msg += ('\nCoroutine object created at '
 | |
|                         '(most recent call last):\n')
 | |
|                 msg += tb.rstrip()
 | |
|             logger.error(msg)
 | |
| 
 | |
| 
 | |
| def coroutine(func):
 | |
|     """Decorator to mark coroutines.
 | |
| 
 | |
|     If the coroutine is not yielded from before it is destroyed,
 | |
|     an error message is logged.
 | |
|     """
 | |
|     if inspect.isgeneratorfunction(func):
 | |
|         coro = func
 | |
|     else:
 | |
|         @functools.wraps(func)
 | |
|         def coro(*args, **kw):
 | |
|             res = func(*args, **kw)
 | |
|             if isinstance(res, futures.Future) or inspect.isgenerator(res):
 | |
|                 res = yield from res
 | |
|             return res
 | |
| 
 | |
|     if not _DEBUG:
 | |
|         wrapper = coro
 | |
|     else:
 | |
|         @functools.wraps(func)
 | |
|         def wrapper(*args, **kwds):
 | |
|             w = CoroWrapper(coro(*args, **kwds), func)
 | |
|             if w._source_traceback:
 | |
|                 del w._source_traceback[-1]
 | |
|             w.__name__ = func.__name__
 | |
|             if hasattr(func, '__qualname__'):
 | |
|                 w.__qualname__ = func.__qualname__
 | |
|             w.__doc__ = func.__doc__
 | |
|             return w
 | |
| 
 | |
|     wrapper._is_coroutine = True  # For iscoroutinefunction().
 | |
|     return wrapper
 | |
| 
 | |
| 
 | |
| def iscoroutinefunction(func):
 | |
|     """Return True if func is a decorated coroutine function."""
 | |
|     return getattr(func, '_is_coroutine', False)
 | |
| 
 | |
| 
 | |
| _COROUTINE_TYPES = (types.GeneratorType, CoroWrapper)
 | |
| 
 | |
| def iscoroutine(obj):
 | |
|     """Return True if obj is a coroutine object."""
 | |
|     return isinstance(obj, _COROUTINE_TYPES)
 | |
| 
 | |
| 
 | |
| def _format_coroutine(coro):
 | |
|     assert iscoroutine(coro)
 | |
|     coro_name = getattr(coro, '__qualname__', coro.__name__)
 | |
| 
 | |
|     filename = coro.gi_code.co_filename
 | |
|     if (isinstance(coro, CoroWrapper)
 | |
|     and not inspect.isgeneratorfunction(coro.func)):
 | |
|         filename, lineno = events._get_function_source(coro.func)
 | |
|         if coro.gi_frame is None:
 | |
|             coro_repr = ('%s() done, defined at %s:%s'
 | |
|                          % (coro_name, filename, lineno))
 | |
|         else:
 | |
|             coro_repr = ('%s() running, defined at %s:%s'
 | |
|                          % (coro_name, filename, lineno))
 | |
|     elif coro.gi_frame is not None:
 | |
|         lineno = coro.gi_frame.f_lineno
 | |
|         coro_repr = ('%s() running at %s:%s'
 | |
|                      % (coro_name, filename, lineno))
 | |
|     else:
 | |
|         lineno = coro.gi_code.co_firstlineno
 | |
|         coro_repr = ('%s() done, defined at %s:%s'
 | |
|                      % (coro_name, filename, lineno))
 | |
| 
 | |
|     return coro_repr
 | 
