mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	gh-120635: Avoid leaking processes in test_pyrepl (GH-120676)
If the child process takes longer than SHORT_TIMEOUT seconds to
complete, kill the process but then wait until it completes with no
timeout to not leak child processes.
(cherry picked from commit 0f3e36454d)
Co-authored-by: Victor Stinner <vstinner@python.org>
		
	
			
		
			
				
	
	
		
			894 lines
		
	
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			894 lines
		
	
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import io
 | 
						|
import itertools
 | 
						|
import os
 | 
						|
import rlcompleter
 | 
						|
import select
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
from unittest import TestCase, skipUnless
 | 
						|
from unittest.mock import patch
 | 
						|
from test.support import force_not_colorized
 | 
						|
from test.support import SHORT_TIMEOUT
 | 
						|
 | 
						|
from .support import (
 | 
						|
    FakeConsole,
 | 
						|
    handle_all_events,
 | 
						|
    handle_events_narrow_console,
 | 
						|
    more_lines,
 | 
						|
    multiline_input,
 | 
						|
    code_to_events,
 | 
						|
)
 | 
						|
from _pyrepl.console import Event
 | 
						|
from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig
 | 
						|
from _pyrepl.readline import multiline_input as readline_multiline_input
 | 
						|
 | 
						|
try:
 | 
						|
    import pty
 | 
						|
except ImportError:
 | 
						|
    pty = None
 | 
						|
 | 
						|
class TestCursorPosition(TestCase):
 | 
						|
    def prepare_reader(self, events):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig(readline_completer=None)
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        return reader
 | 
						|
 | 
						|
    def test_up_arrow_simple(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def f():\n"
 | 
						|
            "  ...\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, console = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.cxy, (0, 1))
 | 
						|
        console.move_cursor.assert_called_once_with(0, 1)
 | 
						|
 | 
						|
    def test_down_arrow_end_of_input(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def f():\n"
 | 
						|
            "  ...\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, console = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.cxy, (0, 2))
 | 
						|
        console.move_cursor.assert_called_once_with(0, 2)
 | 
						|
 | 
						|
    def test_left_arrow_simple(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("11+11"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, console = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.cxy, (4, 0))
 | 
						|
        console.move_cursor.assert_called_once_with(4, 0)
 | 
						|
 | 
						|
    def test_right_arrow_end_of_line(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("11+11"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, console = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.cxy, (5, 0))
 | 
						|
        console.move_cursor.assert_called_once_with(5, 0)
 | 
						|
 | 
						|
    def test_cursor_position_simple_character(self):
 | 
						|
        events = itertools.chain(code_to_events("k"))
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 1)
 | 
						|
 | 
						|
        # 1 for simple character
 | 
						|
        self.assertEqual(reader.cxy, (1, 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_character(self):
 | 
						|
        events = itertools.chain(code_to_events("樂"))
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 1)
 | 
						|
 | 
						|
        # 2 for wide character
 | 
						|
        self.assertEqual(reader.cxy, (2, 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_character_move_left(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("樂"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 0)
 | 
						|
        self.assertEqual(reader.cxy, (0, 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_character_move_left_right(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("樂"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 1)
 | 
						|
 | 
						|
        # 2 for wide character
 | 
						|
        self.assertEqual(reader.cxy, (2, 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_characters_move_up(self):
 | 
						|
        for_loop = "for _ in _:"
 | 
						|
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
           f"{for_loop}\n"
 | 
						|
            "  ' 可口可乐; 可口可樂'"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # cursor at end of first line
 | 
						|
        self.assertEqual(reader.pos, len(for_loop))
 | 
						|
        self.assertEqual(reader.cxy, (len(for_loop), 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_characters_move_up_down(self):
 | 
						|
        for_loop = "for _ in _:"
 | 
						|
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
           f"{for_loop}\n"
 | 
						|
            "  ' 可口可乐; 可口可樂'"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # cursor here (showing 2nd line only):
 | 
						|
        # <  ' 可口可乐; 可口可樂'>
 | 
						|
        #              ^
 | 
						|
        self.assertEqual(reader.pos, 19)
 | 
						|
        self.assertEqual(reader.cxy, (10, 1))
 | 
						|
 | 
						|
    def test_cursor_position_multiple_double_width_characters_move_left(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("' 可口可乐; 可口可樂'"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 10)
 | 
						|
 | 
						|
        # 1 for quote, 1 for space, 2 per wide character,
 | 
						|
        # 1 for semicolon, 1 for space, 2 per wide character
 | 
						|
        self.assertEqual(reader.cxy, (16, 0))
 | 
						|
 | 
						|
    def test_cursor_position_move_up_to_eol(self):
 | 
						|
        first_line = "for _ in _:"
 | 
						|
        second_line = "  hello"
 | 
						|
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            f"{first_line}\n"
 | 
						|
            f"{second_line}\n"
 | 
						|
             "  h\n"
 | 
						|
             "  hel"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # Cursor should be at end of line 1, even though line 2 is shorter
 | 
						|
        # for _ in _:
 | 
						|
        #   hello
 | 
						|
        #   h
 | 
						|
        #   hel
 | 
						|
        self.assertEqual(
 | 
						|
            reader.pos, len(first_line) + len(second_line) + 1
 | 
						|
        )  # +1 for newline
 | 
						|
        self.assertEqual(reader.cxy, (len(second_line), 1))
 | 
						|
 | 
						|
    def test_cursor_position_move_down_to_eol(self):
 | 
						|
        last_line = "  hel"
 | 
						|
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "for _ in _:\n"
 | 
						|
            "  hello\n"
 | 
						|
            "  h\n"
 | 
						|
           f"{last_line}"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # Cursor should be at end of line 3, even though line 2 is shorter
 | 
						|
        # for _ in _:
 | 
						|
        #   hello
 | 
						|
        #   h
 | 
						|
        #   hel
 | 
						|
        self.assertEqual(reader.pos, len(code))
 | 
						|
        self.assertEqual(reader.cxy, (len(last_line), 3))
 | 
						|
 | 
						|
    def test_cursor_position_multiple_mixed_lines_move_up(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def foo():\n"
 | 
						|
            "  x = '可口可乐; 可口可樂'\n"
 | 
						|
            "  y = 'abckdfjskldfjslkdjf'"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            13 * [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
 | 
						|
            [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # By moving left, we're before the s:
 | 
						|
        # y = 'abckdfjskldfjslkdjf'
 | 
						|
        #             ^
 | 
						|
        # And we should move before the semi-colon despite the different offset
 | 
						|
        # x = '可口可乐; 可口可樂'
 | 
						|
        #            ^
 | 
						|
        self.assertEqual(reader.pos, 22)
 | 
						|
        self.assertEqual(reader.cxy, (15, 1))
 | 
						|
 | 
						|
    def test_cursor_position_after_wrap_and_move_up(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def foo():\n"
 | 
						|
            "  hello"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
        reader, _ = handle_events_narrow_console(events)
 | 
						|
 | 
						|
        # The code looks like this:
 | 
						|
        # def foo()\
 | 
						|
        # :
 | 
						|
        #   hello
 | 
						|
        # After moving up we should be after the colon in line 2
 | 
						|
        self.assertEqual(reader.pos, 10)
 | 
						|
        self.assertEqual(reader.cxy, (1, 1))
 | 
						|
 | 
						|
 | 
						|
class TestPyReplAutoindent(TestCase):
 | 
						|
    def prepare_reader(self, events):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig(readline_completer=None)
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        return reader
 | 
						|
 | 
						|
    def test_auto_indent_default(self):
 | 
						|
        # fmt: off
 | 
						|
        input_code = (
 | 
						|
            "def f():\n"
 | 
						|
                "pass\n\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def f():\n"
 | 
						|
            "    pass\n"
 | 
						|
            "    "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
    def test_auto_indent_continuation(self):
 | 
						|
        # auto indenting according to previous user indentation
 | 
						|
        # fmt: off
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("def f():\n"),
 | 
						|
            # add backspace to delete default auto-indent
 | 
						|
            [
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x7f")),
 | 
						|
            ],
 | 
						|
            code_to_events(
 | 
						|
                "  pass\n"
 | 
						|
                  "pass\n\n"
 | 
						|
            ),
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def f():\n"
 | 
						|
            "  pass\n"
 | 
						|
            "  pass\n"
 | 
						|
            "  "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_prev_block(self):
 | 
						|
        # auto indenting according to indentation in different block
 | 
						|
        # fmt: off
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("def f():\n"),
 | 
						|
            # add backspace to delete default auto-indent
 | 
						|
            [
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x7f")),
 | 
						|
            ],
 | 
						|
            code_to_events(
 | 
						|
                "  pass\n"
 | 
						|
                "pass\n\n"
 | 
						|
            ),
 | 
						|
            code_to_events(
 | 
						|
                "def g():\n"
 | 
						|
                  "pass\n\n"
 | 
						|
            ),
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def g():\n"
 | 
						|
            "  pass\n"
 | 
						|
            "  "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output1 = multiline_input(reader)
 | 
						|
        output2 = multiline_input(reader)
 | 
						|
        self.assertEqual(output2, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_multiline(self):
 | 
						|
        # fmt: off
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(
 | 
						|
                "def f():\n"
 | 
						|
                    "pass"
 | 
						|
            ),
 | 
						|
            [
 | 
						|
                # go to the end of the first line
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="\x05", raw=bytearray(b"\x1bO5")),
 | 
						|
                # new line should be autoindented
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
            code_to_events(
 | 
						|
                "pass"
 | 
						|
            ),
 | 
						|
            [
 | 
						|
                # go to end of last line
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
                Event(evt="key", data="\x05", raw=bytearray(b"\x1bO5")),
 | 
						|
                # double newline to terminate the block
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def f():\n"
 | 
						|
            "    pass\n"
 | 
						|
            "    pass\n"
 | 
						|
            "    "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_with_comment(self):
 | 
						|
        # fmt: off
 | 
						|
        events = code_to_events(
 | 
						|
            "def f():  # foo\n"
 | 
						|
                "pass\n\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def f():  # foo\n"
 | 
						|
            "    pass\n"
 | 
						|
            "    "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_ignore_comments(self):
 | 
						|
        # fmt: off
 | 
						|
        events = code_to_events(
 | 
						|
            "pass  #:\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "pass  #:"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
 | 
						|
class TestPyReplOutput(TestCase):
 | 
						|
    def prepare_reader(self, events):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig(readline_completer=None)
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        return reader
 | 
						|
 | 
						|
    def test_basic(self):
 | 
						|
        reader = self.prepare_reader(code_to_events("1+1\n"))
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
 | 
						|
    def test_multiline_edit(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("def f():\n...\n\n"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x08")),
 | 
						|
                Event(evt="key", data="g", raw=bytearray(b"g")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x08")),
 | 
						|
                Event(evt="key", data="delete", raw=bytearray(b"\x7F")),
 | 
						|
                Event(evt="key", data="right", raw=bytearray(b"g")),
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x08")),
 | 
						|
                Event(evt="key", data="p", raw=bytearray(b"p")),
 | 
						|
                Event(evt="key", data="a", raw=bytearray(b"a")),
 | 
						|
                Event(evt="key", data="s", raw=bytearray(b"s")),
 | 
						|
                Event(evt="key", data="s", raw=bytearray(b"s")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "def f():\n    ...\n    ")
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "def g():\n    pass\n    ")
 | 
						|
 | 
						|
    def test_history_navigation_with_up_arrow(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("1+1\n2+2\n"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "2+2")
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "2+2")
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
 | 
						|
    def test_history_navigation_with_down_arrow(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("1+1\n2+2\n"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
 | 
						|
    def test_history_search(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("1+1\n2+2\n3+3\n"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="\x12", raw=bytearray(b"\x12")),
 | 
						|
                Event(evt="key", data="1", raw=bytearray(b"1")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "2+2")
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "3+3")
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
 | 
						|
    def test_control_character(self):
 | 
						|
        events = code_to_events("c\x1d\n")
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "c\x1d")
 | 
						|
 | 
						|
 | 
						|
class TestPyReplCompleter(TestCase):
 | 
						|
    def prepare_reader(self, events, namespace):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig()
 | 
						|
        config.readline_completer = rlcompleter.Completer(namespace).complete
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        return reader
 | 
						|
 | 
						|
    @patch("rlcompleter._readline_available", False)
 | 
						|
    def test_simple_completion(self):
 | 
						|
        events = code_to_events("os.getpid\t\n")
 | 
						|
 | 
						|
        namespace = {"os": os}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "os.getpid()")
 | 
						|
 | 
						|
    def test_completion_with_many_options(self):
 | 
						|
        # Test with something that initially displays many options
 | 
						|
        # and then complete from one of them. The first time tab is
 | 
						|
        # pressed, the options are displayed (which corresponds to
 | 
						|
        # when the repl shows [ not unique ]) and the second completes
 | 
						|
        # from one of them.
 | 
						|
        events = code_to_events("os.\t\tO_AP\t\n")
 | 
						|
 | 
						|
        namespace = {"os": os}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "os.O_APPEND")
 | 
						|
 | 
						|
    def test_empty_namespace_completion(self):
 | 
						|
        events = code_to_events("os.geten\t\n")
 | 
						|
        namespace = {}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "os.geten")
 | 
						|
 | 
						|
    def test_global_namespace_completion(self):
 | 
						|
        events = code_to_events("py\t\n")
 | 
						|
        namespace = {"python": None}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "python")
 | 
						|
 | 
						|
    def test_updown_arrow_with_completion_menu(self):
 | 
						|
        """Up arrow in the middle of unfinished tab completion when the menu is displayed
 | 
						|
        should work and trigger going back in history. Down arrow should subsequently
 | 
						|
        get us back to the incomplete command."""
 | 
						|
        code = "import os\nos.\t\t"
 | 
						|
        namespace = {"os": os}
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events, namespace=namespace)
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        # This is the first line, nothing to see here
 | 
						|
        self.assertEqual(output, "import os")
 | 
						|
        # This is the second line. We pressed up and down arrows
 | 
						|
        # so we should end up where we were when we initiated tab completion.
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "os.")
 | 
						|
 | 
						|
    @patch("_pyrepl.readline._ReadlineWrapper.get_reader")
 | 
						|
    @patch("sys.stderr", new_callable=io.StringIO)
 | 
						|
    def test_completion_with_warnings(self, mock_stderr, mock_get_reader):
 | 
						|
        class Dummy:
 | 
						|
            @property
 | 
						|
            def test_func(self):
 | 
						|
                import warnings
 | 
						|
 | 
						|
                warnings.warn("warnings\n")
 | 
						|
                return None
 | 
						|
 | 
						|
        dummy = Dummy()
 | 
						|
        events = code_to_events("dummy.test_func.\t\n\n")
 | 
						|
        namespace = {"dummy": dummy}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
        mock_get_reader.return_value = reader
 | 
						|
        output = readline_multiline_input(more_lines, ">>>", "...")
 | 
						|
        self.assertEqual(output, "dummy.test_func.__")
 | 
						|
        self.assertEqual(mock_stderr.getvalue(), "")
 | 
						|
 | 
						|
 | 
						|
class TestPasteEvent(TestCase):
 | 
						|
    def prepare_reader(self, events):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig(readline_completer=None)
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        return reader
 | 
						|
 | 
						|
    def test_paste(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def a():\n"
 | 
						|
            "  for x in range(10):\n"
 | 
						|
            "    if x%2:\n"
 | 
						|
            "      print(x)\n"
 | 
						|
            "    else:\n"
 | 
						|
            "      pass\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            [
 | 
						|
                Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
 | 
						|
            ],
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
 | 
						|
            ],
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, code)
 | 
						|
 | 
						|
    def test_paste_mid_newlines(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def f():\n"
 | 
						|
            "  x = y\n"
 | 
						|
            "  \n"
 | 
						|
            "  y = z\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            [
 | 
						|
                Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
 | 
						|
            ],
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
 | 
						|
            ],
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, code)
 | 
						|
 | 
						|
    def test_paste_mid_newlines_not_in_paste_mode(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def f():\n"
 | 
						|
                "x = y\n"
 | 
						|
                "\n"
 | 
						|
                "y = z\n\n"
 | 
						|
        )
 | 
						|
 | 
						|
        expected = (
 | 
						|
            "def f():\n"
 | 
						|
            "    x = y\n"
 | 
						|
            "    "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = code_to_events(code)
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, expected)
 | 
						|
 | 
						|
    def test_paste_not_in_paste_mode(self):
 | 
						|
        # fmt: off
 | 
						|
        input_code = (
 | 
						|
            "def a():\n"
 | 
						|
                "for x in range(10):\n"
 | 
						|
                    "if x%2:\n"
 | 
						|
                        "print(x)\n"
 | 
						|
                    "else:\n"
 | 
						|
                        "pass\n\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def a():\n"
 | 
						|
            "    for x in range(10):\n"
 | 
						|
            "        if x%2:\n"
 | 
						|
            "            print(x)\n"
 | 
						|
            "            else:"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = code_to_events(input_code)
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_bracketed_paste(self):
 | 
						|
        """Test that bracketed paste using \x1b[200~ and \x1b[201~ works."""
 | 
						|
        # fmt: off
 | 
						|
        input_code = (
 | 
						|
            "def a():\n"
 | 
						|
            "  for x in range(10):\n"
 | 
						|
            "\n"
 | 
						|
            "    if x%2:\n"
 | 
						|
            "      print(x)\n"
 | 
						|
            "\n"
 | 
						|
            "    else:\n"
 | 
						|
            "      pass\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def a():\n"
 | 
						|
            "  for x in range(10):\n"
 | 
						|
            "\n"
 | 
						|
            "    if x%2:\n"
 | 
						|
            "      print(x)\n"
 | 
						|
            "\n"
 | 
						|
            "    else:\n"
 | 
						|
            "      pass\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        paste_start = "\x1b[200~"
 | 
						|
        paste_end = "\x1b[201~"
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(paste_start),
 | 
						|
            code_to_events(input_code),
 | 
						|
            code_to_events(paste_end),
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_bracketed_paste_single_line(self):
 | 
						|
        input_code = "oneline"
 | 
						|
 | 
						|
        paste_start = "\x1b[200~"
 | 
						|
        paste_end = "\x1b[201~"
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(paste_start),
 | 
						|
            code_to_events(input_code),
 | 
						|
            code_to_events(paste_end),
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, input_code)
 | 
						|
 | 
						|
 | 
						|
@skipUnless(pty, "requires pty")
 | 
						|
class TestMain(TestCase):
 | 
						|
    @force_not_colorized
 | 
						|
    def test_exposed_globals_in_repl(self):
 | 
						|
        expected_output = (
 | 
						|
            "[\'__annotations__\', \'__builtins__\', \'__doc__\', \'__loader__\', "
 | 
						|
            "\'__name__\', \'__package__\', \'__spec__\']"
 | 
						|
        )
 | 
						|
        output, exit_code = self.run_repl(["sorted(dir())", "exit"])
 | 
						|
        if "can\'t use pyrepl" in output:
 | 
						|
            self.skipTest("pyrepl not available")
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        self.assertIn(expected_output, output)
 | 
						|
 | 
						|
    def test_dumb_terminal_exits_cleanly(self):
 | 
						|
        env = os.environ.copy()
 | 
						|
        env.update({"TERM": "dumb"})
 | 
						|
        output, exit_code = self.run_repl("exit()\n", env=env)
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        self.assertIn("warning: can\'t use pyrepl", output)
 | 
						|
        self.assertNotIn("Exception", output)
 | 
						|
        self.assertNotIn("Traceback", output)
 | 
						|
 | 
						|
    def run_repl(self, repl_input: str | list[str], env: dict | None = None) -> tuple[str, int]:
 | 
						|
        master_fd, slave_fd = pty.openpty()
 | 
						|
        process = subprocess.Popen(
 | 
						|
            [sys.executable, "-i", "-u"],
 | 
						|
            stdin=slave_fd,
 | 
						|
            stdout=slave_fd,
 | 
						|
            stderr=slave_fd,
 | 
						|
            text=True,
 | 
						|
            close_fds=True,
 | 
						|
            env=env if env else os.environ,
 | 
						|
       )
 | 
						|
        if isinstance(repl_input, list):
 | 
						|
            repl_input = "\n".join(repl_input) + "\n"
 | 
						|
        os.write(master_fd, repl_input.encode("utf-8"))
 | 
						|
 | 
						|
        output = []
 | 
						|
        while select.select([master_fd], [], [], 0.5)[0]:
 | 
						|
            data = os.read(master_fd, 1024).decode("utf-8")
 | 
						|
            if not data:
 | 
						|
                break
 | 
						|
            output.append(data)
 | 
						|
 | 
						|
        os.close(master_fd)
 | 
						|
        os.close(slave_fd)
 | 
						|
        try:
 | 
						|
            exit_code = process.wait(timeout=SHORT_TIMEOUT)
 | 
						|
        except subprocess.TimeoutExpired:
 | 
						|
            process.kill()
 | 
						|
            exit_code = process.wait()
 | 
						|
        return "\n".join(output), exit_code
 |