mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 07:01:21 +00:00 
			
		
		
		
	This makes the asyncio REPL (`python -m asyncio`) more usable and similar to the regular REPL. This exposes register_readline() as a top-level function in site.py, but it's intentionally undocumented. Co-authored-by: Carol Willing <carolcode@willingconsulting.com> Co-authored-by: Itamar Oren <itamarost@gmail.com>
		
			
				
	
	
		
			141 lines
		
	
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			141 lines
		
	
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import ast
 | 
						|
import asyncio
 | 
						|
import code
 | 
						|
import concurrent.futures
 | 
						|
import inspect
 | 
						|
import site
 | 
						|
import sys
 | 
						|
import threading
 | 
						|
import types
 | 
						|
import warnings
 | 
						|
 | 
						|
from . import futures
 | 
						|
 | 
						|
 | 
						|
class AsyncIOInteractiveConsole(code.InteractiveConsole):
 | 
						|
 | 
						|
    def __init__(self, locals, loop):
 | 
						|
        super().__init__(locals)
 | 
						|
        self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
 | 
						|
 | 
						|
        self.loop = loop
 | 
						|
 | 
						|
    def runcode(self, code):
 | 
						|
        future = concurrent.futures.Future()
 | 
						|
 | 
						|
        def callback():
 | 
						|
            global repl_future
 | 
						|
            global repl_future_interrupted
 | 
						|
 | 
						|
            repl_future = None
 | 
						|
            repl_future_interrupted = False
 | 
						|
 | 
						|
            func = types.FunctionType(code, self.locals)
 | 
						|
            try:
 | 
						|
                coro = func()
 | 
						|
            except SystemExit:
 | 
						|
                raise
 | 
						|
            except KeyboardInterrupt as ex:
 | 
						|
                repl_future_interrupted = True
 | 
						|
                future.set_exception(ex)
 | 
						|
                return
 | 
						|
            except BaseException as ex:
 | 
						|
                future.set_exception(ex)
 | 
						|
                return
 | 
						|
 | 
						|
            if not inspect.iscoroutine(coro):
 | 
						|
                future.set_result(coro)
 | 
						|
                return
 | 
						|
 | 
						|
            try:
 | 
						|
                repl_future = self.loop.create_task(coro)
 | 
						|
                futures._chain_future(repl_future, future)
 | 
						|
            except BaseException as exc:
 | 
						|
                future.set_exception(exc)
 | 
						|
 | 
						|
        loop.call_soon_threadsafe(callback)
 | 
						|
 | 
						|
        try:
 | 
						|
            return future.result()
 | 
						|
        except SystemExit:
 | 
						|
            raise
 | 
						|
        except BaseException:
 | 
						|
            if repl_future_interrupted:
 | 
						|
                self.write("\nKeyboardInterrupt\n")
 | 
						|
            else:
 | 
						|
                self.showtraceback()
 | 
						|
 | 
						|
 | 
						|
class REPLThread(threading.Thread):
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        try:
 | 
						|
            banner = (
 | 
						|
                f'asyncio REPL {sys.version} on {sys.platform}\n'
 | 
						|
                f'Use "await" directly instead of "asyncio.run()".\n'
 | 
						|
                f'Type "help", "copyright", "credits" or "license" '
 | 
						|
                f'for more information.\n'
 | 
						|
                f'{getattr(sys, "ps1", ">>> ")}import asyncio'
 | 
						|
            )
 | 
						|
 | 
						|
            console.interact(
 | 
						|
                banner=banner,
 | 
						|
                exitmsg='exiting asyncio REPL...')
 | 
						|
        finally:
 | 
						|
            warnings.filterwarnings(
 | 
						|
                'ignore',
 | 
						|
                message=r'^coroutine .* was never awaited$',
 | 
						|
                category=RuntimeWarning)
 | 
						|
 | 
						|
            loop.call_soon_threadsafe(loop.stop)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    loop = asyncio.new_event_loop()
 | 
						|
    asyncio.set_event_loop(loop)
 | 
						|
 | 
						|
    repl_locals = {'asyncio': asyncio}
 | 
						|
    for key in {'__name__', '__package__',
 | 
						|
                '__loader__', '__spec__',
 | 
						|
                '__builtins__', '__file__'}:
 | 
						|
        repl_locals[key] = locals()[key]
 | 
						|
 | 
						|
    console = AsyncIOInteractiveConsole(repl_locals, loop)
 | 
						|
 | 
						|
    repl_future = None
 | 
						|
    repl_future_interrupted = False
 | 
						|
 | 
						|
    try:
 | 
						|
        import readline  # NoQA
 | 
						|
    except ImportError:
 | 
						|
        pass
 | 
						|
 | 
						|
    interactive_hook = getattr(sys, "__interactivehook__", None)
 | 
						|
 | 
						|
    if interactive_hook is not None:
 | 
						|
        interactive_hook()
 | 
						|
 | 
						|
    if interactive_hook is site.register_readline:
 | 
						|
        # Fix the completer function to use the interactive console locals
 | 
						|
        try:
 | 
						|
            import rlcompleter
 | 
						|
        except:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            completer = rlcompleter.Completer(console.locals)
 | 
						|
            readline.set_completer(completer.complete)
 | 
						|
 | 
						|
    repl_thread = REPLThread()
 | 
						|
    repl_thread.daemon = True
 | 
						|
    repl_thread.start()
 | 
						|
 | 
						|
    while True:
 | 
						|
        try:
 | 
						|
            loop.run_forever()
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            if repl_future and not repl_future.done():
 | 
						|
                repl_future.cancel()
 | 
						|
                repl_future_interrupted = True
 | 
						|
            continue
 | 
						|
        else:
 | 
						|
            break
 |