mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	Issue #24402: Fix input() when stdout.fileno() fails; diagnosed by Eryksun
Also factored out some test cases into a new PtyTests class.
This commit is contained in:
		
							parent
							
								
									ff1f3d9ff1
								
							
						
					
					
						commit
						c9a6ab56cf
					
				
					 3 changed files with 116 additions and 77 deletions
				
			
		|  | @ -1134,82 +1134,6 @@ def test_input(self): | ||||||
|             sys.stdout = savestdout |             sys.stdout = savestdout | ||||||
|             fp.close() |             fp.close() | ||||||
| 
 | 
 | ||||||
|     @unittest.skipUnless(pty, "the pty and signal modules must be available") |  | ||||||
|     def check_input_tty(self, prompt, terminal_input, stdio_encoding=None): |  | ||||||
|         if not sys.stdin.isatty() or not sys.stdout.isatty(): |  | ||||||
|             self.skipTest("stdin and stdout must be ttys") |  | ||||||
|         r, w = os.pipe() |  | ||||||
|         try: |  | ||||||
|             pid, fd = pty.fork() |  | ||||||
|         except (OSError, AttributeError) as e: |  | ||||||
|             os.close(r) |  | ||||||
|             os.close(w) |  | ||||||
|             self.skipTest("pty.fork() raised {}".format(e)) |  | ||||||
|         if pid == 0: |  | ||||||
|             # Child |  | ||||||
|             try: |  | ||||||
|                 # Make sure we don't get stuck if there's a problem |  | ||||||
|                 signal.alarm(2) |  | ||||||
|                 os.close(r) |  | ||||||
|                 # Check the error handlers are accounted for |  | ||||||
|                 if stdio_encoding: |  | ||||||
|                     sys.stdin = io.TextIOWrapper(sys.stdin.detach(), |  | ||||||
|                                                  encoding=stdio_encoding, |  | ||||||
|                                                  errors='surrogateescape') |  | ||||||
|                     sys.stdout = io.TextIOWrapper(sys.stdout.detach(), |  | ||||||
|                                                   encoding=stdio_encoding, |  | ||||||
|                                                   errors='replace') |  | ||||||
|                 with open(w, "w") as wpipe: |  | ||||||
|                     print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe) |  | ||||||
|                     print(ascii(input(prompt)), file=wpipe) |  | ||||||
|             except: |  | ||||||
|                 traceback.print_exc() |  | ||||||
|             finally: |  | ||||||
|                 # We don't want to return to unittest... |  | ||||||
|                 os._exit(0) |  | ||||||
|         # Parent |  | ||||||
|         os.close(w) |  | ||||||
|         os.write(fd, terminal_input + b"\r\n") |  | ||||||
|         # Get results from the pipe |  | ||||||
|         with open(r, "r") as rpipe: |  | ||||||
|             lines = [] |  | ||||||
|             while True: |  | ||||||
|                 line = rpipe.readline().strip() |  | ||||||
|                 if line == "": |  | ||||||
|                     # The other end was closed => the child exited |  | ||||||
|                     break |  | ||||||
|                 lines.append(line) |  | ||||||
|         # Check the result was got and corresponds to the user's terminal input |  | ||||||
|         if len(lines) != 2: |  | ||||||
|             # Something went wrong, try to get at stderr |  | ||||||
|             with open(fd, "r", encoding="ascii", errors="ignore") as child_output: |  | ||||||
|                 self.fail("got %d lines in pipe but expected 2, child output was:\n%s" |  | ||||||
|                           % (len(lines), child_output.read())) |  | ||||||
|         os.close(fd) |  | ||||||
|         # Check we did exercise the GNU readline path |  | ||||||
|         self.assertIn(lines[0], {'tty = True', 'tty = False'}) |  | ||||||
|         if lines[0] != 'tty = True': |  | ||||||
|             self.skipTest("standard IO in should have been a tty") |  | ||||||
|         input_result = eval(lines[1])   # ascii() -> eval() roundtrip |  | ||||||
|         if stdio_encoding: |  | ||||||
|             expected = terminal_input.decode(stdio_encoding, 'surrogateescape') |  | ||||||
|         else: |  | ||||||
|             expected = terminal_input.decode(sys.stdin.encoding)  # what else? |  | ||||||
|         self.assertEqual(input_result, expected) |  | ||||||
| 
 |  | ||||||
|     def test_input_tty(self): |  | ||||||
|         # Test input() functionality when wired to a tty (the code path |  | ||||||
|         # is different and invokes GNU readline if available). |  | ||||||
|         self.check_input_tty("prompt", b"quux") |  | ||||||
| 
 |  | ||||||
|     def test_input_tty_non_ascii(self): |  | ||||||
|         # Check stdin/stdout encoding is used when invoking GNU readline |  | ||||||
|         self.check_input_tty("prompté", b"quux\xe9", "utf-8") |  | ||||||
| 
 |  | ||||||
|     def test_input_tty_non_ascii_unicode_errors(self): |  | ||||||
|         # Check stdin/stdout error handler is used when invoking GNU readline |  | ||||||
|         self.check_input_tty("prompté", b"quux\xe9", "ascii") |  | ||||||
| 
 |  | ||||||
|     # test_int(): see test_int.py for tests of built-in function int(). |     # test_int(): see test_int.py for tests of built-in function int(). | ||||||
| 
 | 
 | ||||||
|     def test_repr(self): |     def test_repr(self): | ||||||
|  | @ -1564,6 +1488,116 @@ def test_construct_singletons(self): | ||||||
|             self.assertRaises(TypeError, tp, 1, 2) |             self.assertRaises(TypeError, tp, 1, 2) | ||||||
|             self.assertRaises(TypeError, tp, a=1, b=2) |             self.assertRaises(TypeError, tp, a=1, b=2) | ||||||
| 
 | 
 | ||||||
|  | @unittest.skipUnless(pty, "the pty and signal modules must be available") | ||||||
|  | class PtyTests(unittest.TestCase): | ||||||
|  |     """Tests that use a pseudo terminal to guarantee stdin and stdout are | ||||||
|  |     terminals in the test environment""" | ||||||
|  | 
 | ||||||
|  |     def fork(self): | ||||||
|  |         try: | ||||||
|  |             return pty.fork() | ||||||
|  |         except (OSError, AttributeError) as e: | ||||||
|  |             self.skipTest("pty.fork() raised {}".format(e)) | ||||||
|  | 
 | ||||||
|  |     def check_input_tty(self, prompt, terminal_input, stdio_encoding=None): | ||||||
|  |         if not sys.stdin.isatty() or not sys.stdout.isatty(): | ||||||
|  |             self.skipTest("stdin and stdout must be ttys") | ||||||
|  |         r, w = os.pipe() | ||||||
|  |         try: | ||||||
|  |             pid, fd = self.fork() | ||||||
|  |         except: | ||||||
|  |             os.close(r) | ||||||
|  |             os.close(w) | ||||||
|  |             raise | ||||||
|  |         if pid == 0: | ||||||
|  |             # Child | ||||||
|  |             try: | ||||||
|  |                 # Make sure we don't get stuck if there's a problem | ||||||
|  |                 signal.alarm(2) | ||||||
|  |                 os.close(r) | ||||||
|  |                 # Check the error handlers are accounted for | ||||||
|  |                 if stdio_encoding: | ||||||
|  |                     sys.stdin = io.TextIOWrapper(sys.stdin.detach(), | ||||||
|  |                                                  encoding=stdio_encoding, | ||||||
|  |                                                  errors='surrogateescape') | ||||||
|  |                     sys.stdout = io.TextIOWrapper(sys.stdout.detach(), | ||||||
|  |                                                   encoding=stdio_encoding, | ||||||
|  |                                                   errors='replace') | ||||||
|  |                 with open(w, "w") as wpipe: | ||||||
|  |                     print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe) | ||||||
|  |                     print(ascii(input(prompt)), file=wpipe) | ||||||
|  |             except: | ||||||
|  |                 traceback.print_exc() | ||||||
|  |             finally: | ||||||
|  |                 # We don't want to return to unittest... | ||||||
|  |                 os._exit(0) | ||||||
|  |         # Parent | ||||||
|  |         os.close(w) | ||||||
|  |         os.write(fd, terminal_input + b"\r\n") | ||||||
|  |         # Get results from the pipe | ||||||
|  |         with open(r, "r") as rpipe: | ||||||
|  |             lines = [] | ||||||
|  |             while True: | ||||||
|  |                 line = rpipe.readline().strip() | ||||||
|  |                 if line == "": | ||||||
|  |                     # The other end was closed => the child exited | ||||||
|  |                     break | ||||||
|  |                 lines.append(line) | ||||||
|  |         # Check the result was got and corresponds to the user's terminal input | ||||||
|  |         if len(lines) != 2: | ||||||
|  |             # Something went wrong, try to get at stderr | ||||||
|  |             with open(fd, "r", encoding="ascii", errors="ignore") as child_output: | ||||||
|  |                 self.fail("got %d lines in pipe but expected 2, child output was:\n%s" | ||||||
|  |                           % (len(lines), child_output.read())) | ||||||
|  |         os.close(fd) | ||||||
|  |         # Check we did exercise the GNU readline path | ||||||
|  |         self.assertIn(lines[0], {'tty = True', 'tty = False'}) | ||||||
|  |         if lines[0] != 'tty = True': | ||||||
|  |             self.skipTest("standard IO in should have been a tty") | ||||||
|  |         input_result = eval(lines[1])   # ascii() -> eval() roundtrip | ||||||
|  |         if stdio_encoding: | ||||||
|  |             expected = terminal_input.decode(stdio_encoding, 'surrogateescape') | ||||||
|  |         else: | ||||||
|  |             expected = terminal_input.decode(sys.stdin.encoding)  # what else? | ||||||
|  |         self.assertEqual(input_result, expected) | ||||||
|  | 
 | ||||||
|  |     def test_input_tty(self): | ||||||
|  |         # Test input() functionality when wired to a tty (the code path | ||||||
|  |         # is different and invokes GNU readline if available). | ||||||
|  |         self.check_input_tty("prompt", b"quux") | ||||||
|  | 
 | ||||||
|  |     def test_input_tty_non_ascii(self): | ||||||
|  |         # Check stdin/stdout encoding is used when invoking GNU readline | ||||||
|  |         self.check_input_tty("prompté", b"quux\xe9", "utf-8") | ||||||
|  | 
 | ||||||
|  |     def test_input_tty_non_ascii_unicode_errors(self): | ||||||
|  |         # Check stdin/stdout error handler is used when invoking GNU readline | ||||||
|  |         self.check_input_tty("prompté", b"quux\xe9", "ascii") | ||||||
|  | 
 | ||||||
|  |     def test_input_no_stdout_fileno(self): | ||||||
|  |         # Issue #24402: If stdin is the original terminal but stdout.fileno() | ||||||
|  |         # fails, do not use the original stdout file descriptor | ||||||
|  |         pid, pty = self.fork() | ||||||
|  |         if pid:  # Parent process | ||||||
|  |             # Ideally this should read and write concurrently using select() | ||||||
|  |             # or similar, to avoid the possibility of a deadlock. | ||||||
|  |             os.write(pty, b"quux\r") | ||||||
|  |             _, status = os.waitpid(pid, 0) | ||||||
|  |             output = os.read(pty, 3000).decode("ascii", "backslashreplace") | ||||||
|  |             os.close(pty) | ||||||
|  |             self.assertEqual(status, 0, output) | ||||||
|  |         else:  # Child process | ||||||
|  |             try: | ||||||
|  |                 self.assertTrue(sys.stdin.isatty(), "stdin not a terminal") | ||||||
|  |                 sys.stdout = io.StringIO()  # Does not support fileno() | ||||||
|  |                 input("prompt") | ||||||
|  |                 self.assertEqual(sys.stdout.getvalue(), "prompt") | ||||||
|  |                 os._exit(0)  # Success! | ||||||
|  |             except: | ||||||
|  |                 sys.excepthook(*sys.exc_info()) | ||||||
|  |             finally: | ||||||
|  |                 os._exit(1)  # Failure | ||||||
|  | 
 | ||||||
| class TestSorted(unittest.TestCase): | class TestSorted(unittest.TestCase): | ||||||
| 
 | 
 | ||||||
|     def test_basic(self): |     def test_basic(self): | ||||||
|  |  | ||||||
|  | @ -10,6 +10,9 @@ Release date: tba | ||||||
| Core and Builtins | Core and Builtins | ||||||
| ----------------- | ----------------- | ||||||
| 
 | 
 | ||||||
|  | - Issue #24402: Fix input() to prompt to the redirected stdout when | ||||||
|  |   sys.stdout.fileno() fails. | ||||||
|  | 
 | ||||||
| - Issue #24806: Prevent builtin types that are not allowed to be subclassed from | - Issue #24806: Prevent builtin types that are not allowed to be subclassed from | ||||||
|   being subclassed through multiple inheritance. |   being subclassed through multiple inheritance. | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -1723,8 +1723,10 @@ builtin_input(PyObject *self, PyObject *args) | ||||||
|     } |     } | ||||||
|     if (tty) { |     if (tty) { | ||||||
|         tmp = _PyObject_CallMethodId(fout, &PyId_fileno, ""); |         tmp = _PyObject_CallMethodId(fout, &PyId_fileno, ""); | ||||||
|         if (tmp == NULL) |         if (tmp == NULL) { | ||||||
|             PyErr_Clear(); |             PyErr_Clear(); | ||||||
|  |             tty = 0; | ||||||
|  |         } | ||||||
|         else { |         else { | ||||||
|             fd = PyLong_AsLong(tmp); |             fd = PyLong_AsLong(tmp); | ||||||
|             Py_DECREF(tmp); |             Py_DECREF(tmp); | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Martin Panter
						Martin Panter