mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			124 lines
		
	
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			124 lines
		
	
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| #! /usr/bin/env python
 | |
| 
 | |
| """A multi-threaded telnet-like server that gives a Python prompt.
 | |
| 
 | |
| This is really a prototype for the same thing in C.
 | |
| 
 | |
| Usage: pysvr.py [port]
 | |
| 
 | |
| For security reasons, it only accepts requests from the current host.
 | |
| This can still be insecure, but restricts violations from people who
 | |
| can log in on your machine.  Use with caution!
 | |
| 
 | |
| """
 | |
| 
 | |
| import sys, os, string, getopt, thread, socket, traceback
 | |
| 
 | |
| PORT = 4000                             # Default port
 | |
| 
 | |
| def main():
 | |
|     try:
 | |
|         opts, args = getopt.getopt(sys.argv[1:], "")
 | |
|         if len(args) > 1:
 | |
|             raise getopt.error, "Too many arguments."
 | |
|     except getopt.error, msg:
 | |
|         usage(msg)
 | |
|     for o, a in opts:
 | |
|         pass
 | |
|     if args:
 | |
|         try:
 | |
|             port = string.atoi(args[0])
 | |
|         except ValueError, msg:
 | |
|             usage(msg)
 | |
|     else:
 | |
|         port = PORT
 | |
|     main_thread(port)
 | |
| 
 | |
| def usage(msg=None):
 | |
|     sys.stdout = sys.stderr
 | |
|     if msg:
 | |
|         print msg
 | |
|     print "\n", __doc__,
 | |
|     sys.exit(2)
 | |
| 
 | |
| def main_thread(port):
 | |
|     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | |
|     sock.bind(("", port))
 | |
|     sock.listen(5)
 | |
|     print "Listening on port", port, "..."
 | |
|     while 1:
 | |
|         (conn, addr) = sock.accept()
 | |
|         if addr[0] != conn.getsockname()[0]:
 | |
|             conn.close()
 | |
|             print "Refusing connection from non-local host", addr[0], "."
 | |
|             continue
 | |
|         thread.start_new_thread(service_thread, (conn, addr))
 | |
|         del conn, addr
 | |
| 
 | |
| def service_thread(conn, addr):
 | |
|     (caddr, cport) = addr
 | |
|     print "Thread %s has connection from %s.\n" % (str(thread.get_ident()),
 | |
|                                                    caddr),
 | |
|     stdin = conn.makefile("r")
 | |
|     stdout = conn.makefile("w", 0)
 | |
|     run_interpreter(stdin, stdout)
 | |
|     print "Thread %s is done.\n" % str(thread.get_ident()),
 | |
| 
 | |
| def run_interpreter(stdin, stdout):
 | |
|     globals = {}
 | |
|     try:
 | |
|         str(sys.ps1)
 | |
|     except:
 | |
|         sys.ps1 = ">>> "
 | |
|     source = ""
 | |
|     while 1:
 | |
|         stdout.write(sys.ps1)
 | |
|         line = stdin.readline()
 | |
|         if line[:2] == '\377\354':
 | |
|             line = ""
 | |
|         if not line and not source:
 | |
|             break
 | |
|         if line[-2:] == '\r\n':
 | |
|             line = line[:-2] + '\n'
 | |
|         source = source + line
 | |
|         try:
 | |
|             code = compile_command(source)
 | |
|         except SyntaxError, err:
 | |
|             source = ""
 | |
|             traceback.print_exception(SyntaxError, err, None, file=stdout)
 | |
|             continue
 | |
|         if not code:
 | |
|             continue
 | |
|         source = ""
 | |
|         try:
 | |
|             run_command(code, stdin, stdout, globals)
 | |
|         except SystemExit, how:
 | |
|             if how:
 | |
|                 try:
 | |
|                     how = str(how)
 | |
|                 except:
 | |
|                     how = ""
 | |
|                 stdout.write("Exit %s\n" % how)
 | |
|             break
 | |
|     stdout.write("\nGoodbye.\n")
 | |
| 
 | |
| def run_command(code, stdin, stdout, globals):
 | |
|     save = sys.stdin, sys.stdout, sys.stderr
 | |
|     try:
 | |
|         sys.stdout = sys.stderr = stdout
 | |
|         sys.stdin = stdin
 | |
|         try:
 | |
|             exec code in globals
 | |
|         except SystemExit, how:
 | |
|             raise SystemExit, how, sys.exc_info()[2]
 | |
|         except:
 | |
|             type, value, tb = sys.exc_info()
 | |
|             if tb: tb = tb.tb_next
 | |
|             traceback.print_exception(type, value, tb)
 | |
|             del tb
 | |
|     finally:
 | |
|         sys.stdin, sys.stdout, sys.stderr = save
 | |
| 
 | |
| from code import compile_command
 | |
| 
 | |
| main()
 | 
