mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	Issue 9873: the URL parsing functions now accept ASCII encoded byte sequences in addition to character strings
This commit is contained in:
		
							parent
							
								
									43f0c27be7
								
							
						
					
					
						commit
						9fc443cf59
					
				
					 5 changed files with 606 additions and 140 deletions
				
			
		|  | @ -60,6 +60,7 @@ | |||
|                 '0123456789' | ||||
|                 '+-.') | ||||
| 
 | ||||
| # XXX: Consider replacing with functools.lru_cache | ||||
| MAX_CACHE_SIZE = 20 | ||||
| _parse_cache = {} | ||||
| 
 | ||||
|  | @ -69,66 +70,210 @@ def clear_cache(): | |||
|     _safe_quoters.clear() | ||||
| 
 | ||||
| 
 | ||||
| class ResultMixin(object): | ||||
|     """Shared methods for the parsed result objects.""" | ||||
| # Helpers for bytes handling | ||||
| # For 3.2, we deliberately require applications that | ||||
| # handle improperly quoted URLs to do their own | ||||
| # decoding and encoding. If valid use cases are | ||||
| # presented, we may relax this by using latin-1 | ||||
| # decoding internally for 3.3 | ||||
| _implicit_encoding = 'ascii' | ||||
| _implicit_errors = 'strict' | ||||
| 
 | ||||
| def _noop(obj): | ||||
|     return obj | ||||
| 
 | ||||
| def _encode_result(obj, encoding=_implicit_encoding, | ||||
|                         errors=_implicit_errors): | ||||
|     return obj.encode(encoding, errors) | ||||
| 
 | ||||
| def _decode_args(args, encoding=_implicit_encoding, | ||||
|                        errors=_implicit_errors): | ||||
|     return tuple(x.decode(encoding, errors) if x else '' for x in args) | ||||
| 
 | ||||
| def _coerce_args(*args): | ||||
|     # Invokes decode if necessary to create str args | ||||
|     # and returns the coerced inputs along with | ||||
|     # an appropriate result coercion function | ||||
|     #   - noop for str inputs | ||||
|     #   - encoding function otherwise | ||||
|     str_input = isinstance(args[0], str) | ||||
|     for arg in args[1:]: | ||||
|         # We special-case the empty string to support the | ||||
|         # "scheme=''" default argument to some functions | ||||
|         if arg and isinstance(arg, str) != str_input: | ||||
|             raise TypeError("Cannot mix str and non-str arguments") | ||||
|     if str_input: | ||||
|         return args + (_noop,) | ||||
|     return _decode_args(args) + (_encode_result,) | ||||
| 
 | ||||
| # Result objects are more helpful than simple tuples | ||||
| class _ResultMixinStr(object): | ||||
|     """Standard approach to encoding parsed results from str to bytes""" | ||||
|     __slots__ = () | ||||
| 
 | ||||
|     def encode(self, encoding='ascii', errors='strict'): | ||||
|         return self._encoded_counterpart(*(x.encode(encoding, errors) for x in self)) | ||||
| 
 | ||||
| 
 | ||||
| class _ResultMixinBytes(object): | ||||
|     """Standard approach to decoding parsed results from bytes to str""" | ||||
|     __slots__ = () | ||||
| 
 | ||||
|     def decode(self, encoding='ascii', errors='strict'): | ||||
|         return self._decoded_counterpart(*(x.decode(encoding, errors) for x in self)) | ||||
| 
 | ||||
| 
 | ||||
| class _NetlocResultMixinBase(object): | ||||
|     """Shared methods for the parsed result objects containing a netloc element""" | ||||
|     __slots__ = () | ||||
| 
 | ||||
|     @property | ||||
|     def username(self): | ||||
|         netloc = self.netloc | ||||
|         if "@" in netloc: | ||||
|             userinfo = netloc.rsplit("@", 1)[0] | ||||
|             if ":" in userinfo: | ||||
|                 userinfo = userinfo.split(":", 1)[0] | ||||
|             return userinfo | ||||
|         return None | ||||
|         return self._userinfo[0] | ||||
| 
 | ||||
|     @property | ||||
|     def password(self): | ||||
|         netloc = self.netloc | ||||
|         if "@" in netloc: | ||||
|             userinfo = netloc.rsplit("@", 1)[0] | ||||
|             if ":" in userinfo: | ||||
|                 return userinfo.split(":", 1)[1] | ||||
|         return None | ||||
|         return self._userinfo[1] | ||||
| 
 | ||||
|     @property | ||||
|     def hostname(self): | ||||
|         netloc = self.netloc.split('@')[-1] | ||||
|         if '[' in netloc and ']' in netloc: | ||||
|             return netloc.split(']')[0][1:].lower() | ||||
|         elif ':' in netloc: | ||||
|             return netloc.split(':')[0].lower() | ||||
|         elif netloc == '': | ||||
|             return None | ||||
|         else: | ||||
|             return netloc.lower() | ||||
|         hostname = self._hostinfo[0] | ||||
|         if not hostname: | ||||
|             hostname = None | ||||
|         elif hostname is not None: | ||||
|             hostname = hostname.lower() | ||||
|         return hostname | ||||
| 
 | ||||
|     @property | ||||
|     def port(self): | ||||
|         netloc = self.netloc.split('@')[-1].split(']')[-1] | ||||
|         if ':' in netloc: | ||||
|             port = netloc.split(':')[1] | ||||
|             return int(port, 10) | ||||
|         port = self._hostinfo[1] | ||||
|         if port is not None: | ||||
|             port = int(port, 10) | ||||
|         return port | ||||
| 
 | ||||
| 
 | ||||
| class _NetlocResultMixinStr(_NetlocResultMixinBase, _ResultMixinStr): | ||||
|     __slots__ = () | ||||
| 
 | ||||
|     @property | ||||
|     def _userinfo(self): | ||||
|         netloc = self.netloc | ||||
|         userinfo, have_info, hostinfo = netloc.rpartition('@') | ||||
|         if have_info: | ||||
|             username, have_password, password = userinfo.partition(':') | ||||
|             if not have_password: | ||||
|                 password = None | ||||
|         else: | ||||
|             return None | ||||
|             username = password = None | ||||
|         return username, password | ||||
| 
 | ||||
|     @property | ||||
|     def _hostinfo(self): | ||||
|         netloc = self.netloc | ||||
|         _, _, hostinfo = netloc.rpartition('@') | ||||
|         _, have_open_br, bracketed = hostinfo.partition('[') | ||||
|         if have_open_br: | ||||
|             hostname, _, port = bracketed.partition(']') | ||||
|             _, have_port, port = port.partition(':') | ||||
|         else: | ||||
|             hostname, have_port, port = hostinfo.partition(':') | ||||
|         if not have_port: | ||||
|             port = None | ||||
|         return hostname, port | ||||
| 
 | ||||
| 
 | ||||
| class _NetlocResultMixinBytes(_NetlocResultMixinBase, _ResultMixinBytes): | ||||
|     __slots__ = () | ||||
| 
 | ||||
|     @property | ||||
|     def _userinfo(self): | ||||
|         netloc = self.netloc | ||||
|         userinfo, have_info, hostinfo = netloc.rpartition(b'@') | ||||
|         if have_info: | ||||
|             username, have_password, password = userinfo.partition(b':') | ||||
|             if not have_password: | ||||
|                 password = None | ||||
|         else: | ||||
|             username = password = None | ||||
|         return username, password | ||||
| 
 | ||||
|     @property | ||||
|     def _hostinfo(self): | ||||
|         netloc = self.netloc | ||||
|         _, _, hostinfo = netloc.rpartition(b'@') | ||||
|         _, have_open_br, bracketed = hostinfo.partition(b'[') | ||||
|         if have_open_br: | ||||
|             hostname, _, port = bracketed.partition(b']') | ||||
|             _, have_port, port = port.partition(b':') | ||||
|         else: | ||||
|             hostname, have_port, port = hostinfo.partition(b':') | ||||
|         if not have_port: | ||||
|             port = None | ||||
|         return hostname, port | ||||
| 
 | ||||
| 
 | ||||
| from collections import namedtuple | ||||
| 
 | ||||
| class SplitResult(namedtuple('SplitResult', 'scheme netloc path query fragment'), ResultMixin): | ||||
| _DefragResultBase = namedtuple('DefragResult', 'url fragment') | ||||
| _SplitResultBase = namedtuple('SplitResult', 'scheme netloc path query fragment') | ||||
| _ParseResultBase = namedtuple('ParseResult', 'scheme netloc path params query fragment') | ||||
| 
 | ||||
| # For backwards compatibility, alias _NetlocResultMixinStr | ||||
| # ResultBase is no longer part of the documented API, but it is | ||||
| # retained since deprecating it isn't worth the hassle | ||||
| ResultBase = _NetlocResultMixinStr | ||||
| 
 | ||||
| # Structured result objects for string data | ||||
| class DefragResult(_DefragResultBase, _ResultMixinStr): | ||||
|     __slots__ = () | ||||
|     def geturl(self): | ||||
|         if self.fragment: | ||||
|             return self.url + '#' + self.fragment | ||||
|         else: | ||||
|             return self.url | ||||
| 
 | ||||
| class SplitResult(_SplitResultBase, _NetlocResultMixinStr): | ||||
|     __slots__ = () | ||||
|     def geturl(self): | ||||
|         return urlunsplit(self) | ||||
| 
 | ||||
| 
 | ||||
| class ParseResult(namedtuple('ParseResult', 'scheme netloc path params query fragment'), ResultMixin): | ||||
| 
 | ||||
| class ParseResult(_ParseResultBase, _NetlocResultMixinStr): | ||||
|     __slots__ = () | ||||
| 
 | ||||
|     def geturl(self): | ||||
|         return urlunparse(self) | ||||
| 
 | ||||
| # Structured result objects for bytes data | ||||
| class DefragResultBytes(_DefragResultBase, _ResultMixinBytes): | ||||
|     __slots__ = () | ||||
|     def geturl(self): | ||||
|         if self.fragment: | ||||
|             return self.url + b'#' + self.fragment | ||||
|         else: | ||||
|             return self.url | ||||
| 
 | ||||
| class SplitResultBytes(_SplitResultBase, _NetlocResultMixinBytes): | ||||
|     __slots__ = () | ||||
|     def geturl(self): | ||||
|         return urlunsplit(self) | ||||
| 
 | ||||
| class ParseResultBytes(_ParseResultBase, _NetlocResultMixinBytes): | ||||
|     __slots__ = () | ||||
|     def geturl(self): | ||||
|         return urlunparse(self) | ||||
| 
 | ||||
| # Set up the encode/decode result pairs | ||||
| def _fix_result_transcoding(): | ||||
|     _result_pairs = ( | ||||
|         (DefragResult, DefragResultBytes), | ||||
|         (SplitResult, SplitResultBytes), | ||||
|         (ParseResult, ParseResultBytes), | ||||
|     ) | ||||
|     for _decoded, _encoded in _result_pairs: | ||||
|         _decoded._encoded_counterpart = _encoded | ||||
|         _encoded._decoded_counterpart = _decoded | ||||
| 
 | ||||
| _fix_result_transcoding() | ||||
| del _fix_result_transcoding | ||||
| 
 | ||||
| def urlparse(url, scheme='', allow_fragments=True): | ||||
|     """Parse a URL into 6 components: | ||||
|  | @ -136,13 +281,15 @@ def urlparse(url, scheme='', allow_fragments=True): | |||
|     Return a 6-tuple: (scheme, netloc, path, params, query, fragment). | ||||
|     Note that we don't break the components up in smaller bits | ||||
|     (e.g. netloc is a single string) and we don't expand % escapes.""" | ||||
|     url, scheme, _coerce_result = _coerce_args(url, scheme) | ||||
|     tuple = urlsplit(url, scheme, allow_fragments) | ||||
|     scheme, netloc, url, query, fragment = tuple | ||||
|     if scheme in uses_params and ';' in url: | ||||
|         url, params = _splitparams(url) | ||||
|     else: | ||||
|         params = '' | ||||
|     return ParseResult(scheme, netloc, url, params, query, fragment) | ||||
|     result = ParseResult(scheme, netloc, url, params, query, fragment) | ||||
|     return _coerce_result(result) | ||||
| 
 | ||||
| def _splitparams(url): | ||||
|     if '/'  in url: | ||||
|  | @ -167,11 +314,12 @@ def urlsplit(url, scheme='', allow_fragments=True): | |||
|     Return a 5-tuple: (scheme, netloc, path, query, fragment). | ||||
|     Note that we don't break the components up in smaller bits | ||||
|     (e.g. netloc is a single string) and we don't expand % escapes.""" | ||||
|     url, scheme, _coerce_result = _coerce_args(url, scheme) | ||||
|     allow_fragments = bool(allow_fragments) | ||||
|     key = url, scheme, allow_fragments, type(url), type(scheme) | ||||
|     cached = _parse_cache.get(key, None) | ||||
|     if cached: | ||||
|         return cached | ||||
|         return _coerce_result(cached) | ||||
|     if len(_parse_cache) >= MAX_CACHE_SIZE: # avoid runaway growth | ||||
|         clear_cache() | ||||
|     netloc = query = fragment = '' | ||||
|  | @ -191,7 +339,7 @@ def urlsplit(url, scheme='', allow_fragments=True): | |||
|                 url, query = url.split('?', 1) | ||||
|             v = SplitResult(scheme, netloc, url, query, fragment) | ||||
|             _parse_cache[key] = v | ||||
|             return v | ||||
|             return _coerce_result(v) | ||||
|         if url.endswith(':') or not url[i+1].isdigit(): | ||||
|             for c in url[:i]: | ||||
|                 if c not in scheme_chars: | ||||
|  | @ -209,17 +357,18 @@ def urlsplit(url, scheme='', allow_fragments=True): | |||
|         url, query = url.split('?', 1) | ||||
|     v = SplitResult(scheme, netloc, url, query, fragment) | ||||
|     _parse_cache[key] = v | ||||
|     return v | ||||
|     return _coerce_result(v) | ||||
| 
 | ||||
| def urlunparse(components): | ||||
|     """Put a parsed URL back together again.  This may result in a | ||||
|     slightly different, but equivalent URL, if the URL that was parsed | ||||
|     originally had redundant delimiters, e.g. a ? with an empty query | ||||
|     (the draft states that these are equivalent).""" | ||||
|     scheme, netloc, url, params, query, fragment = components | ||||
|     scheme, netloc, url, params, query, fragment, _coerce_result = ( | ||||
|                                                   _coerce_args(*components)) | ||||
|     if params: | ||||
|         url = "%s;%s" % (url, params) | ||||
|     return urlunsplit((scheme, netloc, url, query, fragment)) | ||||
|     return _coerce_result(urlunsplit((scheme, netloc, url, query, fragment))) | ||||
| 
 | ||||
| def urlunsplit(components): | ||||
|     """Combine the elements of a tuple as returned by urlsplit() into a | ||||
|  | @ -227,7 +376,8 @@ def urlunsplit(components): | |||
|     This may result in a slightly different, but equivalent URL, if the URL that | ||||
|     was parsed originally had unnecessary delimiters (for example, a ? with an | ||||
|     empty query; the RFC states that these are equivalent).""" | ||||
|     scheme, netloc, url, query, fragment = components | ||||
|     scheme, netloc, url, query, fragment, _coerce_result = ( | ||||
|                                           _coerce_args(*components)) | ||||
|     if netloc or (scheme and scheme in uses_netloc and url[:2] != '//'): | ||||
|         if url and url[:1] != '/': url = '/' + url | ||||
|         url = '//' + (netloc or '') + url | ||||
|  | @ -237,7 +387,7 @@ def urlunsplit(components): | |||
|         url = url + '?' + query | ||||
|     if fragment: | ||||
|         url = url + '#' + fragment | ||||
|     return url | ||||
|     return _coerce_result(url) | ||||
| 
 | ||||
| def urljoin(base, url, allow_fragments=True): | ||||
|     """Join a base URL and a possibly relative URL to form an absolute | ||||
|  | @ -246,32 +396,33 @@ def urljoin(base, url, allow_fragments=True): | |||
|         return url | ||||
|     if not url: | ||||
|         return base | ||||
|     base, url, _coerce_result = _coerce_args(base, url) | ||||
|     bscheme, bnetloc, bpath, bparams, bquery, bfragment = \ | ||||
|             urlparse(base, '', allow_fragments) | ||||
|     scheme, netloc, path, params, query, fragment = \ | ||||
|             urlparse(url, bscheme, allow_fragments) | ||||
|     if scheme != bscheme or scheme not in uses_relative: | ||||
|         return url | ||||
|         return _coerce_result(url) | ||||
|     if scheme in uses_netloc: | ||||
|         if netloc: | ||||
|             return urlunparse((scheme, netloc, path, | ||||
|                                params, query, fragment)) | ||||
|             return _coerce_result(urlunparse((scheme, netloc, path, | ||||
|                                               params, query, fragment))) | ||||
|         netloc = bnetloc | ||||
|     if path[:1] == '/': | ||||
|         return urlunparse((scheme, netloc, path, | ||||
|                            params, query, fragment)) | ||||
|         return _coerce_result(urlunparse((scheme, netloc, path, | ||||
|                                           params, query, fragment))) | ||||
|     if not path: | ||||
|         path = bpath | ||||
|         if not params: | ||||
|             params = bparams | ||||
|         else: | ||||
|             path = path[:-1] | ||||
|             return urlunparse((scheme, netloc, path, | ||||
|                                 params, query, fragment)) | ||||
|             return _coerce_result(urlunparse((scheme, netloc, path, | ||||
|                                               params, query, fragment))) | ||||
|         if not query: | ||||
|             query = bquery | ||||
|         return urlunparse((scheme, netloc, path, | ||||
|                            params, query, fragment)) | ||||
|         return _coerce_result(urlunparse((scheme, netloc, path, | ||||
|                                           params, query, fragment))) | ||||
|     segments = bpath.split('/')[:-1] + path.split('/') | ||||
|     # XXX The stuff below is bogus in various ways... | ||||
|     if segments[-1] == '.': | ||||
|  | @ -293,8 +444,8 @@ def urljoin(base, url, allow_fragments=True): | |||
|         segments[-1] = '' | ||||
|     elif len(segments) >= 2 and segments[-1] == '..': | ||||
|         segments[-2:] = [''] | ||||
|     return urlunparse((scheme, netloc, '/'.join(segments), | ||||
|                        params, query, fragment)) | ||||
|     return _coerce_result(urlunparse((scheme, netloc, '/'.join(segments), | ||||
|                                       params, query, fragment))) | ||||
| 
 | ||||
| def urldefrag(url): | ||||
|     """Removes any existing fragment from URL. | ||||
|  | @ -303,12 +454,14 @@ def urldefrag(url): | |||
|     the URL contained no fragments, the second element is the | ||||
|     empty string. | ||||
|     """ | ||||
|     url, _coerce_result = _coerce_args(url) | ||||
|     if '#' in url: | ||||
|         s, n, p, a, q, frag = urlparse(url) | ||||
|         defrag = urlunparse((s, n, p, a, q, '')) | ||||
|         return defrag, frag | ||||
|     else: | ||||
|         return url, '' | ||||
|         frag = '' | ||||
|         defrag = url | ||||
|     return _coerce_result(DefragResult(defrag, frag)) | ||||
| 
 | ||||
| def unquote_to_bytes(string): | ||||
|     """unquote_to_bytes('abc%20def') -> b'abc def'.""" | ||||
|  | @ -420,6 +573,7 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False): | |||
| 
 | ||||
|     Returns a list, as G-d intended. | ||||
|     """ | ||||
|     qs, _coerce_result = _coerce_args(qs) | ||||
|     pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')] | ||||
|     r = [] | ||||
|     for name_value in pairs: | ||||
|  | @ -435,10 +589,9 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False): | |||
|             else: | ||||
|                 continue | ||||
|         if len(nv[1]) or keep_blank_values: | ||||
|             name = unquote(nv[0].replace('+', ' ')) | ||||
|             value = unquote(nv[1].replace('+', ' ')) | ||||
|             name = _coerce_result(unquote(nv[0].replace('+', ' '))) | ||||
|             value = _coerce_result(unquote(nv[1].replace('+', ' '))) | ||||
|             r.append((name, value)) | ||||
| 
 | ||||
|     return r | ||||
| 
 | ||||
| def unquote_plus(string, encoding='utf-8', errors='replace'): | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Nick Coghlan
						Nick Coghlan