mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			362 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			362 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """CGI-savvy HTTP Server.
 | |
| 
 | |
| This module builds on SimpleHTTPServer by implementing GET and POST
 | |
| requests to cgi-bin scripts.
 | |
| 
 | |
| If the os.fork() function is not present (e.g. on Windows),
 | |
| os.popen2() is used as a fallback, with slightly altered semantics; if
 | |
| that function is not present either (e.g. on Macintosh), only Python
 | |
| scripts are supported, and they are executed by the current process.
 | |
| 
 | |
| In all cases, the implementation is intentionally naive -- all
 | |
| requests are executed sychronously.
 | |
| 
 | |
| SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
 | |
| -- it may execute arbitrary Python code or external programs.
 | |
| 
 | |
| Note that status code 200 is sent prior to execution of a CGI script, so
 | |
| scripts cannot send other status codes such as 302 (redirect).
 | |
| """
 | |
| 
 | |
| 
 | |
| __version__ = "0.4"
 | |
| 
 | |
| __all__ = ["CGIHTTPRequestHandler"]
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import urllib
 | |
| import BaseHTTPServer
 | |
| import SimpleHTTPServer
 | |
| import select
 | |
| 
 | |
| 
 | |
| class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
 | |
| 
 | |
|     """Complete HTTP server with GET, HEAD and POST commands.
 | |
| 
 | |
|     GET and HEAD also support running CGI scripts.
 | |
| 
 | |
|     The POST command is *only* implemented for CGI scripts.
 | |
| 
 | |
|     """
 | |
| 
 | |
|     # Determine platform specifics
 | |
|     have_fork = hasattr(os, 'fork')
 | |
|     have_popen2 = hasattr(os, 'popen2')
 | |
|     have_popen3 = hasattr(os, 'popen3')
 | |
| 
 | |
|     # Make rfile unbuffered -- we need to read one line and then pass
 | |
|     # the rest to a subprocess, so we can't use buffered input.
 | |
|     rbufsize = 0
 | |
| 
 | |
|     def do_POST(self):
 | |
|         """Serve a POST request.
 | |
| 
 | |
|         This is only implemented for CGI scripts.
 | |
| 
 | |
|         """
 | |
| 
 | |
|         if self.is_cgi():
 | |
|             self.run_cgi()
 | |
|         else:
 | |
|             self.send_error(501, "Can only POST to CGI scripts")
 | |
| 
 | |
|     def send_head(self):
 | |
|         """Version of send_head that support CGI scripts"""
 | |
|         if self.is_cgi():
 | |
|             return self.run_cgi()
 | |
|         else:
 | |
|             return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
 | |
| 
 | |
|     def is_cgi(self):
 | |
|         """Test whether self.path corresponds to a CGI script.
 | |
| 
 | |
|         Return a tuple (dir, rest) if self.path requires running a
 | |
|         CGI script, None if not.  Note that rest begins with a
 | |
|         slash if it is not empty.
 | |
| 
 | |
|         The default implementation tests whether the path
 | |
|         begins with one of the strings in the list
 | |
|         self.cgi_directories (and the next character is a '/'
 | |
|         or the end of the string).
 | |
| 
 | |
|         """
 | |
| 
 | |
|         path = self.path
 | |
| 
 | |
|         for x in self.cgi_directories:
 | |
|             i = len(x)
 | |
|             if path[:i] == x and (not path[i:] or path[i] == '/'):
 | |
|                 self.cgi_info = path[:i], path[i+1:]
 | |
|                 return True
 | |
|         return False
 | |
| 
 | |
|     cgi_directories = ['/cgi-bin', '/htbin']
 | |
| 
 | |
|     def is_executable(self, path):
 | |
|         """Test whether argument path is an executable file."""
 | |
|         return executable(path)
 | |
| 
 | |
|     def is_python(self, path):
 | |
|         """Test whether argument path is a Python script."""
 | |
|         head, tail = os.path.splitext(path)
 | |
|         return tail.lower() in (".py", ".pyw")
 | |
| 
 | |
|     def run_cgi(self):
 | |
|         """Execute a CGI script."""
 | |
|         path = self.path
 | |
|         dir, rest = self.cgi_info
 | |
| 
 | |
|         i = path.find('/', len(dir) + 1)
 | |
|         while i >= 0:
 | |
|             nextdir = path[:i]
 | |
|             nextrest = path[i+1:]
 | |
| 
 | |
|             scriptdir = self.translate_path(nextdir)
 | |
|             if os.path.isdir(scriptdir):
 | |
|                 dir, rest = nextdir, nextrest
 | |
|                 i = path.find('/', len(dir) + 1)
 | |
|             else:
 | |
|                 break
 | |
| 
 | |
|         # find an explicit query string, if present.
 | |
|         i = rest.rfind('?')
 | |
|         if i >= 0:
 | |
|             rest, query = rest[:i], rest[i+1:]
 | |
|         else:
 | |
|             query = ''
 | |
| 
 | |
|         # dissect the part after the directory name into a script name &
 | |
|         # a possible additional path, to be stored in PATH_INFO.
 | |
|         i = rest.find('/')
 | |
|         if i >= 0:
 | |
|             script, rest = rest[:i], rest[i:]
 | |
|         else:
 | |
|             script, rest = rest, ''
 | |
| 
 | |
|         scriptname = dir + '/' + script
 | |
|         scriptfile = self.translate_path(scriptname)
 | |
|         if not os.path.exists(scriptfile):
 | |
|             self.send_error(404, "No such CGI script (%r)" % scriptname)
 | |
|             return
 | |
|         if not os.path.isfile(scriptfile):
 | |
|             self.send_error(403, "CGI script is not a plain file (%r)" %
 | |
|                             scriptname)
 | |
|             return
 | |
|         ispy = self.is_python(scriptname)
 | |
|         if not ispy:
 | |
|             if not (self.have_fork or self.have_popen2 or self.have_popen3):
 | |
|                 self.send_error(403, "CGI script is not a Python script (%r)" %
 | |
|                                 scriptname)
 | |
|                 return
 | |
|             if not self.is_executable(scriptfile):
 | |
|                 self.send_error(403, "CGI script is not executable (%r)" %
 | |
|                                 scriptname)
 | |
|                 return
 | |
| 
 | |
|         # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
 | |
|         # XXX Much of the following could be prepared ahead of time!
 | |
|         env = {}
 | |
|         env['SERVER_SOFTWARE'] = self.version_string()
 | |
|         env['SERVER_NAME'] = self.server.server_name
 | |
|         env['GATEWAY_INTERFACE'] = 'CGI/1.1'
 | |
|         env['SERVER_PROTOCOL'] = self.protocol_version
 | |
|         env['SERVER_PORT'] = str(self.server.server_port)
 | |
|         env['REQUEST_METHOD'] = self.command
 | |
|         uqrest = urllib.unquote(rest)
 | |
|         env['PATH_INFO'] = uqrest
 | |
|         env['PATH_TRANSLATED'] = self.translate_path(uqrest)
 | |
|         env['SCRIPT_NAME'] = scriptname
 | |
|         if query:
 | |
|             env['QUERY_STRING'] = query
 | |
|         host = self.address_string()
 | |
|         if host != self.client_address[0]:
 | |
|             env['REMOTE_HOST'] = host
 | |
|         env['REMOTE_ADDR'] = self.client_address[0]
 | |
|         authorization = self.headers.getheader("authorization")
 | |
|         if authorization:
 | |
|             authorization = authorization.split()
 | |
|             if len(authorization) == 2:
 | |
|                 import base64, binascii
 | |
|                 env['AUTH_TYPE'] = authorization[0]
 | |
|                 if authorization[0].lower() == "basic":
 | |
|                     try:
 | |
|                         authorization = base64.decodestring(authorization[1])
 | |
|                     except binascii.Error:
 | |
|                         pass
 | |
|                     else:
 | |
|                         authorization = authorization.split(':')
 | |
|                         if len(authorization) == 2:
 | |
|                             env['REMOTE_USER'] = authorization[0]
 | |
|         # XXX REMOTE_IDENT
 | |
|         if self.headers.typeheader is None:
 | |
|             env['CONTENT_TYPE'] = self.headers.type
 | |
|         else:
 | |
|             env['CONTENT_TYPE'] = self.headers.typeheader
 | |
|         length = self.headers.getheader('content-length')
 | |
|         if length:
 | |
|             env['CONTENT_LENGTH'] = length
 | |
|         accept = []
 | |
|         for line in self.headers.getallmatchingheaders('accept'):
 | |
|             if line[:1] in "\t\n\r ":
 | |
|                 accept.append(line.strip())
 | |
|             else:
 | |
|                 accept = accept + line[7:].split(',')
 | |
|         env['HTTP_ACCEPT'] = ','.join(accept)
 | |
|         ua = self.headers.getheader('user-agent')
 | |
|         if ua:
 | |
|             env['HTTP_USER_AGENT'] = ua
 | |
|         co = filter(None, self.headers.getheaders('cookie'))
 | |
|         if co:
 | |
|             env['HTTP_COOKIE'] = ', '.join(co)
 | |
|         # XXX Other HTTP_* headers
 | |
|         # Since we're setting the env in the parent, provide empty
 | |
|         # values to override previously set values
 | |
|         for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
 | |
|                   'HTTP_USER_AGENT', 'HTTP_COOKIE'):
 | |
|             env.setdefault(k, "")
 | |
|         os.environ.update(env)
 | |
| 
 | |
|         self.send_response(200, "Script output follows")
 | |
| 
 | |
|         decoded_query = query.replace('+', ' ')
 | |
| 
 | |
|         if self.have_fork:
 | |
|             # Unix -- fork as we should
 | |
|             args = [script]
 | |
|             if '=' not in decoded_query:
 | |
|                 args.append(decoded_query)
 | |
|             nobody = nobody_uid()
 | |
|             self.wfile.flush() # Always flush before forking
 | |
|             pid = os.fork()
 | |
|             if pid != 0:
 | |
|                 # Parent
 | |
|                 pid, sts = os.waitpid(pid, 0)
 | |
|                 # throw away additional data [see bug #427345]
 | |
|                 while select.select([self.rfile], [], [], 0)[0]:
 | |
|                     if not self.rfile.read(1):
 | |
|                         break
 | |
|                 if sts:
 | |
|                     self.log_error("CGI script exit status %#x", sts)
 | |
|                 return
 | |
|             # Child
 | |
|             try:
 | |
|                 try:
 | |
|                     os.setuid(nobody)
 | |
|                 except os.error:
 | |
|                     pass
 | |
|                 os.dup2(self.rfile.fileno(), 0)
 | |
|                 os.dup2(self.wfile.fileno(), 1)
 | |
|                 os.execve(scriptfile, args, os.environ)
 | |
|             except:
 | |
|                 self.server.handle_error(self.request, self.client_address)
 | |
|                 os._exit(127)
 | |
| 
 | |
|         elif self.have_popen2 or self.have_popen3:
 | |
|             # Windows -- use popen2 or popen3 to create a subprocess
 | |
|             import shutil
 | |
|             if self.have_popen3:
 | |
|                 popenx = os.popen3
 | |
|             else:
 | |
|                 popenx = os.popen2
 | |
|             cmdline = scriptfile
 | |
|             if self.is_python(scriptfile):
 | |
|                 interp = sys.executable
 | |
|                 if interp.lower().endswith("w.exe"):
 | |
|                     # On Windows, use python.exe, not pythonw.exe
 | |
|                     interp = interp[:-5] + interp[-4:]
 | |
|                 cmdline = "%s -u %s" % (interp, cmdline)
 | |
|             if '=' not in query and '"' not in query:
 | |
|                 cmdline = '%s "%s"' % (cmdline, query)
 | |
|             self.log_message("command: %s", cmdline)
 | |
|             try:
 | |
|                 nbytes = int(length)
 | |
|             except (TypeError, ValueError):
 | |
|                 nbytes = 0
 | |
|             files = popenx(cmdline, 'b')
 | |
|             fi = files[0]
 | |
|             fo = files[1]
 | |
|             if self.have_popen3:
 | |
|                 fe = files[2]
 | |
|             if self.command.lower() == "post" and nbytes > 0:
 | |
|                 data = self.rfile.read(nbytes)
 | |
|                 fi.write(data)
 | |
|             # throw away additional data [see bug #427345]
 | |
|             while select.select([self.rfile._sock], [], [], 0)[0]:
 | |
|                 if not self.rfile._sock.recv(1):
 | |
|                     break
 | |
|             fi.close()
 | |
|             shutil.copyfileobj(fo, self.wfile)
 | |
|             if self.have_popen3:
 | |
|                 errors = fe.read()
 | |
|                 fe.close()
 | |
|                 if errors:
 | |
|                     self.log_error('%s', errors)
 | |
|             sts = fo.close()
 | |
|             if sts:
 | |
|                 self.log_error("CGI script exit status %#x", sts)
 | |
|             else:
 | |
|                 self.log_message("CGI script exited OK")
 | |
| 
 | |
|         else:
 | |
|             # Other O.S. -- execute script in this process
 | |
|             save_argv = sys.argv
 | |
|             save_stdin = sys.stdin
 | |
|             save_stdout = sys.stdout
 | |
|             save_stderr = sys.stderr
 | |
|             try:
 | |
|                 save_cwd = os.getcwd()
 | |
|                 try:
 | |
|                     sys.argv = [scriptfile]
 | |
|                     if '=' not in decoded_query:
 | |
|                         sys.argv.append(decoded_query)
 | |
|                     sys.stdout = self.wfile
 | |
|                     sys.stdin = self.rfile
 | |
|                     execfile(scriptfile, {"__name__": "__main__"})
 | |
|                 finally:
 | |
|                     sys.argv = save_argv
 | |
|                     sys.stdin = save_stdin
 | |
|                     sys.stdout = save_stdout
 | |
|                     sys.stderr = save_stderr
 | |
|                     os.chdir(save_cwd)
 | |
|             except SystemExit, sts:
 | |
|                 self.log_error("CGI script exit status %s", str(sts))
 | |
|             else:
 | |
|                 self.log_message("CGI script exited OK")
 | |
| 
 | |
| 
 | |
| nobody = None
 | |
| 
 | |
| def nobody_uid():
 | |
|     """Internal routine to get nobody's uid"""
 | |
|     global nobody
 | |
|     if nobody:
 | |
|         return nobody
 | |
|     try:
 | |
|         import pwd
 | |
|     except ImportError:
 | |
|         return -1
 | |
|     try:
 | |
|         nobody = pwd.getpwnam('nobody')[2]
 | |
|     except KeyError:
 | |
|         nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))
 | |
|     return nobody
 | |
| 
 | |
| 
 | |
| def executable(path):
 | |
|     """Test for executable file."""
 | |
|     try:
 | |
|         st = os.stat(path)
 | |
|     except os.error:
 | |
|         return False
 | |
|     return st.st_mode & 0111 != 0
 | |
| 
 | |
| 
 | |
| def test(HandlerClass = CGIHTTPRequestHandler,
 | |
|          ServerClass = BaseHTTPServer.HTTPServer):
 | |
|     SimpleHTTPServer.test(HandlerClass, ServerClass)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     test()
 | 
