| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | import io | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  | import itertools | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | import json | 
					
						
							|  |  |  | import os | 
					
						
							| 
									
										
										
										
											2025-05-06 05:44:49 -04:00
										 |  |  | import re | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | import signal | 
					
						
							|  |  |  | import socket | 
					
						
							|  |  |  | import subprocess | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import textwrap | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | import unittest.mock | 
					
						
							| 
									
										
										
										
											2025-05-06 05:44:49 -04:00
										 |  |  | from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack | 
					
						
							|  |  |  | from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT | 
					
						
							| 
									
										
										
										
											2025-05-25 23:09:02 +03:00
										 |  |  | from test.support.os_helper import TESTFN, unlink | 
					
						
							|  |  |  | from typing import List | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | import pdb | 
					
						
							|  |  |  | from pdb import _PdbServer, _PdbClient | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 19:28:25 +02:00
										 |  |  | if not sys.is_remote_debug_enabled(): | 
					
						
							|  |  |  |     raise unittest.SkipTest('remote debugging is disabled') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 13:14:59 +02:00
										 |  |  | @contextmanager | 
					
						
							|  |  |  | def kill_on_error(proc): | 
					
						
							|  |  |  |     """Context manager killing the subprocess if a Python exception is raised.""" | 
					
						
							|  |  |  |     with proc: | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             yield proc | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             proc.kill() | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | class MockSocketFile: | 
					
						
							|  |  |  |     """Mock socket file for testing _PdbServer without actual socket connections.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         self.input_queue = [] | 
					
						
							|  |  |  |         self.output_buffer = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def write(self, data: bytes) -> None: | 
					
						
							|  |  |  |         """Simulate write to socket.""" | 
					
						
							|  |  |  |         self.output_buffer.append(data) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def flush(self) -> None: | 
					
						
							|  |  |  |         """No-op flush implementation.""" | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def readline(self) -> bytes: | 
					
						
							|  |  |  |         """Read a line from the prepared input queue.""" | 
					
						
							|  |  |  |         if not self.input_queue: | 
					
						
							|  |  |  |             return b"" | 
					
						
							|  |  |  |         return self.input_queue.pop(0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def close(self) -> None: | 
					
						
							|  |  |  |         """Close the mock socket file.""" | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_input(self, data: dict) -> None: | 
					
						
							|  |  |  |         """Add input that will be returned by readline.""" | 
					
						
							|  |  |  |         self.input_queue.append(json.dumps(data).encode() + b"\n") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_output(self) -> List[dict]: | 
					
						
							|  |  |  |         """Get the output that was written by the object being tested.""" | 
					
						
							|  |  |  |         results = [] | 
					
						
							|  |  |  |         for data in self.output_buffer: | 
					
						
							|  |  |  |             if isinstance(data, bytes) and data.endswith(b"\n"): | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     results.append(json.loads(data.decode().strip())) | 
					
						
							|  |  |  |                 except json.JSONDecodeError: | 
					
						
							|  |  |  |                     pass  # Ignore non-JSON output | 
					
						
							|  |  |  |         self.output_buffer = [] | 
					
						
							|  |  |  |         return results | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  | class PdbClientTestCase(unittest.TestCase): | 
					
						
							|  |  |  |     """Tests for the _PdbClient class.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def do_test( | 
					
						
							|  |  |  |         self, | 
					
						
							|  |  |  |         *, | 
					
						
							|  |  |  |         incoming, | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |         simulate_send_failure=False, | 
					
						
							|  |  |  |         simulate_sigint_during_stdout_write=False, | 
					
						
							|  |  |  |         use_interrupt_socket=False, | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |         expected_outgoing=None, | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |         expected_outgoing_signals=None, | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |         expected_completions=None, | 
					
						
							|  |  |  |         expected_exception=None, | 
					
						
							|  |  |  |         expected_stdout="", | 
					
						
							|  |  |  |         expected_stdout_substring="", | 
					
						
							|  |  |  |         expected_state=None, | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         if expected_outgoing is None: | 
					
						
							|  |  |  |             expected_outgoing = [] | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |         if expected_outgoing_signals is None: | 
					
						
							|  |  |  |             expected_outgoing_signals = [] | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |         if expected_completions is None: | 
					
						
							|  |  |  |             expected_completions = [] | 
					
						
							|  |  |  |         if expected_state is None: | 
					
						
							|  |  |  |             expected_state = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         expected_state.setdefault("write_failed", False) | 
					
						
							|  |  |  |         messages = [m for source, m in incoming if source == "server"] | 
					
						
							|  |  |  |         prompts = [m["prompt"] for source, m in incoming if source == "user"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         input_iter = (m for source, m in incoming if source == "user") | 
					
						
							|  |  |  |         completions = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def mock_input(prompt): | 
					
						
							|  |  |  |             message = next(input_iter, None) | 
					
						
							|  |  |  |             if message is None: | 
					
						
							|  |  |  |                 raise EOFError | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if req := message.get("completion_request"): | 
					
						
							|  |  |  |                 readline_mock = unittest.mock.Mock() | 
					
						
							|  |  |  |                 readline_mock.get_line_buffer.return_value = req["line"] | 
					
						
							|  |  |  |                 readline_mock.get_begidx.return_value = req["begidx"] | 
					
						
							|  |  |  |                 readline_mock.get_endidx.return_value = req["endidx"] | 
					
						
							|  |  |  |                 unittest.mock.seal(readline_mock) | 
					
						
							|  |  |  |                 with unittest.mock.patch.dict(sys.modules, {"readline": readline_mock}): | 
					
						
							|  |  |  |                     for param in itertools.count(): | 
					
						
							|  |  |  |                         prefix = req["line"][req["begidx"] : req["endidx"]] | 
					
						
							|  |  |  |                         completion = client.complete(prefix, param) | 
					
						
							|  |  |  |                         if completion is None: | 
					
						
							|  |  |  |                             break | 
					
						
							|  |  |  |                         completions.append(completion) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             reply = message["input"] | 
					
						
							|  |  |  |             if isinstance(reply, BaseException): | 
					
						
							|  |  |  |                 raise reply | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |             if isinstance(reply, str): | 
					
						
							|  |  |  |                 return reply | 
					
						
							|  |  |  |             return reply() | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |         with ExitStack() as stack: | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |             client_sock, server_sock = socket.socketpair() | 
					
						
							|  |  |  |             stack.enter_context(closing(client_sock)) | 
					
						
							|  |  |  |             stack.enter_context(closing(server_sock)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             server_sock = unittest.mock.Mock(wraps=server_sock) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             client_sock.sendall( | 
					
						
							|  |  |  |                 b"".join( | 
					
						
							|  |  |  |                     (m if isinstance(m, bytes) else json.dumps(m).encode()) + b"\n" | 
					
						
							|  |  |  |                     for m in messages | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             client_sock.shutdown(socket.SHUT_WR) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if simulate_send_failure: | 
					
						
							|  |  |  |                 server_sock.sendall = unittest.mock.Mock( | 
					
						
							|  |  |  |                     side_effect=OSError("sendall failed") | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 client_sock.shutdown(socket.SHUT_RD) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             stdout = io.StringIO() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if simulate_sigint_during_stdout_write: | 
					
						
							|  |  |  |                 orig_stdout_write = stdout.write | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 def sigint_stdout_write(s): | 
					
						
							|  |  |  |                     signal.raise_signal(signal.SIGINT) | 
					
						
							|  |  |  |                     return orig_stdout_write(s) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 stdout.write = sigint_stdout_write | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |             input_mock = stack.enter_context( | 
					
						
							|  |  |  |                 unittest.mock.patch("pdb.input", side_effect=mock_input) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             stack.enter_context(redirect_stdout(stdout)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |             if use_interrupt_socket: | 
					
						
							|  |  |  |                 interrupt_sock = unittest.mock.Mock(spec=socket.socket) | 
					
						
							|  |  |  |                 mock_kill = None | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 interrupt_sock = None | 
					
						
							|  |  |  |                 mock_kill = stack.enter_context( | 
					
						
							|  |  |  |                     unittest.mock.patch("os.kill", spec=os.kill) | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |             client = _PdbClient( | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |                 pid=12345, | 
					
						
							|  |  |  |                 server_socket=server_sock, | 
					
						
							|  |  |  |                 interrupt_sock=interrupt_sock, | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if expected_exception is not None: | 
					
						
							|  |  |  |                 exception = expected_exception["exception"] | 
					
						
							|  |  |  |                 msg = expected_exception["msg"] | 
					
						
							|  |  |  |                 stack.enter_context(self.assertRaises(exception, msg=msg)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             client.cmdloop() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |         sent_msgs = [msg.args[0] for msg in server_sock.sendall.mock_calls] | 
					
						
							|  |  |  |         for msg in sent_msgs: | 
					
						
							|  |  |  |             assert msg.endswith(b"\n") | 
					
						
							|  |  |  |         actual_outgoing = [json.loads(msg) for msg in sent_msgs] | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |         self.assertEqual(actual_outgoing, expected_outgoing) | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |         self.assertEqual(completions, expected_completions) | 
					
						
							|  |  |  |         if expected_stdout_substring and not expected_stdout: | 
					
						
							|  |  |  |             self.assertIn(expected_stdout_substring, stdout.getvalue()) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self.assertEqual(stdout.getvalue(), expected_stdout) | 
					
						
							|  |  |  |         input_mock.assert_has_calls([unittest.mock.call(p) for p in prompts]) | 
					
						
							|  |  |  |         actual_state = {k: getattr(client, k) for k in expected_state} | 
					
						
							|  |  |  |         self.assertEqual(actual_state, expected_state) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |         if use_interrupt_socket: | 
					
						
							|  |  |  |             outgoing_signals = [ | 
					
						
							|  |  |  |                 signal.Signals(int.from_bytes(call.args[0])) | 
					
						
							|  |  |  |                 for call in interrupt_sock.sendall.call_args_list | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             assert mock_kill is not None | 
					
						
							|  |  |  |             outgoing_signals = [] | 
					
						
							|  |  |  |             for call in mock_kill.call_args_list: | 
					
						
							|  |  |  |                 pid, signum = call.args | 
					
						
							|  |  |  |                 self.assertEqual(pid, 12345) | 
					
						
							|  |  |  |                 outgoing_signals.append(signal.Signals(signum)) | 
					
						
							|  |  |  |         self.assertEqual(outgoing_signals, expected_outgoing_signals) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |     def test_remote_immediately_closing_the_connection(self): | 
					
						
							|  |  |  |         """Test the behavior when the remote closes the connection immediately.""" | 
					
						
							|  |  |  |         incoming = [] | 
					
						
							|  |  |  |         expected_outgoing = [] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=expected_outgoing, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_command_list(self): | 
					
						
							|  |  |  |         """Test handling the command_list message.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"command_list": ["help", "list", "continue"]}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[], | 
					
						
							|  |  |  |             expected_state={ | 
					
						
							|  |  |  |                 "pdb_commands": {"help", "list", "continue"}, | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_info_message(self): | 
					
						
							|  |  |  |         """Test handling a message payload with type='info'.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"message": "Some message or other\n", "type": "info"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[], | 
					
						
							|  |  |  |             expected_stdout="Some message or other\n", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_error_message(self): | 
					
						
							|  |  |  |         """Test handling a message payload with type='error'.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"message": "Some message or other.", "type": "error"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[], | 
					
						
							|  |  |  |             expected_stdout="*** Some message or other.\n", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_other_message(self): | 
					
						
							|  |  |  |         """Test handling a message payload with an unrecognized type.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"message": "Some message.\n", "type": "unknown"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[], | 
					
						
							|  |  |  |             expected_stdout="Some message.\n", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_help_for_command(self): | 
					
						
							|  |  |  |         """Test handling a request to display help for a command.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"help": "ll"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[], | 
					
						
							|  |  |  |             expected_stdout_substring="Usage: ll | longlist", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_help_without_a_specific_topic(self): | 
					
						
							|  |  |  |         """Test handling a request to display a help overview.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"help": ""}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[], | 
					
						
							|  |  |  |             expected_stdout_substring="type help <topic>", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_help_pdb(self): | 
					
						
							|  |  |  |         """Test handling a request to display the full PDB manual.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"help": "pdb"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[], | 
					
						
							|  |  |  |             expected_stdout_substring=">>> import pdb", | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_pdb_prompts(self): | 
					
						
							|  |  |  |         """Test responding to pdb's normal prompts.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"command_list": ["b"]}), | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": "lst ["}), | 
					
						
							|  |  |  |             ("user", {"prompt": "...   ", "input": "0 ]"}), | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": ""}), | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": "b ["}), | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": "! b ["}), | 
					
						
							|  |  |  |             ("user", {"prompt": "...   ", "input": "b ]"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 {"reply": "lst [\n0 ]"}, | 
					
						
							|  |  |  |                 {"reply": ""}, | 
					
						
							|  |  |  |                 {"reply": "b ["}, | 
					
						
							|  |  |  |                 {"reply": "!b [\nb ]"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_state={"state": "pdb"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_interact_prompts(self): | 
					
						
							|  |  |  |         """Test responding to pdb's interact mode prompts.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"command_list": ["b"]}), | 
					
						
							|  |  |  |             ("server", {"prompt": ">>> ", "state": "interact"}), | 
					
						
							|  |  |  |             ("user", {"prompt": ">>> ", "input": "lst ["}), | 
					
						
							|  |  |  |             ("user", {"prompt": "... ", "input": "0 ]"}), | 
					
						
							|  |  |  |             ("server", {"prompt": ">>> ", "state": "interact"}), | 
					
						
							|  |  |  |             ("user", {"prompt": ">>> ", "input": ""}), | 
					
						
							|  |  |  |             ("server", {"prompt": ">>> ", "state": "interact"}), | 
					
						
							|  |  |  |             ("user", {"prompt": ">>> ", "input": "b ["}), | 
					
						
							|  |  |  |             ("user", {"prompt": "... ", "input": "b ]"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 {"reply": "lst [\n0 ]"}, | 
					
						
							|  |  |  |                 {"reply": ""}, | 
					
						
							|  |  |  |                 {"reply": "b [\nb ]"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_state={"state": "interact"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_retry_pdb_prompt_on_syntax_error(self): | 
					
						
							|  |  |  |         """Test re-prompting after a SyntaxError in a Python expression.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": " lst ["}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": "lst ["}), | 
					
						
							|  |  |  |             ("user", {"prompt": "...   ", "input": " 0 ]"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 {"reply": "lst [\n 0 ]"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_stdout_substring="*** IndentationError", | 
					
						
							|  |  |  |             expected_state={"state": "pdb"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_retry_interact_prompt_on_syntax_error(self): | 
					
						
							|  |  |  |         """Test re-prompting after a SyntaxError in a Python expression.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": ">>> ", "state": "interact"}), | 
					
						
							|  |  |  |             ("user", {"prompt": ">>> ", "input": "!lst ["}), | 
					
						
							|  |  |  |             ("user", {"prompt": ">>> ", "input": "lst ["}), | 
					
						
							|  |  |  |             ("user", {"prompt": "... ", "input": " 0 ]"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 {"reply": "lst [\n 0 ]"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_stdout_substring="*** SyntaxError", | 
					
						
							|  |  |  |             expected_state={"state": "interact"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handling_unrecognized_prompt_type(self): | 
					
						
							|  |  |  |         """Test fallback to "dumb" single-line mode for unknown states.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": "Do it? ", "state": "confirm"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "Do it? ", "input": "! ["}), | 
					
						
							|  |  |  |             ("server", {"prompt": "Do it? ", "state": "confirm"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "Do it? ", "input": "echo hello"}), | 
					
						
							|  |  |  |             ("server", {"prompt": "Do it? ", "state": "confirm"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "Do it? ", "input": ""}), | 
					
						
							|  |  |  |             ("server", {"prompt": "Do it? ", "state": "confirm"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "Do it? ", "input": "echo goodbye"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 {"reply": "! ["}, | 
					
						
							|  |  |  |                 {"reply": "echo hello"}, | 
					
						
							|  |  |  |                 {"reply": ""}, | 
					
						
							|  |  |  |                 {"reply": "echo goodbye"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_state={"state": "dumb"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |     def test_sigint_at_prompt(self): | 
					
						
							|  |  |  |         """Test signaling when a prompt gets interrupted.""" | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": "(Pdb) ", | 
					
						
							|  |  |  |                     "input": lambda: signal.raise_signal(signal.SIGINT), | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 {"signal": "INT"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_state={"state": "pdb"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_sigint_at_continuation_prompt(self): | 
					
						
							|  |  |  |         """Test signaling when a continuation prompt gets interrupted.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": "if True:"}), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": "...   ", | 
					
						
							|  |  |  |                     "input": lambda: signal.raise_signal(signal.SIGINT), | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 {"signal": "INT"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_state={"state": "pdb"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |     def test_sigint_when_writing(self): | 
					
						
							|  |  |  |         """Test siginaling when sys.stdout.write() gets interrupted.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"message": "Some message or other\n", "type": "info"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         for use_interrupt_socket in [False, True]: | 
					
						
							|  |  |  |             with self.subTest(use_interrupt_socket=use_interrupt_socket): | 
					
						
							|  |  |  |                 self.do_test( | 
					
						
							|  |  |  |                     incoming=incoming, | 
					
						
							|  |  |  |                     simulate_sigint_during_stdout_write=True, | 
					
						
							|  |  |  |                     use_interrupt_socket=use_interrupt_socket, | 
					
						
							|  |  |  |                     expected_outgoing=[], | 
					
						
							|  |  |  |                     expected_outgoing_signals=[signal.SIGINT], | 
					
						
							|  |  |  |                     expected_stdout="Some message or other\n", | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |     def test_eof_at_prompt(self): | 
					
						
							|  |  |  |         """Test signaling when a prompt gets an EOFError.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": EOFError()}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 {"signal": "EOF"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_state={"state": "pdb"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unrecognized_json_message(self): | 
					
						
							|  |  |  |         """Test failing after getting an unrecognized payload.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"monty": "python"}), | 
					
						
							|  |  |  |             ("server", {"message": "Some message or other\n", "type": "info"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[], | 
					
						
							|  |  |  |             expected_exception={ | 
					
						
							|  |  |  |                 "exception": RuntimeError, | 
					
						
							|  |  |  |                 "msg": 'Unrecognized payload b\'{"monty": "python"}\'', | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_continuing_after_getting_a_non_json_payload(self): | 
					
						
							|  |  |  |         """Test continuing after getting a non JSON payload.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", b"spam"), | 
					
						
							|  |  |  |             ("server", {"message": "Something", "type": "info"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[], | 
					
						
							|  |  |  |             expected_stdout="\n".join( | 
					
						
							|  |  |  |                 [ | 
					
						
							|  |  |  |                     "*** Invalid JSON from remote: b'spam\\n'", | 
					
						
							|  |  |  |                     "Something", | 
					
						
							|  |  |  |                 ] | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_write_failing(self): | 
					
						
							|  |  |  |         """Test terminating if write fails due to a half closed socket.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[{"signal": "INT"}], | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |             simulate_send_failure=True, | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |             expected_state={"write_failed": True}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_completion_in_pdb_state(self): | 
					
						
							|  |  |  |         """Test requesting tab completions at a (Pdb) prompt.""" | 
					
						
							|  |  |  |         # GIVEN | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": "(Pdb) ", | 
					
						
							|  |  |  |                     "completion_request": { | 
					
						
							|  |  |  |                         "line": "    mod._", | 
					
						
							|  |  |  |                         "begidx": 8, | 
					
						
							|  |  |  |                         "endidx": 9, | 
					
						
							|  |  |  |                     }, | 
					
						
							|  |  |  |                     "input": "print(\n    mod.__name__)", | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ("server", {"completions": ["__name__", "__file__"]}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "complete": { | 
					
						
							|  |  |  |                         "text": "_", | 
					
						
							|  |  |  |                         "line": "mod._", | 
					
						
							|  |  |  |                         "begidx": 4, | 
					
						
							|  |  |  |                         "endidx": 5, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |                 {"reply": "print(\n    mod.__name__)"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_completions=["__name__", "__file__"], | 
					
						
							|  |  |  |             expected_state={"state": "pdb"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-05-04 15:20:28 -04:00
										 |  |  |     def test_multiline_completion_in_pdb_state(self): | 
					
						
							|  |  |  |         """Test requesting tab completions at a (Pdb) continuation prompt.""" | 
					
						
							|  |  |  |         # GIVEN | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": "(Pdb) ", "state": "pdb"}), | 
					
						
							|  |  |  |             ("user", {"prompt": "(Pdb) ", "input": "if True:"}), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": "...   ", | 
					
						
							|  |  |  |                     "completion_request": { | 
					
						
							|  |  |  |                         "line": "    b", | 
					
						
							|  |  |  |                         "begidx": 4, | 
					
						
							|  |  |  |                         "endidx": 5, | 
					
						
							|  |  |  |                     }, | 
					
						
							|  |  |  |                     "input": "    bool()", | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ("server", {"completions": ["bin", "bool", "bytes"]}), | 
					
						
							|  |  |  |             ("user", {"prompt": "...   ", "input": ""}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "complete": { | 
					
						
							|  |  |  |                         "text": "b", | 
					
						
							|  |  |  |                         "line": "! b", | 
					
						
							|  |  |  |                         "begidx": 2, | 
					
						
							|  |  |  |                         "endidx": 3, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |                 {"reply": "if True:\n    bool()\n"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_completions=["bin", "bool", "bytes"], | 
					
						
							|  |  |  |             expected_state={"state": "pdb"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |     def test_completion_in_interact_state(self): | 
					
						
							|  |  |  |         """Test requesting tab completions at a >>> prompt.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": ">>> ", "state": "interact"}), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": ">>> ", | 
					
						
							|  |  |  |                     "completion_request": { | 
					
						
							|  |  |  |                         "line": "    mod.__", | 
					
						
							|  |  |  |                         "begidx": 8, | 
					
						
							|  |  |  |                         "endidx": 10, | 
					
						
							|  |  |  |                     }, | 
					
						
							|  |  |  |                     "input": "print(\n    mod.__name__)", | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ("server", {"completions": ["__name__", "__file__"]}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "complete": { | 
					
						
							|  |  |  |                         "text": "__", | 
					
						
							|  |  |  |                         "line": "mod.__", | 
					
						
							|  |  |  |                         "begidx": 4, | 
					
						
							|  |  |  |                         "endidx": 6, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |                 {"reply": "print(\n    mod.__name__)"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_completions=["__name__", "__file__"], | 
					
						
							|  |  |  |             expected_state={"state": "interact"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_completion_in_unknown_state(self): | 
					
						
							|  |  |  |         """Test requesting tab completions at an unrecognized prompt.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"command_list": ["p"]}), | 
					
						
							|  |  |  |             ("server", {"prompt": "Do it? ", "state": "confirm"}), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": "Do it? ", | 
					
						
							|  |  |  |                     "completion_request": { | 
					
						
							|  |  |  |                         "line": "_", | 
					
						
							|  |  |  |                         "begidx": 0, | 
					
						
							|  |  |  |                         "endidx": 1, | 
					
						
							|  |  |  |                     }, | 
					
						
							|  |  |  |                     "input": "__name__", | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 {"reply": "__name__"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_state={"state": "dumb"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_write_failure_during_completion(self): | 
					
						
							|  |  |  |         """Test failing to write to the socket to request tab completions.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": ">>> ", "state": "interact"}), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": ">>> ", | 
					
						
							|  |  |  |                     "completion_request": { | 
					
						
							|  |  |  |                         "line": "xy", | 
					
						
							|  |  |  |                         "begidx": 0, | 
					
						
							|  |  |  |                         "endidx": 2, | 
					
						
							|  |  |  |                     }, | 
					
						
							|  |  |  |                     "input": "xyz", | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "complete": { | 
					
						
							|  |  |  |                         "text": "xy", | 
					
						
							|  |  |  |                         "line": "xy", | 
					
						
							|  |  |  |                         "begidx": 0, | 
					
						
							|  |  |  |                         "endidx": 2, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |                 {"reply": "xyz"}, | 
					
						
							|  |  |  |             ], | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |             simulate_send_failure=True, | 
					
						
							| 
									
										
										
										
											2025-04-30 09:09:41 -04:00
										 |  |  |             expected_completions=[], | 
					
						
							|  |  |  |             expected_state={"state": "interact", "write_failed": True}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_read_failure_during_completion(self): | 
					
						
							|  |  |  |         """Test failing to read tab completions from the socket.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": ">>> ", "state": "interact"}), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": ">>> ", | 
					
						
							|  |  |  |                     "completion_request": { | 
					
						
							|  |  |  |                         "line": "xy", | 
					
						
							|  |  |  |                         "begidx": 0, | 
					
						
							|  |  |  |                         "endidx": 2, | 
					
						
							|  |  |  |                     }, | 
					
						
							|  |  |  |                     "input": "xyz", | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "complete": { | 
					
						
							|  |  |  |                         "text": "xy", | 
					
						
							|  |  |  |                         "line": "xy", | 
					
						
							|  |  |  |                         "begidx": 0, | 
					
						
							|  |  |  |                         "endidx": 2, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |                 {"reply": "xyz"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_completions=[], | 
					
						
							|  |  |  |             expected_state={"state": "interact"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_reading_invalid_json_during_completion(self): | 
					
						
							|  |  |  |         """Test receiving invalid JSON when getting tab completions.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": ">>> ", "state": "interact"}), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": ">>> ", | 
					
						
							|  |  |  |                     "completion_request": { | 
					
						
							|  |  |  |                         "line": "xy", | 
					
						
							|  |  |  |                         "begidx": 0, | 
					
						
							|  |  |  |                         "endidx": 2, | 
					
						
							|  |  |  |                     }, | 
					
						
							|  |  |  |                     "input": "xyz", | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ("server", b'{"completions": '), | 
					
						
							|  |  |  |             ("user", {"prompt": ">>> ", "input": "xyz"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "complete": { | 
					
						
							|  |  |  |                         "text": "xy", | 
					
						
							|  |  |  |                         "line": "xy", | 
					
						
							|  |  |  |                         "begidx": 0, | 
					
						
							|  |  |  |                         "endidx": 2, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |                 {"reply": "xyz"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_stdout_substring="*** json.decoder.JSONDecodeError", | 
					
						
							|  |  |  |             expected_completions=[], | 
					
						
							|  |  |  |             expected_state={"state": "interact"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_reading_empty_json_during_completion(self): | 
					
						
							|  |  |  |         """Test receiving an empty JSON object when getting tab completions.""" | 
					
						
							|  |  |  |         incoming = [ | 
					
						
							|  |  |  |             ("server", {"prompt": ">>> ", "state": "interact"}), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 "user", | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "prompt": ">>> ", | 
					
						
							|  |  |  |                     "completion_request": { | 
					
						
							|  |  |  |                         "line": "xy", | 
					
						
							|  |  |  |                         "begidx": 0, | 
					
						
							|  |  |  |                         "endidx": 2, | 
					
						
							|  |  |  |                     }, | 
					
						
							|  |  |  |                     "input": "xyz", | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ("server", {}), | 
					
						
							|  |  |  |             ("user", {"prompt": ">>> ", "input": "xyz"}), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.do_test( | 
					
						
							|  |  |  |             incoming=incoming, | 
					
						
							|  |  |  |             expected_outgoing=[ | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "complete": { | 
					
						
							|  |  |  |                         "text": "xy", | 
					
						
							|  |  |  |                         "line": "xy", | 
					
						
							|  |  |  |                         "begidx": 0, | 
					
						
							|  |  |  |                         "endidx": 2, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |                 {"reply": "xyz"}, | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             expected_stdout=( | 
					
						
							|  |  |  |                 "*** RuntimeError: Failed to get valid completions." | 
					
						
							|  |  |  |                 " Got: {}\n" | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             expected_completions=[], | 
					
						
							|  |  |  |             expected_state={"state": "interact"}, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | class RemotePdbTestCase(unittest.TestCase): | 
					
						
							|  |  |  |     """Tests for the _PdbServer class.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         self.sockfile = MockSocketFile() | 
					
						
							|  |  |  |         self.pdb = _PdbServer(self.sockfile) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Mock some Bdb attributes that are lazily created when tracing starts | 
					
						
							|  |  |  |         self.pdb.botframe = None | 
					
						
							|  |  |  |         self.pdb.quitting = False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Create a frame for testing | 
					
						
							|  |  |  |         self.test_globals = {'a': 1, 'b': 2, '__pdb_convenience_variables': {'x': 100}} | 
					
						
							|  |  |  |         self.test_locals = {'c': 3, 'd': 4} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Create a simple test frame | 
					
						
							|  |  |  |         frame_info = unittest.mock.Mock() | 
					
						
							|  |  |  |         frame_info.f_globals = self.test_globals | 
					
						
							|  |  |  |         frame_info.f_locals = self.test_locals | 
					
						
							|  |  |  |         frame_info.f_lineno = 42 | 
					
						
							|  |  |  |         frame_info.f_code = unittest.mock.Mock() | 
					
						
							|  |  |  |         frame_info.f_code.co_filename = "test_file.py" | 
					
						
							|  |  |  |         frame_info.f_code.co_name = "test_function" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.pdb.curframe = frame_info | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_message_and_error(self): | 
					
						
							|  |  |  |         """Test message and error methods send correct JSON.""" | 
					
						
							|  |  |  |         self.pdb.message("Test message") | 
					
						
							|  |  |  |         self.pdb.error("Test error") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         outputs = self.sockfile.get_output() | 
					
						
							|  |  |  |         self.assertEqual(len(outputs), 2) | 
					
						
							|  |  |  |         self.assertEqual(outputs[0], {"message": "Test message\n", "type": "info"}) | 
					
						
							|  |  |  |         self.assertEqual(outputs[1], {"message": "Test error", "type": "error"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_read_command(self): | 
					
						
							|  |  |  |         """Test reading commands from the socket.""" | 
					
						
							|  |  |  |         # Add test input | 
					
						
							|  |  |  |         self.sockfile.add_input({"reply": "help"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Read the command | 
					
						
							|  |  |  |         cmd = self.pdb._read_reply() | 
					
						
							|  |  |  |         self.assertEqual(cmd, "help") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_read_command_EOF(self): | 
					
						
							|  |  |  |         """Test reading EOF command.""" | 
					
						
							|  |  |  |         # Simulate socket closure | 
					
						
							|  |  |  |         self.pdb._write_failed = True | 
					
						
							|  |  |  |         with self.assertRaises(EOFError): | 
					
						
							|  |  |  |             self.pdb._read_reply() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_completion(self): | 
					
						
							|  |  |  |         """Test handling completion requests.""" | 
					
						
							|  |  |  |         # Mock completenames to return specific values | 
					
						
							|  |  |  |         with unittest.mock.patch.object(self.pdb, 'completenames', | 
					
						
							|  |  |  |                                        return_value=["continue", "clear"]): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Add a completion request | 
					
						
							|  |  |  |             self.sockfile.add_input({ | 
					
						
							|  |  |  |                 "complete": { | 
					
						
							|  |  |  |                     "text": "c", | 
					
						
							|  |  |  |                     "line": "c", | 
					
						
							|  |  |  |                     "begidx": 0, | 
					
						
							|  |  |  |                     "endidx": 1 | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             }) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Add a regular command to break the loop | 
					
						
							|  |  |  |             self.sockfile.add_input({"reply": "help"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Read command - this should process the completion request first | 
					
						
							|  |  |  |             cmd = self.pdb._read_reply() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Verify completion response was sent | 
					
						
							|  |  |  |             outputs = self.sockfile.get_output() | 
					
						
							|  |  |  |             self.assertEqual(len(outputs), 1) | 
					
						
							|  |  |  |             self.assertEqual(outputs[0], {"completions": ["continue", "clear"]}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # The actual command should be returned | 
					
						
							|  |  |  |             self.assertEqual(cmd, "help") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_do_help(self): | 
					
						
							|  |  |  |         """Test that do_help sends the help message.""" | 
					
						
							|  |  |  |         self.pdb.do_help("break") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         outputs = self.sockfile.get_output() | 
					
						
							|  |  |  |         self.assertEqual(len(outputs), 1) | 
					
						
							|  |  |  |         self.assertEqual(outputs[0], {"help": "break"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_interact_mode(self): | 
					
						
							|  |  |  |         """Test interaction mode setup and execution.""" | 
					
						
							|  |  |  |         # First set up interact mode | 
					
						
							|  |  |  |         self.pdb.do_interact("") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Verify _interact_state is properly initialized | 
					
						
							|  |  |  |         self.assertIsNotNone(self.pdb._interact_state) | 
					
						
							|  |  |  |         self.assertIsInstance(self.pdb._interact_state, dict) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Test running code in interact mode | 
					
						
							|  |  |  |         with unittest.mock.patch.object(self.pdb, '_error_exc') as mock_error: | 
					
						
							|  |  |  |             self.pdb._run_in_python_repl("print('test')") | 
					
						
							|  |  |  |             mock_error.assert_not_called() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Test with syntax error | 
					
						
							|  |  |  |             self.pdb._run_in_python_repl("if:") | 
					
						
							|  |  |  |             mock_error.assert_called_once() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_registering_commands(self): | 
					
						
							|  |  |  |         """Test registering breakpoint commands.""" | 
					
						
							|  |  |  |         # Mock get_bpbynumber | 
					
						
							|  |  |  |         with unittest.mock.patch.object(self.pdb, 'get_bpbynumber'): | 
					
						
							|  |  |  |             # Queue up some input to send | 
					
						
							|  |  |  |             self.sockfile.add_input({"reply": "commands 1"}) | 
					
						
							|  |  |  |             self.sockfile.add_input({"reply": "silent"}) | 
					
						
							|  |  |  |             self.sockfile.add_input({"reply": "print('hi')"}) | 
					
						
							|  |  |  |             self.sockfile.add_input({"reply": "end"}) | 
					
						
							|  |  |  |             self.sockfile.add_input({"signal": "EOF"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Run the PDB command loop | 
					
						
							|  |  |  |             self.pdb.cmdloop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             outputs = self.sockfile.get_output() | 
					
						
							|  |  |  |             self.assertIn('command_list', outputs[0]) | 
					
						
							|  |  |  |             self.assertEqual(outputs[1], {"prompt": "(Pdb) ", "state": "pdb"}) | 
					
						
							|  |  |  |             self.assertEqual(outputs[2], {"prompt": "(com) ", "state": "commands"}) | 
					
						
							|  |  |  |             self.assertEqual(outputs[3], {"prompt": "(com) ", "state": "commands"}) | 
					
						
							|  |  |  |             self.assertEqual(outputs[4], {"prompt": "(com) ", "state": "commands"}) | 
					
						
							|  |  |  |             self.assertEqual(outputs[5], {"prompt": "(Pdb) ", "state": "pdb"}) | 
					
						
							|  |  |  |             self.assertEqual(outputs[6], {"message": "\n", "type": "info"}) | 
					
						
							|  |  |  |             self.assertEqual(len(outputs), 7) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertEqual( | 
					
						
							|  |  |  |                 self.pdb.commands[1], | 
					
						
							|  |  |  |                 ["_pdbcmd_silence_frame_status", "print('hi')"], | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_detach(self): | 
					
						
							|  |  |  |         """Test the detach method.""" | 
					
						
							|  |  |  |         with unittest.mock.patch.object(self.sockfile, 'close') as mock_close: | 
					
						
							|  |  |  |             self.pdb.detach() | 
					
						
							|  |  |  |             mock_close.assert_called_once() | 
					
						
							|  |  |  |             self.assertFalse(self.pdb.quitting) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_cmdloop(self): | 
					
						
							|  |  |  |         """Test the command loop with various commands.""" | 
					
						
							|  |  |  |         # Mock onecmd to track command execution | 
					
						
							|  |  |  |         with unittest.mock.patch.object(self.pdb, 'onecmd', return_value=False) as mock_onecmd: | 
					
						
							|  |  |  |             # Add commands to the queue | 
					
						
							|  |  |  |             self.pdb.cmdqueue = ['help', 'list'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Add a command from the socket for when cmdqueue is empty | 
					
						
							|  |  |  |             self.sockfile.add_input({"reply": "next"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Add a second command to break the loop | 
					
						
							|  |  |  |             self.sockfile.add_input({"reply": "quit"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Configure onecmd to exit the loop on "quit" | 
					
						
							|  |  |  |             def side_effect(line): | 
					
						
							|  |  |  |                 return line == 'quit' | 
					
						
							|  |  |  |             mock_onecmd.side_effect = side_effect | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Run the command loop | 
					
						
							|  |  |  |             self.pdb.quitting = False # Set this by hand because we don't want to really call set_trace() | 
					
						
							|  |  |  |             self.pdb.cmdloop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Should have processed 4 commands: 2 from cmdqueue, 2 from socket | 
					
						
							|  |  |  |             self.assertEqual(mock_onecmd.call_count, 4) | 
					
						
							|  |  |  |             mock_onecmd.assert_any_call('help') | 
					
						
							|  |  |  |             mock_onecmd.assert_any_call('list') | 
					
						
							|  |  |  |             mock_onecmd.assert_any_call('next') | 
					
						
							|  |  |  |             mock_onecmd.assert_any_call('quit') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Check if prompt was sent to client | 
					
						
							|  |  |  |             outputs = self.sockfile.get_output() | 
					
						
							|  |  |  |             prompts = [o for o in outputs if 'prompt' in o] | 
					
						
							|  |  |  |             self.assertEqual(len(prompts), 2)  # Should have sent 2 prompts | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-26 11:09:09 -07:00
										 |  |  | @requires_subprocess() | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | @unittest.skipIf(is_wasi, "WASI does not support TCP sockets") | 
					
						
							|  |  |  | class PdbConnectTestCase(unittest.TestCase): | 
					
						
							|  |  |  |     """Tests for the _connect mechanism using direct socket communication.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         # Create a server socket that will wait for the debugger to connect | 
					
						
							|  |  |  |         self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
					
						
							|  |  |  |         self.server_sock.bind(('127.0.0.1', 0))  # Let OS assign port | 
					
						
							|  |  |  |         self.server_sock.listen(1) | 
					
						
							|  |  |  |         self.port = self.server_sock.getsockname()[1] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _create_script(self, script=None): | 
					
						
							|  |  |  |         # Create a file for subprocess script | 
					
						
							|  |  |  |         if script is None: | 
					
						
							|  |  |  |             script = textwrap.dedent( | 
					
						
							|  |  |  |                 f"""
 | 
					
						
							|  |  |  |                 import pdb | 
					
						
							|  |  |  |                 import sys | 
					
						
							|  |  |  |                 import time | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 def foo(): | 
					
						
							|  |  |  |                     x = 42 | 
					
						
							|  |  |  |                     return bar() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 def bar(): | 
					
						
							|  |  |  |                     return 42 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 def connect_to_debugger(): | 
					
						
							|  |  |  |                     # Create a frame to debug | 
					
						
							|  |  |  |                     def dummy_function(): | 
					
						
							|  |  |  |                         x = 42 | 
					
						
							|  |  |  |                         # Call connect to establish connection | 
					
						
							|  |  |  |                         # with the test server | 
					
						
							|  |  |  |                         frame = sys._getframe()  # Get the current frame | 
					
						
							|  |  |  |                         pdb._connect( | 
					
						
							|  |  |  |                             host='127.0.0.1', | 
					
						
							|  |  |  |                             port={self.port}, | 
					
						
							|  |  |  |                             frame=frame, | 
					
						
							|  |  |  |                             commands="", | 
					
						
							|  |  |  |                             version=pdb._PdbServer.protocol_version(), | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |                             signal_raising_thread=False, | 
					
						
							| 
									
										
										
										
											2025-05-06 01:28:16 -04:00
										 |  |  |                             colorize=False, | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |                         ) | 
					
						
							|  |  |  |                         return x  # This line won't be reached in debugging | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     return dummy_function() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 result = connect_to_debugger() | 
					
						
							|  |  |  |                 foo() | 
					
						
							|  |  |  |                 print(f"Function returned: {{result}}") | 
					
						
							|  |  |  |                 """)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.script_path = TESTFN + "_connect_test.py" | 
					
						
							|  |  |  |         with open(self.script_path, 'w') as f: | 
					
						
							|  |  |  |             f.write(script) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         self.server_sock.close() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             unlink(self.script_path) | 
					
						
							|  |  |  |         except OSError: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _connect_and_get_client_file(self): | 
					
						
							|  |  |  |         """Helper to start subprocess and get connected client file.""" | 
					
						
							|  |  |  |         # Start the subprocess that will connect to our socket | 
					
						
							|  |  |  |         process = subprocess.Popen( | 
					
						
							|  |  |  |             [sys.executable, self.script_path], | 
					
						
							|  |  |  |             stdout=subprocess.PIPE, | 
					
						
							|  |  |  |             stderr=subprocess.PIPE, | 
					
						
							|  |  |  |             text=True | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Accept the connection from the subprocess | 
					
						
							|  |  |  |         client_sock, _ = self.server_sock.accept() | 
					
						
							|  |  |  |         client_file = client_sock.makefile('rwb') | 
					
						
							|  |  |  |         self.addCleanup(client_file.close) | 
					
						
							|  |  |  |         self.addCleanup(client_sock.close) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return process, client_file | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _read_until_prompt(self, client_file): | 
					
						
							|  |  |  |         """Helper to read messages until a prompt is received.""" | 
					
						
							|  |  |  |         messages = [] | 
					
						
							|  |  |  |         while True: | 
					
						
							|  |  |  |             data = client_file.readline() | 
					
						
							|  |  |  |             if not data: | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  |             msg = json.loads(data.decode()) | 
					
						
							|  |  |  |             messages.append(msg) | 
					
						
							|  |  |  |             if 'prompt' in msg: | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  |         return messages | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _send_command(self, client_file, command): | 
					
						
							|  |  |  |         """Helper to send a command to the debugger.""" | 
					
						
							|  |  |  |         client_file.write(json.dumps({"reply": command}).encode() + b"\n") | 
					
						
							|  |  |  |         client_file.flush() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_connect_and_basic_commands(self): | 
					
						
							|  |  |  |         """Test connecting to a remote debugger and sending basic commands.""" | 
					
						
							|  |  |  |         self._create_script() | 
					
						
							|  |  |  |         process, client_file = self._connect_and_get_client_file() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 13:14:59 +02:00
										 |  |  |         with kill_on_error(process): | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             # We should receive initial data from the debugger | 
					
						
							|  |  |  |             data = client_file.readline() | 
					
						
							|  |  |  |             initial_data = json.loads(data.decode()) | 
					
						
							|  |  |  |             self.assertIn('message', initial_data) | 
					
						
							|  |  |  |             self.assertIn('pdb._connect', initial_data['message']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # First, look for command_list message | 
					
						
							|  |  |  |             data = client_file.readline() | 
					
						
							|  |  |  |             command_list = json.loads(data.decode()) | 
					
						
							|  |  |  |             self.assertIn('command_list', command_list) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Then, look for the first prompt | 
					
						
							|  |  |  |             data = client_file.readline() | 
					
						
							|  |  |  |             prompt_data = json.loads(data.decode()) | 
					
						
							|  |  |  |             self.assertIn('prompt', prompt_data) | 
					
						
							|  |  |  |             self.assertEqual(prompt_data['state'], 'pdb') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Send 'bt' (backtrace) command | 
					
						
							|  |  |  |             self._send_command(client_file, "bt") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Check for response - we should get some stack frames | 
					
						
							|  |  |  |             messages = self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Extract text messages containing stack info | 
					
						
							|  |  |  |             text_msg = [msg['message'] for msg in messages | 
					
						
							|  |  |  |                     if 'message' in msg and 'connect_to_debugger' in msg['message']] | 
					
						
							|  |  |  |             got_stack_info = bool(text_msg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             expected_stacks = [ | 
					
						
							|  |  |  |                 "<module>", | 
					
						
							|  |  |  |                 "connect_to_debugger", | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             for stack, msg in zip(expected_stacks, text_msg, strict=True): | 
					
						
							|  |  |  |                 self.assertIn(stack, msg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertTrue(got_stack_info, "Should have received stack trace information") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Send 'c' (continue) command to let the program finish | 
					
						
							|  |  |  |             self._send_command(client_file, "c") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Wait for process to finish | 
					
						
							| 
									
										
										
										
											2025-04-25 16:00:26 +02:00
										 |  |  |             stdout, _ = process.communicate(timeout=SHORT_TIMEOUT) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |             # Check if we got the expected output | 
					
						
							|  |  |  |             self.assertIn("Function returned: 42", stdout) | 
					
						
							|  |  |  |             self.assertEqual(process.returncode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_breakpoints(self): | 
					
						
							|  |  |  |         """Test setting and hitting breakpoints.""" | 
					
						
							|  |  |  |         self._create_script() | 
					
						
							|  |  |  |         process, client_file = self._connect_and_get_client_file() | 
					
						
							| 
									
										
										
										
											2025-04-25 13:14:59 +02:00
										 |  |  |         with kill_on_error(process): | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             # Skip initial messages until we get to the prompt | 
					
						
							|  |  |  |             self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Set a breakpoint at the return statement | 
					
						
							|  |  |  |             self._send_command(client_file, "break bar") | 
					
						
							|  |  |  |             messages = self._read_until_prompt(client_file) | 
					
						
							|  |  |  |             bp_msg = next(msg['message'] for msg in messages if 'message' in msg) | 
					
						
							|  |  |  |             self.assertIn("Breakpoint", bp_msg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Continue execution until breakpoint | 
					
						
							|  |  |  |             self._send_command(client_file, "c") | 
					
						
							|  |  |  |             messages = self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Verify we hit the breakpoint | 
					
						
							|  |  |  |             hit_msg = next(msg['message'] for msg in messages if 'message' in msg) | 
					
						
							|  |  |  |             self.assertIn("bar()", hit_msg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Check breakpoint list | 
					
						
							|  |  |  |             self._send_command(client_file, "b") | 
					
						
							|  |  |  |             messages = self._read_until_prompt(client_file) | 
					
						
							|  |  |  |             list_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg) | 
					
						
							|  |  |  |             self.assertIn("1   breakpoint", list_msg) | 
					
						
							|  |  |  |             self.assertIn("breakpoint already hit 1 time", list_msg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Clear breakpoint | 
					
						
							|  |  |  |             self._send_command(client_file, "clear 1") | 
					
						
							|  |  |  |             messages = self._read_until_prompt(client_file) | 
					
						
							|  |  |  |             clear_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg) | 
					
						
							|  |  |  |             self.assertIn("Deleted breakpoint", clear_msg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Continue to end | 
					
						
							|  |  |  |             self._send_command(client_file, "c") | 
					
						
							| 
									
										
										
										
											2025-04-25 16:00:26 +02:00
										 |  |  |             stdout, _ = process.communicate(timeout=SHORT_TIMEOUT) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |             self.assertIn("Function returned: 42", stdout) | 
					
						
							|  |  |  |             self.assertEqual(process.returncode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_keyboard_interrupt(self): | 
					
						
							|  |  |  |         """Test that sending keyboard interrupt breaks into pdb.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         script = textwrap.dedent(f"""
 | 
					
						
							|  |  |  |             import time | 
					
						
							|  |  |  |             import sys | 
					
						
							|  |  |  |             import socket | 
					
						
							|  |  |  |             import pdb | 
					
						
							|  |  |  |             def bar(): | 
					
						
							|  |  |  |                 frame = sys._getframe()  # Get the current frame | 
					
						
							|  |  |  |                 pdb._connect( | 
					
						
							|  |  |  |                     host='127.0.0.1', | 
					
						
							|  |  |  |                     port={self.port}, | 
					
						
							|  |  |  |                     frame=frame, | 
					
						
							|  |  |  |                     commands="", | 
					
						
							|  |  |  |                     version=pdb._PdbServer.protocol_version(), | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |                     signal_raising_thread=True, | 
					
						
							| 
									
										
										
										
											2025-05-06 01:28:16 -04:00
										 |  |  |                     colorize=False, | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |                 ) | 
					
						
							|  |  |  |                 print("Connected to debugger") | 
					
						
							| 
									
										
										
										
											2025-04-25 10:40:18 -07:00
										 |  |  |                 iterations = 50 | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |                 while iterations > 0: | 
					
						
							| 
									
										
										
										
											2025-04-25 10:40:18 -07:00
										 |  |  |                     print("Iteration", iterations, flush=True) | 
					
						
							|  |  |  |                     time.sleep(0.2) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |                     iterations -= 1 | 
					
						
							|  |  |  |                 return 42 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if __name__ == "__main__": | 
					
						
							|  |  |  |                 print("Function returned:", bar()) | 
					
						
							|  |  |  |             """)
 | 
					
						
							|  |  |  |         self._create_script(script=script) | 
					
						
							|  |  |  |         process, client_file = self._connect_and_get_client_file() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |         # Accept a 2nd connection from the subprocess to tell it about signals | 
					
						
							|  |  |  |         signal_sock, _ = self.server_sock.accept() | 
					
						
							|  |  |  |         self.addCleanup(signal_sock.close) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 13:14:59 +02:00
										 |  |  |         with kill_on_error(process): | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             # Skip initial messages until we get to the prompt | 
					
						
							|  |  |  |             self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Continue execution | 
					
						
							|  |  |  |             self._send_command(client_file, "c") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 10:40:18 -07:00
										 |  |  |             # Confirm that the remote is already in the while loop. We know | 
					
						
							|  |  |  |             # it's in bar() and we can exit the loop immediately by setting | 
					
						
							|  |  |  |             # iterations to 0. | 
					
						
							|  |  |  |             while line := process.stdout.readline(): | 
					
						
							|  |  |  |                 if line.startswith("Iteration"): | 
					
						
							|  |  |  |                     break | 
					
						
							| 
									
										
										
										
											2025-04-25 14:46:44 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             # Inject a script to interrupt the running process | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |             signal_sock.sendall(signal.SIGINT.to_bytes()) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             messages = self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 10:40:18 -07:00
										 |  |  |             # Verify we got the keyboard interrupt message. | 
					
						
							| 
									
										
										
										
											2025-04-25 14:46:44 +01:00
										 |  |  |             interrupt_msgs = [msg['message'] for msg in messages if 'message' in msg] | 
					
						
							| 
									
										
										
										
											2025-04-25 10:40:18 -07:00
										 |  |  |             expected_msg = [msg for msg in interrupt_msgs if "bar()" in msg] | 
					
						
							| 
									
										
										
										
											2025-04-25 14:46:44 +01:00
										 |  |  |             self.assertGreater(len(expected_msg), 0) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 14:46:44 +01:00
										 |  |  |             # Continue to end as fast as we can | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             self._send_command(client_file, "iterations = 0") | 
					
						
							|  |  |  |             self._send_command(client_file, "c") | 
					
						
							| 
									
										
										
										
											2025-04-25 16:00:26 +02:00
										 |  |  |             stdout, _ = process.communicate(timeout=SHORT_TIMEOUT) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             self.assertIn("Function returned: 42", stdout) | 
					
						
							|  |  |  |             self.assertEqual(process.returncode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_handle_eof(self): | 
					
						
							|  |  |  |         """Test that EOF signal properly exits the debugger.""" | 
					
						
							|  |  |  |         self._create_script() | 
					
						
							|  |  |  |         process, client_file = self._connect_and_get_client_file() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 13:14:59 +02:00
										 |  |  |         with kill_on_error(process): | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             # Skip initial messages until we get to the prompt | 
					
						
							|  |  |  |             self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Send EOF signal to exit the debugger | 
					
						
							|  |  |  |             client_file.write(json.dumps({"signal": "EOF"}).encode() + b"\n") | 
					
						
							|  |  |  |             client_file.flush() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # The process should complete normally after receiving EOF | 
					
						
							| 
									
										
										
										
											2025-04-25 16:00:26 +02:00
										 |  |  |             stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |             # Verify process completed correctly | 
					
						
							|  |  |  |             self.assertIn("Function returned: 42", stdout) | 
					
						
							|  |  |  |             self.assertEqual(process.returncode, 0) | 
					
						
							|  |  |  |             self.assertEqual(stderr, "") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_protocol_version(self): | 
					
						
							|  |  |  |         """Test that incompatible protocol versions are properly detected.""" | 
					
						
							|  |  |  |         # Create a script using an incompatible protocol version | 
					
						
							|  |  |  |         script = textwrap.dedent(f'''
 | 
					
						
							|  |  |  |             import sys | 
					
						
							|  |  |  |             import pdb | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             def run_test(): | 
					
						
							|  |  |  |                 frame = sys._getframe() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Use a fake version number that's definitely incompatible | 
					
						
							|  |  |  |                 fake_version = 0x01010101 # A fake version that doesn't match any real Python version | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Connect with the wrong version | 
					
						
							|  |  |  |                 pdb._connect( | 
					
						
							|  |  |  |                     host='127.0.0.1', | 
					
						
							|  |  |  |                     port={self.port}, | 
					
						
							|  |  |  |                     frame=frame, | 
					
						
							|  |  |  |                     commands="", | 
					
						
							|  |  |  |                     version=fake_version, | 
					
						
							| 
									
										
										
										
											2025-05-05 15:33:59 -04:00
										 |  |  |                     signal_raising_thread=False, | 
					
						
							| 
									
										
										
										
											2025-05-06 01:28:16 -04:00
										 |  |  |                     colorize=False, | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |                 ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # This should print if the debugger detaches correctly | 
					
						
							|  |  |  |                 print("Debugger properly detected version mismatch") | 
					
						
							|  |  |  |                 return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if __name__ == "__main__": | 
					
						
							|  |  |  |                 print("Test result:", run_test()) | 
					
						
							|  |  |  |             ''')
 | 
					
						
							|  |  |  |         self._create_script(script=script) | 
					
						
							|  |  |  |         process, client_file = self._connect_and_get_client_file() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 13:14:59 +02:00
										 |  |  |         with kill_on_error(process): | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             # First message should be an error about protocol version mismatch | 
					
						
							|  |  |  |             data = client_file.readline() | 
					
						
							|  |  |  |             message = json.loads(data.decode()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertIn('message', message) | 
					
						
							|  |  |  |             self.assertEqual(message['type'], 'error') | 
					
						
							|  |  |  |             self.assertIn('incompatible', message['message']) | 
					
						
							|  |  |  |             self.assertIn('protocol version', message['message']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # The process should complete normally | 
					
						
							| 
									
										
										
										
											2025-04-25 16:00:26 +02:00
										 |  |  |             stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |             # Verify the process completed successfully | 
					
						
							|  |  |  |             self.assertIn("Test result: True", stdout) | 
					
						
							|  |  |  |             self.assertIn("Debugger properly detected version mismatch", stdout) | 
					
						
							|  |  |  |             self.assertEqual(process.returncode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_help_system(self): | 
					
						
							|  |  |  |         """Test that the help system properly sends help text to the client.""" | 
					
						
							|  |  |  |         self._create_script() | 
					
						
							|  |  |  |         process, client_file = self._connect_and_get_client_file() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 13:14:59 +02:00
										 |  |  |         with kill_on_error(process): | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             # Skip initial messages until we get to the prompt | 
					
						
							|  |  |  |             self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Request help for different commands | 
					
						
							|  |  |  |             help_commands = ["help", "help break", "help continue", "help pdb"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             for cmd in help_commands: | 
					
						
							|  |  |  |                 self._send_command(client_file, cmd) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Look for help message | 
					
						
							|  |  |  |                 data = client_file.readline() | 
					
						
							|  |  |  |                 message = json.loads(data.decode()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 self.assertIn('help', message) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 if cmd == "help": | 
					
						
							|  |  |  |                     # Should just contain the command itself | 
					
						
							|  |  |  |                     self.assertEqual(message['help'], "") | 
					
						
							|  |  |  |                 else: | 
					
						
							|  |  |  |                     # Should contain the specific command we asked for help with | 
					
						
							|  |  |  |                     command = cmd.split()[1] | 
					
						
							|  |  |  |                     self.assertEqual(message['help'], command) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Skip to the next prompt | 
					
						
							|  |  |  |                 self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Continue execution to finish the program | 
					
						
							|  |  |  |             self._send_command(client_file, "c") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 16:00:26 +02:00
										 |  |  |             stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             self.assertIn("Function returned: 42", stdout) | 
					
						
							|  |  |  |             self.assertEqual(process.returncode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_multi_line_commands(self): | 
					
						
							|  |  |  |         """Test that multi-line commands work properly over remote connection.""" | 
					
						
							|  |  |  |         self._create_script() | 
					
						
							|  |  |  |         process, client_file = self._connect_and_get_client_file() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 13:14:59 +02:00
										 |  |  |         with kill_on_error(process): | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             # Skip initial messages until we get to the prompt | 
					
						
							|  |  |  |             self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Send a multi-line command | 
					
						
							|  |  |  |             multi_line_commands = [ | 
					
						
							|  |  |  |                 # Define a function | 
					
						
							|  |  |  |                 "def test_func():\n    return 42", | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # For loop | 
					
						
							|  |  |  |                 "for i in range(3):\n    print(i)", | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # If statement | 
					
						
							|  |  |  |                 "if True:\n    x = 42\nelse:\n    x = 0", | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Try/except | 
					
						
							|  |  |  |                 "try:\n    result = 10/2\n    print(result)\nexcept ZeroDivisionError:\n    print('Error')", | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Class definition | 
					
						
							|  |  |  |                 "class TestClass:\n    def __init__(self):\n        self.value = 100\n    def get_value(self):\n        return self.value" | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             for cmd in multi_line_commands: | 
					
						
							|  |  |  |                 self._send_command(client_file, cmd) | 
					
						
							|  |  |  |                 self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Test executing the defined function | 
					
						
							|  |  |  |             self._send_command(client_file, "test_func()") | 
					
						
							|  |  |  |             messages = self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Find the result message | 
					
						
							|  |  |  |             result_msg = next(msg['message'] for msg in messages if 'message' in msg) | 
					
						
							|  |  |  |             self.assertIn("42", result_msg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Test creating an instance of the defined class | 
					
						
							|  |  |  |             self._send_command(client_file, "obj = TestClass()") | 
					
						
							|  |  |  |             self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Test calling a method on the instance | 
					
						
							|  |  |  |             self._send_command(client_file, "obj.get_value()") | 
					
						
							|  |  |  |             messages = self._read_until_prompt(client_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Find the result message | 
					
						
							|  |  |  |             result_msg = next(msg['message'] for msg in messages if 'message' in msg) | 
					
						
							|  |  |  |             self.assertIn("100", result_msg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Continue execution to finish | 
					
						
							|  |  |  |             self._send_command(client_file, "c") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-25 16:00:26 +02:00
										 |  |  |             stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT) | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  |             self.assertIn("Function returned: 42", stdout) | 
					
						
							|  |  |  |             self.assertEqual(process.returncode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-05-06 05:44:49 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | def _supports_remote_attaching(): | 
					
						
							|  |  |  |     PROCESS_VM_READV_SUPPORTED = False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         from _remote_debugging import PROCESS_VM_READV_SUPPORTED | 
					
						
							|  |  |  |     except ImportError: | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return PROCESS_VM_READV_SUPPORTED | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled") | 
					
						
							|  |  |  | @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32", | 
					
						
							|  |  |  |                     "Test only runs on Linux, Windows and MacOS") | 
					
						
							|  |  |  | @unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(), | 
					
						
							|  |  |  |                     "Testing on Linux requires process_vm_readv support") | 
					
						
							|  |  |  | @cpython_only | 
					
						
							|  |  |  | @requires_subprocess() | 
					
						
							|  |  |  | class PdbAttachTestCase(unittest.TestCase): | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         # Create a server socket that will wait for the debugger to connect | 
					
						
							|  |  |  |         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
					
						
							|  |  |  |         self.sock.bind(('127.0.0.1', 0))  # Let OS assign port | 
					
						
							|  |  |  |         self.sock.listen(1) | 
					
						
							|  |  |  |         self.port = self.sock.getsockname()[1] | 
					
						
							|  |  |  |         self._create_script() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _create_script(self, script=None): | 
					
						
							|  |  |  |         # Create a file for subprocess script | 
					
						
							|  |  |  |         script = textwrap.dedent( | 
					
						
							|  |  |  |             f"""
 | 
					
						
							|  |  |  |             import socket | 
					
						
							|  |  |  |             import time | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             def foo(): | 
					
						
							|  |  |  |                 return bar() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             def bar(): | 
					
						
							|  |  |  |                 return baz() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             def baz(): | 
					
						
							|  |  |  |                 x = 1 | 
					
						
							|  |  |  |                 # Trigger attach | 
					
						
							|  |  |  |                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
					
						
							|  |  |  |                 sock.connect(('127.0.0.1', {self.port})) | 
					
						
							|  |  |  |                 sock.close() | 
					
						
							|  |  |  |                 count = 0 | 
					
						
							|  |  |  |                 while x == 1 and count < 100: | 
					
						
							|  |  |  |                     count += 1 | 
					
						
							|  |  |  |                     time.sleep(0.1) | 
					
						
							|  |  |  |                 return x | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             result = foo() | 
					
						
							|  |  |  |             print(f"Function returned: {{result}}") | 
					
						
							|  |  |  |             """
 | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.script_path = TESTFN + "_connect_test.py" | 
					
						
							|  |  |  |         with open(self.script_path, 'w') as f: | 
					
						
							|  |  |  |             f.write(script) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         self.sock.close() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             unlink(self.script_path) | 
					
						
							|  |  |  |         except OSError: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def do_integration_test(self, client_stdin): | 
					
						
							|  |  |  |         process = subprocess.Popen( | 
					
						
							|  |  |  |             [sys.executable, self.script_path], | 
					
						
							|  |  |  |             stdout=subprocess.PIPE, | 
					
						
							|  |  |  |             stderr=subprocess.PIPE, | 
					
						
							|  |  |  |             text=True | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         self.addCleanup(process.stdout.close) | 
					
						
							|  |  |  |         self.addCleanup(process.stderr.close) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Wait for the process to reach our attachment point | 
					
						
							|  |  |  |         self.sock.settimeout(10) | 
					
						
							|  |  |  |         conn, _ = self.sock.accept() | 
					
						
							|  |  |  |         conn.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         client_stdin = io.StringIO(client_stdin) | 
					
						
							|  |  |  |         client_stdout = io.StringIO() | 
					
						
							|  |  |  |         client_stderr = io.StringIO() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.addCleanup(client_stdin.close) | 
					
						
							|  |  |  |         self.addCleanup(client_stdout.close) | 
					
						
							|  |  |  |         self.addCleanup(client_stderr.close) | 
					
						
							|  |  |  |         self.addCleanup(process.wait) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with ( | 
					
						
							|  |  |  |             unittest.mock.patch("sys.stdin", client_stdin), | 
					
						
							|  |  |  |             redirect_stdout(client_stdout), | 
					
						
							|  |  |  |             redirect_stderr(client_stderr), | 
					
						
							|  |  |  |             unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]), | 
					
						
							|  |  |  |         ): | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 pdb.main() | 
					
						
							|  |  |  |             except PermissionError: | 
					
						
							|  |  |  |                 self.skipTest("Insufficient permissions for remote execution") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         process.wait() | 
					
						
							|  |  |  |         server_stdout = process.stdout.read() | 
					
						
							|  |  |  |         server_stderr = process.stderr.read() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if process.returncode != 0: | 
					
						
							|  |  |  |             print("server failed") | 
					
						
							|  |  |  |             print(f"server stdout:\n{server_stdout}") | 
					
						
							|  |  |  |             print(f"server stderr:\n{server_stderr}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(process.returncode, 0) | 
					
						
							|  |  |  |         return { | 
					
						
							|  |  |  |             "client": { | 
					
						
							|  |  |  |                 "stdout": client_stdout.getvalue(), | 
					
						
							|  |  |  |                 "stderr": client_stderr.getvalue(), | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |             "server": { | 
					
						
							|  |  |  |                 "stdout": server_stdout, | 
					
						
							|  |  |  |                 "stderr": server_stderr, | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_attach_to_process_without_colors(self): | 
					
						
							|  |  |  |         with force_color(False): | 
					
						
							|  |  |  |             output = self.do_integration_test("ll\nx=42\n") | 
					
						
							|  |  |  |         self.assertEqual(output["client"]["stderr"], "") | 
					
						
							|  |  |  |         self.assertEqual(output["server"]["stderr"], "") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") | 
					
						
							|  |  |  |         self.assertIn("while x == 1", output["client"]["stdout"]) | 
					
						
							|  |  |  |         self.assertNotIn("\x1b", output["client"]["stdout"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_attach_to_process_with_colors(self): | 
					
						
							|  |  |  |         with force_color(True): | 
					
						
							|  |  |  |             output = self.do_integration_test("ll\nx=42\n") | 
					
						
							|  |  |  |         self.assertEqual(output["client"]["stderr"], "") | 
					
						
							|  |  |  |         self.assertEqual(output["server"]["stderr"], "") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") | 
					
						
							|  |  |  |         self.assertIn("\x1b", output["client"]["stdout"]) | 
					
						
							|  |  |  |         self.assertNotIn("while x == 1", output["client"]["stdout"]) | 
					
						
							|  |  |  |         self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"])) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-24 20:43:23 -04:00
										 |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     unittest.main() |